Module: Wukong::HadoopCommand
- Included in:
- Script
- Defined in:
- lib/wukong/script/hadoop_command.rb
Defined Under Namespace
Modules: ClassMethods
Class Method Summary (collapse)
-
+ (Object) included(base)
Standard ClassMethods-on-include trick.
Instance Method Summary (collapse)
-
- (Object) execute_hadoop_workflow
Assemble the hadoop command to execute and launch the hadoop runner to execute the script across all tasktrackers.
- - (Object) hadoop_jobconf_options
- - (Object) hadoop_other_args
- - (Object) hadoop_recycle_env
-
- (Object) hadoop_runner
The path to the hadoop runner script.
-
- (Object) jobconf(option)
emit a -jobconf hadoop option if the simplified command line arg is present if not, the resulting nil will be elided later.
Class Method Details
+ (Object) included(base)
Standard ClassMethods-on-include trick
182 183 184 185 186 |
# File 'lib/wukong/script/hadoop_command.rb', line 182 def self.included base base.class_eval do extend ClassMethods end end |
Instance Method Details
- (Object) execute_hadoop_workflow
Assemble the hadoop command to execute and launch the hadoop runner to execute the script across all tasktrackers
FIXME: Should add some simple logic to ensure that commands are in the right order or hadoop will complain. ie. -D options MUST come before others
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/wukong/script/hadoop_command.rb', line 59 def execute_hadoop_workflow # Input paths join by ',' input_paths = @input_paths.join(',') # # Use Settings[:hadoop_home] to set the path your config install. hadoop_commandline = [ hadoop_runner, "jar #{[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar", , "-D mapred.job.name='#{job_name}'", hadoop_other_args, "-mapper '#{mapper_commandline(:hadoop)}'", "-reducer '#{reducer_commandline(:hadoop)}'", "-input '#{input_paths}'", "-output '#{output_path}'", "-file '#{this_script_filename}'", hadoop_recycle_env, ].flatten.compact.join(" \t\\\n ") Log.info " Launching hadoop!" execute_command!(hadoop_commandline) end |
- (Object) hadoop_jobconf_options
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/wukong/script/hadoop_command.rb', line 81 def = [] # Fixup these options [:reuse_jvms] = '-1' if ([:reuse_jvms] == true) [:respect_exit_status] = 'false' if ([:ignore_exit_status] == true) # If no reducer and no reduce_command, then skip the reduce phase [:reduce_tasks] = 0 if (! reducer) && (! [:reduce_command]) && (! [:reduce_tasks]) # Fields hadoop should use to distribute records to reducers unless [:partition_fields].blank? += [ jobconf(:partition_fields), jobconf(:output_field_separator), ] end += [ :io_sort_mb, :io_sort_record_percent, :map_speculative, :map_tasks, :max_maps_per_cluster, :max_maps_per_node, :max_node_map_tasks, :max_node_reduce_tasks, :max_reduces_per_cluster, :max_reduces_per_node, :max_record_length, :min_split_size, :output_field_separator, :key_field_separator, :partition_fields, :sort_fields, :reduce_tasks, :respect_exit_status, :reuse_jvms, :timeout, :max_tracker_failures, :max_map_attempts, :max_reduce_attempts ].map{|opt| jobconf(opt)} .flatten.compact end |
- (Object) hadoop_other_args
112 113 114 115 116 117 118 119 120 |
# File 'lib/wukong/script/hadoop_command.rb', line 112 def hadoop_other_args extra_str_args = [ [:extra_args] ] if .split_on_xml_tag extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{.split_on_xml_tag}>,end=</#{.split_on_xml_tag}>'} end extra_str_args << ' -lazyOutput' if [:noempty] # don't create reduce file if no records extra_str_args << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless [:partition_fields].blank? extra_str_args end |
- (Object) hadoop_recycle_env
122 123 124 125 126 |
# File 'lib/wukong/script/hadoop_command.rb', line 122 def hadoop_recycle_env %w[RUBYLIB].map do |var| %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var] end.compact end |
- (Object) hadoop_runner
The path to the hadoop runner script
129 130 131 |
# File 'lib/wukong/script/hadoop_command.rb', line 129 def hadoop_runner [:hadoop_runner] || ([:hadoop_home]+'/bin/hadoop') end |
- (Object) jobconf(option)
emit a -jobconf hadoop option if the simplified command line arg is present if not, the resulting nil will be elided later
44 45 46 47 48 49 |
# File 'lib/wukong/script/hadoop_command.rb', line 44 def jobconf option if [option] # "-jobconf %s=%s" % [options.definition_of(option, :description), options[option]] "-D %s=%s" % [.definition_of(option, :description), [option]] end end |