Class: DripDrop::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/dripdrop/node.rb,
lib/dripdrop/node/nodelet.rb

Defined Under Namespace

Classes: Nodelet

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Node) initialize(opts = {}, &block)

A new instance of Node



19
20
21
22
23
24
25
26
27
28
# File 'lib/dripdrop/node.rb', line 19

def initialize(opts={},&block)
  @zm_reactor = nil # The instance of the zmq_machine reactor
  @block      = block
  @thread     = nil # Thread containing the reactors
  @routing    = {}  # Routing table
  @debug      = opts[:debug]
  @recipients_for       = {}
  @handler_default_opts = {:debug => @debug}
  @nodelets   = {}  # Cache of registered nodelets
end

Instance Attribute Details

- (Object) debug

Returns the value of attribute debug



17
18
19
# File 'lib/dripdrop/node.rb', line 17

def debug
  @debug
end

- (Object) routing (readonly)

Returns the value of attribute routing



16
17
18
# File 'lib/dripdrop/node.rb', line 16

def routing
  @routing
end

- (Object) zm_reactor (readonly)

Returns the value of attribute zm_reactor



16
17
18
# File 'lib/dripdrop/node.rb', line 16

def zm_reactor
  @zm_reactor
end

Instance Method Details

- (Object) http_client(address, opts = {})

An EM HTTP client. Example:

client = http_client(addr)
client.send_message(:name => 'name', :body => 'hi') do |resp_msg|
  puts resp_msg.inspect
end


223
224
225
226
227
# File 'lib/dripdrop/node.rb', line 223

def http_client(address,opts={})
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::HTTPClientHandler.new(uri, h_opts)
end

- (Object) http_server(address, opts = {}, &block)

Starts a new Thin HTTP server listening on address. Can have an on_recv handler that gets passed msg and response args.

http_server(addr) {|msg,response| response.send_message(msg)}


211
212
213
214
215
# File 'lib/dripdrop/node.rb', line 211

def http_server(address,opts={},&block)
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::HTTPServerHandler.new(uri, h_opts,&block)
end

- (Object) join

If the reactor has started, this blocks until the thread running the reactor joins. This should block forever unless stop is called.



52
53
54
55
56
57
58
# File 'lib/dripdrop/node.rb', line 52

def join
  if @thread
    @thread.join
  else
    raise "Can't join on a node that isn't yet started"
  end
end

- (Object) nodelet(name, &block)

Nodelets are a way of segmenting a DripDrop::Node. This can be used for both organization and deployment. One might want the production deployment of an app to be broken across multiple servers or processes for instance. Additionally, by combining nodelets with routes_for managing routes becomes a little easier.

Nodelets can be used thusly:

routes_for :heartbeat do
  route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
end

nodelet :heartbeat do
  zm_reactor.periodical_timer(500) do
  ticker.send_message(:name => 'tick')
end


139
140
141
142
143
# File 'lib/dripdrop/node.rb', line 139

def nodelet(name,&block)
  nlet = @nodelets[name] ||= Nodelet.new(self,name,routing)
  block.call(nlet) if block
  nlet
end

- (Object) recv_internal(dest, identifier, &block)

Defines a subscriber to the channel dest, to receive messages from send_internal. identifier is a unique identifier for this receiver. The identifier can be used by remove_recv_internal



250
251
252
253
254
255
256
# File 'lib/dripdrop/node.rb', line 250

def recv_internal(dest,identifier,&block)
  if @recipients_for[dest]
    @recipients_for[dest][identifier] =  block
  else
    @recipients_for[dest] = {identifier => block}
  end
end

- (Object) remove_recv_internal(dest, identifier)

Deletes a subscriber to the channel dest previously identified by a reciever created with recv_internal



260
261
262
263
# File 'lib/dripdrop/node.rb', line 260

def remove_recv_internal(dest,identifier)
  return false unless @recipients_for[dest]
  @recipients_for[dest].delete(identifier)
end

- (Object) route(name, handler_type, *handler_args)

Defines a new route. Routes are the recommended way to instantiate handlers. For example:

route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind
route :stats_sub, :zmq_subscribe, stats_pub.address, :connect

Will make the following methods available within the reactor block:

stats_pub  # A regular zmq_publish handler
:stats_sub # A regular zmq_subscribe handler

See the docs for routes_for for more info in grouping routes for nodelets and maintaining sanity in larger apps



84
85
86
# File 'lib/dripdrop/node.rb', line 84

def route(name,handler_type,*handler_args)
  route_full(nil, name, handler_type, *handler_args)
end

- (Object) route_full(nodelet, name, handler_type, *handler_args)

Probably not useful for most, apps. This is used internally to create a route for a given nodelet.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/dripdrop/node.rb', line 90

def route_full(nodelet, name, handler_type, *handler_args)
  # If we're in a route_for block, prepend appropriately
  full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name
  
  handler = self.send(handler_type, *handler_args)
  @routing[full_name] = handler
  
  # Define the route name as a singleton method
  (class << self; self; end).class_eval do
    define_method(full_name) { handler }
  end
  
  handler
end

- (Object) routes_for(nodelet_name, &block)

Defines a group of routes, to be used as the interface for a nodelet later on.

All routes defined with the route_for block will be prepended with the nodelet_name and an underscore. So, the following routes:

routes_for :forwarder do
  route :input,  :zmq_subscribe, 'tcp://127.0.0.1:2200', :bind
  route :output, :zmq_publish,   f.in.address, :connect
end

Will yield the routes: forwarder_input and forwarder_output globally. Within the block scope of the forwarder nodelet however, the routes are additionally available with their own short names. See the nodelet method for details.



119
120
121
122
# File 'lib/dripdrop/node.rb', line 119

def routes_for(nodelet_name,&block)
  nlet = nodelet(nodelet_name,&block)
  block.call(nlet)
end

- (Object) send_internal(dest, data)

An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.

This is useful for situations where you want to broadcast messages across your app, but need a way to properly delete listeners.

dest is the name of the pub/sub channel. data is any type of ruby var you'd like to send.



238
239
240
241
242
243
244
245
# File 'lib/dripdrop/node.rb', line 238

def send_internal(dest,data)
  return false unless @recipients_for[dest]
  blocks = @recipients_for[dest].values
  return false unless blocks
  blocks.each do |block|
    block.call(data)
  end
end

- (Object) start

Starts the reactors and runs the block passed to initialize. This is non-blocking.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/dripdrop/node.rb', line 32

def start
  @thread = Thread.new do
    EM.run do
      ZM::Reactor.new(:my_reactor).run do |zm_reactor|
        @zm_reactor = zm_reactor
        if @block
          self.instance_eval(&@block)
        elsif self.respond_to?(:action)
          self.action
        else
          raise "Could not start, no block or action specified"
        end
      end
    end
  end
end

- (Object) start!

Blocking version of start, equivalent to start then join



61
62
63
64
# File 'lib/dripdrop/node.rb', line 61

def start!
  self.start
  self.join
end

- (Object) stop

Stops the reactors. If you were blocked on #join, that will unblock.



67
68
69
70
# File 'lib/dripdrop/node.rb', line 67

def stop
  @zm_reactor.stop
  EM.stop
end

- (Object) websocket(address, opts = {})

Binds an EM websocket connection to address. takes blocks for on_open, on_recv, on_close and on_error.

For example on_recv could be used to echo incoming messages thusly:

websocket(addr).on_open {|ws|
  ws.send_message(:name => 'ws_open_ack')
}.on_recv {|msg,ws|
  ws.send(msg)
}.on_close {|ws|
}.on_error {|ws|
}

The ws object that's passed into the handlers is not the DripDrop::WebSocketHandler object, but an em-websocket object.



202
203
204
205
206
# File 'lib/dripdrop/node.rb', line 202

def websocket(address,opts={})
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::WebSocketHandler.new(uri,h_opts)
end

- (Object) zmq_publish(address, socket_ctype, opts = {})

Creates a ZMQ::PUB type socket, can only send messages via send_message



153
154
155
# File 'lib/dripdrop/node.rb', line 153

def zmq_publish(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQPubHandler,:pub_socket,address,socket_ctype,opts)
end

- (Object) zmq_pull(address, socket_ctype, opts = {}, &block)

Creates a ZMQ::PULL type socket. Can only receive messages via on_recv



158
159
160
# File 'lib/dripdrop/node.rb', line 158

def zmq_pull(address,socket_ctype,opts={},&block)
  zmq_handler(DripDrop::ZMQPullHandler,:pull_socket,address,socket_ctype,opts)
end

- (Object) zmq_push(address, socket_ctype, opts = {})

Creates a ZMQ::PUSH type socket, can only send messages via send_message



163
164
165
# File 'lib/dripdrop/node.rb', line 163

def zmq_push(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQPushHandler,:push_socket,address,socket_ctype,opts)
end

- (Object) zmq_subscribe(address, socket_ctype, opts = {}, &block)

Creates a ZMQ::SUB type socket. Can only receive messages via on_recv. zmq_subscribe sockets have a topic_filter option, which restricts which messages they can receive. It takes a regexp as an option.



148
149
150
# File 'lib/dripdrop/node.rb', line 148

def zmq_subscribe(address,socket_ctype,opts={},&block)
  zmq_handler(DripDrop::ZMQSubHandler,:sub_socket,address,socket_ctype,opts)
end

- (Object) zmq_xrep(address, socket_ctype, opts = {})

Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply to the original source of the message.

Receiving with XREP sockets in DripDrop is different than other types of sockets, on_recv passes 2 arguments to its callback, message, and response. A minimal example is shown below:

zmq_xrep(z_addr, :bind).on_recv do |message,response|
  response.send_message(message)
end


179
180
181
# File 'lib/dripdrop/node.rb', line 179

def zmq_xrep(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQXRepHandler,:xrep_socket,address,socket_ctype,opts)
end

- (Object) zmq_xreq(address, socket_ctype, opts = {})

See the documentation for zmq_xrep for more info



184
185
186
# File 'lib/dripdrop/node.rb', line 184

def zmq_xreq(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQXReqHandler,:xreq_socket,address,socket_ctype,opts)
end