Class: Roby::Interface::V2::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v2/server.rb

Overview

The server-side object allowing to access an interface (e.g. a Roby app) through any communication channel from a single client

Defined Under Namespace

Classes: LogEventMatcher

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, interface, main_thread: Thread.current) ⇒ Server

Returns a new instance of Server.

Parameters:

  • io (Channel)

    a channel to the server

  • interface (Interface)

    the interface object we give remote access to



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/roby/interface/v2/server.rb', line 23

def initialize(io, interface, main_thread: Thread.current)
    @notifications_enabled = true
    @io = io
    @interface = interface
    @main_thread = main_thread
    @pending_packets = Queue.new
    @performed_handshake = false

    @log_event_subscriptions = {}
    @log_event_match_cache = {}
end

Instance Attribute Details

#client_idString (readonly)

Returns a string that allows the user to identify the client.

Returns:

  • (String)

    a string that allows the user to identify the client



14
15
16
# File 'lib/roby/interface/v2/server.rb', line 14

def client_id
  @client_id
end

#interfaceInterface (readonly)

Returns the interface object we are giving access to.

Returns:

  • (Interface)

    the interface object we are giving access to



12
13
14
# File 'lib/roby/interface/v2/server.rb', line 12

def interface
  @interface
end

#ioChannel (readonly)

Returns the IO to the client.

Returns:

  • (Channel)

    the IO to the client



10
11
12
# File 'lib/roby/interface/v2/server.rb', line 10

def io
  @io
end

Instance Method Details

#closeObject



155
156
157
158
# File 'lib/roby/interface/v2/server.rb', line 155

def close
    io.close
    @listeners&.dispose
end

#closed?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/roby/interface/v2/server.rb', line 151

def closed?
    io.closed?
end

#disable_notificationsObject



116
117
118
# File 'lib/roby/interface/v2/server.rb', line 116

def disable_notifications
    self.notifications_enabled = false
end

#enable_notificationsObject



112
113
114
# File 'lib/roby/interface/v2/server.rb', line 112

def enable_notifications
    self.notifications_enabled = true
end

#flush_pending_packetsObject

Flush packets queued from #queue_packet



82
83
84
85
86
87
88
89
90
# File 'lib/roby/interface/v2/server.rb', line 82

def flush_pending_packets
    packets = []
    until @pending_packets.empty?
        packets << @pending_packets.pop
    end
    packets.each do |p|
        write_packet(p, defer_exceptions: true)
    end
end

#handshake(id, commands) ⇒ Object



96
97
98
99
100
101
102
103
104
105
# File 'lib/roby/interface/v2/server.rb', line 96

def handshake(id, commands)
    @client_id = id
    Roby::Interface.info "new interface client: #{id}"
    result = commands.each_with_object({}) do |s, result|
        result[s] = interface.send(s)
    end
    @performed_handshake = true
    listen_to_notifications
    result
end

#has_deferred_exception?Boolean

Returns:

  • (Boolean)


181
182
183
# File 'lib/roby/interface/v2/server.rb', line 181

def has_deferred_exception?
    @deferred_exception
end

#listen_to_notificationsObject

Listen to notifications on the underlying interface



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/roby/interface/v2/server.rb', line 36

def listen_to_notifications
    listeners = []
    listeners << @interface.on_cycle_end do
        write_packet(
            [
                :cycle_end,
                [@interface.execution_engine.cycle_index,
                 @interface.execution_engine.cycle_start]
            ], defer_exceptions: true)
    end
    listeners << @interface.on_notification do |*args|
        if notifications_enabled?
            queue_packet([:notification, args])
        elsif Thread.current == @main_thread
            flush_pending_packets
        end
    end
    listeners << @interface.on_ui_event do |*args|
        queue_packet([:ui_event, args])
    end
    listeners << @interface.on_job_notification do |*args|
        write_packet([:job_progress, args], defer_exceptions: true)
    end
    listeners << @interface.on_exception do |*args|
        write_packet([:exception, args], defer_exceptions: true)
    end
    listeners << @interface.on_log_event do |*args|
        queue_packet([:log_event, args]) if log_event_subscribed?(args[0])
    end
    @listeners = Roby.disposable(*listeners)
end

#log_event_subscribe(matcher) ⇒ Object



135
136
137
138
139
140
# File 'lib/roby/interface/v2/server.rb', line 135

def log_event_subscribe(matcher)
    matcher = LogEventMatcher.new(matcher: matcher)
    matcher.id = matcher.object_id
    @log_event_subscriptions[matcher.id] = matcher
    matcher.id
end

#log_event_subscribed?(name) ⇒ Boolean

Returns:

  • (Boolean)


122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/roby/interface/v2/server.rb', line 122

def log_event_subscribed?(name)
    name = name.to_s
    return true if @log_event_match_cache[name]

    matcher = @log_event_subscriptions.each_value.find do |m|
        m.matcher === name
    end
    return false unless matcher

    @log_event_match_cache[name] = matcher
    true
end

#log_event_unsubscribe(matcher_id) ⇒ Object



142
143
144
145
146
147
148
149
# File 'lib/roby/interface/v2/server.rb', line 142

def log_event_unsubscribe(matcher_id)
    matcher = @log_event_subscriptions.delete(matcher_id)
    return unless matcher

    @log_event_match_cache.delete_if do |_, m|
        m.id == matcher_id
    end
end

#notifications_enabled?Boolean

Returns whether the messages should be forwarded to our clients.

Returns:

  • (Boolean)

    whether the messages should be forwarded to our clients



18
# File 'lib/roby/interface/v2/server.rb', line 18

attr_predicate :notifications_enabled?, true

#performed_handshake?Boolean

Whether the remote side already called #handshake

Returns:

  • (Boolean)


108
109
110
# File 'lib/roby/interface/v2/server.rb', line 108

def performed_handshake?
    @performed_handshake
end

#pollObject

Process one command from the client, and send the reply

Raises:

  • (@deferred_exception)


199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/roby/interface/v2/server.rb', line 199

def poll
    raise @deferred_exception if has_deferred_exception?

    path, m, args, keywords = io.read_packet
    return unless m

    begin
        reply =
            if m == :process_batch
                process_batch(path, args.first)
            else
                process_call(path, m, args, keywords)
            end

        true
    rescue Exception => e
        write_packet([:bad_call, e])
        return
    end

    begin
        write_packet([:reply, reply])
    rescue ComError
        raise
    rescue Exception => e
        write_packet([:protocol_error, e])
        raise
    end
end

#process_batch(path, calls) ⇒ Object



160
161
162
163
164
# File 'lib/roby/interface/v2/server.rb', line 160

def process_batch(path, calls)
    calls.map do |p, m, a, kw|
        process_call(path + p, m, a, kw)
    end
end

#process_call(path, name, args, keywords) ⇒ Object



166
167
168
169
170
171
172
# File 'lib/roby/interface/v2/server.rb', line 166

def process_call(path, name, args, keywords)
    if path.empty? && respond_to?(name)
        send(name, *args, **keywords)
    else
        process_interface_call(path, name, args, keywords)
    end
end

#process_interface_call(path, name, args, keywords) ⇒ Object



174
175
176
177
178
179
# File 'lib/roby/interface/v2/server.rb', line 174

def process_interface_call(path, name, args, keywords)
    receiver = path.inject(interface) do |obj, subcommand|
        obj.send(subcommand)
    end
    receiver.send(name, *args, **keywords)
end

#queue_packet(call) ⇒ Object

Write or queue a call, depending on whether the current thread is the main thread

Time ordering between out-of-thread and in-thread packets is not guaranteed, so this can only be used in cases where it does not matter.



73
74
75
76
77
78
79
# File 'lib/roby/interface/v2/server.rb', line 73

def queue_packet(call)
    if Thread.current == @main_thread
        write_packet(call, defer_exceptions: true)
    else
        @pending_packets << call
    end
end

#to_ioObject



92
93
94
# File 'lib/roby/interface/v2/server.rb', line 92

def to_io
    io.to_io
end

#write_packet(call, defer_exceptions: false) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/roby/interface/v2/server.rb', line 185

def write_packet(call, defer_exceptions: false)
    return if has_deferred_exception?

    flush_pending_packets
    io.write_packet(call)
rescue Exception => e
    if defer_exceptions
        @deferred_exception = e
    else
        raise
    end
end