Class: DripDrop::Node
- Inherits:
-
Object
- Object
- DripDrop::Node
- Defined in:
- lib/dripdrop/node.rb,
lib/dripdrop/node/nodelet.rb
Defined Under Namespace
Classes: Nodelet
Instance Attribute Summary (collapse)
-
- (Object) debug
Returns the value of attribute debug.
-
- (Object) routing
readonly
Returns the value of attribute routing.
-
- (Object) zm_reactor
readonly
Returns the value of attribute zm_reactor.
Instance Method Summary (collapse)
-
- (Object) http_client(address, opts = {})
An EM HTTP client.
-
- (Object) http_server(address, opts = {}, &block)
Starts a new Thin HTTP server listening on address.
-
- (Node) initialize(opts = {}, &block)
constructor
A new instance of Node.
-
- (Object) join
If the reactor has started, this blocks until the thread running the reactor joins.
-
- (Object) nodelet(name, &block)
Nodelets are a way of segmenting a DripDrop::Node.
-
- (Object) recv_internal(dest, identifier, &block)
Defines a subscriber to the channel dest, to receive messages from send_internal.
-
- (Object) remove_recv_internal(dest, identifier)
Deletes a subscriber to the channel dest previously identified by a reciever created with recv_internal.
-
- (Object) route(name, handler_type, *handler_args)
Defines a new route.
-
- (Object) route_full(nodelet, name, handler_type, *handler_args)
Probably not useful for most, apps.
-
- (Object) routes_for(nodelet_name, &block)
Defines a group of routes, to be used as the interface for a nodelet later on.
-
- (Object) send_internal(dest, data)
An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.
-
- (Object) start
Starts the reactors and runs the block passed to initialize.
-
- (Object) start!
Blocking version of start, equivalent to start then join.
-
- (Object) stop
Stops the reactors.
-
- (Object) websocket(address, opts = {})
Binds an EM websocket connection to address.
-
- (Object) zmq_publish(address, socket_ctype, opts = {})
Creates a ZMQ::PUB type socket, can only send messages via send_message.
-
- (Object) zmq_pull(address, socket_ctype, opts = {}, &block)
Creates a ZMQ::PULL type socket.
-
- (Object) zmq_push(address, socket_ctype, opts = {})
Creates a ZMQ::PUSH type socket, can only send messages via send_message.
-
- (Object) zmq_subscribe(address, socket_ctype, opts = {}, &block)
Creates a ZMQ::SUB type socket.
-
- (Object) zmq_xrep(address, socket_ctype, opts = {})
Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited.
-
- (Object) zmq_xreq(address, socket_ctype, opts = {})
See the documentation for zmq_xrep for more info.
Constructor Details
- (Node) initialize(opts = {}, &block)
A new instance of Node
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/dripdrop/node.rb', line 19 def initialize(opts={},&block) @zm_reactor = nil # The instance of the zmq_machine reactor @block = block @thread = nil # Thread containing the reactors @routing = {} # Routing table @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} @nodelets = {} # Cache of registered nodelets end |
Instance Attribute Details
- (Object) debug
Returns the value of attribute debug
17 18 19 |
# File 'lib/dripdrop/node.rb', line 17 def debug @debug end |
- (Object) routing (readonly)
Returns the value of attribute routing
16 17 18 |
# File 'lib/dripdrop/node.rb', line 16 def routing @routing end |
- (Object) zm_reactor (readonly)
Returns the value of attribute zm_reactor
16 17 18 |
# File 'lib/dripdrop/node.rb', line 16 def zm_reactor @zm_reactor end |
Instance Method Details
- (Object) http_client(address, opts = {})
An EM HTTP client. Example:
client = http_client(addr)
client.(:name => 'name', :body => 'hi') do |resp_msg|
puts resp_msg.inspect
end
223 224 225 226 227 |
# File 'lib/dripdrop/node.rb', line 223 def http_client(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::HTTPClientHandler.new(uri, h_opts) end |
- (Object) http_server(address, opts = {}, &block)
Starts a new Thin HTTP server listening on address. Can have an on_recv handler that gets passed msg and response args.
http_server(addr) {|msg,response| response.(msg)}
211 212 213 214 215 |
# File 'lib/dripdrop/node.rb', line 211 def http_server(address,opts={},&block) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::HTTPServerHandler.new(uri, h_opts,&block) end |
- (Object) join
If the reactor has started, this blocks until the thread running the reactor joins. This should block forever unless stop is called.
52 53 54 55 56 57 58 |
# File 'lib/dripdrop/node.rb', line 52 def join if @thread @thread.join else raise "Can't join on a node that isn't yet started" end end |
- (Object) nodelet(name, &block)
Nodelets are a way of segmenting a DripDrop::Node. This can be used for both organization and deployment. One might want the production deployment of an app to be broken across multiple servers or processes for instance. Additionally, by combining nodelets with routes_for managing routes becomes a little easier.
Nodelets can be used thusly:
routes_for :heartbeat do
route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
end
nodelet :heartbeat do
zm_reactor.periodical_timer(500) do
ticker.send_message(:name => 'tick')
end
139 140 141 142 143 |
# File 'lib/dripdrop/node.rb', line 139 def nodelet(name,&block) nlet = @nodelets[name] ||= Nodelet.new(self,name,routing) block.call(nlet) if block nlet end |
- (Object) recv_internal(dest, identifier, &block)
Defines a subscriber to the channel dest, to receive messages from send_internal. identifier is a unique identifier for this receiver. The identifier can be used by remove_recv_internal
250 251 252 253 254 255 256 |
# File 'lib/dripdrop/node.rb', line 250 def recv_internal(dest,identifier,&block) if @recipients_for[dest] @recipients_for[dest][identifier] = block else @recipients_for[dest] = {identifier => block} end end |
- (Object) remove_recv_internal(dest, identifier)
Deletes a subscriber to the channel dest previously identified by a reciever created with recv_internal
260 261 262 263 |
# File 'lib/dripdrop/node.rb', line 260 def remove_recv_internal(dest,identifier) return false unless @recipients_for[dest] @recipients_for[dest].delete(identifier) end |
- (Object) route(name, handler_type, *handler_args)
Defines a new route. Routes are the recommended way to instantiate handlers. For example:
route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind
route :stats_sub, :zmq_subscribe, stats_pub.address, :connect
Will make the following methods available within the reactor block:
stats_pub # A regular zmq_publish handler
:stats_sub # A regular zmq_subscribe handler
See the docs for routes_for for more info in grouping routes for nodelets and maintaining sanity in larger apps
84 85 86 |
# File 'lib/dripdrop/node.rb', line 84 def route(name,handler_type,*handler_args) route_full(nil, name, handler_type, *handler_args) end |
- (Object) route_full(nodelet, name, handler_type, *handler_args)
Probably not useful for most, apps. This is used internally to create a route for a given nodelet.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/dripdrop/node.rb', line 90 def route_full(nodelet, name, handler_type, *handler_args) # If we're in a route_for block, prepend appropriately full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name handler = self.send(handler_type, *handler_args) @routing[full_name] = handler # Define the route name as a singleton method (class << self; self; end).class_eval do define_method(full_name) { handler } end handler end |
- (Object) routes_for(nodelet_name, &block)
Defines a group of routes, to be used as the interface for a nodelet later on.
All routes defined with the route_for block will be prepended with the nodelet_name and an underscore. So, the following routes:
routes_for :forwarder do
route :input, :zmq_subscribe, 'tcp://127.0.0.1:2200', :bind
route :output, :zmq_publish, f.in.address, :connect
end
Will yield the routes: forwarder_input and forwarder_output globally. Within the block scope of the forwarder nodelet however, the routes are additionally available with their own short names. See the nodelet method for details.
119 120 121 122 |
# File 'lib/dripdrop/node.rb', line 119 def routes_for(nodelet_name,&block) nlet = nodelet(nodelet_name,&block) block.call(nlet) end |
- (Object) send_internal(dest, data)
An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.
This is useful for situations where you want to broadcast messages across your app, but need a way to properly delete listeners.
dest is the name of the pub/sub channel. data is any type of ruby var you'd like to send.
238 239 240 241 242 243 244 245 |
# File 'lib/dripdrop/node.rb', line 238 def send_internal(dest,data) return false unless @recipients_for[dest] blocks = @recipients_for[dest].values return false unless blocks blocks.each do |block| block.call(data) end end |
- (Object) start
Starts the reactors and runs the block passed to initialize. This is non-blocking.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/dripdrop/node.rb', line 32 def start @thread = Thread.new do EM.run do ZM::Reactor.new(:my_reactor).run do |zm_reactor| @zm_reactor = zm_reactor if @block self.instance_eval(&@block) elsif self.respond_to?(:action) self.action else raise "Could not start, no block or action specified" end end end end end |
- (Object) start!
Blocking version of start, equivalent to start then join
61 62 63 64 |
# File 'lib/dripdrop/node.rb', line 61 def start! self.start self.join end |
- (Object) stop
Stops the reactors. If you were blocked on #join, that will unblock.
67 68 69 70 |
# File 'lib/dripdrop/node.rb', line 67 def stop @zm_reactor.stop EM.stop end |
- (Object) websocket(address, opts = {})
Binds an EM websocket connection to address. takes blocks for on_open, on_recv, on_close and on_error.
For example on_recv could be used to echo incoming messages thusly:
websocket(addr).on_open {|ws|
ws.(:name => 'ws_open_ack')
}.on_recv {|msg,ws|
ws.send(msg)
}.on_close {|ws|
}.on_error {|ws|
}
The ws object that's passed into the handlers is not the DripDrop::WebSocketHandler object, but an em-websocket object.
202 203 204 205 206 |
# File 'lib/dripdrop/node.rb', line 202 def websocket(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::WebSocketHandler.new(uri,h_opts) end |
- (Object) zmq_publish(address, socket_ctype, opts = {})
Creates a ZMQ::PUB type socket, can only send messages via send_message
153 154 155 |
# File 'lib/dripdrop/node.rb', line 153 def zmq_publish(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQPubHandler,:pub_socket,address,socket_ctype,opts) end |
- (Object) zmq_pull(address, socket_ctype, opts = {}, &block)
Creates a ZMQ::PULL type socket. Can only receive messages via on_recv
158 159 160 |
# File 'lib/dripdrop/node.rb', line 158 def zmq_pull(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQPullHandler,:pull_socket,address,socket_ctype,opts) end |
- (Object) zmq_push(address, socket_ctype, opts = {})
Creates a ZMQ::PUSH type socket, can only send messages via send_message
163 164 165 |
# File 'lib/dripdrop/node.rb', line 163 def zmq_push(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQPushHandler,:push_socket,address,socket_ctype,opts) end |
- (Object) zmq_subscribe(address, socket_ctype, opts = {}, &block)
Creates a ZMQ::SUB type socket. Can only receive messages via on_recv. zmq_subscribe sockets have a topic_filter option, which restricts which messages they can receive. It takes a regexp as an option.
148 149 150 |
# File 'lib/dripdrop/node.rb', line 148 def zmq_subscribe(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts) end |
- (Object) zmq_xrep(address, socket_ctype, opts = {})
Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply to the original source of the message.
Receiving with XREP sockets in DripDrop is different than other types of sockets, on_recv passes 2 arguments to its callback, message, and response. A minimal example is shown below:
zmq_xrep(z_addr, :bind).on_recv do |,response|
response.()
end
179 180 181 |
# File 'lib/dripdrop/node.rb', line 179 def zmq_xrep(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXRepHandler,:xrep_socket,address,socket_ctype,opts) end |
- (Object) zmq_xreq(address, socket_ctype, opts = {})
See the documentation for zmq_xrep for more info
184 185 186 |
# File 'lib/dripdrop/node.rb', line 184 def zmq_xreq(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXReqHandler,:xreq_socket,address,socket_ctype,opts) end |