Class: Wukong::Source::SourceDriver
- Inherits:
-
Local::StdioDriver
- Object
- EM::P::LineAndTextProtocol
- Local::StdioDriver
- Wukong::Source::SourceDriver
- Includes:
- Logging
- Defined in:
- lib/wukong/source/source_driver.rb
Overview
A driver which works just like the Wukong::Local::StdioDriver
except it ignores input from STDIN and instead generates its
own input records according to some periodic schedule. Each
consecutive record produced will be an incrementing positive
integer (as a string), starting with '1'.
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
The number of records after which a
Processor#finalizewill be called. -
#index ⇒ Object
The index of the record.
Attributes included from DriverMethods
Class Method Summary collapse
-
.start(label, settings = {}) ⇒ Object
Starts periodically feeding the processor or dataflow given by
labelusing the givensettings.
Instance Method Summary collapse
-
#create_event ⇒ Object
Creates a new event using the following steps:.
-
#post_init ⇒ Object
Sets the initial value of
indexto 1 and sets the batch size (only if it's positive). -
#process(record) ⇒ Object
Outputs a
recordfrom the dataflow or processor toSTDOUT.
Methods included from Logging
Methods inherited from Local::StdioDriver
add_signal_traps, #initialize, #receive_line, #setup, #unbind
Methods included from DriverMethods
#construct_dataflow, #finalize, #finalize_and_stop_dataflow, #finalize_dataflow, #send_through_dataflow, #setup, #setup_dataflow, #stop
Constructor Details
This class inherits a constructor from Wukong::Local::StdioDriver
Instance Attribute Details
#batch_size ⇒ Object
The number of records after which a Processor#finalize will
be called.
18 19 20 |
# File 'lib/wukong/source/source_driver.rb', line 18 def batch_size @batch_size end |
#index ⇒ Object
The index of the record.
14 15 16 |
# File 'lib/wukong/source/source_driver.rb', line 14 def index @index end |
Class Method Details
.start(label, settings = {}) ⇒ Object
Starts periodically feeding the processor or dataflow given by
label using the given settings.
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/wukong/source/source_driver.rb', line 33 def self.start(label, settings={}) driver = new(:foobar, label, settings) # i don't think the 1st argument matters here... driver.post_init period = case when settings[:period] then settings[:period] when settings[:per_sec] then (1.0 / settings[:per_sec]) rescue 1.0 else 1.0 end driver.create_event EventMachine::PeriodicTimer.new(period) { driver.create_event } end |
Instance Method Details
#create_event ⇒ Object
Creates a new event using the following steps:
- Feeds a record with the existing
indexto the dataflow. - Increments the
index. - Finalizes the dataflow if the number of records is a
multiple of the
batch_size.
54 55 56 57 58 |
# File 'lib/wukong/source/source_driver.rb', line 54 def create_event receive_line(index.to_s) self.index += 1 finalize_dataflow if self.batch_size && (self.index % self.batch_size) == 0 end |
#post_init ⇒ Object
Sets the initial value of index to 1 and sets the batch size
(only if it's positive).
22 23 24 25 26 |
# File 'lib/wukong/source/source_driver.rb', line 22 def post_init super() self.index = 1 self.batch_size = settings[:batch_size].to_i if settings[:batch_size] && settings[:batch_size].to_i > 0 end |
#process(record) ⇒ Object
Outputs a record from the dataflow or processor to STDOUT.
STDOUT will automatically be flushed to force output to
prevent the feeling of "no output" when the looping period is
long.
67 68 69 70 |
# File 'lib/wukong/source/source_driver.rb', line 67 def process record $stdout.puts record $stdout.flush end |