Class: Roby::Interface::V2::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v2/client.rb

Overview

The client-side object that allows to access an interface (e.g. a Roby app) from another process than the Roby controller

Defined Under Namespace

Classes: BatchContext, Job, NoSuchAction, RemoteError, TimeoutError

Constant Summary collapse

DEFAULT_CALL_TIMEOUT =

Default value for #call_timeout

10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, id, handshake: %i[actions commands])) ⇒ Client

Create a client endpoint to a Roby interface [Server]

Parameters:

  • io (Channel)

    a channel to the server

  • id (String)

    a unique identifier for this client (e.g. host:port of the local endpoint when using TCP). It is passed to the server through Server#handshake

  • handshake (Array<Symbol>) (defaults to: %i[actions commands]))

    commands executed on the server side during the handshake and stored in the #handshake_results attribute. Include :actions and :commands if you pass this explicitely, unless you know what you are doing

See Also:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/roby/interface/v2/client.rb', line 69

def initialize(io, id, handshake: %i[actions commands])
    @pending_async_calls = []
    @io = io
    @message_id = 0
    @notification_queue = []
    @job_progress_queue = []
    @exception_queue = []
    @log_event_queue = []
    @ui_event_queue = []
    @call_timeout = DEFAULT_CALL_TIMEOUT

    @handshake_results = call([], :handshake, id, handshake)
    @actions = @handshake_results[:actions]
    @commands = Protocol.unmarshal_object(
        @io, @handshake_results[:commands]
    )
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(m, *args, **keywords, &b) ⇒ Object

rubocop:disable Style/MethodMissingSuper



655
656
657
658
659
660
661
662
663
# File 'lib/roby/interface/v2/client.rb', line 655

def method_missing(m, *args, **keywords, &b) # rubocop:disable Style/MethodMissingSuper
    if (sub = find_subcommand_by_name(m.to_s))
        SubcommandClient.new(self, m.to_s, sub.description, sub.commands)
    elsif (match = /^async_(.*)$/.match(m.to_s))
        async_call([], match[1].to_sym, *args, **keywords, &b)
    else
        call([], m, *args, **keywords)
    end
end

Instance Attribute Details

#actionsArray<Roby::Actions::Model::Action> (readonly)

Returns set of known actions.

Returns:

  • (Array<Roby::Actions::Model::Action>)

    set of known actions



17
18
19
# File 'lib/roby/interface/v2/client.rb', line 17

def actions
  @actions
end

#call_timeoutFloat

Timeout, in seconds, in blocking remote calls

Defaults to DEFAULT_CALL_TIMEOUT

Returns:

  • (Float)


55
56
57
# File 'lib/roby/interface/v2/client.rb', line 55

def call_timeout
  @call_timeout
end

#commandsHash (readonly)

Returns the set of available commands.

Returns:

  • (Hash)

    the set of available commands



19
20
21
# File 'lib/roby/interface/v2/client.rb', line 19

def commands
  @commands
end

#cycle_indexInteger (readonly)

Returns index of the last processed cycle.

Returns:

  • (Integer)

    index of the last processed cycle



39
40
41
# File 'lib/roby/interface/v2/client.rb', line 39

def cycle_index
  @cycle_index
end

#cycle_start_timeTime (readonly)

Returns time of the last processed cycle.

Returns:

  • (Time)

    time of the last processed cycle



41
42
43
# File 'lib/roby/interface/v2/client.rb', line 41

def cycle_start_time
  @cycle_start_time
end

#exception_queueArray<Integer,Array> (readonly)

Returns list of existing exceptions. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID.

Returns:

  • (Array<Integer,Array>)

    list of existing exceptions. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID



32
33
34
# File 'lib/roby/interface/v2/client.rb', line 32

def exception_queue
  @exception_queue
end

#handshake_resultsHash<Symbol,Object> (readonly)

Result of the calls done during the handshake

Returns:



48
49
50
# File 'lib/roby/interface/v2/client.rb', line 48

def handshake_results
  @handshake_results
end

#ioChannel (readonly)

Returns the IO to the server.

Returns:

  • (Channel)

    the IO to the server



15
16
17
# File 'lib/roby/interface/v2/client.rb', line 15

def io
  @io
end

#job_progress_queueArray<Integer,Array> (readonly)

Returns list of existing job progress information. The integer is an ID that can be used to refer to the job progress information. It is always growing and will never collide with a job progress and exception ID.

Returns:

  • (Array<Integer,Array>)

    list of existing job progress information. The integer is an ID that can be used to refer to the job progress information. It is always growing and will never collide with a job progress and exception ID



24
25
26
# File 'lib/roby/interface/v2/client.rb', line 24

def job_progress_queue
  @job_progress_queue
end

#notification_queueArray<Integer,Array> (readonly)

Returns list of existing notifications. The integer is an ID that can be used to refer to the notification. It is always growing and will never collide with an exception ID.

Returns:

  • (Array<Integer,Array>)

    list of existing notifications. The integer is an ID that can be used to refer to the notification. It is always growing and will never collide with an exception ID



28
29
30
# File 'lib/roby/interface/v2/client.rb', line 28

def notification_queue
  @notification_queue
end

#pending_async_callsArray<Hash> (readonly)

Returns list of the pending async calls.

Returns:

  • (Array<Hash>)

    list of the pending async calls



43
44
45
# File 'lib/roby/interface/v2/client.rb', line 43

def pending_async_calls
  @pending_async_calls
end

#ui_event_queueArray<Integer,Array> (readonly)

Returns list of queued UI events. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID.

Returns:

  • (Array<Integer,Array>)

    list of queued UI events. The integer is an ID that can be used to refer to the exception. It is always growing and will never collide with a notification ID



36
37
38
# File 'lib/roby/interface/v2/client.rb', line 36

def ui_event_queue
  @ui_event_queue
end

Instance Method Details

#allocate_message_idObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Allocation of unique IDs for notification messages



237
238
239
# File 'lib/roby/interface/v2/client.rb', line 237

def allocate_message_id
    @message_id += 1
end

#async_call(path, m, *args, **keywords, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Asynchronously call a method on the interface or on one of the interface's subcommands

Parameters:

  • path (Array<String>)

    path to the subcommand. Empty means on the interface object itself.

  • m (Symbol)

    command or action name. Actions are always formatted as action_name!

  • args (Object)

    the command or action arguments

Returns:

  • (Object)

    an Object associated with the call

See Also:



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/roby/interface/v2/client.rb', line 409

def async_call(path, m, *args, **keywords, &block)
    raise "no callback block given" unless block_given?

    if (action_match = /(.*)!$/.match(m.to_s))
        action_name = action_match[1]
        unless find_action_by_name(action_name)
            raise NoSuchAction,
                  "there is no action called #{action_name} on #{self}"
        end

        path = []
        m = :start_job
        args = [action_name, *args]
    end
    io.write_packet([path, m, args, keywords])
    pending_async_calls << { block: block, path: path, m: m, args: args }
    pending_async_calls.last.freeze
end

#async_call_pending?(a_call) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the async call is still pending

Parameters:

  • call (Object)

    the Object associated with the call

Returns:

  • (Boolean)

    true if the async call is pending, false otherwise



434
435
436
# File 'lib/roby/interface/v2/client.rb', line 434

def async_call_pending?(a_call)
    pending_async_calls.any? { |item| item.equal?(a_call) }
end

#call(path, m, *args, **keywords) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call a method on the interface or on one of the interface's subcommands

Parameters:

  • path (Array<String>)

    path to the subcommand. Empty means on the interface object itself.

  • m (Symbol)

    command or action name. Actions are always formatted as action_name!

  • args (Object)

    the command or action arguments

Returns:

  • (Object)

    the command result, or -- in the case of an action -- the job ID for the newly created action



383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/roby/interface/v2/client.rb', line 383

def call(path, m, *args, **keywords)
    if (action_match = /(.*)!$/.match(m.to_s))
        unless args.empty?
            raise ArgumentError, "jobs only accept keyword arguments"
        end

        start_job(action_match[1], **keywords)
    else
        io.write_packet([path, m, args, keywords])
        result, = poll(1, timeout: @call_timeout)
        result
    end
end

#closeObject

Close the communication channel



98
99
100
# File 'lib/roby/interface/v2/client.rb', line 98

def close
    io.close
end

#closed?Boolean

Whether the communication channel to the server is closed

Returns:

  • (Boolean)


93
94
95
# File 'lib/roby/interface/v2/client.rb', line 93

def closed?
    io.closed?
end

#create_batchBatchContext

Create a batch context

Messages sent to the returned object are validated as much as possible and gathered in a list. Call #process_batch to send all the gathered calls at once to the remote server

Returns:



617
618
619
# File 'lib/roby/interface/v2/client.rb', line 617

def create_batch
    BatchContext.new(self)
end

#each_jobObject

Enumerate the current jobs



593
594
595
596
597
598
599
# File 'lib/roby/interface/v2/client.rb', line 593

def each_job
    return enum_for(__method__) unless block_given?

    jobs.each do |job_id, (job_state, placeholder_task, job_task)|
        yield(Job.new(job_id, job_state, placeholder_task, job_task))
    end
end

#find_action_by_name(name) ⇒ Actions::Models::Action?

Find an action by its name

This is a local operation using the information gathered at connection time

Parameters:

  • name (String)

    the name of the action to look for

Returns:



119
120
121
# File 'lib/roby/interface/v2/client.rb', line 119

def find_action_by_name(name)
    actions.find { |act| act.name == name }
end

#find_all_actions_matching(matcher) ⇒ Array<Actions::Models::Action>

Finds all actions whose name matches a pattern

Parameters:

  • matcher (#===)

    the matching object (usually a Regexp or String)

Returns:



130
131
132
# File 'lib/roby/interface/v2/client.rb', line 130

def find_all_actions_matching(matcher)
    actions.find_all { |act| matcher === act.name }
end

#find_all_jobs_by_action_name(action_name) ⇒ Array<Job>

Find all the jobs that match the given action name

Returns:



604
605
606
607
608
# File 'lib/roby/interface/v2/client.rb', line 604

def find_all_jobs_by_action_name(action_name)
    each_job.find_all do |j|
        j.action_model.name == action_name
    end
end

#find_subcommand_by_name(name) ⇒ Object



636
637
638
# File 'lib/roby/interface/v2/client.rb', line 636

def find_subcommand_by_name(name)
    commands[name]
end

#has_action?(name) ⇒ Boolean

Tests whether the interface has an action with that name

Returns:

  • (Boolean)


108
109
110
# File 'lib/roby/interface/v2/client.rb', line 108

def has_action?(name)
    find_action_by_name(name)
end

#has_exceptions?Boolean

Whether some exception notifications have been queued

Returns:

  • (Boolean)


340
341
342
# File 'lib/roby/interface/v2/client.rb', line 340

def has_exceptions?
    !exception_queue.empty?
end

#has_job_progress?Boolean

Whether some job progress information is currently queued

Returns:

  • (Boolean)


254
255
256
# File 'lib/roby/interface/v2/client.rb', line 254

def has_job_progress?
    !job_progress_queue.empty?
end

#has_notifications?Boolean

Whether some generic notifications have been queued

Returns:

  • (Boolean)


295
296
297
# File 'lib/roby/interface/v2/client.rb', line 295

def has_notifications?
    !notification_queue.empty?
end

#has_subcommand?(name) ⇒ Boolean

Tests whether the remote interface has a given subcommand

Returns:

  • (Boolean)


641
642
643
# File 'lib/roby/interface/v2/client.rb', line 641

def has_subcommand?(name)
    commands.key?(name)
end

#has_ui_event?Boolean

Whether some UI events have been queued

Returns:

  • (Boolean)


316
317
318
# File 'lib/roby/interface/v2/client.rb', line 316

def has_ui_event?
    !ui_event_queue.empty?
end

#poll(expected_count = 0, timeout: nil) ⇒ Object

Polls for new data on the IO channel

Returns:

Raises:

  • (ComError)

    if the link seem to be broken

  • (ProtocolError)

    if some errors happened when validating the protocol



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/roby/interface/v2/client.rb', line 202

def poll(expected_count = 0, timeout: nil)
    result = nil
    timeout = if expected_count > 0 then timeout
              else
                  0
              end

    has_cycle_end = false
    while (packet = io.read_packet(timeout))
        has_cycle_end = process_packet(*packet) do |reply_value|
            if result
                raise ProtocolError,
                      "got more than one sync reply in a single poll call"
            end
            result = reply_value
            expected_count -= 1
        end

        if expected_count <= 0
            break if has_cycle_end

            timeout = 0
        end
    end
    if expected_count != 0
        within_s = " within #{timeout}s" if timeout
        raise TimeoutError, "failed to receive expected reply#{within_s}"
    end

    [result, has_cycle_end]
end

#pop_exception(Integer,Array)

Remove and return the oldest exception notification

Returns:

  • ((Integer,Array))

    a unique and monotonically-increasing message ID and the generic notification information as specified by (Interface#on_exception)



349
350
351
# File 'lib/roby/interface/v2/client.rb', line 349

def pop_exception
    exception_queue.shift
end

#pop_job_progress(Integer,Array)

Remove and return the oldest job information message

Returns:

  • ((Integer,Array))

    a unique and monotonically-increasing message ID and the arguments to job progress as specified on Interface#on_job_notification.



263
264
265
# File 'lib/roby/interface/v2/client.rb', line 263

def pop_job_progress
    job_progress_queue.shift
end

#pop_log_event(String,Array)?

Get a queued received event (processed in FIFO)

In addition to the events generated by the remote Roby app, the client emits the "cycle_end" event

Returns:

  • ((String,Array), nil)

    the event, or nil if there are none left



281
282
283
# File 'lib/roby/interface/v2/client.rb', line 281

def pop_log_event
    @log_event_queue.shift
end

#pop_notification(Integer,Array)

Remove and return the oldest generic notification message

Returns:

  • ((Integer,Array))

    a unique and monotonically-increasing message ID and the generic notification information as specified by (Application#notify)



304
305
306
# File 'lib/roby/interface/v2/client.rb', line 304

def pop_notification
    notification_queue.shift
end

#pop_ui_eventObject

Remove the oldest UI event and return it



321
322
323
# File 'lib/roby/interface/v2/client.rb', line 321

def pop_ui_event
    ui_event_queue.shift
end

#process_batch(batch) ⇒ Array

Send all commands gathered in a batch for processing on the remote server

Parameters:

Returns:

  • (Array)

    the return values of each of the calls gathered in the batch



627
628
629
630
# File 'lib/roby/interface/v2/client.rb', line 627

def process_batch(batch)
    ret = call([], :process_batch, batch.__calls)
    BatchContext::Return.from_calls_and_return(batch.__calls, ret)
end

#process_packet(m, args = nil) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process a message as received on #io

Parameters:

  • args (defaults to: nil)

    the arguments to the given message. The object type is message-dependent

Returns:

  • (Boolean)

    whether the message was a cycle_end message



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/roby/interface/v2/client.rb', line 142

def process_packet(m, args = nil)
    case m
    when :cycle_end
        @cycle_index, @cycle_start_time = *args
        queue_log_event("cycle_end", [])
        return true
    when :bad_call
        if !pending_async_calls.empty?
            process_pending_async_call(args, nil)
        else
            e = args
            raise RemoteError, e.message, (e.backtrace + caller)
        end
    when :reply
        if !pending_async_calls.empty?
            process_pending_async_call(nil, args)
        else
            yield args
        end
    when :job_progress
        queue_job_progress(*args)
    when :notification
        queue_notification(*args)
    when :ui_event
        queue_ui_event(*args)
    when :exception
        queue_exception(*args)
    when :log_event
        queue_log_event(*args)
    else
        raise ProtocolError,
              "unexpected reply from #{io}: #{m} args=#{args}"
    end
    false
end

#process_pending_async_call(error, result) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove and call the block of a pending async call



191
192
193
194
# File 'lib/roby/interface/v2/client.rb', line 191

def process_pending_async_call(error, result)
    current_call = pending_async_calls.shift
    current_call[:block].call(error, result)
end

#queue_exception(kind, error, tasks, job_ids) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push an exception notification to #exception_queue

It can be retrieved with #pop_exception

See the yield parameters of Interface#on_exception for the overall argument format.



333
334
335
336
337
# File 'lib/roby/interface/v2/client.rb', line 333

def queue_exception(kind, error, tasks, job_ids)
    exception_queue.push(
        [allocate_message_id, [kind, error, tasks, job_ids]]
    )
end

#queue_job_progress(kind, job_id, job_name, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a job notification to #job_progress_queue

See the yield parameters of Interface#on_job_notification for the overall argument format.



247
248
249
250
251
# File 'lib/roby/interface/v2/client.rb', line 247

def queue_job_progress(kind, job_id, job_name, *args)
    job_progress_queue.push(
        [allocate_message_id, [kind, job_id, job_name, *args]]
    )
end

#queue_log_event(m, values) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a system event to cycle_events



270
271
272
# File 'lib/roby/interface/v2/client.rb', line 270

def queue_log_event(m, values)
    @log_event_queue.push([m, values])
end

#queue_notification(source, level, message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a generic notification to #notification_queue



288
289
290
291
292
# File 'lib/roby/interface/v2/client.rb', line 288

def queue_notification(source, level, message)
    notification_queue.push(
        [allocate_message_id, [source, level, message]]
    )
end

#queue_ui_event(event_name, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Push a UI event to #ui_event_queue



311
312
313
# File 'lib/roby/interface/v2/client.rb', line 311

def queue_ui_event(event_name, *args)
    ui_event_queue.push [allocate_message_id, [event_name, *args]]
end

#reload_actionsObject



632
633
634
# File 'lib/roby/interface/v2/client.rb', line 632

def reload_actions
    @actions = call([], :reload_actions)
end

#start_job(action_name, **arguments) ⇒ Object

Start the given job within the batch

Parameters:

  • action_name (Symbol)

    the action name

  • arguments (Hash<Symbol,Object>)

    the action arguments

Raises:



362
363
364
365
366
367
368
369
# File 'lib/roby/interface/v2/client.rb', line 362

def start_job(action_name, **arguments)
    unless find_action_by_name(action_name)
        raise NoSuchAction,
              "there is no action called #{action_name} on #{self}"
    end

    call([], :start_job, action_name, **arguments)
end

#statsStats

Returns I/O statistics.

Returns:

  • (Stats)

    I/O statistics



88
89
90
# File 'lib/roby/interface/v2/client.rb', line 88

def stats
    io.stats
end

#subcommand(name) ⇒ Object

Returns a shell object



646
647
648
649
650
651
652
653
# File 'lib/roby/interface/v2/client.rb', line 646

def subcommand(name)
    unless (sub = find_subcommand_by_name(name))
        raise ArgumentError,
              "#{name} is not a known subcommand on #{self}"
    end

    SubcommandClient.new(self, name, sub.description, sub.commands)
end

#to_ioObject

The underlying IO object



103
104
105
# File 'lib/roby/interface/v2/client.rb', line 103

def to_io
    io.to_io
end

#wait(timeout: nil) ⇒ Boolean

Wait until there is data to process on the IO channel

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    a timeout after which the method will return. Use nil for no timeout

Returns:

  • (Boolean)

    falsy if the timeout was reached, true otherwise



184
185
186
# File 'lib/roby/interface/v2/client.rb', line 184

def wait(timeout: nil)
    io.read_wait(timeout: timeout)
end