Class: Roby::Interface::V2::Client
- Defined in:
- lib/roby/interface/v2/client.rb
Overview
The client-side object that allows to access an interface (e.g. a Roby app) from another process than the Roby controller
Defined Under Namespace
Classes: BatchContext, Job, NoSuchAction, RemoteError, TimeoutError
Constant Summary collapse
- DEFAULT_CALL_TIMEOUT =
Default value for #call_timeout
10
Instance Attribute Summary collapse
-
#actions ⇒ Array<Roby::Actions::Model::Action>
readonly
Set of known actions.
-
#call_timeout ⇒ Float
Timeout, in seconds, in blocking remote calls.
-
#commands ⇒ Hash
readonly
The set of available commands.
-
#cycle_index ⇒ Integer
readonly
Index of the last processed cycle.
-
#cycle_start_time ⇒ Time
readonly
Time of the last processed cycle.
-
#exception_queue ⇒ Array<Integer,Array>
readonly
List of existing exceptions.
-
#handshake_results ⇒ Hash<Symbol,Object>
readonly
Result of the calls done during the handshake.
-
#io ⇒ Channel
readonly
The IO to the server.
-
#job_progress_queue ⇒ Array<Integer,Array>
readonly
List of existing job progress information.
-
#notification_queue ⇒ Array<Integer,Array>
readonly
List of existing notifications.
-
#pending_async_calls ⇒ Array<Hash>
readonly
List of the pending async calls.
-
#ui_event_queue ⇒ Array<Integer,Array>
readonly
List of queued UI events.
Instance Method Summary collapse
-
#allocate_message_id ⇒ Object
private
Allocation of unique IDs for notification messages.
-
#async_call(path, m, *args, **keywords, &block) ⇒ Object
private
Asynchronously call a method on the interface or on one of the interface's subcommands.
-
#async_call_pending?(a_call) ⇒ Boolean
private
Whether the async call is still pending.
-
#call(path, m, *args, **keywords) ⇒ Object
private
Call a method on the interface or on one of the interface's subcommands.
-
#close ⇒ Object
Close the communication channel.
-
#closed? ⇒ Boolean
Whether the communication channel to the server is closed.
-
#create_batch ⇒ BatchContext
Create a batch context.
-
#each_job ⇒ Object
Enumerate the current jobs.
-
#find_action_by_name(name) ⇒ Actions::Models::Action?
Find an action by its name.
-
#find_all_actions_matching(matcher) ⇒ Array<Actions::Models::Action>
Finds all actions whose name matches a pattern.
-
#find_all_jobs_by_action_name(action_name) ⇒ Array<Job>
Find all the jobs that match the given action name.
- #find_subcommand_by_name(name) ⇒ Object
-
#has_action?(name) ⇒ Boolean
Tests whether the interface has an action with that name.
-
#has_exceptions? ⇒ Boolean
Whether some exception notifications have been queued.
-
#has_job_progress? ⇒ Boolean
Whether some job progress information is currently queued.
-
#has_notifications? ⇒ Boolean
Whether some generic notifications have been queued.
-
#has_subcommand?(name) ⇒ Boolean
Tests whether the remote interface has a given subcommand.
-
#has_ui_event? ⇒ Boolean
Whether some UI events have been queued.
-
#initialize(io, id, handshake: %i[actions commands])) ⇒ Client
constructor
Create a client endpoint to a Roby interface [Server].
-
#method_missing(m, *args, **keywords, &b) ⇒ Object
rubocop:disable Style/MethodMissingSuper.
-
#poll(expected_count = 0, timeout: nil) ⇒ Object
Polls for new data on the IO channel.
-
#pop_exception ⇒ (Integer,Array)
Remove and return the oldest exception notification.
-
#pop_job_progress ⇒ (Integer,Array)
Remove and return the oldest job information message.
-
#pop_log_event ⇒ (String,Array)?
Get a queued received event (processed in FIFO).
-
#pop_notification ⇒ (Integer,Array)
Remove and return the oldest generic notification message.
-
#pop_ui_event ⇒ Object
Remove the oldest UI event and return it.
-
#process_batch(batch) ⇒ Array
Send all commands gathered in a batch for processing on the remote server.
-
#process_packet(m, args = nil) ⇒ Boolean
private
Process a message as received on #io.
-
#process_pending_async_call(error, result) ⇒ Object
private
Remove and call the block of a pending async call.
-
#queue_exception(kind, error, tasks, job_ids) ⇒ Object
private
Push an exception notification to #exception_queue.
-
#queue_job_progress(kind, job_id, job_name, *args) ⇒ Object
private
Push a job notification to #job_progress_queue.
-
#queue_log_event(m, values) ⇒ Object
private
Push a system event to cycle_events.
-
#queue_notification(source, level, message) ⇒ Object
private
Push a generic notification to #notification_queue.
-
#queue_ui_event(event_name, *args) ⇒ Object
private
Push a UI event to #ui_event_queue.
- #reload_actions ⇒ Object
-
#start_job(action_name, **arguments) ⇒ Object
Start the given job within the batch.
-
#stats ⇒ Stats
I/O statistics.
-
#subcommand(name) ⇒ Object
Returns a shell object.
-
#to_io ⇒ Object
The underlying IO object.
-
#wait(timeout: nil) ⇒ Boolean
Wait until there is data to process on the IO channel.
Constructor Details
#initialize(io, id, handshake: %i[actions commands])) ⇒ Client
Create a client endpoint to a Roby interface [Server]
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/roby/interface/v2/client.rb', line 69 def initialize(io, id, handshake: %i[actions commands]) @pending_async_calls = [] @io = io @message_id = 0 @notification_queue = [] @job_progress_queue = [] @exception_queue = [] @log_event_queue = [] @ui_event_queue = [] @call_timeout = DEFAULT_CALL_TIMEOUT @handshake_results = call([], :handshake, id, handshake) @actions = @handshake_results[:actions] @commands = Protocol.unmarshal_object( @io, @handshake_results[:commands] ) end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(m, *args, **keywords, &b) ⇒ Object
rubocop:disable Style/MethodMissingSuper
655 656 657 658 659 660 661 662 663 |
# File 'lib/roby/interface/v2/client.rb', line 655 def method_missing(m, *args, **keywords, &b) # rubocop:disable Style/MethodMissingSuper if (sub = find_subcommand_by_name(m.to_s)) SubcommandClient.new(self, m.to_s, sub.description, sub.commands) elsif (match = /^async_(.*)$/.match(m.to_s)) async_call([], match[1].to_sym, *args, **keywords, &b) else call([], m, *args, **keywords) end end |
Instance Attribute Details
#actions ⇒ Array<Roby::Actions::Model::Action> (readonly)
Returns set of known actions.
17 18 19 |
# File 'lib/roby/interface/v2/client.rb', line 17 def actions @actions end |
#call_timeout ⇒ Float
Timeout, in seconds, in blocking remote calls
Defaults to DEFAULT_CALL_TIMEOUT
55 56 57 |
# File 'lib/roby/interface/v2/client.rb', line 55 def call_timeout @call_timeout end |
#commands ⇒ Hash (readonly)
Returns the set of available commands.
19 20 21 |
# File 'lib/roby/interface/v2/client.rb', line 19 def commands @commands end |
#cycle_index ⇒ Integer (readonly)
Returns index of the last processed cycle.
39 40 41 |
# File 'lib/roby/interface/v2/client.rb', line 39 def cycle_index @cycle_index end |
#cycle_start_time ⇒ Time (readonly)
Returns time of the last processed cycle.
41 42 43 |
# File 'lib/roby/interface/v2/client.rb', line 41 def cycle_start_time @cycle_start_time end |
#exception_queue ⇒ Array<Integer,Array> (readonly)
Returns list of existing exceptions. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID.
32 33 34 |
# File 'lib/roby/interface/v2/client.rb', line 32 def exception_queue @exception_queue end |
#handshake_results ⇒ Hash<Symbol,Object> (readonly)
Result of the calls done during the handshake
48 49 50 |
# File 'lib/roby/interface/v2/client.rb', line 48 def handshake_results @handshake_results end |
#io ⇒ Channel (readonly)
Returns the IO to the server.
15 16 17 |
# File 'lib/roby/interface/v2/client.rb', line 15 def io @io end |
#job_progress_queue ⇒ Array<Integer,Array> (readonly)
Returns list of existing job progress information. The integer is an ID that can be used to refer to the job progress information. It is always growing and will never collide with a job progress and exception ID.
24 25 26 |
# File 'lib/roby/interface/v2/client.rb', line 24 def job_progress_queue @job_progress_queue end |
#notification_queue ⇒ Array<Integer,Array> (readonly)
Returns list of existing notifications. The integer is an ID that can be used to refer to the notification. It is always growing and will never collide with an exception ID.
28 29 30 |
# File 'lib/roby/interface/v2/client.rb', line 28 def notification_queue @notification_queue end |
#pending_async_calls ⇒ Array<Hash> (readonly)
Returns list of the pending async calls.
43 44 45 |
# File 'lib/roby/interface/v2/client.rb', line 43 def pending_async_calls @pending_async_calls end |
#ui_event_queue ⇒ Array<Integer,Array> (readonly)
Returns list of queued UI events. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID.
36 37 38 |
# File 'lib/roby/interface/v2/client.rb', line 36 def ui_event_queue @ui_event_queue end |
Instance Method Details
#allocate_message_id ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Allocation of unique IDs for notification messages
237 238 239 |
# File 'lib/roby/interface/v2/client.rb', line 237 def @message_id += 1 end |
#async_call(path, m, *args, **keywords, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Asynchronously call a method on the interface or on one of the interface's subcommands
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/roby/interface/v2/client.rb', line 409 def async_call(path, m, *args, **keywords, &block) raise "no callback block given" unless block_given? if (action_match = /(.*)!$/.match(m.to_s)) action_name = action_match[1] unless find_action_by_name(action_name) raise NoSuchAction, "there is no action called #{action_name} on #{self}" end path = [] m = :start_job args = [action_name, *args] end io.write_packet([path, m, args, keywords]) pending_async_calls << { block: block, path: path, m: m, args: args } pending_async_calls.last.freeze end |
#async_call_pending?(a_call) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether the async call is still pending
434 435 436 |
# File 'lib/roby/interface/v2/client.rb', line 434 def async_call_pending?(a_call) pending_async_calls.any? { |item| item.equal?(a_call) } end |
#call(path, m, *args, **keywords) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call a method on the interface or on one of the interface's subcommands
383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/roby/interface/v2/client.rb', line 383 def call(path, m, *args, **keywords) if (action_match = /(.*)!$/.match(m.to_s)) unless args.empty? raise ArgumentError, "jobs only accept keyword arguments" end start_job(action_match[1], **keywords) else io.write_packet([path, m, args, keywords]) result, = poll(1, timeout: @call_timeout) result end end |
#close ⇒ Object
Close the communication channel
98 99 100 |
# File 'lib/roby/interface/v2/client.rb', line 98 def close io.close end |
#closed? ⇒ Boolean
Whether the communication channel to the server is closed
93 94 95 |
# File 'lib/roby/interface/v2/client.rb', line 93 def closed? io.closed? end |
#create_batch ⇒ BatchContext
Create a batch context
Messages sent to the returned object are validated as much as possible and gathered in a list. Call #process_batch to send all the gathered calls at once to the remote server
617 618 619 |
# File 'lib/roby/interface/v2/client.rb', line 617 def create_batch BatchContext.new(self) end |
#each_job ⇒ Object
Enumerate the current jobs
593 594 595 596 597 598 599 |
# File 'lib/roby/interface/v2/client.rb', line 593 def each_job return enum_for(__method__) unless block_given? jobs.each do |job_id, (job_state, placeholder_task, job_task)| yield(Job.new(job_id, job_state, placeholder_task, job_task)) end end |
#find_action_by_name(name) ⇒ Actions::Models::Action?
Find an action by its name
This is a local operation using the information gathered at connection time
119 120 121 |
# File 'lib/roby/interface/v2/client.rb', line 119 def find_action_by_name(name) actions.find { |act| act.name == name } end |
#find_all_actions_matching(matcher) ⇒ Array<Actions::Models::Action>
Finds all actions whose name matches a pattern
130 131 132 |
# File 'lib/roby/interface/v2/client.rb', line 130 def find_all_actions_matching(matcher) actions.find_all { |act| matcher === act.name } end |
#find_all_jobs_by_action_name(action_name) ⇒ Array<Job>
Find all the jobs that match the given action name
604 605 606 607 608 |
# File 'lib/roby/interface/v2/client.rb', line 604 def find_all_jobs_by_action_name(action_name) each_job.find_all do |j| j.action_model.name == action_name end end |
#find_subcommand_by_name(name) ⇒ Object
636 637 638 |
# File 'lib/roby/interface/v2/client.rb', line 636 def find_subcommand_by_name(name) commands[name] end |
#has_action?(name) ⇒ Boolean
Tests whether the interface has an action with that name
108 109 110 |
# File 'lib/roby/interface/v2/client.rb', line 108 def has_action?(name) find_action_by_name(name) end |
#has_exceptions? ⇒ Boolean
Whether some exception notifications have been queued
340 341 342 |
# File 'lib/roby/interface/v2/client.rb', line 340 def has_exceptions? !exception_queue.empty? end |
#has_job_progress? ⇒ Boolean
Whether some job progress information is currently queued
254 255 256 |
# File 'lib/roby/interface/v2/client.rb', line 254 def has_job_progress? !job_progress_queue.empty? end |
#has_notifications? ⇒ Boolean
Whether some generic notifications have been queued
295 296 297 |
# File 'lib/roby/interface/v2/client.rb', line 295 def has_notifications? !notification_queue.empty? end |
#has_subcommand?(name) ⇒ Boolean
Tests whether the remote interface has a given subcommand
641 642 643 |
# File 'lib/roby/interface/v2/client.rb', line 641 def has_subcommand?(name) commands.key?(name) end |
#has_ui_event? ⇒ Boolean
Whether some UI events have been queued
316 317 318 |
# File 'lib/roby/interface/v2/client.rb', line 316 def has_ui_event? !ui_event_queue.empty? end |
#poll(expected_count = 0, timeout: nil) ⇒ Object
Polls for new data on the IO channel
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/roby/interface/v2/client.rb', line 202 def poll(expected_count = 0, timeout: nil) result = nil timeout = if expected_count > 0 then timeout else 0 end has_cycle_end = false while (packet = io.read_packet(timeout)) has_cycle_end = process_packet(*packet) do |reply_value| if result raise ProtocolError, "got more than one sync reply in a single poll call" end result = reply_value expected_count -= 1 end if expected_count <= 0 break if has_cycle_end timeout = 0 end end if expected_count != 0 within_s = " within #{timeout}s" if timeout raise TimeoutError, "failed to receive expected reply#{within_s}" end [result, has_cycle_end] end |
#pop_exception ⇒ (Integer,Array)
Remove and return the oldest exception notification
349 350 351 |
# File 'lib/roby/interface/v2/client.rb', line 349 def pop_exception exception_queue.shift end |
#pop_job_progress ⇒ (Integer,Array)
Remove and return the oldest job information message
263 264 265 |
# File 'lib/roby/interface/v2/client.rb', line 263 def pop_job_progress job_progress_queue.shift end |
#pop_log_event ⇒ (String,Array)?
Get a queued received event (processed in FIFO)
In addition to the events generated by the remote Roby app, the client emits the "cycle_end" event
281 282 283 |
# File 'lib/roby/interface/v2/client.rb', line 281 def pop_log_event @log_event_queue.shift end |
#pop_notification ⇒ (Integer,Array)
Remove and return the oldest generic notification message
304 305 306 |
# File 'lib/roby/interface/v2/client.rb', line 304 def pop_notification notification_queue.shift end |
#pop_ui_event ⇒ Object
Remove the oldest UI event and return it
321 322 323 |
# File 'lib/roby/interface/v2/client.rb', line 321 def pop_ui_event ui_event_queue.shift end |
#process_batch(batch) ⇒ Array
Send all commands gathered in a batch for processing on the remote server
627 628 629 630 |
# File 'lib/roby/interface/v2/client.rb', line 627 def process_batch(batch) ret = call([], :process_batch, batch.__calls) BatchContext::Return.from_calls_and_return(batch.__calls, ret) end |
#process_packet(m, args = nil) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process a message as received on #io
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/roby/interface/v2/client.rb', line 142 def process_packet(m, args = nil) case m when :cycle_end @cycle_index, @cycle_start_time = *args queue_log_event("cycle_end", []) return true when :bad_call if !pending_async_calls.empty? process_pending_async_call(args, nil) else e = args raise RemoteError, e., (e.backtrace + caller) end when :reply if !pending_async_calls.empty? process_pending_async_call(nil, args) else yield args end when :job_progress queue_job_progress(*args) when :notification queue_notification(*args) when :ui_event queue_ui_event(*args) when :exception queue_exception(*args) when :log_event queue_log_event(*args) else raise ProtocolError, "unexpected reply from #{io}: #{m} args=#{args}" end false end |
#process_pending_async_call(error, result) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Remove and call the block of a pending async call
191 192 193 194 |
# File 'lib/roby/interface/v2/client.rb', line 191 def process_pending_async_call(error, result) current_call = pending_async_calls.shift current_call[:block].call(error, result) end |
#queue_exception(kind, error, tasks, job_ids) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Push an exception notification to #exception_queue
It can be retrieved with #pop_exception
See the yield parameters of Interface#on_exception for the overall argument format.
333 334 335 336 337 |
# File 'lib/roby/interface/v2/client.rb', line 333 def queue_exception(kind, error, tasks, job_ids) exception_queue.push( [, [kind, error, tasks, job_ids]] ) end |
#queue_job_progress(kind, job_id, job_name, *args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Push a job notification to #job_progress_queue
See the yield parameters of Interface#on_job_notification for the overall argument format.
247 248 249 250 251 |
# File 'lib/roby/interface/v2/client.rb', line 247 def queue_job_progress(kind, job_id, job_name, *args) job_progress_queue.push( [, [kind, job_id, job_name, *args]] ) end |
#queue_log_event(m, values) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Push a system event to cycle_events
270 271 272 |
# File 'lib/roby/interface/v2/client.rb', line 270 def queue_log_event(m, values) @log_event_queue.push([m, values]) end |
#queue_notification(source, level, message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Push a generic notification to #notification_queue
288 289 290 291 292 |
# File 'lib/roby/interface/v2/client.rb', line 288 def queue_notification(source, level, ) notification_queue.push( [, [source, level, ]] ) end |
#queue_ui_event(event_name, *args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Push a UI event to #ui_event_queue
311 312 313 |
# File 'lib/roby/interface/v2/client.rb', line 311 def queue_ui_event(event_name, *args) ui_event_queue.push [, [event_name, *args]] end |
#reload_actions ⇒ Object
632 633 634 |
# File 'lib/roby/interface/v2/client.rb', line 632 def reload_actions @actions = call([], :reload_actions) end |
#start_job(action_name, **arguments) ⇒ Object
Start the given job within the batch
362 363 364 365 366 367 368 369 |
# File 'lib/roby/interface/v2/client.rb', line 362 def start_job(action_name, **arguments) unless find_action_by_name(action_name) raise NoSuchAction, "there is no action called #{action_name} on #{self}" end call([], :start_job, action_name, **arguments) end |
#stats ⇒ Stats
Returns I/O statistics.
88 89 90 |
# File 'lib/roby/interface/v2/client.rb', line 88 def stats io.stats end |
#subcommand(name) ⇒ Object
Returns a shell object
646 647 648 649 650 651 652 653 |
# File 'lib/roby/interface/v2/client.rb', line 646 def subcommand(name) unless (sub = find_subcommand_by_name(name)) raise ArgumentError, "#{name} is not a known subcommand on #{self}" end SubcommandClient.new(self, name, sub.description, sub.commands) end |
#to_io ⇒ Object
The underlying IO object
103 104 105 |
# File 'lib/roby/interface/v2/client.rb', line 103 def to_io io.to_io end |
#wait(timeout: nil) ⇒ Boolean
Wait until there is data to process on the IO channel
184 185 186 |
# File 'lib/roby/interface/v2/client.rb', line 184 def wait(timeout: nil) io.read_wait(timeout: timeout) end |