Class: Wukong::Streamer::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/wukong/streamer/base.rb

Direct Known Subclasses

Size::Reducer, WcReducer, AccumulatingReducer, IdentityMapper, IdentityReducer, JsonStreamer, LineStreamer, RankAndBinReducer, RecordStreamer, Streamer, StructStreamer

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Base) initialize(options = {})

Accepts option hash from script runner



12
13
14
# File 'lib/wukong/streamer/base.rb', line 12

def initialize options={}
  @own_options = options
end

Instance Attribute Details

- (Object) own_options (readonly)

Options, initially set from the command-line args -- see Script#process_argv!



7
8
9
# File 'lib/wukong/streamer/base.rb', line 7

def own_options
  @own_options
end

Class Method Details

+ (Object) mapper(*args, &block)

Creates a new object of this class and injects the given block as the process method



110
111
112
# File 'lib/wukong/streamer/base.rb', line 110

def self.mapper *args, &block
  self.new.mapper(*args, &block)
end

+ (Object) run(options = {})

Creates a new object of this class and runs it



120
121
122
# File 'lib/wukong/streamer/base.rb', line 120

def self.run options={}
  Wukong.run(self.new, nil, options)
end

Instance Method Details

- (Object) after_stream

Called exactly once, after streaming completes



49
50
# File 'lib/wukong/streamer/base.rb', line 49

def after_stream
end

- (Object) bad_record!(key, *args)

To track processing errors inline, pass the line back to bad_record!



83
84
85
86
# File 'lib/wukong/streamer/base.rb', line 83

def bad_record! key, *args
  warn "Bad record #{args.inspect[0..400]}"
  puts ["bad_record-"+key.to_s, *args].join("\t")
end

- (Object) before_stream

Called exactly once, before streaming begins



45
46
# File 'lib/wukong/streamer/base.rb', line 45

def before_stream
end

- (Object) each_record(&block)



40
41
42
# File 'lib/wukong/streamer/base.rb', line 40

def each_record &block
  $stdin.each(&block)
end

- (Object) emit(record)

Serializes the record to output.

Emits a single line of tab-separated fields created by calling #to_flat on the record and joining with "t".

Does no escaping or processing of the record -- that's to_flat's job, or yours if you override this method.



68
69
70
# File 'lib/wukong/streamer/base.rb', line 68

def emit record
  puts record.to_flat.join("\t")
end

- (Object) mapper(&mapper_block)

Defines a process method on the fly to execute the given mapper.

This is still experimental. Among other limitations, you can't use <tt>yield</tt> -- you have to call emit() directly.



98
99
100
101
102
103
104
105
106
# File 'lib/wukong/streamer/base.rb', line 98

def mapper &mapper_block
  @mapper_block = mapper_block.to_proc
  self.instance_eval do
    def process *args, &block
      instance_exec(*args, &@mapper_block)
    end
  end
  self
end

- (Object) monitor

A periodic logger to track progress



89
90
91
# File 'lib/wukong/streamer/base.rb', line 89

def monitor
  @monitor ||= PeriodicMonitor.new
end

- (Object) options



16
17
18
# File 'lib/wukong/streamer/base.rb', line 16

def options
  Settings.deep_merge own_options
end

- (Object) process(*args) {|args| ... }

Process each record in turn, yielding the records to emit

Yields:

  • (args)


75
76
77
# File 'lib/wukong/streamer/base.rb', line 75

def process *args, &block
  yield(args)
end

- (Object) recordize(line)

Default recordizer: returns array of fields by splitting at tabs



55
56
57
# File 'lib/wukong/streamer/base.rb', line 55

def recordize line
  line.split("\t") rescue nil
end

- (Object) run(options = {})

Delegates back to Wukong to run this instance as a mapper



115
116
117
# File 'lib/wukong/streamer/base.rb', line 115

def run options={}
  Wukong.run(self, nil, options)
end

- (Object) stream

Pass each record to #process



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/wukong/streamer/base.rb', line 23

def stream
  Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank?
  before_stream
  each_record do |line|
    record = recordize(line.chomp) or next
    process(*record) do |output_record|
      emit output_record
    end
    track(record)
  end
  after_stream
end

- (Object) track(record)



36
37
38
# File 'lib/wukong/streamer/base.rb', line 36

def track record
  monitor.periodically(record.to_s[0..1000])
end