Module: Wukong::EmrCommand
- Defined in:
- lib/wukong/script/emr_command.rb
Overview
EMR Options
Defined Under Namespace
Classes: S3Util
Constant Summary
- ABSOLUTE_URI =
%r{^/|^\w+://}
Instance Method Summary (collapse)
- - (Object) bootstrap_s3_uri
- - (Object) copy_jars_to_cloud
- - (Object) copy_script_to_cloud
- - (Object) emr_credentials
-
- (Object) emr_s3_path(*path_segs)
Produces an s3 URI within the Wukong emr sandbox from a set of path segments.
- - (Object) execute_emr_runner
- - (Object) execute_emr_workflow
-
- (Object) fix_paths!
Walk through the input paths and the output path.
- - (Object) hadoop_options_for_emr_runner
-
- (Object) job_handle
A short name for this job.
- - (Object) log_s3_uri
- - (Object) mapper_s3_uri
- - (Object) reducer_s3_uri
- - (Object) wukong_libs_s3_uri
Instance Method Details
- (Object) bootstrap_s3_uri
118 119 120 |
# File 'lib/wukong/script/emr_command.rb', line 118 def bootstrap_s3_uri emr_s3_path(job_handle, 'bin', "emr_bootstrap.sh") end |
- (Object) copy_jars_to_cloud
42 43 44 45 |
# File 'lib/wukong/script/emr_command.rb', line 42 def copy_jars_to_cloud S3Util.store(File.('/tmp/wukong-libs.jar'), wukong_libs_s3_uri) # "--cache-archive=#{wukong_libs_s3_uri}#vendor", end |
- (Object) copy_script_to_cloud
35 36 37 38 39 40 |
# File 'lib/wukong/script/emr_command.rb', line 35 def copy_script_to_cloud Log.info " Copying this script to the cloud." S3Util.store(this_script_filename, mapper_s3_uri) S3Util.store(this_script_filename, reducer_s3_uri) S3Util.store(File.(Settings.emr_bootstrap_script), bootstrap_s3_uri) end |
- (Object) emr_credentials
82 83 84 85 86 87 88 89 90 |
# File 'lib/wukong/script/emr_command.rb', line 82 def emr_credentials command_args = [] if Settings.emr_credentials_file command_args << "--credentials #{File.(Settings.emr_credentials_file)}" else command_args << %Q{--access-id #{Settings.access_key} --private-key #{Settings.secret_access_key} } end command_args end |
- (Object) emr_s3_path(*path_segs)
Produces an s3 URI within the Wukong emr sandbox from a set of path segments
105 106 107 |
# File 'lib/wukong/script/emr_command.rb', line 105 def emr_s3_path *path_segs File.join(Settings.emr_root, path_segs.flatten.compact) end |
- (Object) execute_emr_runner
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/wukong/script/emr_command.rb', line 53 def execute_emr_runner # fix_paths! command_args = [] if Settings.jobflow command_args << Settings.dashed_flag_for(:jobflow) else command_args << "--create --name=#{job_name}" command_args << Settings.dashed_flag_for(:alive) command_args << Settings.dashed_flags(:num_instances, [:instance_type, :slave_instance_type], :master_instance_type, :hadoop_version).join(' ') command_args << Settings.dashed_flags(:availability_zone, :key_pair, :key_pair_file).join(' ') command_args << "--bootstrap-action=#{bootstrap_s3_uri}" end command_args << Settings.dashed_flags(:enable_debugging, :step_action, [:emr_runner_verbose, :verbose], [:emr_runner_debug, :debug]).join(' ') command_args += emr_credentials command_args += [ "--log-uri=#{log_s3_uri}", "--stream", "--mapper=#{mapper_s3_uri} ", "--reducer=#{reducer_s3_uri} ", "--input=#{input_paths.join(",")} --output=#{output_path}", ] # eg to specify zero reducers: # Settings[:emr_extra_args] = "--arg '-D mapred.reduce.tasks=0'" command_args += Settings[:emr_extra_args] unless Settings[:emr_extra_args].blank? command_args += Log.info 'Follow along at http://localhost:9000/job' execute_command!( File.(Settings.emr_runner), *command_args ) end |
- (Object) execute_emr_workflow
30 31 32 33 |
# File 'lib/wukong/script/emr_command.rb', line 30 def execute_emr_workflow copy_script_to_cloud execute_emr_runner end |
- (Object) fix_paths!
Walk through the input paths and the output path. Prepends Settings.emr_data_root to any that does NOT look like an absolute path ("/foo") or a URI ("s3://yourmom/data")
131 132 133 134 135 136 137 138 139 |
# File 'lib/wukong/script/emr_command.rb', line 131 def fix_paths! return if Settings.emr_data_root.blank? unless input_paths.blank? @input_paths = input_paths.map{|path| (path =~ ABSOLUTE_URI) ? path : File.join(Settings.emr_data_root, path) } end unless output_path.blank? @output_path = [output_path].map{|path| (path =~ ABSOLUTE_URI) ? path : File.join(Settings.emr_data_root, path) } end end |
- (Object) hadoop_options_for_emr_runner
47 48 49 50 51 |
# File 'lib/wukong/script/emr_command.rb', line 47 def [, hadoop_other_args].flatten.compact.uniq.map do |hdp_opt| hdp_opt.split(' ').map {|part| "--arg '#{part}'"} end.flatten end |
- (Object) job_handle
A short name for this job
93 94 95 |
# File 'lib/wukong/script/emr_command.rb', line 93 def job_handle File.basename($0,'.rb') end |
- (Object) log_s3_uri
115 116 117 |
# File 'lib/wukong/script/emr_command.rb', line 115 def log_s3_uri emr_s3_path(job_handle, 'log', 'emr_jobs') end |
- (Object) mapper_s3_uri
109 110 111 |
# File 'lib/wukong/script/emr_command.rb', line 109 def mapper_s3_uri emr_s3_path(job_handle, 'code', job_handle+'-mapper.rb') end |
- (Object) reducer_s3_uri
112 113 114 |
# File 'lib/wukong/script/emr_command.rb', line 112 def reducer_s3_uri emr_s3_path(job_handle, 'code', job_handle+'-reducer.rb') end |
- (Object) wukong_libs_s3_uri
121 122 123 |
# File 'lib/wukong/script/emr_command.rb', line 121 def wukong_libs_s3_uri emr_s3_path(job_handle, 'code', "wukong-libs.jar") end |