Class: CloudCrowd::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
ModelStatus
Defined in:
lib/cloud_crowd/models/job.rb

Overview

A chunk of work that will be farmed out into many WorkUnits to be processed in parallel by each active CloudCrowd::Worker. Jobs are defined by a list of inputs (usually public urls to files), an action (the name of a script that CloudCrowd knows how to run), and, eventually a corresponding list of output.

Constant Summary

CLEANUP_GRACE_PERIOD =

That's a week.

7

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from ModelStatus

#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?

Class Method Details

+ (Object) cleanup_all(opts = {})

Clean up all jobs beyond a certain age.



35
36
37
38
39
40
# File 'lib/cloud_crowd/models/job.rb', line 35

def self.cleanup_all(opts = {})
  days = opts[:days] || CLEANUP_GRACE_PERIOD
  self.complete.older_than(days).find_in_batches(:batch_size => 100) do |jobs|
    jobs.each {|job| job.destroy }
  end
end

+ (Object) create_from_request(h)

Create a Job from an incoming JSON request, and add it to the queue.



24
25
26
27
28
29
30
31
32
# File 'lib/cloud_crowd/models/job.rb', line 24

def self.create_from_request(h)
  self.create(
    :inputs       => h['inputs'].to_json,
    :action       => h['action'],
    :options      => (h['options'] || {}).to_json,
    :email        => h['email'],
    :callback_url => h['callback_url']
  )
end

Instance Method Details

- (Object) action_class

Retrieve the class for this Job's Action.



118
119
120
121
122
# File 'lib/cloud_crowd/models/job.rb', line 118

def action_class
  @action_class ||= CloudCrowd.actions[self.action]
  return @action_class if @action_class
  raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end

- (Boolean) all_work_units_complete?

Have all of the WorkUnits finished?

Returns:

  • (Boolean)


93
94
95
# File 'lib/cloud_crowd/models/job.rb', line 93

def all_work_units_complete?
  self.work_units.incomplete.count <= 0
end

- (Boolean) any_work_units_failed?

Have any of the WorkUnits failed?

Returns:

  • (Boolean)


98
99
100
# File 'lib/cloud_crowd/models/job.rb', line 98

def any_work_units_failed?
  self.work_units.failed.count > 0
end

- (Object) check_for_completion

After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/cloud_crowd/models/job.rb', line 44

def check_for_completion
  return unless all_work_units_complete?
  set_next_status
  outs = gather_outputs_from_work_units
  return queue_for_workers([outs]) if merging?
  if complete?
    update_attributes(:outputs => outs, :time => time_taken)
    puts "Job ##{id} (#{action}) #{display_status}." unless ENV['RACK_ENV'] == 'test'
    Thread.new { fire_callback } if callback_url
  end
  self
end

- (Object) cleanup_assets

Cleaning up after a job will remove all of its files from S3 or the filesystem. Destroying a Job will cleanup_assets first. Run this in a separate thread to get out of the transaction's way. TODO: Convert this into a 'cleanup' work unit that gets run by a worker.



88
89
90
# File 'lib/cloud_crowd/models/job.rb', line 88

def cleanup_assets
  AssetStore.new.cleanup(self)
end

- (Object) color

Generate a stable 8-bit Hex color code, based on the Job's id.



144
145
146
# File 'lib/cloud_crowd/models/job.rb', line 144

def color
  @color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
end

- (Boolean) done_splitting?

This job is done splitting if it's finished with its splitting work units.

Returns:

  • (Boolean)


108
109
110
# File 'lib/cloud_crowd/models/job.rb', line 108

def done_splitting?
  splittable? && work_units.splitting.count <= 0
end

- (Object) fire_callback

If a callback_url is defined, post the Job's JSON to it upon completion. The callback_url may include HTTP basic authentication, if you like:

http://user:password@example.com/job_complete

If the callback URL returns a '201 Created' HTTP status code, CloudCrowd will assume that the resource has been successfully created, and the Job will be cleaned up.



75
76
77
78
79
80
81
82
# File 'lib/cloud_crowd/models/job.rb', line 75

def fire_callback
  begin
    response = RestClient.post(callback_url, {:job => self.to_json})
    Thread.new { self.destroy } if response && response.code == 201
  rescue RestClient::Exception => e
    puts "Job ##{id} (#{action}) failed to fire callback: #{callback_url}"
  end
end

- (Object) gather_outputs_from_work_units (private)

When the WorkUnits are all finished, gather all their outputs together before removing them from the database entirely. Returns their merged JSON.



169
170
171
172
173
174
# File 'lib/cloud_crowd/models/job.rb', line 169

def gather_outputs_from_work_units
  units = self.work_units.complete
  outs = self.work_units.complete.map {|u| u.parsed_output }
  self.work_units.complete.destroy_all
  outs.to_json
end

- (Boolean) mergeable?

This job is mergeable if its Action has a merge method.

Returns:

  • (Boolean)


113
114
115
# File 'lib/cloud_crowd/models/job.rb', line 113

def mergeable?
  self.processing? && self.action_class.public_instance_methods.map {|m| m.to_sym }.include?(:merge)
end

- (Object) percent_complete

How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards. This happens when there's a single large input that takes a long time to split, and when it finally does it creates a whole swarm of work units. This seems unavoidable.



129
130
131
132
133
134
135
# File 'lib/cloud_crowd/models/job.rb', line 129

def percent_complete
  return 99  if merging?
  return 100 if complete?
  unit_count = work_units.count
  return 100 if unit_count <= 0
  (work_units.complete.count / unit_count.to_f * 100).round
end

- (Object) queue_for_workers(input = nil) (private)

When starting a new job, or moving to a new stage, split up the inputs into WorkUnits, and queue them. Workers will start picking them up right away.



179
180
181
182
183
# File 'lib/cloud_crowd/models/job.rb', line 179

def queue_for_workers(input=nil)
  input ||= JSON.parse(self.inputs)
  input.each {|i| WorkUnit.start(self, action, i, status) }
  self
end

- (Object) set_initial_status (private)

A Job starts out either splitting or processing, depending on its action.



186
187
188
# File 'lib/cloud_crowd/models/job.rb', line 186

def set_initial_status
  self.status = self.splittable? ? SPLITTING : PROCESSING
end

- (Object) set_next_status

Transition this Job's current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.



59
60
61
62
63
64
65
66
# File 'lib/cloud_crowd/models/job.rb', line 59

def set_next_status
  update_attribute(:status,
    any_work_units_failed? ? FAILED     :
    self.splitting?        ? PROCESSING :
    self.mergeable?        ? MERGING    :
                             SUCCEEDED
  )
end

- (Boolean) splittable?

This job is splittable if its Action has a split method.

Returns:

  • (Boolean)


103
104
105
# File 'lib/cloud_crowd/models/job.rb', line 103

def splittable?
  self.action_class.public_instance_methods.map {|m| m.to_sym }.include? :split
end

- (Object) time_taken

How long has this Job taken?



138
139
140
141
# File 'lib/cloud_crowd/models/job.rb', line 138

def time_taken
  return self.time if self.time
  Time.now - self.created_at
end

- (Object) to_json(opts = {})

A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.



150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/cloud_crowd/models/job.rb', line 150

def to_json(opts={})
  atts = {
    'id'                => id,
    'color'             => color,
    'status'            => display_status,
    'percent_complete'  => percent_complete,
    'work_units'        => work_units.count,
    'time_taken'        => time_taken
  }
  atts['outputs'] = JSON.parse(outputs) if outputs
  atts['email']   = email               if email
  atts.to_json
end