Class: Bud::BudServer
- Inherits:
-
EM::Connection
- Object
- EM::Connection
- Bud::BudServer
- Defined in:
- lib/bud/server.rb
Overview
:nodoc: all
Instance Method Summary (collapse)
-
- (BudServer) initialize(bud, channel_filter)
constructor
A new instance of BudServer.
- - (Object) receive_data(data)
- - (Object) recv_message(obj)
Constructor Details
- (BudServer) initialize(bud, channel_filter)
A new instance of BudServer
4 5 6 7 8 9 10 |
# File 'lib/bud/server.rb', line 4 def initialize(bud, channel_filter) @bud = bud @channel_filter = channel_filter @filter_buf = {} @pac = MessagePack::Unpacker.new super end |
Instance Method Details
- (Object) receive_data(data)
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/bud/server.rb', line 12 def receive_data(data) # Feed the received data to the deserializer @pac.feed_each(data) do |obj| (obj) end # apply the channel filter to each channel's pending tuples buf_leftover = {} @filter_buf.each do |tbl_name, buf| if @channel_filter accepted, saved = @channel_filter.call(tbl_name, buf) else accepted = buf saved = [] end unless accepted.empty? @bud.inbound[tbl_name] ||= [] @bud.inbound[tbl_name].concat(accepted) end buf_leftover[tbl_name] = saved unless saved.empty? end @filter_buf = buf_leftover begin @bud.tick_internal if @bud.running_async rescue Exception => e # If we raise an exception here, EM dies, which causes problems (e.g., # other Bud instances in the same process will crash). Ignoring the # error isn't best though -- we should do better (#74). puts "Exception handling network messages: #{e}" puts e.backtrace puts "Inbound messages:" @bud.inbound.each do |chn_name, t| puts " #{t.inspect} (channel: #{chn_name})" end @bud.inbound.clear end @bud.rtracer.sleep if @bud.[:rtrace] end |
- (Object) recv_message(obj)
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/bud/server.rb', line 54 def (obj) unless (obj.class <= Array and obj.length == 3 and @bud.tables.include?(obj[0].to_sym) and obj[1].class <= Array and obj[2].class <= Array) raise Bud::Error, "bad inbound message of class #{obj.class}: #{obj.inspect}" end # Deserialize any nested marshalled values tbl_name, tuple, marshall_indexes = obj marshall_indexes.each do |i| if i < 0 || i >= tuple.length raise Bud::Error, "bad inbound message: marshalled value at index #{i}, #{obj.inspect}" end tuple[i] = Marshal.load(tuple[i]) end obj = [tbl_name, tuple] @bud.rtracer.recv(obj) if @bud.[:rtrace] @filter_buf[obj[0].to_sym] ||= [] @filter_buf[obj[0].to_sym] << obj[1] end |