Class: CloudCrowd::Node

Inherits:
Sinatra::Base
  • Object
show all
Defined in:
lib/cloud_crowd/node.rb

Overview

A Node is a Sinatra/Thin application that runs a single instance per-machine It registers with the central server, receives WorkUnits, and forks off Workers to process them. The actions are:

get /heartbeat

Returns 200 OK to let monitoring tools know the server's up.

post /work

The central server hits /work to dispatch a WorkUnit to this Node.

Constant Summary

DEFAULT_PORT =

A Node's default port. You only run a single node per machine, so they can all use the same port without any problems.

9063
SCRAPE_UPTIME =

A list of regex scrapers, which let us extract the one-minute load average and the amount of free memory on different flavors of UNIX.

/\d+\.\d+/
SCRAPE_LINUX_MEMORY =
/MemFree:\s+(\d+) kB/
SCRAPE_MAC_MEMORY =
/Pages free:\s+(\d+)./
SCRAPE_MAC_PAGE =
/page size of (\d+) bytes/
MONITOR_INTERVAL =

The interval at which the node monitors the machine's load and memory use (if configured to do so in config.yml).

3
CHECK_IN_INTERVAL =

The interval at which the node regularly checks in with central (5 min).

300
OVERLOADED_MESSAGE =

The response sent back when this node is overloaded.

'Node Overloaded'

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Node) initialize(options = {})

When creating a node, specify the port it should run on.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/cloud_crowd/node.rb', line 67

def initialize(options={})
  super()
  require 'json'
  CloudCrowd.identity = :node
  @central          = CloudCrowd.central_server
  @host             = Socket.gethostname
  @enabled_actions  = CloudCrowd.actions.keys - (CloudCrowd.config[:disabled_actions] || [])
  @port             = options[:port] || DEFAULT_PORT
  @id               = "#{@host}:#{@port}"
  @daemon           = !!options[:daemonize]
  @tag              = options[:tag]
  @overloaded       = false
  @max_load         = CloudCrowd.config[:max_load]
  @min_memory       = CloudCrowd.config[:min_free_memory]
  start unless ENV['RACK_ENV'] == 'test'
end

Instance Attribute Details

- (Object) central (readonly)

Returns the value of attribute central



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def central
  @central
end

- (Object) enabled_actions (readonly)

Returns the value of attribute enabled_actions



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def enabled_actions
  @enabled_actions
end

- (Object) host (readonly)

Returns the value of attribute host



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def host
  @host
end

- (Object) port (readonly)

Returns the value of attribute port



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def port
  @port
end

- (Object) tag (readonly)

Returns the value of attribute tag



34
35
36
# File 'lib/cloud_crowd/node.rb', line 34

def tag
  @tag
end

Instance Method Details

- (Object) asset_store

Lazy-initialize the asset_store, preferably after the Node has launched.



124
125
126
# File 'lib/cloud_crowd/node.rb', line 124

def asset_store
  @asset_store ||= AssetStore.new
end

- (Object) check_in(critical = false)

Checking in with the central server informs it of the location and configuration of this Node. If it can't check-in, there's no point in starting.



105
106
107
108
109
110
111
112
113
114
115
# File 'lib/cloud_crowd/node.rb', line 105

def check_in(critical=false)
  @central["/node/#{@id}"].put(
    :busy             => @overloaded,
    :tag              => @tag,
    :max_workers      => CloudCrowd.config[:max_workers],
    :enabled_actions  => @enabled_actions.join(',')
  )
rescue RestClient::Exception, Errno::ECONNREFUSED
  CloudCrowd.log "Failed to connect to the central server (#{@central.to_s})."
  raise SystemExit if critical
end

- (Object) check_in_periodically (private)

If communication is interrupted for external reasons, the central server will assume that the node has gone down. Checking in will let central know it's still online.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/cloud_crowd/node.rb', line 175

def check_in_periodically
  @check_in_thread = Thread.new do
    loop do
      reply = ""
      1.upto(5).each do | attempt_number |
        # sleep for an ever increasing amount of time to prevent overloading the server
        sleep CHECK_IN_INTERVAL * attempt_number
        reply = check_in
        # if we did not receive a reply, the server has went away; it
        # will reply with an empty string if the check-in succeeds
        if reply.nil?
          CloudCrowd.log "Failed on attempt # #{attempt_number} to check in with server"
        else
          break
        end
      end
      if reply.nil?
        CloudCrowd.log "Giving up after repeated attempts to contact server"
        raise SystemExit
      end
    end
  end
end

- (Object) check_out

Before exiting, the Node checks out with the central server, releasing all of its WorkUnits for other Nodes to handle



119
120
121
# File 'lib/cloud_crowd/node.rb', line 119

def check_out
  @central["/node/#{@id}"].delete
end

- (Object) free_memory

The current amount of free memory in megabytes.



142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/cloud_crowd/node.rb', line 142

def free_memory
  case RUBY_PLATFORM
  when /darwin/
    stats = `vm_stat`
    @mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0
    stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size
  when /linux/
    `cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0
  else
    raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
  end
end

- (Object) load_average

The current one-minute load average.



137
138
139
# File 'lib/cloud_crowd/node.rb', line 137

def load_average
  `uptime`.match(SCRAPE_UPTIME).to_s.to_f
end

- (Object) monitor_system (private)

Launch a monitoring thread that periodically checks the node's load average and the amount of free memory remaining. If we transition out of the overloaded state, let central know.



161
162
163
164
165
166
167
168
169
170
# File 'lib/cloud_crowd/node.rb', line 161

def monitor_system
  @monitor_thread = Thread.new do
    loop do
      was_overloaded = @overloaded
      @overloaded = overloaded?
      check_in if was_overloaded && !@overloaded
      sleep MONITOR_INTERVAL
    end
  end
end

- (Boolean) overloaded?

Is the node overloaded? If configured, checks if the load average is greater than 'max_load', or if the available RAM is less than 'min_free_memory'.



131
132
133
134
# File 'lib/cloud_crowd/node.rb', line 131

def overloaded?
  (@max_load && load_average > @max_load) ||
  (@min_memory && free_memory < @min_memory)
end

- (Object) shut_down (private)

At shut down, de-register with the central server before exiting.



208
209
210
211
212
213
214
# File 'lib/cloud_crowd/node.rb', line 208

def shut_down
  @check_in_thread.kill if @check_in_thread
  @monitor_thread.kill if @monitor_thread
  check_out
  @server_thread.kill if @server_thread
  Process.exit
end

- (Object) start

Starting up a Node registers with the central server and begins to listen for incoming WorkUnits.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/cloud_crowd/node.rb', line 86

def start
  FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path)
  @server          = Thin::Server.new('0.0.0.0', @port, self, :signals => false)
  @server.tag      = 'cloud-crowd-node'
  @server.pid_file = CloudCrowd.pid_path('node.pid')
  @server.log_file = CloudCrowd.log_path('node.log')
  @server.daemonize if @daemon
  trap_signals
  asset_store
  @server_thread   = Thread.new { @server.start }
  check_in(true)
  check_in_periodically
  monitor_system if @max_load || @min_memory
  @server_thread.join
end

- (Object) trap_signals (private)

Trap exit signals in order to shut down cleanly.



200
201
202
203
204
205
# File 'lib/cloud_crowd/node.rb', line 200

def trap_signals
  Signal.trap('QUIT') { shut_down }
  Signal.trap('INT')  { shut_down }
  Signal.trap('KILL') { shut_down }
  Signal.trap('TERM') { shut_down }
end