Class: CloudCrowd::WorkUnit
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::WorkUnit
- Includes:
- ModelStatus
- Defined in:
- lib/cloud_crowd/models/work_unit.rb
Overview
A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.
Constant Summary
- MAX_RESERVATION =
We use a random number in (0…MAX_RESERVATION) to reserve work units. The size of the maximum signed integer in MySQL – SQLite has no limit.
2147483647
- RESERVATION_LIMIT =
We only reserve a certain number of WorkUnits in a single go, to avoid reserving the entire table.
25
Class Method Summary (collapse)
-
+ (Object) cancel_all_reservations
Cancels all outstanding WorkUnit reservations for all processes.
-
+ (Object) cancel_reservations(reservation)
Cancels all outstanding WorkUnit reservations for this process.
-
+ (Object) distribute_to_nodes
Attempt to send a list of WorkUnits to nodes with available capacity.
-
+ (Object) find_by_worker_name(name)
Look up a WorkUnit by the worker that's currently processing it.
-
+ (Object) reserve_available(options = {})
Reserves all available WorkUnits for this process.
-
+ (Object) start(job, action, input, status)
Convenience method for starting a new WorkUnit.
Instance Method Summary (collapse)
-
- (Object) assign_to(node_record, worker_pid)
When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.
-
- (Object) cancel_reservation
If the node can't process the unit, cancel it's reservation.
-
- (Object) fail(output, time_taken)
Mark this unit as having failed.
-
- (Object) finish(result, time_taken)
Mark this unit as having finished successfully.
-
- (Object) parsed_output(out = self.output)
All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array).
-
- (Object) to_json
The JSON representation of a WorkUnit shares the Job's options with all its cousin WorkUnits.
-
- (Object) try_again
Ever tried.
Methods included from ModelStatus
#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?
Class Method Details
+ (Object) cancel_all_reservations
Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)
93 94 95 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 93
def self.cancel_all_reservations
WorkUnit.update_all('reservation = null')
end
|
+ (Object) cancel_reservations(reservation)
Cancels all outstanding WorkUnit reservations for this process.
87 88 89 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 87
def self.cancel_reservations(reservation)
WorkUnit.reserved(reservation).update_all('reservation = null')
end
|
+ (Object) distribute_to_nodes
Attempt to send a list of WorkUnits to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.
We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size, and try to match them to Nodes that are capable of handling the Action. WorkUnits get removed from the availability list when they are successfully sent, and Nodes get removed when they are busy or have the action in question disabled.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 40
def self.distribute_to_nodes
reservation = nil
loop do
# Find the available nodes, and determine what actions we're capable
# of running at the moment.
available_nodes = NodeRecord.available
available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"
# Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
work_units = WorkUnit.reserved(reservation)
# Round robin through the nodes and units, sending the unit if the node
# is able to process it.
work_units.each do |unit|
available_nodes.each do |node|
if node.actions.include? unit.action
if node.send_work_unit unit
work_units.delete unit
available_nodes.delete node if node.busy?
break
end
end
end
end
# If we still have units at this point, or we're fresh out of nodes,
# that means we're done.
return if work_units.any? || available_nodes.empty?
end
ensure
WorkUnit.cancel_reservations(reservation) if reservation
end
|
+ (Object) find_by_worker_name(name)
Look up a WorkUnit by the worker that's currently processing it. Specified by pid@host.
99 100 101 102 103 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 99
def self.find_by_worker_name(name)
pid, host = name.split('@')
node = NodeRecord.find_by_host(host)
node && node.work_units.find_by_worker_pid(pid)
end
|
+ (Object) reserve_available(options = {})
Reserves all available WorkUnits for this process. Returns false if there were none available.
79 80 81 82 83 84 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 79
def self.reserve_available(options={})
reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
conditions = "reservation is null and node_record_id is null and status in (#{INCOMPLETE.join(',')}) and #{options[:conditions]}"
any = WorkUnit.update_all("reservation = #{reservation}", conditions, options) > 0
any && reservation
end
|
+ (Object) start(job, action, input, status)
Convenience method for starting a new WorkUnit.
106 107 108 109 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 106
def self.start(job, action, input, status)
input = input.to_json unless input.is_a? String
self.create(:job => job, :action => action, :input => input, :status => status)
end
|
Instance Method Details
- (Object) assign_to(node_record, worker_pid)
When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.
166 167 168 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 166
def assign_to(node_record, worker_pid)
update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end
|
- (Object) cancel_reservation
If the node can't process the unit, cancel it's reservation.
160 161 162 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 160
def cancel_reservation
update_attributes!(:reservation => nil)
end
|
- (Object) fail(output, time_taken)
Mark this unit as having failed. May attempt a retry.
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 136
def fail(output, time_taken)
tries = self.attempts + 1
return try_again if tries < CloudCrowd.config[:work_unit_retries]
update_attributes({
:status => FAILED,
:node_record => nil,
:worker_pid => nil,
:attempts => tries,
:output => output,
:time => time_taken
})
job && job.check_for_completion
end
|
- (Object) finish(result, time_taken)
Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 115
def finish(result, time_taken)
if splitting?
[parsed_output(result)].flatten.each do |new_input|
WorkUnit.start(job, action, new_input, PROCESSING)
end
self.destroy
job.set_next_status if job && job.done_splitting?
else
update_attributes({
:status => SUCCEEDED,
:node_record => nil,
:worker_pid => nil,
:attempts => attempts + 1,
:output => result,
:time => time_taken
})
job && job.check_for_completion
end
end
|
- (Object) parsed_output(out = self.output)
All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.
173 174 175 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 173
def parsed_output(out = self.output)
JSON.parse(out)['output']
end
|
- (Object) to_json
The JSON representation of a WorkUnit shares the Job's options with all its cousin WorkUnits.
179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 179
def to_json
{
'id' => self.id,
'job_id' => self.job_id,
'input' => self.input,
'attempts' => self.attempts,
'action' => self.action,
'options' => JSON.parse(self.job.options),
'status' => self.status
}.to_json
end
|
- (Object) try_again
Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.
151 152 153 154 155 156 157 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 151
def try_again
update_attributes({
:node_record => nil,
:worker_pid => nil,
:attempts => self.attempts + 1
})
end
|