Class: Wukong::Streamer::Base
Direct Known Subclasses
Size::Reducer, WcReducer, AccumulatingReducer, IdentityMapper, IdentityReducer, JsonStreamer, LineStreamer, RankAndBinReducer, RecordStreamer, Streamer, StructStreamer
Instance Attribute Summary (collapse)
-
- (Object) own_options
readonly
Options, initially set from the command-line args -- see Script#process_argv!.
Class Method Summary (collapse)
-
+ (Object) mapper(*args, &block)
Creates a new object of this class and injects the given block as the process method.
-
+ (Object) run(options = {})
Creates a new object of this class and runs it.
Instance Method Summary (collapse)
-
- (Object) after_stream
Called exactly once, after streaming completes.
-
- (Object) bad_record!(key, *args)
To track processing errors inline, pass the line back to bad_record!.
-
- (Object) before_stream
Called exactly once, before streaming begins.
- - (Object) each_record(&block)
-
- (Object) emit(record)
Serializes the record to output.
-
- (Base) initialize(options = {})
constructor
Accepts option hash from script runner.
-
- (Object) mapper(&mapper_block)
Defines a process method on the fly to execute the given mapper.
-
- (Object) monitor
A periodic logger to track progress.
- - (Object) options
-
- (Object) process(*args) {|args| ... }
Process each record in turn, yielding the records to emit.
-
- (Object) recordize(line)
Default recordizer: returns array of fields by splitting at tabs.
-
- (Object) run(options = {})
Delegates back to Wukong to run this instance as a mapper.
-
- (Object) stream
Pass each record to #process.
- - (Object) track(record)
Constructor Details
- (Base) initialize(options = {})
Accepts option hash from script runner
12 13 14 |
# File 'lib/wukong/streamer/base.rb', line 12 def initialize ={} @own_options = end |
Instance Attribute Details
- (Object) own_options (readonly)
Options, initially set from the command-line args -- see Script#process_argv!
7 8 9 |
# File 'lib/wukong/streamer/base.rb', line 7 def @own_options end |
Class Method Details
+ (Object) mapper(*args, &block)
Creates a new object of this class and injects the given block as the process method
110 111 112 |
# File 'lib/wukong/streamer/base.rb', line 110 def self.mapper *args, &block self.new.mapper(*args, &block) end |
+ (Object) run(options = {})
Creates a new object of this class and runs it
120 121 122 |
# File 'lib/wukong/streamer/base.rb', line 120 def self.run ={} Wukong.run(self.new, nil, ) end |
Instance Method Details
- (Object) after_stream
Called exactly once, after streaming completes
49 50 |
# File 'lib/wukong/streamer/base.rb', line 49 def after_stream end |
- (Object) bad_record!(key, *args)
To track processing errors inline, pass the line back to bad_record!
83 84 85 86 |
# File 'lib/wukong/streamer/base.rb', line 83 def bad_record! key, *args warn "Bad record #{args.inspect[0..400]}" puts ["bad_record-"+key.to_s, *args].join("\t") end |
- (Object) before_stream
Called exactly once, before streaming begins
45 46 |
# File 'lib/wukong/streamer/base.rb', line 45 def before_stream end |
- (Object) each_record(&block)
40 41 42 |
# File 'lib/wukong/streamer/base.rb', line 40 def each_record &block $stdin.each(&block) end |
- (Object) emit(record)
Serializes the record to output.
Emits a single line of tab-separated fields created by calling #to_flat on the record and joining with "t".
Does no escaping or processing of the record -- that's to_flat's job, or yours if you override this method.
68 69 70 |
# File 'lib/wukong/streamer/base.rb', line 68 def emit record puts record.to_flat.join("\t") end |
- (Object) mapper(&mapper_block)
Defines a process method on the fly to execute the given mapper.
This is still experimental. Among other limitations, you can't use <tt>yield</tt> -- you have to call emit() directly.
98 99 100 101 102 103 104 105 106 |
# File 'lib/wukong/streamer/base.rb', line 98 def mapper &mapper_block @mapper_block = mapper_block.to_proc self.instance_eval do def process *args, &block instance_exec(*args, &@mapper_block) end end self end |
- (Object) monitor
A periodic logger to track progress
89 90 91 |
# File 'lib/wukong/streamer/base.rb', line 89 def monitor @monitor ||= PeriodicMonitor.new end |
- (Object) options
16 17 18 |
# File 'lib/wukong/streamer/base.rb', line 16 def Settings.deep_merge end |
- (Object) process(*args) {|args| ... }
Process each record in turn, yielding the records to emit
75 76 77 |
# File 'lib/wukong/streamer/base.rb', line 75 def process *args, &block yield(args) end |
- (Object) recordize(line)
Default recordizer: returns array of fields by splitting at tabs
55 56 57 |
# File 'lib/wukong/streamer/base.rb', line 55 def recordize line line.split("\t") rescue nil end |
- (Object) run(options = {})
Delegates back to Wukong to run this instance as a mapper
115 116 117 |
# File 'lib/wukong/streamer/base.rb', line 115 def run ={} Wukong.run(self, nil, ) end |
- (Object) stream
Pass each record to #process
23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/wukong/streamer/base.rb', line 23 def stream Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank? before_stream each_record do |line| record = recordize(line.chomp) or next process(*record) do |output_record| emit output_record end track(record) end after_stream end |
- (Object) track(record)
36 37 38 |
# File 'lib/wukong/streamer/base.rb', line 36 def track record monitor.periodically(record.to_s[0..1000]) end |