Module: ZMQ::CommonSocketBehavior
Instance Attribute Summary (collapse)
-
- (Object) name
readonly
Returns the value of attribute name.
-
- (Object) socket
readonly
Returns the value of attribute socket.
Class Method Summary (collapse)
-
+ (Object) create(context_ptr, type, opts = {:receiver_class => ZMQ::Message})
Allocates a socket of type type for sending and receiving data.
Instance Method Summary (collapse)
-
- (Object) bind(address)
Binds the socket to an address.
-
- (Object) close
Closes the socket.
-
- (Object) connect(address)
Connects the socket to an address.
-
- (CommonSocketBehavior) initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message})
To avoid rescuing exceptions, use the factory method #create for all socket creation.
-
- (Boolean) more_parts?
Convenience method for checking on additional message parts.
-
- (Object) recv_multipart(list, routing_envelope, flag = 0)
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets.
-
- (Object) recv_string(string, flags = 0)
Helper method to make a new #Message instance and convert its payload to a string.
-
- (Object) recv_strings(list, flag = 0)
Receive a multipart message as a list of strings.
-
- (Object) recvmsg(message, flags = 0)
Dequeues a message from the underlying queue.
-
- (Object) recvmsgs(list, flag = 0)
Receive a multipart message as an array of objects (by default these are instances of Message).
-
- (Object) send_and_close(message, flags = 0)
Sends a message.
-
- (Object) send_string(string, flags = 0)
Helper method to make a new #Message instance out of the string passed in for transmission.
-
- (Object) send_strings(parts, flags = 0)
Send a sequence of strings as a multipart message out of the parts passed in for transmission.
-
- (Object) sendmsg(message, flags = 0)
Queues the message for transmission.
-
- (Object) sendmsgs(parts, flags = 0)
Send a sequence of messages as a multipart message out of the parts passed in for transmission.
-
- (Object) setsockopt(name, value, length = nil)
Set the queue options on this socket.
Methods included from Util
bind_to_random_tcp_port, errno, error_string, resultcode_ok?, version
Instance Attribute Details
- (Object) name (readonly)
Returns the value of attribute name
7 8 9 |
# File 'lib/ffi-rzmq/socket.rb', line 7 def name @name end |
- (Object) socket (readonly)
Returns the value of attribute socket
7 8 9 |
# File 'lib/ffi-rzmq/socket.rb', line 7 def socket @socket end |
Class Method Details
+ (Object) create(context_ptr, type, opts = {:receiver_class => ZMQ::Message})
Allocates a socket of type type for sending and receiving data.
type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.
By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.
Creation of a new Socket object can return nil when socket creation fails.
if (socket = Socket.new(context.pointer, ZMQ::REQ))
...
else
STDERR.puts "Socket creation failed"
end
34 35 36 |
# File 'lib/ffi-rzmq/socket.rb', line 34 def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message} new(context_ptr, type, opts) rescue nil end |
Instance Method Details
- (Object) bind(address)
Binds the socket to an address.
socket.bind("tcp://127.0.0.1:5555")
181 182 183 |
# File 'lib/ffi-rzmq/socket.rb', line 181 def bind address LibZMQ.zmq_bind @socket, address end |
- (Object) close
Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.
Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ.errno for the error code.
rc = socket.close
puts("Given socket was invalid!") unless 0 == rc
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/ffi-rzmq/socket.rb', line 202 def close if @socket remove_finalizer rc = LibZMQ.zmq_close @socket @socket = nil release_cache rc else 0 end end |
- (Object) connect(address)
Connects the socket to an address.
rc = socket.connect("tcp://127.0.0.1:5555")
189 190 191 |
# File 'lib/ffi-rzmq/socket.rb', line 189 def connect address rc = LibZMQ.zmq_connect @socket, address end |
- (CommonSocketBehavior) initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message})
To avoid rescuing exceptions, use the factory method #create for all socket creation.
Allocates a socket of type type for sending and receiving data.
type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.
By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.
Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the 0mq socket within the context fails.
begin
socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
# error handling
end
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/ffi-rzmq/socket.rb', line 67 def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message} # users may override the classes used for receiving; class must conform to the # same public API as ZMQ::Message @receiver_klass = opts[:receiver_class] context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context) unless context_ptr.null? @socket = LibZMQ.zmq_socket context_ptr, type if @socket && !@socket.null? @name = SocketTypeNameMap[type] else raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null" end else raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null" end @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer end |
- (Boolean) more_parts?
Convenience method for checking on additional message parts.
Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.
Warning: if the call to #getsockopt fails, this method will return false and swallow the error.
= []
= Message.new
rc = socket.recvmsg()
if ZMQ::Util.resultcode_ok?(rc)
<<
while more_parts?
= Message.new
rc = socket.recvmsg()
.push() if resulcode_ok?(rc)
end
end
171 172 173 174 175 |
# File 'lib/ffi-rzmq/socket.rb', line 171 def more_parts? rc = getsockopt ZMQ::RCVMORE, @more_parts_array Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false end |
- (Object) recv_multipart(list, routing_envelope, flag = 0)
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/ffi-rzmq/socket.rb', line 427 def recv_multipart list, routing_envelope, flag = 0 parts = [] rc = recvmsgs parts, flag if Util.resultcode_ok?(rc) routing = true parts.each do |part| if routing routing_envelope << part routing = part.size > 0 else list << part end end end rc end |
- (Object) recv_string(string, flags = 0)
Helper method to make a new #Message instance and convert its payload to a string.
flags may be ZMQ::NonBlocking.
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
The application code is responsible for handling the message object lifecycle when #recv returns an error code.
359 360 361 362 363 364 365 |
# File 'lib/ffi-rzmq/socket.rb', line 359 def recv_string string, flags = 0 = @receiver_klass.new rc = recvmsg , flags string.replace(.copy_out_string) if Util.resultcode_ok?(rc) .close rc end |
- (Object) recv_strings(list, flag = 0)
Receive a multipart message as a list of strings.
flag may be ZMQ::NonBlocking. Any other flag will be removed.
372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/ffi-rzmq/socket.rb', line 372 def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag if Util.resultcode_ok?(rc) array.each do || list << .copy_out_string .close end end rc end |
- (Object) recvmsg(message, flags = 0)
Dequeues a message from the underlying queue. By default, this is a blocking operation.
flags may take two values:
0 (default) - blocking operation
ZMQ::NonBlocking - non-blocking operation
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
The application code is responsible for handling the message object lifecycle when #recv returns an error code.
338 339 340 341 |
# File 'lib/ffi-rzmq/socket.rb', line 338 def recvmsg , flags = 0 #LibZMQ.zmq_recvmsg @socket, message.address, flags __recvmsg__(@socket, .address, flags) end |
- (Object) recvmsgs(list, flag = 0)
Receive a multipart message as an array of objects (by default these are instances of Message).
flag may be ZMQ::NonBlocking. Any other flag will be removed.
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 |
# File 'lib/ffi-rzmq/socket.rb', line 392 def recvmsgs list, flag = 0 flag = NonBlocking if dontwait?(flag) = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << # check rc *first*; necessary because the call to #more_parts? can reset # the zmq_errno to a weird value, so the zmq_errno that was set on the # call to #recv gets lost while Util.resultcode_ok?(rc) && more_parts? = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << else .close list.each { |msg| msg.close } list.clear end end else .close end rc end |
- (Object) send_and_close(message, flags = 0)
Sends a message. This will automatically close the message for both successful and failed sends.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
315 316 317 318 319 |
# File 'lib/ffi-rzmq/socket.rb', line 315 def send_and_close , flags = 0 rc = sendmsg , flags .close rc end |
- (Object) send_string(string, flags = 0)
Helper method to make a new #Message instance out of the string passed in for transmission.
flags may be ZMQ::NonBlocking and ZMQ::SNDMORE.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
247 248 249 250 |
# File 'lib/ffi-rzmq/socket.rb', line 247 def send_string string, flags = 0 = Message.new string send_and_close , flags end |
- (Object) send_strings(parts, flags = 0)
Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.
flags may be ZMQ::NonBlocking.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/ffi-rzmq/socket.rb', line 266 def send_strings parts, flags = 0 return -1 if !parts || parts.empty? flags = NonBlocking if dontwait?(flags) parts[0..-2].each do |part| rc = send_string part, (flags | ZMQ::SNDMORE) return rc unless Util.resultcode_ok?(rc) end send_string parts[-1], flags end |
- (Object) sendmsg(message, flags = 0)
Queues the message for transmission. Message is assumed to conform to the same public API as #Message.
flags may take two values:
-
0 (default) - blocking operation
-
ZMQ::NonBlocking - non-blocking operation
-
ZMQ::SNDMORE - this message is part of a multi-part message
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
230 231 232 |
# File 'lib/ffi-rzmq/socket.rb', line 230 def sendmsg , flags = 0 __sendmsg__(@socket, .address, flags) end |
- (Object) sendmsgs(parts, flags = 0)
Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).
flags may be ZMQ::NonBlocking.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When flags is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/ffi-rzmq/socket.rb', line 292 def sendmsgs parts, flags = 0 return -1 if !parts || parts.empty? flags = NonBlocking if dontwait?(flags) parts[0..-2].each do |part| rc = sendmsg part, (flags | ZMQ::SNDMORE) return rc unless Util.resultcode_ok?(rc) end sendmsg parts[-1], flags end |
- (Object) setsockopt(name, value, length = nil)
Set the queue options on this socket.
Valid name values that take a numeric value are:
ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3 only)
ZMQ::MAXMSGSIZE (version 3 only)
ZMQ::SNDHWM (version 3 only)
ZMQ::RCVHWM (version 3 only)
ZMQ::MULTICAST_HOPS (version 3 only)
ZMQ::RCVTIMEO (version 3 only)
ZMQ::SNDTIMEO (version 3 only)
Valid name values that take a string value are:
ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE
Returns 0 when the operation completed successfully. Returns -1 when this operation failed.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/ffi-rzmq/socket.rb', line 128 def setsockopt name, value, length = nil if 1 == @option_lookup[name] length = 8 pointer = LibC.malloc length pointer.write_long_long value elsif 0 == @option_lookup[name] length = 4 pointer = LibC.malloc length pointer.write_int value elsif 2 == @option_lookup[name] length ||= value.size # note: not checking errno for failed memory allocations :( pointer = LibC.malloc length pointer.write_string value end rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length LibC.free(pointer) unless pointer.nil? || pointer.null? rc end |