Module: Wukong::EmrCommand

Defined in:
lib/wukong/script/emr_command.rb

Overview

EMR Options

Defined Under Namespace

Classes: S3Util

Constant Summary

ABSOLUTE_URI =
%r{^/|^\w+://}

Instance Method Summary (collapse)

Instance Method Details

- (Object) bootstrap_s3_uri



118
119
120
# File 'lib/wukong/script/emr_command.rb', line 118

def bootstrap_s3_uri
  emr_s3_path(job_handle, 'bin', "emr_bootstrap.sh")
end

- (Object) copy_jars_to_cloud



42
43
44
45
# File 'lib/wukong/script/emr_command.rb', line 42

def copy_jars_to_cloud
  S3Util.store(File.expand_path('/tmp/wukong-libs.jar'), wukong_libs_s3_uri)
  # "--cache-archive=#{wukong_libs_s3_uri}#vendor",
end

- (Object) copy_script_to_cloud



35
36
37
38
39
40
# File 'lib/wukong/script/emr_command.rb', line 35

def copy_script_to_cloud
  Log.info "  Copying this script to the cloud."
  S3Util.store(this_script_filename, mapper_s3_uri)
  S3Util.store(this_script_filename, reducer_s3_uri)
  S3Util.store(File.expand_path(Settings.emr_bootstrap_script), bootstrap_s3_uri)
end

- (Object) emr_credentials



82
83
84
85
86
87
88
89
90
# File 'lib/wukong/script/emr_command.rb', line 82

def emr_credentials
  command_args = []
  if Settings.emr_credentials_file
    command_args << "--credentials #{File.expand_path(Settings.emr_credentials_file)}"
  else
    command_args << %Q{--access-id #{Settings.access_key} --private-key #{Settings.secret_access_key} }
  end
  command_args
end

- (Object) emr_s3_path(*path_segs)

Produces an s3 URI within the Wukong emr sandbox from a set of path segments

Examples:

Settings.emr_root = 's3://emr.yourmom.com/wukong'
emr_s3_path('log', 'my_happy_job', 'run-97.log')
# => "s3://emr.yourmom.com/wukong/log/my_happy_job/run-97.log"


105
106
107
# File 'lib/wukong/script/emr_command.rb', line 105

def emr_s3_path *path_segs
  File.join(Settings.emr_root, path_segs.flatten.compact)
end

- (Object) execute_emr_runner



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/wukong/script/emr_command.rb', line 53

def execute_emr_runner
  # fix_paths!
  command_args = []
  if Settings.jobflow
    command_args << Settings.dashed_flag_for(:jobflow)
  else
    command_args << "--create --name=#{job_name}"
    command_args << Settings.dashed_flag_for(:alive)
    command_args << Settings.dashed_flags(:num_instances, [:instance_type, :slave_instance_type], :master_instance_type, :hadoop_version).join(' ')
    command_args << Settings.dashed_flags(:availability_zone, :key_pair, :key_pair_file).join(' ')
    command_args << "--bootstrap-action=#{bootstrap_s3_uri}"
  end
  command_args << Settings.dashed_flags(:enable_debugging, :step_action, [:emr_runner_verbose, :verbose], [:emr_runner_debug, :debug]).join(' ')
  command_args += emr_credentials
  command_args += [
    "--log-uri=#{log_s3_uri}",
    "--stream",
    "--mapper=#{mapper_s3_uri} ",
    "--reducer=#{reducer_s3_uri} ",
    "--input=#{input_paths.join(",")} --output=#{output_path}",
  ]
  # eg to specify zero reducers:
  # Settings[:emr_extra_args] = "--arg '-D mapred.reduce.tasks=0'"
  command_args += Settings[:emr_extra_args] unless Settings[:emr_extra_args].blank?
  command_args += hadoop_options_for_emr_runner
  Log.info 'Follow along at http://localhost:9000/job'
  execute_command!( File.expand_path(Settings.emr_runner), *command_args )
end

- (Object) execute_emr_workflow



30
31
32
33
# File 'lib/wukong/script/emr_command.rb', line 30

def execute_emr_workflow
  copy_script_to_cloud
  execute_emr_runner
end

- (Object) fix_paths!

Walk through the input paths and the output path. Prepends Settings.emr_data_root to any that does NOT look like an absolute path ("/foo") or a URI ("s3://yourmom/data")



131
132
133
134
135
136
137
138
139
# File 'lib/wukong/script/emr_command.rb', line 131

def fix_paths!
  return if Settings.emr_data_root.blank?
  unless input_paths.blank?
    @input_paths = input_paths.map{|path|   (path =~ ABSOLUTE_URI) ? path : File.join(Settings.emr_data_root, path) }
  end
  unless output_path.blank?
    @output_path = [output_path].map{|path| (path =~ ABSOLUTE_URI) ? path : File.join(Settings.emr_data_root, path) }
  end
end

- (Object) hadoop_options_for_emr_runner



47
48
49
50
51
# File 'lib/wukong/script/emr_command.rb', line 47

def hadoop_options_for_emr_runner
  [hadoop_jobconf_options, hadoop_other_args].flatten.compact.uniq.map do |hdp_opt|
    hdp_opt.split(' ').map {|part| "--arg '#{part}'"}
  end.flatten
end

- (Object) job_handle

A short name for this job



93
94
95
# File 'lib/wukong/script/emr_command.rb', line 93

def job_handle
  File.basename($0,'.rb')
end

- (Object) log_s3_uri



115
116
117
# File 'lib/wukong/script/emr_command.rb', line 115

def log_s3_uri
  emr_s3_path(job_handle, 'log', 'emr_jobs')
end

- (Object) mapper_s3_uri



109
110
111
# File 'lib/wukong/script/emr_command.rb', line 109

def mapper_s3_uri
  emr_s3_path(job_handle, 'code', job_handle+'-mapper.rb')
end

- (Object) reducer_s3_uri



112
113
114
# File 'lib/wukong/script/emr_command.rb', line 112

def reducer_s3_uri
  emr_s3_path(job_handle, 'code', job_handle+'-reducer.rb')
end

- (Object) wukong_libs_s3_uri



121
122
123
# File 'lib/wukong/script/emr_command.rb', line 121

def wukong_libs_s3_uri
  emr_s3_path(job_handle, 'code', "wukong-libs.jar")
end