Class: CloudCrowd::Action
- Inherits:
-
Object
- Object
- CloudCrowd::Action
- Defined in:
- lib/cloud_crowd/action.rb
Overview
As you write your custom actions, have them inherit from CloudCrowd::Action. All actions must implement a process method, which should return a JSON-serializable object that will be used as the output for the work unit. See the default actions for examples.
Optionally, actions may define split and merge methods to do mapping and reducing around the input. split should return an array of URLs – to be mapped into WorkUnits and processed in parallel. In the merge step, input will be an array of all the resulting outputs from calling process.
All actions have use of an individual work_directory, for scratch files, and spend their duration inside of it, so relative paths work well.
Note that Actions inherit a backticks (`) method that raises an Exception if the external command fails.
Constant Summary
- FILE_URL =
/\Afile:\/\//
Instance Attribute Summary (collapse)
-
- (Object) file_name
readonly
Returns the value of attribute file_name.
-
- (Object) input
readonly
Returns the value of attribute input.
-
- (Object) input_path
readonly
Returns the value of attribute input_path.
-
- (Object) options
readonly
Returns the value of attribute options.
-
- (Object) work_directory
readonly
Returns the value of attribute work_directory.
Instance Method Summary (collapse)
-
- (Object) `(command)
Actions have a backticks command that raises a CommandFailed exception on failure, so that processing doesn't just blithely continue.
-
- (Object) cleanup_work_directory
After the Action has finished, we remove the work directory and return to the root directory (where workers run by default).
-
- (Object) download(url, path)
Download a file to the specified path.
-
- (Object) download_input
private
If the input is a URL, download the file before beginning processing.
-
- (Action) initialize(status, input, options, store)
constructor
Initializing an Action sets up all of the read-only variables that form the bulk of the API for action subclasses.
- - (Boolean) input_is_url? private
-
- (Object) local_storage_prefix
private
The directory prefix to use for local storage.
-
- (Object) parse_input
private
If we think that the input is JSON, replace it with the parsed form.
-
- (Object) process
Each Action subclass must implement a process method, overriding this.
-
- (Object) remote_storage_prefix
private
The directory prefix to use for remote storage.
-
- (Object) safe_filename(url)
private
Convert an unsafe URL into a filesystem-friendly filename.
-
- (Object) save(file_path)
Takes a local filesystem path, saves the file to S3, and returns the public (or authenticated) url on S3 where the file can be accessed.
Constructor Details
- (Action) initialize(status, input, options, store)
Initializing an Action sets up all of the read-only variables that form the bulk of the API for action subclasses. (Paths to read from and write to). It creates the work_directory and moves into it. If we're not merging multiple results, it downloads the input file into the work_directory before starting.
29 30 31 32 33 34 35 36 |
# File 'lib/cloud_crowd/action.rb', line 29
def initialize(status, input, options, store)
@input, @options, @store = input, options, store
@job_id, @work_unit_id = options['job_id'], options['work_unit_id']
@work_directory = File.expand_path(File.join(@store.temp_storage_path, local_storage_prefix))
FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory)
parse_input
download_input
end
|
Instance Attribute Details
- (Object) file_name (readonly)
Returns the value of attribute file_name
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def file_name
@file_name
end
|
- (Object) input (readonly)
Returns the value of attribute input
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def input
@input
end
|
- (Object) input_path (readonly)
Returns the value of attribute input_path
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def input_path
@input_path
end
|
- (Object) options (readonly)
Returns the value of attribute options
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def options
@options
end
|
- (Object) work_directory (readonly)
Returns the value of attribute work_directory
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def work_directory
@work_directory
end
|
Instance Method Details
- (Object) `(command)
Actions have a backticks command that raises a CommandFailed exception on failure, so that processing doesn't just blithely continue.
74 75 76 77 78 79 |
# File 'lib/cloud_crowd/action.rb', line 74
def `(command)
result = super(command)
exit_code = $?.to_i
raise Error::CommandFailed.new(result, exit_code) unless exit_code == 0
result
end
|
- (Object) cleanup_work_directory
After the Action has finished, we remove the work directory and return to the root directory (where workers run by default).
68 69 70 |
# File 'lib/cloud_crowd/action.rb', line 68
def cleanup_work_directory
FileUtils.rm_r(@work_directory) if File.exists?(@work_directory)
end
|
- (Object) download(url, path)
Download a file to the specified path.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/cloud_crowd/action.rb', line 44
def download(url, path)
if url.match(FILE_URL)
FileUtils.cp(url.sub(FILE_URL, ''), path)
else
File.open(path, 'w+') do |file|
Net::HTTP.get_response(URI(url)) do |response|
response.read_body do |chunk|
file.write chunk
end
end
end
end
path
end
|
- (Object) download_input (private)
If the input is a URL, download the file before beginning processing.
118 119 120 121 122 123 124 125 |
# File 'lib/cloud_crowd/action.rb', line 118
def download_input
return unless input_is_url?
Dir.chdir(@work_directory) do
@input_path = File.join(@work_directory, safe_filename(@input))
@file_name = File.basename(@input_path, File.extname(@input_path))
download(@input, @input_path)
end
end
|
- (Boolean) input_is_url? (private)
113 114 115 |
# File 'lib/cloud_crowd/action.rb', line 113
def input_is_url?
!URI.parse(@input).scheme.nil? rescue false
end
|
- (Object) local_storage_prefix (private)
The directory prefix to use for local storage.
- action]/unit_[work_unit_id
101 102 103 104 |
# File 'lib/cloud_crowd/action.rb', line 101
def local_storage_prefix
@local_storage_prefix ||= Inflector.underscore(self.class) +
(@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end
|
- (Object) parse_input (private)
If we think that the input is JSON, replace it with the parsed form. It would be great if the JSON module had an is_json? method.
108 109 110 111 |
# File 'lib/cloud_crowd/action.rb', line 108
def parse_input
return unless ['[', '{'].include? @input[0..0]
@input = JSON.parse(@input) rescue @input
end
|
- (Object) process
Each Action subclass must implement a process method, overriding this.
39 40 41 |
# File 'lib/cloud_crowd/action.rb', line 39
def process
raise NotImplementedError, "CloudCrowd::Actions must override 'process' with their own processing code."
end
|
- (Object) remote_storage_prefix (private)
The directory prefix to use for remote storage.
- action]/job_[job_id
94 95 96 97 |
# File 'lib/cloud_crowd/action.rb', line 94
def remote_storage_prefix
@remote_storage_prefix ||= Inflector.underscore(self.class) +
"/job_#{@job_id}" + (@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end
|
- (Object) safe_filename(url) (private)
Convert an unsafe URL into a filesystem-friendly filename.
85 86 87 88 89 90 |
# File 'lib/cloud_crowd/action.rb', line 85
def safe_filename(url)
url = url.sub(/\?.*\Z/, '')
ext = File.extname(url)
name = URI.unescape(File.basename(url)).gsub(/[^a-zA-Z0-9_\-.]/, '-').gsub(/-+/, '-')
File.basename(name, ext).gsub('.', '-') + ext
end
|
- (Object) save(file_path)
Takes a local filesystem path, saves the file to S3, and returns the public (or authenticated) url on S3 where the file can be accessed.
61 62 63 64 |
# File 'lib/cloud_crowd/action.rb', line 61
def save(file_path)
save_path = File.join(remote_storage_prefix, File.basename(file_path))
@store.save(file_path, save_path)
end
|