Module: ZMQ::CommonSocketBehavior

Includes:
Util
Included in:
Socket
Defined in:
lib/ffi-rzmq/socket.rb

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from Util

bind_to_random_tcp_port, errno, error_string, resultcode_ok?, version

Instance Attribute Details

- (Object) name (readonly)

Returns the value of attribute name



7
8
9
# File 'lib/ffi-rzmq/socket.rb', line 7

def name
  @name
end

- (Object) socket (readonly)

Returns the value of attribute socket



7
8
9
# File 'lib/ffi-rzmq/socket.rb', line 7

def socket
  @socket
end

Class Method Details

+ (Object) create(context_ptr, type, opts = {:receiver_class => ZMQ::Message})

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can return nil when socket creation fails.

if (socket = Socket.new(context.pointer, ZMQ::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end


34
35
36
# File 'lib/ffi-rzmq/socket.rb', line 34

def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  new(context_ptr, type, opts) rescue nil
end

Instance Method Details

- (Object) bind(address)

Binds the socket to an address.

socket.bind("tcp://127.0.0.1:5555")


181
182
183
# File 'lib/ffi-rzmq/socket.rb', line 181

def bind address
  LibZMQ.zmq_bind @socket, address
end

- (Object) close

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.

Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ.errno for the error code.

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc


202
203
204
205
206
207
208
209
210
211
212
# File 'lib/ffi-rzmq/socket.rb', line 202

def close
  if @socket
    remove_finalizer
    rc = LibZMQ.zmq_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end

- (Object) connect(address)

Connects the socket to an address.

rc = socket.connect("tcp://127.0.0.1:5555")


189
190
191
# File 'lib/ffi-rzmq/socket.rb', line 189

def connect address
  rc = LibZMQ.zmq_connect @socket, address
end

- (CommonSocketBehavior) initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message})

To avoid rescuing exceptions, use the factory method #create for all socket creation.

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the 0mq socket within the context fails.

begin
  socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
  # error handling
end

Returns:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/ffi-rzmq/socket.rb', line 67

def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as ZMQ::Message
  @receiver_klass = opts[:receiver_class]

  context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)

  unless context_ptr.null?
    @socket = LibZMQ.zmq_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
    end
  else
    raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
  end

  @longlong_cache = @int_cache = nil
  @more_parts_array = []
  @option_lookup = []
  populate_option_lookup

  define_finalizer
end

- (Boolean) more_parts?

Convenience method for checking on additional message parts.

Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

message_parts = []
message = Message.new
rc = socket.recvmsg(message)
if ZMQ::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recvmsg(message)
    message_parts.push(message) if resulcode_ok?(rc)
  end
end

Returns:

  • (Boolean)


171
172
173
174
175
# File 'lib/ffi-rzmq/socket.rb', line 171

def more_parts?
  rc = getsockopt ZMQ::RCVMORE, @more_parts_array

  Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end

- (Object) recv_multipart(list, routing_envelope, flag = 0)

Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.



427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/ffi-rzmq/socket.rb', line 427

def recv_multipart list, routing_envelope, flag = 0
  parts = []
  rc = recvmsgs parts, flag

  if Util.resultcode_ok?(rc)
    routing = true
    parts.each do |part|
      if routing
        routing_envelope << part
        routing = part.size > 0
      else
        list << part
      end
    end
  end

  rc
end

- (Object) recv_string(string, flags = 0)

Helper method to make a new #Message instance and convert its payload to a string.

flags may be ZMQ::NonBlocking.

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



359
360
361
362
363
364
365
# File 'lib/ffi-rzmq/socket.rb', line 359

def recv_string string, flags = 0
  message = @receiver_klass.new
  rc = recvmsg message, flags
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end

- (Object) recv_strings(list, flag = 0)

Receive a multipart message as a list of strings.

flag may be ZMQ::NonBlocking. Any other flag will be removed.



372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/ffi-rzmq/socket.rb', line 372

def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag

  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end

  rc
end

- (Object) recvmsg(message, flags = 0)

Dequeues a message from the underlying queue. By default, this is a blocking operation.

flags may take two values:

0 (default) - blocking operation
ZMQ::NonBlocking - non-blocking operation

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



338
339
340
341
# File 'lib/ffi-rzmq/socket.rb', line 338

def recvmsg message, flags = 0
  #LibZMQ.zmq_recvmsg @socket, message.address, flags
  __recvmsg__(@socket, message.address, flags)
end

- (Object) recvmsgs(list, flag = 0)

Receive a multipart message as an array of objects (by default these are instances of Message).

flag may be ZMQ::NonBlocking. Any other flag will be removed.



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/ffi-rzmq/socket.rb', line 392

def recvmsgs list, flag = 0
  flag = NonBlocking if dontwait?(flag)

  message = @receiver_klass.new
  rc = recvmsg message, flag

  if Util.resultcode_ok?(rc)
    list << message

    # check rc *first*; necessary because the call to #more_parts? can reset
    # the zmq_errno to a weird value, so the zmq_errno that was set on the
    # call to #recv gets lost
    while Util.resultcode_ok?(rc) && more_parts?
      message = @receiver_klass.new
      rc = recvmsg message, flag

      if Util.resultcode_ok?(rc)
        list << message
      else
        message.close
        list.each { |msg| msg.close }
        list.clear
      end
    end
  else
    message.close
  end

  rc
end

- (Object) send_and_close(message, flags = 0)

Sends a message. This will automatically close the message for both successful and failed sends.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



315
316
317
318
319
# File 'lib/ffi-rzmq/socket.rb', line 315

def send_and_close message, flags = 0
  rc = sendmsg message, flags
  message.close
  rc
end

- (Object) send_string(string, flags = 0)

Helper method to make a new #Message instance out of the string passed in for transmission.

flags may be ZMQ::NonBlocking and ZMQ::SNDMORE.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



247
248
249
250
# File 'lib/ffi-rzmq/socket.rb', line 247

def send_string string, flags = 0
  message = Message.new string
  send_and_close message, flags
end

- (Object) send_strings(parts, flags = 0)

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

flags may be ZMQ::NonBlocking.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



266
267
268
269
270
271
272
273
274
275
276
# File 'lib/ffi-rzmq/socket.rb', line 266

def send_strings parts, flags = 0
  return -1 if !parts || parts.empty?
  flags = NonBlocking if dontwait?(flags)

  parts[0..-2].each do |part|
    rc = send_string part, (flags | ZMQ::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  send_string parts[-1], flags
end

- (Object) sendmsg(message, flags = 0)

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

flags may take two values:

  • 0 (default) - blocking operation

  • ZMQ::NonBlocking - non-blocking operation

  • ZMQ::SNDMORE - this message is part of a multi-part message

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



230
231
232
# File 'lib/ffi-rzmq/socket.rb', line 230

def sendmsg message, flags = 0
  __sendmsg__(@socket, message.address, flags)
end

- (Object) sendmsgs(parts, flags = 0)

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

flags may be ZMQ::NonBlocking.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



292
293
294
295
296
297
298
299
300
301
302
# File 'lib/ffi-rzmq/socket.rb', line 292

def sendmsgs parts, flags = 0
  return -1 if !parts || parts.empty?
  flags = NonBlocking if dontwait?(flags)

  parts[0..-2].each do |part|
    rc = sendmsg part, (flags | ZMQ::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  sendmsg parts[-1], flags
end

- (Object) setsockopt(name, value, length = nil)

Set the queue options on this socket.

Valid name values that take a numeric value are:

ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3 only)
ZMQ::MAXMSGSIZE (version 3 only)
ZMQ::SNDHWM (version 3 only)
ZMQ::RCVHWM (version 3 only)
ZMQ::MULTICAST_HOPS (version 3 only)
ZMQ::RCVTIMEO (version 3 only)
ZMQ::SNDTIMEO (version 3 only)

Valid name values that take a string value are:

ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/ffi-rzmq/socket.rb', line 128

def setsockopt name, value, length = nil
  if 1 == @option_lookup[name]
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif 0 == @option_lookup[name]
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif 2 == @option_lookup[name]
    length ||= value.size

    # note: not checking errno for failed memory allocations :(
    pointer = LibC.malloc length
    pointer.write_string value
  end

  rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.nil? || pointer.null?
  rc
end