Class: RSMP::Supervisor

Inherits:
Node
  • Object
show all
Defined in:
lib/rsmp/supervisor.rb

Instance Attribute Summary collapse

Attributes inherited from Node

#archive, #clock, #deferred, #error_queue, #task

Attributes included from Logging

#archive

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Node

#author, #check_required_settings, #defer, #do_deferred, #do_start, #exiting, #idle, #notify_error, #process_deferred, #restart, #start

Methods included from Inspect

#inspect, #inspector

Methods included from Wait

#wait_for

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options = {}) ⇒ Supervisor

Returns a new instance of Supervisor.


9
10
11
12
13
14
# File 'lib/rsmp/supervisor.rb', line 9

def initialize options={}
  handle_supervisor_settings options
  super options
  @proxies = []
  @site_id_condition = Async::Notification.new
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.


7
8
9
# File 'lib/rsmp/supervisor.rb', line 7

def logger
  @logger
end

#proxiesObject (readonly)

Returns the value of attribute proxies.


7
8
9
# File 'lib/rsmp/supervisor.rb', line 7

def proxies
  @proxies
end

#rsmp_versionsObject (readonly)

Returns the value of attribute rsmp_versions.


7
8
9
# File 'lib/rsmp/supervisor.rb', line 7

def rsmp_versions
  @rsmp_versions
end

#site_idObject (readonly)

Returns the value of attribute site_id.


7
8
9
# File 'lib/rsmp/supervisor.rb', line 7

def site_id
  @site_id
end

#supervisor_settingsObject (readonly)

Returns the value of attribute supervisor_settings.


7
8
9
# File 'lib/rsmp/supervisor.rb', line 7

def supervisor_settings
  @supervisor_settings
end

Class Method Details

.build_id_from_ip_port(ip, port) ⇒ Object


264
265
266
# File 'lib/rsmp/supervisor.rb', line 264

def self.build_id_from_ip_port ip, port
  Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end

Instance Method Details

#accept?(socket, info) ⇒ Boolean

Returns:

  • (Boolean)

108
109
110
# File 'lib/rsmp/supervisor.rb', line 108

def accept? socket, info
  true
end

#aggregated_status_changed(site_proxy, component) ⇒ Object


261
262
# File 'lib/rsmp/supervisor.rb', line 261

def aggregated_status_changed site_proxy, component
end

#authorize_ip(ip) ⇒ Object

Raises:


124
125
126
127
128
# File 'lib/rsmp/supervisor.rb', line 124

def authorize_ip ip
  return if @supervisor_settings['ips'] == 'all'
  return if @supervisor_settings['ips'].include? ip
  raise ConnectionError.new('guest ip not allowed')
end

#build_proxy(settings) ⇒ Object


112
113
114
# File 'lib/rsmp/supervisor.rb', line 112

def build_proxy settings
  SiteProxy.new settings
end

#check_max_sitesObject


130
131
132
133
134
135
136
137
# File 'lib/rsmp/supervisor.rb', line 130

def check_max_sites
  max = @supervisor_settings['max_sites']
  if max
    if @proxies.size >= max
      raise ConnectionError.new("maximum of #{max} sites already connected")
    end
  end
end

#check_site_already_connected(site_id) ⇒ Object


242
243
244
245
# File 'lib/rsmp/supervisor.rb', line 242

def check_site_already_connected site_id
  site = find_site(site_id)
  raise FatalError.new "Site '#{site_id}' already connected" if site != nil && site != self
end

#check_site_id(site_id) ⇒ Object


237
238
239
240
# File 'lib/rsmp/supervisor.rb', line 237

def check_site_id site_id
  #check_site_already_connected site_id
  return site_id_to_site_setting site_id
end

#check_site_sxl_typesObject


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rsmp/supervisor.rb', line 44

def check_site_sxl_types
  sites = @supervisor_settings['sites'].clone || {}
  sites['guest'] = @supervisor_settings['guest']
  sites.each do |site_id,settings|
    unless settings
      raise RSMP::ConfigurationError.new("Configuration for site '#{site_id}' is empty")
    end
    sxl = settings['sxl']
    unless sxl
      raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': No SXL specified")
    end
    RSMP::Schemer.find_schemas! sxl if sxl
  rescue RSMP::Schemer::UnknownSchemaError => e
    raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': #{e}")
  end
end

#close(socket, info) ⇒ Object


195
196
197
198
199
200
201
202
203
# File 'lib/rsmp/supervisor.rb', line 195

def close socket, info
  if info
    log "Connection to #{format_ip_and_port(info)} closed", ip: info[:ip], level: :info, timestamp: Clock.now
  else
    log "Connection closed", level: :info, timestamp: Clock.now
  end

  socket.close
end

#connect(socket, info) ⇒ Object


146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/rsmp/supervisor.rb', line 146

def connect socket, info
  log "Site connected from #{format_ip_and_port(info)}",
      ip: info[:ip],
      port: info[:port],
      level: :info,
      timestamp: Clock.now

  authorize_ip info[:ip]

  stream = Async::IO::Stream.new socket
  protocol = Async::IO::Protocol::Line.new stream, Proxy::WRAPPING_DELIMITER

  settings = {
    supervisor: self,
    ip: info[:ip],
    port: info[:port],
    task: @task,
    settings: {'collect'=>@supervisor_settings['collect']},
    socket: socket,
    stream: stream,
    protocol: protocol,
    info: info,
    logger: @logger,
    archive: @archive
  }

  id = peek_version_message protocol
  proxy = find_site id
  if proxy
    proxy.revive settings
  else
    check_max_sites
    proxy = build_proxy settings
    @proxies.push proxy
  end
  proxy.run     # will run until the site disconnects
ensure
  site_ids_changed
  stop if @supervisor_settings['one_shot']
end

#find_site(site_id) ⇒ Object


216
217
218
219
220
221
# File 'lib/rsmp/supervisor.rb', line 216

def find_site site_id
  @proxies.each do |site|
    return site if site_id == :any || site.site_id == site_id
  end
  nil
end

#find_site_from_ip_port(ip, port) ⇒ Object


209
210
211
212
213
214
# File 'lib/rsmp/supervisor.rb', line 209

def find_site_from_ip_port ip, port
  @proxies.each do |site|
    return site if site.ip == ip && site.port == port
  end
  nil
end

#format_ip_and_port(info) ⇒ Object


116
117
118
119
120
121
122
# File 'lib/rsmp/supervisor.rb', line 116

def format_ip_and_port info
  if @logger.settings['hide_ip_and_port']
     '********'
  else
     "#{info[:ip]}:#{info[:port]}"
  end
end

#handle_connection(socket) ⇒ Object


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rsmp/supervisor.rb', line 81

def handle_connection socket
  remote_port = socket.remote_address.ip_port
  remote_hostname = socket.remote_address.ip_address
  remote_ip = socket.remote_address.ip_address

  info = {ip:remote_ip, port:remote_port, hostname:remote_hostname, now:Clock.now}
  if accept? socket, info
    connect socket, info
  else
    reject socket, info
  end
rescue ConnectionError => e
  log "Rejected connection from #{remote_ip}:#{remote_port}, #{e.to_s}", level: :warning
  notify_error e
rescue StandardError => e
  log "Connection: #{e.to_s}", exception: e, level: :error
  notify_error e, level: :internal
ensure
  close socket, info
end

#handle_supervisor_settings(options = {}) ⇒ Object


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/rsmp/supervisor.rb', line 20

def handle_supervisor_settings options={}
  defaults = {
    'port' => 12111,
    'ips' => 'all',
    'guest' => {
      'rsmp_versions' => 'all',
      'sxl' => 'tlc',
      'intervals' => {
        'timer' => 1,
        'watchdog' => 1
      },
      'timeouts' => {
        'watchdog' => 2,
        'acknowledgement' => 2
      }
    }
  }

  # merge options into defaults
  @supervisor_settings = defaults.deep_merge(options[:supervisor_settings] || {})
  @rsmp_versions = @supervisor_settings["rsmp_versions"]
  check_site_sxl_types
end

#ip_to_site_settings(ip) ⇒ Object


257
258
259
# File 'lib/rsmp/supervisor.rb', line 257

def ip_to_site_settings ip
  @supervisor_settings['sites'][ip] || @supervisor_settings['sites']['guest']
end

#peek_version_message(protocol) ⇒ Object


139
140
141
142
143
144
# File 'lib/rsmp/supervisor.rb', line 139

def peek_version_message protocol
  json = protocol.peek_line
  attributes = Message.parse_attributes json
  message = Message.build attributes, json
  message.attribute('siteId').first['sId']
end

#reject(socket, info) ⇒ Object


191
192
193
# File 'lib/rsmp/supervisor.rb', line 191

def reject socket, info
  log "Site rejected", ip: info[:ip], level: :info
end

#site_connected?(site_id) ⇒ Boolean

Returns:

  • (Boolean)

205
206
207
# File 'lib/rsmp/supervisor.rb', line 205

def site_connected? site_id
  return find_site(site_id) != nil
end

#site_id_to_site_setting(site_id) ⇒ Object


247
248
249
250
251
252
253
254
255
# File 'lib/rsmp/supervisor.rb', line 247

def site_id_to_site_setting site_id
  return {} unless @supervisor_settings['sites']
  @supervisor_settings['sites'].each_pair do |id,settings|
    if id == 'guest' || id == site_id
      return settings
    end
  end
  raise FatalError.new "site id #{site_id} unknown"
end

#site_ids_changedObject


187
188
189
# File 'lib/rsmp/supervisor.rb', line 187

def site_ids_changed
  @site_id_condition.signal
end

#start_actionObject


61
62
63
64
65
66
67
68
69
70
# File 'lib/rsmp/supervisor.rb', line 61

def start_action
  @endpoint = Async::IO::Endpoint.tcp('0.0.0.0', @supervisor_settings["port"])
  @endpoint.accept do |socket|  # creates async tasks
    handle_connection(socket)
  rescue StandardError => e
    notify_error e, level: :internal
  end
rescue StandardError => e
  notify_error e, level: :internal
end

#startingObject


102
103
104
105
106
# File 'lib/rsmp/supervisor.rb', line 102

def starting
  log "Starting supervisor on port #{@supervisor_settings["port"]}", 
      level: :info,
      timestamp: @clock.now
end

#stopObject


72
73
74
75
76
77
78
79
# File 'lib/rsmp/supervisor.rb', line 72

def stop
  log "Stopping supervisor #{@supervisor_settings["site_id"]}", level: :info
  @proxies.each { |proxy| proxy.stop }
  @proxies.clear
  super
  @tcp_server.close if @tcp_server
  @tcp_server = nil
end

#wait_for_site(site_id, timeout) ⇒ Object


223
224
225
226
227
228
229
# File 'lib/rsmp/supervisor.rb', line 223

def wait_for_site site_id, timeout
  site = find_site site_id
  return site if site
  wait_for(@site_id_condition,timeout) { find_site site_id }
rescue Async::TimeoutError
  raise RSMP::TimeoutError.new "Site '#{site_id}' did not connect within #{timeout}s"
end

#wait_for_site_disconnect(site_id, timeout) ⇒ Object


231
232
233
234
235
# File 'lib/rsmp/supervisor.rb', line 231

def wait_for_site_disconnect site_id, timeout
  wait_for(@site_id_condition,timeout) { true unless find_site site_id }
rescue Async::TimeoutError
  raise RSMP::TimeoutError.new "Site '#{site_id}' did not disconnect within #{timeout}s"
end