Class: CloudCrowd::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::Job
- Includes:
- ModelStatus
- Defined in:
- lib/cloud_crowd/models/job.rb
Overview
A chunk of work that will be farmed out into many WorkUnits to be processed in parallel by each active CloudCrowd::Worker. Jobs are defined by a list of inputs (usually public urls to files), an action (the name of a script that CloudCrowd knows how to run), and, eventually a corresponding list of output.
Constant Summary
- CLEANUP_GRACE_PERIOD =
That's a week.
7
Class Method Summary (collapse)
-
+ (Object) cleanup_all(opts = {})
Clean up all jobs beyond a certain age.
-
+ (Object) create_from_request(h)
Create a Job from an incoming JSON request, and add it to the queue.
Instance Method Summary (collapse)
-
- (Object) action_class
Retrieve the class for this Job's Action.
-
- (Boolean) all_work_units_complete?
Have all of the WorkUnits finished?.
-
- (Boolean) any_work_units_failed?
Have any of the WorkUnits failed?.
-
- (Object) check_for_completion
After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.
-
- (Object) cleanup_assets
Cleaning up after a job will remove all of its files from S3 or the filesystem.
-
- (Object) color
Generate a stable 8-bit Hex color code, based on the Job's id.
-
- (Boolean) done_splitting?
This job is done splitting if it's finished with its splitting work units.
-
- (Object) fire_callback
If a callback_url is defined, post the Job's JSON to it upon completion.
-
- (Object) gather_outputs_from_work_units
private
When the WorkUnits are all finished, gather all their outputs together before removing them from the database entirely.
-
- (Boolean) mergeable?
This job is mergeable if its Action has a merge method.
-
- (Object) percent_complete
How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards.
-
- (Object) queue_for_workers(input = nil)
private
When starting a new job, or moving to a new stage, split up the inputs into WorkUnits, and queue them.
-
- (Object) set_initial_status
private
A Job starts out either splitting or processing, depending on its action.
-
- (Object) set_next_status
Transition this Job's current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.
-
- (Boolean) splittable?
This job is splittable if its Action has a split method.
-
- (Object) time_taken
How long has this Job taken?.
-
- (Object) to_json(opts = {})
A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.
Methods included from ModelStatus
#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?
Class Method Details
+ (Object) cleanup_all(opts = {})
Clean up all jobs beyond a certain age.
35 36 37 38 39 40 |
# File 'lib/cloud_crowd/models/job.rb', line 35
def self.cleanup_all(opts = {})
days = opts[:days] || CLEANUP_GRACE_PERIOD
self.complete.older_than(days).find_in_batches(:batch_size => 100) do |jobs|
jobs.each {|job| job.destroy }
end
end
|
+ (Object) create_from_request(h)
Create a Job from an incoming JSON request, and add it to the queue.
24 25 26 27 28 29 30 31 32 |
# File 'lib/cloud_crowd/models/job.rb', line 24
def self.create_from_request(h)
self.create(
:inputs => h['inputs'].to_json,
:action => h['action'],
:options => (h['options'] || {}).to_json,
:email => h['email'],
:callback_url => h['callback_url']
)
end
|
Instance Method Details
- (Object) action_class
Retrieve the class for this Job's Action.
118 119 120 121 122 |
# File 'lib/cloud_crowd/models/job.rb', line 118
def action_class
@action_class ||= CloudCrowd.actions[self.action]
return @action_class if @action_class
raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end
|
- (Boolean) all_work_units_complete?
Have all of the WorkUnits finished?
93 94 95 |
# File 'lib/cloud_crowd/models/job.rb', line 93
def all_work_units_complete?
self.work_units.incomplete.count <= 0
end
|
- (Boolean) any_work_units_failed?
Have any of the WorkUnits failed?
98 99 100 |
# File 'lib/cloud_crowd/models/job.rb', line 98
def any_work_units_failed?
self.work_units.failed.count > 0
end
|
- (Object) check_for_completion
After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/cloud_crowd/models/job.rb', line 44
def check_for_completion
return unless all_work_units_complete?
set_next_status
outs = gather_outputs_from_work_units
return queue_for_workers([outs]) if merging?
if complete?
update_attributes(:outputs => outs, :time => time_taken)
puts "Job ##{id} (#{action}) #{display_status}." unless ENV['RACK_ENV'] == 'test'
Thread.new { fire_callback } if callback_url
end
self
end
|
- (Object) cleanup_assets
Cleaning up after a job will remove all of its files from S3 or the filesystem. Destroying a Job will cleanup_assets first. Run this in a separate thread to get out of the transaction's way. TODO: Convert this into a 'cleanup' work unit that gets run by a worker.
88 89 90 |
# File 'lib/cloud_crowd/models/job.rb', line 88
def cleanup_assets
AssetStore.new.cleanup(self)
end
|
- (Object) color
Generate a stable 8-bit Hex color code, based on the Job's id.
144 145 146 |
# File 'lib/cloud_crowd/models/job.rb', line 144
def color
@color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
end
|
- (Boolean) done_splitting?
This job is done splitting if it's finished with its splitting work units.
108 109 110 |
# File 'lib/cloud_crowd/models/job.rb', line 108
def done_splitting?
splittable? && work_units.splitting.count <= 0
end
|
- (Object) fire_callback
If a callback_url is defined, post the Job's JSON to it upon completion. The callback_url may include HTTP basic authentication, if you like:
http://user:password@example.com/job_complete
If the callback URL returns a '201 Created' HTTP status code, CloudCrowd will assume that the resource has been successfully created, and the Job will be cleaned up.
75 76 77 78 79 80 81 82 |
# File 'lib/cloud_crowd/models/job.rb', line 75
def fire_callback
begin
response = RestClient.post(callback_url, {:job => self.to_json})
Thread.new { self.destroy } if response && response.code == 201
rescue RestClient::Exception => e
puts "Job ##{id} (#{action}) failed to fire callback: #{callback_url}"
end
end
|
- (Object) gather_outputs_from_work_units (private)
When the WorkUnits are all finished, gather all their outputs together before removing them from the database entirely. Returns their merged JSON.
169 170 171 172 173 174 |
# File 'lib/cloud_crowd/models/job.rb', line 169
def gather_outputs_from_work_units
units = self.work_units.complete
outs = self.work_units.complete.map {|u| u.parsed_output }
self.work_units.complete.destroy_all
outs.to_json
end
|
- (Boolean) mergeable?
This job is mergeable if its Action has a merge method.
113 114 115 |
# File 'lib/cloud_crowd/models/job.rb', line 113
def mergeable?
self.processing? && self.action_class.public_instance_methods.map {|m| m.to_sym }.include?(:merge)
end
|
- (Object) percent_complete
How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards. This happens when there's a single large input that takes a long time to split, and when it finally does it creates a whole swarm of work units. This seems unavoidable.
129 130 131 132 133 134 135 |
# File 'lib/cloud_crowd/models/job.rb', line 129
def percent_complete
return 99 if merging?
return 100 if complete?
unit_count = work_units.count
return 100 if unit_count <= 0
(work_units.complete.count / unit_count.to_f * 100).round
end
|
- (Object) queue_for_workers(input = nil) (private)
When starting a new job, or moving to a new stage, split up the inputs into WorkUnits, and queue them. Workers will start picking them up right away.
179 180 181 182 183 |
# File 'lib/cloud_crowd/models/job.rb', line 179
def queue_for_workers(input=nil)
input ||= JSON.parse(self.inputs)
input.each {|i| WorkUnit.start(self, action, i, status) }
self
end
|
- (Object) set_initial_status (private)
A Job starts out either splitting or processing, depending on its action.
186 187 188 |
# File 'lib/cloud_crowd/models/job.rb', line 186
def set_initial_status
self.status = self.splittable? ? SPLITTING : PROCESSING
end
|
- (Object) set_next_status
Transition this Job's current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.
59 60 61 62 63 64 65 66 |
# File 'lib/cloud_crowd/models/job.rb', line 59
def set_next_status
update_attribute(:status,
any_work_units_failed? ? FAILED :
self.splitting? ? PROCESSING :
self.mergeable? ? MERGING :
SUCCEEDED
)
end
|
- (Boolean) splittable?
This job is splittable if its Action has a split method.
103 104 105 |
# File 'lib/cloud_crowd/models/job.rb', line 103
def splittable?
self.action_class.public_instance_methods.map {|m| m.to_sym }.include? :split
end
|
- (Object) time_taken
How long has this Job taken?
138 139 140 141 |
# File 'lib/cloud_crowd/models/job.rb', line 138
def time_taken
return self.time if self.time
Time.now - self.created_at
end
|
- (Object) to_json(opts = {})
A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.
150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/cloud_crowd/models/job.rb', line 150
def to_json(opts={})
atts = {
'id' => id,
'color' => color,
'status' => display_status,
'percent_complete' => percent_complete,
'work_units' => work_units.count,
'time_taken' => time_taken
}
atts['outputs'] = JSON.parse(outputs) if outputs
atts['email'] = email if email
atts.to_json
end
|