Class: RSMP::Proxy

Inherits:
Object
  • Object
show all
Includes:
Inspect, Logging, Notifier, Wait
Defined in:
lib/rsmp/proxy.rb

Direct Known Subclasses

SiteProxy, SupervisorProxy

Constant Summary collapse

WRAPPING_DELIMITER =
"\f"

Instance Attribute Summary collapse

Attributes included from Logging

#logger

Instance Method Summary collapse

Methods included from Inspect

#inspector

Methods included from Notifier

#add_listener, #initialize_distributor, #notify, #remove_listener

Methods included from Wait

#wait_for

Methods included from Logging

#initialize_logging

Constructor Details

#initialize(options) ⇒ Proxy

Returns a new instance of Proxy.


16
17
18
19
20
21
22
# File 'lib/rsmp/proxy.rb', line 16

def initialize options
  initialize_logging options
  setup options
  initialize_distributor
  prepare_collection @settings['collect']
  clear
end

Instance Attribute Details

#archiveObject (readonly)

Returns the value of attribute archive.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def archive
  @archive
end

#collectorObject (readonly)

Returns the value of attribute collector.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def collector
  @collector
end

#connection_infoObject (readonly)

Returns the value of attribute connection_info.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def connection_info
  @connection_info
end

#ipObject (readonly)

Returns the value of attribute ip.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def ip
  @ip
end

#portObject (readonly)

Returns the value of attribute port.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def port
  @port
end

#stateObject (readonly)

Returns the value of attribute state.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def state
  @state
end

#sxlObject (readonly)

Returns the value of attribute sxl.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def sxl
  @sxl
end

#taskObject (readonly)

Returns the value of attribute task.


14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def task
  @task
end

Instance Method Details

#acknowledge(original) ⇒ Object

Raises:

  • (InvalidArgument)

395
396
397
398
399
400
401
# File 'lib/rsmp/proxy.rb', line 395

def acknowledge original
  raise InvalidArgument unless original
  ack = MessageAck.build_from(original)
  ack.original = original.clone
  send_message ack, "for #{ack.original.type} #{original.m_id_short}"
  check_ingoing_acknowledged original
end

#acknowledged_first_ingoing(message) ⇒ Object


473
474
# File 'lib/rsmp/proxy.rb', line 473

def acknowledged_first_ingoing message
end

#acknowledged_first_outgoing(message) ⇒ Object


470
471
# File 'lib/rsmp/proxy.rb', line 470

def acknowledged_first_outgoing message
end

#authorObject


547
548
549
# File 'lib/rsmp/proxy.rb', line 547

def author
  node.site_id
end

#buffer_message(message) ⇒ Object


282
283
284
285
# File 'lib/rsmp/proxy.rb', line 282

def buffer_message message
  # TODO
  #log "Cannot send #{message.type} because the connection is closed.", message: message, level: :error
end

#check_ack_timeout(now) ⇒ Object


224
225
226
227
228
229
230
231
232
233
234
# File 'lib/rsmp/proxy.rb', line 224

def check_ack_timeout now
  timeout = @site_settings['timeouts']['acknowledgement']
  # hash cannot be modify during iteration, so clone it
  @awaiting_acknowledgement.clone.each_pair do |m_id, message|
    latest = message.timestamp + timeout
    if now > latest
      log "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds", level: :error
      stop
    end
  end
end

#check_ingoing_acknowledged(message) ⇒ Object


463
464
465
466
467
468
# File 'lib/rsmp/proxy.rb', line 463

def check_ingoing_acknowledged message
  unless @ingoing_acknowledged[message.type]
    @ingoing_acknowledged[message.type] = true
    acknowledged_first_ingoing message
  end
end

#check_outgoing_acknowledged(message) ⇒ Object

TODO this might be better handled by a proper event machine using e.g. the EventMachine gem


456
457
458
459
460
461
# File 'lib/rsmp/proxy.rb', line 456

def check_outgoing_acknowledged message
  unless @outgoing_acknowledged[message.type]
    @outgoing_acknowledged[message.type] = true
    acknowledged_first_outgoing message
  end
end

#check_rsmp_version(message) ⇒ Object


381
382
383
384
385
386
387
388
389
390
# File 'lib/rsmp/proxy.rb', line 381

def check_rsmp_version message
  versions = rsmp_versions
  # find versions that both we and the client support
  candidates = message.versions & versions
  if candidates.any?
    @rsmp_version = candidates.sort_by { |v| Gem::Version.new(v) }.last  # pick latest version
  else
    raise FatalError.new "RSMP versions [#{message.versions.join(',')}] requested, but only [#{versions.join(',')}] supported."
  end
end

#check_watchdog_timeout(now) ⇒ Object


236
237
238
239
240
241
242
243
244
# File 'lib/rsmp/proxy.rb', line 236

def check_watchdog_timeout now
  timeout = @site_settings['timeouts']['watchdog']
  latest = @latest_watchdog_received + timeout
  left = latest - now
  if left < 0
    log "No Watchdog within #{timeout} seconds", level: :error
    stop
  end
end

#clearObject


89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/rsmp/proxy.rb', line 89

def clear
  @awaiting_acknowledgement = {}
  @latest_watchdog_received = nil
  @watchdog_started = false
  @version_determined = false
  @ingoing_acknowledged = {}
  @outgoing_acknowledged = {}
  @latest_watchdog_send_at = nil

  @state_condition = Async::Notification.new
  @acknowledgements = {}
  @acknowledgement_condition = Async::Notification.new
end

#clockObject


48
49
50
# File 'lib/rsmp/proxy.rb', line 48

def clock
  node.clock
end

#close_socketObject


103
104
105
106
107
108
109
110
111
112
113
# File 'lib/rsmp/proxy.rb', line 103

def close_socket
  if @stream
    @stream.close
    @stream = nil
  end

  if @socket
    @socket.close
    @socket = nil
  end
end

#collect(task, options, &block) ⇒ Object


59
60
61
62
# File 'lib/rsmp/proxy.rb', line 59

def collect task, options, &block
  collector = RSMP::Collector.new self, options
  collector.collect task, &block
end

#connection_completeObject


536
537
538
# File 'lib/rsmp/proxy.rb', line 536

def connection_complete
  set_state :ready
end

#dont_acknowledge(original, prefix = nil, reason = nil) ⇒ Object

Raises:

  • (InvalidArgument)

403
404
405
406
407
408
409
410
411
412
413
# File 'lib/rsmp/proxy.rb', line 403

def dont_acknowledge original, prefix=nil, reason=nil
  raise InvalidArgument unless original
  str = [prefix,reason].join(' ')
  log str, message: original, level: :warning if reason
  message = MessageNotAck.new({
    "oMId" => original.m_id,
    "rea" => reason || "Unknown reason"
  })
  message.original = original.clone
  send_message message, "for #{original.type} #{original.m_id_short}"
end

#dont_expect_acknowledgement(message) ⇒ Object


367
368
369
# File 'lib/rsmp/proxy.rb', line 367

def dont_expect_acknowledgement message
  @awaiting_acknowledgement.delete message.attribute("oMId")
end

#expect_acknowledgement(message) ⇒ Object


361
362
363
364
365
# File 'lib/rsmp/proxy.rb', line 361

def expect_acknowledgement message
  unless message.is_a?(MessageAck) || message.is_a?(MessageNotAck)
    @awaiting_acknowledgement[message.m_id] = message
  end
end

#expect_version_message(message) ⇒ Object


530
531
532
533
534
# File 'lib/rsmp/proxy.rb', line 530

def expect_version_message message
  unless message.is_a?(Version) || message.is_a?(MessageAck) || message.is_a?(MessageNotAck)
    raise FatalError.new "Version must be received first"
  end
end

#extraneous_version(message) ⇒ Object


371
372
373
# File 'lib/rsmp/proxy.rb', line 371

def extraneous_version message
  dont_acknowledge message, "Received", "extraneous Version message"
end

#find_original_for_message(message) ⇒ Object


451
452
453
# File 'lib/rsmp/proxy.rb', line 451

def find_original_for_message message
   @awaiting_acknowledgement[ message.attribute("oMId") ]
end

#get_schemasObject


255
256
257
258
259
260
261
262
263
# File 'lib/rsmp/proxy.rb', line 255

def get_schemas
  # normally we have an sxl, but during connection, it hasn't been established yet
  # at these times we only validate against the core schema
  # TODO
  # what schema should we use to validate the intial Version and MessageAck messages?
  schemas = { core: '3.1.5' }
  schemas[sxl] = RSMP::Schemer.sanitize_version(sxl_version) if sxl
  schemas
end

#inspectObject


42
43
44
45
46
# File 'lib/rsmp/proxy.rb', line 42

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(
    :@acknowledgements,:@settings,:@site_settings
    )}>"
end

#log(str, options = {}) ⇒ Object


251
252
253
# File 'lib/rsmp/proxy.rb', line 251

def log str, options={}
  super str, options.merge(ip: @ip, port: @port, site_id: @site_id)
end

#log_acknowledgement_for_original(message, original) ⇒ Object


509
510
511
512
513
514
515
516
517
518
# File 'lib/rsmp/proxy.rb', line 509

def log_acknowledgement_for_original message, original
  str = "Received #{message.type} for #{original.type} #{message.attribute("oMId")[0..3]}"
  if message.type == 'MessageNotAck'
    reason = message.attributes["rea"]
    str = "#{str}: #{reason}" if reason
    log str, message: message, level: :warning
  else
    log str, message: message, level: :log
  end
end

#log_acknowledgement_for_unknown(message) ⇒ Object


520
521
522
# File 'lib/rsmp/proxy.rb', line 520

def log_acknowledgement_for_unknown message
  log "Received #{message.type} for unknown message #{message.attribute("oMId")[0..3]}", message: message, level: :warning
end

#log_send(message, reason = nil) ⇒ Object


287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/rsmp/proxy.rb', line 287

def log_send message, reason=nil
  if reason
    str = "Sent #{message.type} #{reason}"
  else
    str = "Sent #{message.type}"
  end

  if message.type == "MessageNotAck"
    log str, message: message, level: :warning
  else
    log str, message: message, level: :log
  end
end

#nodeObject


543
544
545
# File 'lib/rsmp/proxy.rb', line 543

def node
  raise 'Must be overridden'
end

#notify_error(e, options = {}) ⇒ Object


152
153
154
# File 'lib/rsmp/proxy.rb', line 152

def notify_error e, options={}
  node.notify_error e, options
end

#prepare_collection(num) ⇒ Object


52
53
54
55
56
57
# File 'lib/rsmp/proxy.rb', line 52

def prepare_collection num
  if num
    @collector = RSMP::Collector.new self, num: num, ingoing: true, outgoing: true
    add_listener @collector
  end
end

#process_ack(message) ⇒ Object


476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/rsmp/proxy.rb', line 476

def process_ack message
  original = find_original_for_message message
  if original
    dont_expect_acknowledgement message
    message.original = original
    log_acknowledgement_for_original message, original

    if original.type == "Version"
      version_acknowledged
    end

    check_outgoing_acknowledged original

    @acknowledgements[ original.m_id ] = message
    @acknowledgement_condition.signal message
  else
    log_acknowledgement_for_unknown message
  end
end

#process_message(message) ⇒ Object


340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/rsmp/proxy.rb', line 340

def process_message message
  case message
    when MessageAck
      process_ack message
    when MessageNotAck
      process_not_ack message
    when Version
      process_version message
    when Watchdog
      process_watchdog message
    else
      dont_acknowledge message, "Received", "unknown message (#{message.type})"
  end
end

#process_not_ack(message) ⇒ Object


496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/rsmp/proxy.rb', line 496

def process_not_ack message
  original = find_original_for_message message
  if original
    dont_expect_acknowledgement message
    message.original = original
    log_acknowledgement_for_original message, original
    @acknowledgements[ original.m_id ] = message
    @acknowledgement_condition.signal message
  else
    log_acknowledgement_for_unknown message
  end
end

#process_packet(json) ⇒ Object


301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/rsmp/proxy.rb', line 301

def process_packet json
  attributes = Message.parse_attributes json
  message = Message.build attributes, json
  message.validate get_schemas
  notify message
  expect_version_message(message) unless @version_determined
  process_message message
  process_deferred
  message
rescue InvalidPacket => e
  str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{e.message}"
  notify_error e.exception(str)
  log str, level: :warning
  nil
rescue MalformedMessage => e
  str = "Received malformed message, #{e.message}"
  notify_error e.exception(str)
  log str, message: Malformed.new(attributes), level: :warning
  # cannot send NotAcknowledged for a malformed message since we can't read it, just ignore it
  nil
rescue SchemaError, RSMP::Schemer::Error => e
  str = "Received invalid #{message.type}, schema errors: #{e.message}"
  log str, message: message, level: :warning
  notify_error e.exception("#{str} #{message.json}")
  dont_acknowledge message, str
  message
rescue InvalidMessage => e
  str = "Received", "invalid #{message.type}, #{e.message}"
  notify_error e.exception("#{str} #{message.json}")
  dont_acknowledge message, str
  message
rescue FatalError => e
  str = "Rejected #{message.type},"
  notify_error e.exception("#{str} #{message.json}")
  dont_acknowledge message, str, "#{e.message}"
  stop
  message
end

#process_version(message) ⇒ Object


392
393
# File 'lib/rsmp/proxy.rb', line 392

def process_version message
end

#process_watchdog(message) ⇒ Object


524
525
526
527
528
# File 'lib/rsmp/proxy.rb', line 524

def process_watchdog message
  log "Received #{message.type}", message: message, level: :log
  @latest_watchdog_received = Clock.now
  acknowledge message
end

#ready?Boolean

Returns:

  • (Boolean)

71
72
73
# File 'lib/rsmp/proxy.rb', line 71

def ready?
  @state == :ready
end

#revive(options) ⇒ Object


24
25
26
# File 'lib/rsmp/proxy.rb', line 24

def revive options
  setup options
end

#rsmp_versionsObject


375
376
377
378
379
# File 'lib/rsmp/proxy.rb', line 375

def rsmp_versions
  return ['3.1.5'] if @site_settings["rsmp_versions"] == 'latest'
  return ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5'] if @site_settings["rsmp_versions"] == 'all'
  @site_settings["rsmp_versions"]
end

#runObject


64
65
66
67
68
69
# File 'lib/rsmp/proxy.rb', line 64

def run
  start
  @reader.wait if @reader
ensure
  stop unless [:stopped, :stopping].include? @state
end

#send_message(message, reason = nil, validate: true) ⇒ Object


265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/rsmp/proxy.rb', line 265

def send_message message, reason=nil, validate: true
  raise IOError unless @protocol
  message.direction = :out
  message.generate_json
  message.validate get_schemas unless validate==false
  expect_acknowledgement message
  @protocol.write_lines message.json
  notify message
  log_send message, reason
rescue EOFError, IOError
  buffer_message message
rescue SchemaError, RSMP::Schemer::Error => e
  str = "Could not send #{message.type} because schema validation failed: #{e.message}"
  log str, message: message, level: :error
  notify_error e.exception("#{str} #{message.json}")
end

#send_version(site_id, rsmp_versions) ⇒ Object


431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/rsmp/proxy.rb', line 431

def send_version site_id, rsmp_versions
  if rsmp_versions=='latest'
    versions = ['3.1.5']
  elsif rsmp_versions=='all'
    versions = ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5']
  else
    versions = [rsmp_versions].flatten
  end
  versions_array = versions.map {|v| {"vers" => v} }

  site_id_array = [site_id].flatten.map {|id| {"sId" => id} }

  version_response = Version.new({
    "RSMP"=>versions_array,
    "siteId"=>site_id_array,
    "SXL"=>sxl_version
  })
  send_message version_response
end

#send_watchdog(now = Clock.now) ⇒ Object


218
219
220
221
222
# File 'lib/rsmp/proxy.rb', line 218

def send_watchdog now=Clock.now
  message = Watchdog.new( {"wTs" => clock.to_s})
  send_message message
  @latest_watchdog_send_at = now
end

#set_state(state) ⇒ Object


415
416
417
418
# File 'lib/rsmp/proxy.rb', line 415

def set_state state
  @state = state
  @state_condition.signal @state
end

#setup(options) ⇒ Object


28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rsmp/proxy.rb', line 28

def setup options
  @settings = options[:settings]
  @task = options[:task]
  @socket = options[:socket]
  @stream = options[:stream]
  @protocol = options[:protocol]
  @ip = options[:ip]
  @port = options[:port]
  @connection_info = options[:info]
  @sxl = nil
  @site_settings = nil  # can't pick until we know the site id
  @state = :stopped
end

#startObject


75
76
77
# File 'lib/rsmp/proxy.rb', line 75

def start
  set_state :starting
end

#start_readerObject


115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rsmp/proxy.rb', line 115

def start_reader  
  @reader = @task.async do |task|
    task.annotate "reader"
    @stream ||= Async::IO::Stream.new(@socket)
    @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed

    while json = @protocol.read_line
      beginning = Time.now
      message = process_packet json
      duration = Time.now - beginning
      ms = (duration*1000).round(4)
      per_second = (1.0 / duration).round
      if message
        type = message.type
        m_id = Logger.shorten_message_id(message.m_id)
      else
        type = 'Unknown'
        m_id = nil
      end
      str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ')
      log str, level: :statistics
    end
  rescue Async::Wrapper::Cancelled
    # ignore        
  rescue EOFError
    log "Connection closed", level: :warning
  rescue IOError => e
    log "IOError: #{e}", level: :warning
  rescue Errno::ECONNRESET
    log "Connection reset by peer", level: :warning
  rescue Errno::EPIPE
    log "Broken pipe", level: :warning
  rescue StandardError => e
    notify_error e, level: :internal
  end
end

#start_timerObject


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/rsmp/proxy.rb', line 162

def start_timer
  name = "timer"
  interval = @site_settings['intervals']['timer'] || 1
  log "Starting #{name} with interval #{interval} seconds", level: :debug
  @latest_watchdog_received = Clock.now

  @timer = @task.async do |task|
    task.annotate "timer"
    next_time = Time.now.to_f
    loop do
      begin
        now = Clock.now
        timer(now)
      rescue RSMP::Schemer::Error => e
        puts "Timer: Schema error: #{e}"
      rescue EOFError => e
        log "Timer: Connection closed: #{e}", level: :warning
      rescue IOError => e
        log "Timer: IOError", level: :warning
      rescue Errno::ECONNRESET
        log "Timer: Connection reset by peer", level: :warning
      rescue Errno::EPIPE => e
        log "Timer: Broken pipe", level: :warning
      rescue StandardError => e
        notify_error e, level: :internal
      end
    ensure
      next_time += interval
      duration = next_time - Time.now.to_f
      task.sleep duration
    end
  end
end

#start_watchdogObject


156
157
158
159
160
# File 'lib/rsmp/proxy.rb', line 156

def start_watchdog
  log "Starting watchdog with interval #{@site_settings['intervals']['watchdog']} seconds", level: :debug
  send_watchdog
  @watchdog_started = true
end

#stopObject


79
80
81
82
83
84
85
86
87
# File 'lib/rsmp/proxy.rb', line 79

def stop
  return if @state == :stopped
  set_state :stopping
  stop_tasks
ensure
  close_socket
  clear
  set_state :stopped
end

#stop_tasksObject


246
247
248
249
# File 'lib/rsmp/proxy.rb', line 246

def stop_tasks
  @timer.stop if @timer
  @reader.stop if @reader
end

#timer(now) ⇒ Object


196
197
198
199
200
# File 'lib/rsmp/proxy.rb', line 196

def timer now
  watchdog_send_timer now
  check_ack_timeout now
  check_watchdog_timeout now
end

#version_acknowledgedObject


540
541
# File 'lib/rsmp/proxy.rb', line 540

def version_acknowledged
end

#wait_for_acknowledgement(parent_task, options = {}, m_id) ⇒ Object


551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/rsmp/proxy.rb', line 551

def wait_for_acknowledgement parent_task, options={}, m_id
  collect(parent_task,options.merge({
    type: ['MessageAck','MessageNotAck'],
    num: 1
  })) do |message|
    if message.is_a?(MessageNotAck)
      if message.attribute('oMId') == m_id
        # set result to an exception, but don't raise it.
        # this will be returned by the task and stored as the task result
        # when the parent task call wait() on the task, the exception
        # will be raised in the parent task, and caught by rspec.
        # rspec will then show the error and record the test as failed
        m_id_short = RSMP::Message.shorten_m_id m_id, 8
        result = RSMP::MessageRejected.new "Aggregated status request #{m_id_short} was rejected: #{message.attribute('rea')}"
        next true   # done, no more messages wanted
      end
    elsif message.is_a?(MessageAck)
      next true if message.attribute('oMId') == m_id
    end
    false
  end
end

#wait_for_state(state, timeout) ⇒ Object


420
421
422
423
424
425
426
427
428
429
# File 'lib/rsmp/proxy.rb', line 420

def wait_for_state state, timeout
  states = [state].flatten
  return if states.include?(@state)
  wait_for(@state_condition,timeout) do
    states.include?(@state)
  end
  @state
rescue Async::TimeoutError
  raise RSMP::TimeoutError.new "Did not reach state #{state} within #{timeout}s"
end

#watchdog_send_timer(now) ⇒ Object


202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/rsmp/proxy.rb', line 202

def watchdog_send_timer now
  return unless @watchdog_started  
  return if @site_settings['intervals']['watchdog'] == :never
  
  if @latest_watchdog_send_at == nil
    send_watchdog now
  else
    # we add half the timer interval to pick the timer
    # event closes to the wanted wathcdog interval
    diff = now - @latest_watchdog_send_at
    if (diff + 0.5*@site_settings['intervals']['timer']) >= (@site_settings['intervals']['watchdog'])
      send_watchdog now
    end
  end
end

#will_not_handle(message) ⇒ Object


355
356
357
358
359
# File 'lib/rsmp/proxy.rb', line 355

def will_not_handle message
  reason = "since we're a #{self.class.name.downcase}" unless reason
  log "Ignoring #{message.type}, #{reason}", message: message, level: :warning
  dont_acknowledge message, nil, reason
end