Class: Roby::Interface::V2::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/interface/v2/channel.rb

Overview

A wrapper on top of raw IO that uses droby marshalling to communicate

Defined Under Namespace

Classes: Stats

Constant Summary collapse

WEBSOCKET_CLASSES =

This is a workaround for a very bad performance behavior on first load. These classes are auto-loaded and it takes forever to load them in multithreaded contexts.

[
    WebSocket::Frame::Outgoing::Client,
    WebSocket::Frame::Outgoing::Server,
    WebSocket::Frame::Incoming::Client,
    WebSocket::Frame::Incoming::Server
].freeze
ALLOWED_BASIC_TYPES =
[
    TrueClass, FalseClass, NilClass, Integer, Float, String, Symbol, Time,
    Range
].freeze
None =
Object.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, client, max_write_buffer_size: 25 * 1024**2) ⇒ Channel

Returns a new instance of Channel.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/roby/interface/v2/channel.rb', line 40

def initialize(
    io, client,
    max_write_buffer_size: 25 * 1024**2
)
    @io = io
    @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @client = client
    @stats = Stats.new(tx: 0, rx: 0)
    @websocket_packet =
        if client
            WebSocket::Frame::Outgoing::Client
        else
            WebSocket::Frame::Outgoing::Server
        end

    @incoming =
        if client?
            WebSocket::Frame::Incoming::Client.new(type: :binary)
        else
            WebSocket::Frame::Incoming::Server.new(type: :binary)
        end
    @max_write_buffer_size = max_write_buffer_size
    @read_buffer = String.new
    @write_buffer = String.new
    @write_thread = nil

    @structs = {}
    @marshallers = {}
    @allowed_objects = Set.new
    @resolved_marshallers = {}
    Protocol.setup_channel(self)
end

Instance Attribute Details

#io#read_nonblock, #write (readonly)

Returns the channel that allows us to communicate to clients.

Returns:

  • (#read_nonblock, #write)

    the channel that allows us to communicate to clients



10
11
12
# File 'lib/roby/interface/v2/channel.rb', line 10

def io
  @io
end

#max_write_buffer_sizeObject (readonly)

The maximum byte count that the channel can hold on the write side until it bails out



18
19
20
# File 'lib/roby/interface/v2/channel.rb', line 18

def max_write_buffer_size
  @max_write_buffer_size
end

#statsStats (readonly)

Returns I/O statistics.

Returns:

  • (Stats)

    I/O statistics



38
39
40
# File 'lib/roby/interface/v2/channel.rb', line 38

def stats
  @stats
end

Class Method Details

.find_invalid_marshalling_object(object) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/roby/interface/v2/channel.rb', line 189

def self.find_invalid_marshalling_object(object)
    case object
    when Array, Struct, Hash
        object.each do
            obj = find_invalid_marshalling_object(_1)
            return obj if obj
        end
    else
        begin
            ::Marshal.dump(object)
            nil
        rescue TypeError
            object
        end
    end
end

Instance Method Details

#add_marshaller(*classes) {|channel, object| ... } ⇒ Object

Define a custom marshaller for objects of the given class

Parameters:

  • classes (Array<Class>)

    the classes to use the given marshaller for. This will match instances of subclasses as well. The first marshaller defined for a given instance will win.

Yield Parameters:

Yield Returns:

  • (Object)

    the marshalled object



218
219
220
221
# File 'lib/roby/interface/v2/channel.rb', line 218

def add_marshaller(*classes, &block)
    classes.each { @marshallers[_1] = block }
    @resolved_marshallers = @marshallers.dup
end

#allow_classes(*classes) ⇒ Object



206
207
208
# File 'lib/roby/interface/v2/channel.rb', line 206

def allow_classes(*classes)
    add_marshaller(*classes) { _2 }
end

#allow_objects(*objects) ⇒ Object



225
226
227
# File 'lib/roby/interface/v2/channel.rb', line 225

def allow_objects(*objects)
    @allowed_objects.merge(objects)
end

#allowed_object?(object) ⇒ Boolean

Returns:

  • (Boolean)


229
230
231
# File 'lib/roby/interface/v2/channel.rb', line 229

def allowed_object?(object)
    @allowed_objects.include?(object)
end

#client?Boolean

Returns true if the local process is the client or the server.

Returns:

  • (Boolean)

    true if the local process is the client or the server



14
# File 'lib/roby/interface/v2/channel.rb', line 14

attr_predicate :client?

#closeObject



81
82
83
# File 'lib/roby/interface/v2/channel.rb', line 81

def close
    io.close
end

#closed?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/roby/interface/v2/channel.rb', line 85

def closed?
    io.closed?
end

#eof?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/roby/interface/v2/channel.rb', line 89

def eof?
    io.eof?
end

#find_marshaller(object) ⇒ Object



267
268
269
270
271
272
273
274
275
276
277
# File 'lib/roby/interface/v2/channel.rb', line 267

def find_marshaller(object)
    if (block = @resolved_marshallers[object.class])
        return block
    end

    _, block =
        @marshallers
        .find_all { |klass, _| object.kind_of?(klass) }
        .min_by { _1 }
    @resolved_marshallers[object.class] = block
end

#flushObject



93
94
95
# File 'lib/roby/interface/v2/channel.rb', line 93

def flush
    io.flush
end

#guard_buffer_sizeObject

Raises:



347
348
349
350
351
352
353
354
# File 'lib/roby/interface/v2/channel.rb', line 347

def guard_buffer_size
    return if @write_buffer.size <= max_write_buffer_size

    raise ComError,
          "channel reached an internal buffer size of "\
          "#{@write_buffer.size}, which is bigger than the limit "\
          "of #{max_write_buffer_size}, bailing out"
end

#guard_read_threadObject

Raises:



338
339
340
341
342
343
344
345
# File 'lib/roby/interface/v2/channel.rb', line 338

def guard_read_thread
    @read_thread ||= Thread.current
    return if @read_thread == Thread.current

    raise InternalError,
          "cross-thread access to channel while reading: "\
          "expected #{@read_thread} but got #{Thread.current}"
end

#guard_write_threadObject

Raises:



329
330
331
332
333
334
335
336
# File 'lib/roby/interface/v2/channel.rb', line 329

def guard_write_thread
    @write_thread ||= Thread.current
    return if @write_thread == Thread.current

    raise InternalError,
          "cross-thread access to channel while writing: "\
          "expected #{@write_thread} to #{Thread.current}"
end

#marshal_basic_object(object) ⇒ Object

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/roby/interface/v2/channel.rb', line 248

def marshal_basic_object(object) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
    case object
    when Array
        object.map { marshal_filter_object(_1) }
    when Set
        object.each_with_object(Set.new) do
            _2 << marshal_filter_object(_1)
        end
    when Hash
        object.transform_values { marshal_filter_object(_1) }
    when Struct
        Protocol.marshal_struct_generic(self, object)
    when *ALLOWED_BASIC_TYPES
        object
    else
        None
    end
end

#marshal_filter_object(object) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/roby/interface/v2/channel.rb', line 233

def marshal_filter_object(object)
    marshalled = marshal_basic_object(object)
    return marshalled if marshalled != None

    return object if allowed_object?(object)

    if (marshaller = find_marshaller(object))
        return marshaller[self, object]
    end

    message = "object '#{object}' of class #{object.class} "\
              "not allowed on this interface"
    report_error(message)
end

#marshal_object(object) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/roby/interface/v2/channel.rb', line 169

def marshal_object(object)
    object = marshal_filter_object(object)
    Marshal.dump(object)
rescue TypeError => e
    invalid = self.class.find_invalid_marshalling_object(object)
    message = "failed to marshal #{invalid} of class "\
              "#{invalid.class} in #{object}: #{e.message}"
    Marshal.dump report_error(message)
rescue RuntimeError => e
    message = "failed to marshal #{object}: #{e.message}"
    Marshal.dump report_error(message)
end

#push_write_data(new_bytes = nil) ⇒ Boolean

Push queued data

The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.

Returns:

  • (Boolean)

    true if there is still data left in the buffe, false otherwise



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/roby/interface/v2/channel.rb', line 307

def push_write_data(new_bytes = nil)
    guard_write_thread

    @write_buffer.concat(new_bytes) if new_bytes
    written_bytes = io.write_nonblock(@write_buffer)
    @stats.tx += written_bytes

    @write_buffer = @write_buffer[written_bytes..-1]
    !@write_buffer.empty?
rescue Errno::EWOULDBLOCK, Errno::EAGAIN
    guard_buffer_size
rescue SystemCallError, IOError
    raise ComError, "broken communication channel"
rescue RuntimeError => e
    # Workaround what seems to be a Ruby bug ...
    if e.message =~ /can.t modify frozen IOError/
        raise ComError, "broken communication channel"
    end

    raise
end

#read_data_from_io(remaining_time) ⇒ Object



141
142
143
144
145
146
147
# File 'lib/roby/interface/v2/channel.rb', line 141

def read_data_from_io(remaining_time)
    return unless IO.select([@io], [], [], remaining_time)

    @incoming << @read_buffer if io.sysread(1024**2, @read_buffer)
    @stats.rx += @read_buffer.size
rescue Errno::EWOULDBLOCK, Errno::EAGAIN # rubocop:disable Lint/SuppressedException
end

#read_packet(timeout = 0) ⇒ Object?

Read one packet from #io and unmarshal it

Returns:

  • (Object, nil)

    returns the unmarshalled object, or nil if no full object can be found in the data received so far



111
112
113
114
115
116
117
118
119
# File 'lib/roby/interface/v2/channel.rb', line 111

def read_packet(timeout = 0)
    guard_read_thread

    if (packet = @incoming.next)
        return unmarshal_packet(packet)
    end

    read_packet_from_io(timeout)
end

#read_packet_from_io(timeout) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/roby/interface/v2/channel.rb', line 121

def read_packet_from_io(timeout)
    deadline       = Time.now + timeout if timeout
    remaining_time = timeout

    loop do
        read_data_from_io(remaining_time)

        if (packet = @incoming.next)
            return unmarshal_packet(packet)
        end

        if deadline
            remaining_time = deadline - Time.now
            return if remaining_time < 0
        end
    end
rescue SystemCallError, IOError
    raise ComError, "closed communication"
end

#read_wait(timeout: nil) ⇒ Boolean

Wait until there is something to read on the channel

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    a timeout after which the method will return. Use nil for no timeout

Returns:

  • (Boolean)

    falsy if the timeout was reached, true otherwise



103
104
105
# File 'lib/roby/interface/v2/channel.rb', line 103

def read_wait(timeout: nil)
    IO.select([io], [], [], timeout)
end

#report_error(message) ⇒ Object



182
183
184
185
186
187
# File 'lib/roby/interface/v2/channel.rb', line 182

def report_error(message)
    Roby::Interface.warn message
    caller.each { Roby::Interface.warn("  #{_1}") }

    Protocol::Error.new(message: message, backtrace: [])
end

#reset_thread_guard(read_thread = nil, write_thread = nil) ⇒ Object



295
296
297
298
# File 'lib/roby/interface/v2/channel.rb', line 295

def reset_thread_guard(read_thread = nil, write_thread = nil)
    @write_thread = read_thread
    @read_thread = write_thread
end

#resolve_struct(object) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/roby/interface/v2/channel.rb', line 279

def resolve_struct(object)
    if !object.klass
        return Struct.new(*object.contents.keys, keyword_init: true)
    elsif (cached = @structs[object.klass])
        return cached
    end

    begin
        existing = constant(object.klass)
        @structs[object.klass] = existing
    rescue NameError
        @structs[object.klass] =
            Struct.new(*object.values.keys, keyword_init: true)
    end
end

#to_ioObject



77
78
79
# File 'lib/roby/interface/v2/channel.rb', line 77

def to_io
    io.to_io
end

#unmarshal_packet(packet) ⇒ Object



149
150
151
152
153
154
155
156
# File 'lib/roby/interface/v2/channel.rb', line 149

def unmarshal_packet(packet)
    raw = Marshal.load(packet.to_s) # rubocop:disable Security/MarshalLoad
    Protocol.unmarshal_object(self, raw)
rescue ArgumentError, TypeError => e
    puts "failed to unmarshal received packet: #{e.message}"
    raise ProtocolError,
          "failed to unmarshal received packet: #{e.message}", e.backtrace
end

#write_buffer_sizeObject



73
74
75
# File 'lib/roby/interface/v2/channel.rb', line 73

def write_buffer_size
    @write_buffer.size
end

#write_packet(object) ⇒ void

This method returns an undefined value.

Write one ruby object (usually an array) as a marshalled packet and send it to #io

Parameters:

  • object (Object)

    the object to be sent



163
164
165
166
167
# File 'lib/roby/interface/v2/channel.rb', line 163

def write_packet(object)
    marshalled = marshal_object(object)
    packet = @websocket_packet.new(data: marshalled, type: :binary)
    push_write_data(packet.to_s)
end