Class: Roby::Interface::V2::Server
- Defined in:
- lib/roby/interface/v2/server.rb
Overview
The server-side object allowing to access an interface (e.g. a Roby app) through any communication channel from a single client
Defined Under Namespace
Classes: LogEventMatcher
Instance Attribute Summary collapse
-
#client_id ⇒ String
readonly
A string that allows the user to identify the client.
-
#interface ⇒ Interface
readonly
The interface object we are giving access to.
-
#io ⇒ Channel
readonly
The IO to the client.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #disable_notifications ⇒ Object
- #enable_notifications ⇒ Object
-
#flush_pending_packets ⇒ Object
Flush packets queued from #queue_packet.
- #handshake(id, commands) ⇒ Object
- #has_deferred_exception? ⇒ Boolean
-
#initialize(io, interface, main_thread: Thread.current) ⇒ Server
constructor
A new instance of Server.
-
#listen_to_notifications ⇒ Object
Listen to notifications on the underlying interface.
- #log_event_subscribe(matcher) ⇒ Object
- #log_event_subscribed?(name) ⇒ Boolean
- #log_event_unsubscribe(matcher_id) ⇒ Object
-
#notifications_enabled? ⇒ Boolean
Whether the messages should be forwarded to our clients.
-
#performed_handshake? ⇒ Boolean
Whether the remote side already called #handshake.
-
#poll ⇒ Object
Process one command from the client, and send the reply.
- #process_batch(path, calls) ⇒ Object
- #process_call(path, name, args, keywords) ⇒ Object
- #process_interface_call(path, name, args, keywords) ⇒ Object
-
#queue_packet(call) ⇒ Object
Write or queue a call, depending on whether the current thread is the main thread.
- #to_io ⇒ Object
- #write_packet(call, defer_exceptions: false) ⇒ Object
Constructor Details
#initialize(io, interface, main_thread: Thread.current) ⇒ Server
Returns a new instance of Server.
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/roby/interface/v2/server.rb', line 23 def initialize(io, interface, main_thread: Thread.current) @notifications_enabled = true @io = io @interface = interface @main_thread = main_thread @pending_packets = Queue.new @performed_handshake = false @log_event_subscriptions = {} @log_event_match_cache = {} end |
Instance Attribute Details
#client_id ⇒ String (readonly)
Returns a string that allows the user to identify the client.
14 15 16 |
# File 'lib/roby/interface/v2/server.rb', line 14 def client_id @client_id end |
#interface ⇒ Interface (readonly)
Returns the interface object we are giving access to.
12 13 14 |
# File 'lib/roby/interface/v2/server.rb', line 12 def interface @interface end |
#io ⇒ Channel (readonly)
Returns the IO to the client.
10 11 12 |
# File 'lib/roby/interface/v2/server.rb', line 10 def io @io end |
Instance Method Details
#close ⇒ Object
155 156 157 158 |
# File 'lib/roby/interface/v2/server.rb', line 155 def close io.close @listeners&.dispose end |
#closed? ⇒ Boolean
151 152 153 |
# File 'lib/roby/interface/v2/server.rb', line 151 def closed? io.closed? end |
#disable_notifications ⇒ Object
116 117 118 |
# File 'lib/roby/interface/v2/server.rb', line 116 def disable_notifications self.notifications_enabled = false end |
#enable_notifications ⇒ Object
112 113 114 |
# File 'lib/roby/interface/v2/server.rb', line 112 def enable_notifications self.notifications_enabled = true end |
#flush_pending_packets ⇒ Object
Flush packets queued from #queue_packet
82 83 84 85 86 87 88 89 90 |
# File 'lib/roby/interface/v2/server.rb', line 82 def flush_pending_packets packets = [] until @pending_packets.empty? packets << @pending_packets.pop end packets.each do |p| write_packet(p, defer_exceptions: true) end end |
#handshake(id, commands) ⇒ Object
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/roby/interface/v2/server.rb', line 96 def handshake(id, commands) @client_id = id Roby::Interface.info "new interface client: #{id}" result = commands.each_with_object({}) do |s, result| result[s] = interface.send(s) end @performed_handshake = true listen_to_notifications result end |
#has_deferred_exception? ⇒ Boolean
181 182 183 |
# File 'lib/roby/interface/v2/server.rb', line 181 def has_deferred_exception? @deferred_exception end |
#listen_to_notifications ⇒ Object
Listen to notifications on the underlying interface
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/roby/interface/v2/server.rb', line 36 def listen_to_notifications listeners = [] listeners << @interface.on_cycle_end do write_packet( [ :cycle_end, [@interface.execution_engine.cycle_index, @interface.execution_engine.cycle_start] ], defer_exceptions: true) end listeners << @interface.on_notification do |*args| if notifications_enabled? queue_packet([:notification, args]) elsif Thread.current == @main_thread flush_pending_packets end end listeners << @interface.on_ui_event do |*args| queue_packet([:ui_event, args]) end listeners << @interface.on_job_notification do |*args| write_packet([:job_progress, args], defer_exceptions: true) end listeners << @interface.on_exception do |*args| write_packet([:exception, args], defer_exceptions: true) end listeners << @interface.on_log_event do |*args| queue_packet([:log_event, args]) if log_event_subscribed?(args[0]) end @listeners = Roby.disposable(*listeners) end |
#log_event_subscribe(matcher) ⇒ Object
135 136 137 138 139 140 |
# File 'lib/roby/interface/v2/server.rb', line 135 def log_event_subscribe(matcher) matcher = LogEventMatcher.new(matcher: matcher) matcher.id = matcher.object_id @log_event_subscriptions[matcher.id] = matcher matcher.id end |
#log_event_subscribed?(name) ⇒ Boolean
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/roby/interface/v2/server.rb', line 122 def log_event_subscribed?(name) name = name.to_s return true if @log_event_match_cache[name] matcher = @log_event_subscriptions.each_value.find do |m| m.matcher === name end return false unless matcher @log_event_match_cache[name] = matcher true end |
#log_event_unsubscribe(matcher_id) ⇒ Object
142 143 144 145 146 147 148 149 |
# File 'lib/roby/interface/v2/server.rb', line 142 def log_event_unsubscribe(matcher_id) matcher = @log_event_subscriptions.delete(matcher_id) return unless matcher @log_event_match_cache.delete_if do |_, m| m.id == matcher_id end end |
#notifications_enabled? ⇒ Boolean
Returns whether the messages should be forwarded to our clients.
18 |
# File 'lib/roby/interface/v2/server.rb', line 18 attr_predicate :notifications_enabled?, true |
#performed_handshake? ⇒ Boolean
Whether the remote side already called #handshake
108 109 110 |
# File 'lib/roby/interface/v2/server.rb', line 108 def performed_handshake? @performed_handshake end |
#poll ⇒ Object
Process one command from the client, and send the reply
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/roby/interface/v2/server.rb', line 199 def poll raise @deferred_exception if has_deferred_exception? path, m, args, keywords = io.read_packet return unless m begin reply = if m == :process_batch process_batch(path, args.first) else process_call(path, m, args, keywords) end true rescue Exception => e write_packet([:bad_call, e]) return end begin write_packet([:reply, reply]) rescue ComError raise rescue Exception => e write_packet([:protocol_error, e]) raise end end |
#process_batch(path, calls) ⇒ Object
160 161 162 163 164 |
# File 'lib/roby/interface/v2/server.rb', line 160 def process_batch(path, calls) calls.map do |p, m, a, kw| process_call(path + p, m, a, kw) end end |
#process_call(path, name, args, keywords) ⇒ Object
166 167 168 169 170 171 172 |
# File 'lib/roby/interface/v2/server.rb', line 166 def process_call(path, name, args, keywords) if path.empty? && respond_to?(name) send(name, *args, **keywords) else process_interface_call(path, name, args, keywords) end end |
#process_interface_call(path, name, args, keywords) ⇒ Object
174 175 176 177 178 179 |
# File 'lib/roby/interface/v2/server.rb', line 174 def process_interface_call(path, name, args, keywords) receiver = path.inject(interface) do |obj, subcommand| obj.send(subcommand) end receiver.send(name, *args, **keywords) end |
#queue_packet(call) ⇒ Object
Write or queue a call, depending on whether the current thread is the main thread
Time ordering between out-of-thread and in-thread packets is not guaranteed, so this can only be used in cases where it does not matter.
73 74 75 76 77 78 79 |
# File 'lib/roby/interface/v2/server.rb', line 73 def queue_packet(call) if Thread.current == @main_thread write_packet(call, defer_exceptions: true) else @pending_packets << call end end |
#to_io ⇒ Object
92 93 94 |
# File 'lib/roby/interface/v2/server.rb', line 92 def to_io io.to_io end |
#write_packet(call, defer_exceptions: false) ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/roby/interface/v2/server.rb', line 185 def write_packet(call, defer_exceptions: false) return if has_deferred_exception? flush_pending_packets io.write_packet(call) rescue Exception => e if defer_exceptions @deferred_exception = e else raise end end |