Class: ETL::Engine
Overview
The main ETL engine clas
Class Attribute Summary (collapse)
-
+ (Object) average_rows_per_second
Accessor for the average rows per second processed.
-
+ (Object) batch
Access the current ETL::Execution::Batch instance.
-
+ (Object) current_destination
The current destination.
-
+ (Object) current_source
The current source.
-
+ (Object) current_source_row
The current source row.
-
+ (Object) exit_code
exit code to be passed to the command line.
-
+ (Object) job
Access the current ETL::Execution::Job instance.
-
+ (Object) limit
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch.
-
+ (Object) log_write_mode
Accessor for the log write mode.
-
+ (Object) logger
:nodoc:.
-
+ (Object) offset
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch.
-
+ (Object) read_locally
Set to true to read locally from the last source cache files.
-
+ (Object) realtime_activity
Set to true to activate realtime activity.
-
+ (Object) rows_read
Accessor for the total number of rows read from sources.
-
+ (Object) rows_written
Accessor for the total number of rows processed.
-
+ (Object) skip_bulk_import
Set to true to skip all bulk importing.
-
+ (Object) timestamped_log
Returns the value of attribute timestamped_log.
-
+ (Object) use_temp_tables
Set to true to use temp tables.
Class Method Summary (collapse)
-
+ (Object) connection(name)
Get a named connection.
-
+ (Object) finish
Called when a batch job finishes, allowing for cleanup to occur.
-
+ (Object) init(options = {})
Initialization that is run when a job is executed.
-
+ (Object) process(file)
Process the specified file.
-
+ (Object) table(table_name, connection)
Modify the table name if necessary.
-
+ (Object) temp_tables
Get a registry of temp tables.
-
+ (Object) timestamp
Get a timestamp value as a string.
-
+ (Boolean) use_temp_tables?
Return true if using temp tables.
Instance Method Summary (collapse)
-
- (Object) benchmarks
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline.
-
- (Object) errors
Array of errors encountered during execution of the ETL process.
-
- (Object) process(file)
Process a file, control object or batch object.
-
- (Object) say(message)
Say the specified message, with a newline.
-
- (Object) say_on_own_line(message)
Say the message on its own line.
-
- (Object) say_without_newline(message)
Say the specified message without a newline.
-
- (Object) track_error(control, msg)
First attempt at centralizing error notifications.
Methods included from Util
#approximate_distance_of_time_in_words, #distance_of_time_in_words
Class Attribute Details
+ (Object) average_rows_per_second
Accessor for the average rows per second processed
138 139 140 |
# File 'lib/etl/engine.rb', line 138 def average_rows_per_second @average_rows_per_second end |
+ (Object) batch
Access the current ETL::Execution::Batch instance
119 120 121 |
# File 'lib/etl/engine.rb', line 119 def batch @batch end |
+ (Object) current_destination
The current destination
97 98 99 |
# File 'lib/etl/engine.rb', line 97 def current_destination @current_destination end |
+ (Object) current_source
The current source
91 92 93 |
# File 'lib/etl/engine.rb', line 91 def current_source @current_source end |
+ (Object) current_source_row
The current source row
94 95 96 |
# File 'lib/etl/engine.rb', line 94 def current_source_row @current_source_row end |
+ (Object) exit_code
exit code to be passed to the command line
88 89 90 |
# File 'lib/etl/engine.rb', line 88 def exit_code @exit_code end |
+ (Object) job
Access the current ETL::Execution::Job instance
116 117 118 |
# File 'lib/etl/engine.rb', line 116 def job @job end |
+ (Object) limit
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit
124 125 126 |
# File 'lib/etl/engine.rb', line 124 def limit @limit end |
+ (Object) log_write_mode
Accessor for the log write mode. Default is 'a' for append.
61 62 63 |
# File 'lib/etl/engine.rb', line 61 def log_write_mode @log_write_mode end |
+ (Object) offset
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset
129 130 131 |
# File 'lib/etl/engine.rb', line 129 def offset @offset end |
+ (Object) read_locally
Set to true to read locally from the last source cache files
135 136 137 |
# File 'lib/etl/engine.rb', line 135 def read_locally @read_locally end |
+ (Object) realtime_activity
Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT
101 102 103 |
# File 'lib/etl/engine.rb', line 101 def realtime_activity @realtime_activity end |
+ (Object) rows_read
Accessor for the total number of rows read from sources
104 105 106 |
# File 'lib/etl/engine.rb', line 104 def rows_read @rows_read end |
+ (Object) rows_written
Accessor for the total number of rows processed
110 111 112 |
# File 'lib/etl/engine.rb', line 110 def rows_written @rows_written end |
+ (Object) skip_bulk_import
Set to true to skip all bulk importing
132 133 134 |
# File 'lib/etl/engine.rb', line 132 def skip_bulk_import @skip_bulk_import end |
+ (Object) timestamped_log
Returns the value of attribute timestamped_log
58 59 60 |
# File 'lib/etl/engine.rb', line 58 def @timestamped_log end |
+ (Object) use_temp_tables
Set to true to use temp tables
150 151 152 |
# File 'lib/etl/engine.rb', line 150 def use_temp_tables @use_temp_tables end |
Class Method Details
+ (Object) connection(name)
Get a named connection
141 142 143 144 145 146 147 |
# File 'lib/etl/engine.rb', line 141 def connection(name) logger.debug "Retrieving connection #{name}" conn = connections[name] ||= establish_connection(name) #conn.verify!(ActiveRecord::Base.verification_timeout) conn.reconnect! unless conn.active? conn end |
+ (Object) finish
Called when a batch job finishes, allowing for cleanup to occur
158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/etl/engine.rb', line 158 def finish temp_tables.each do |temp_table, mapping| actual_table = mapping[:table] #puts "move #{temp_table} to #{actual_table}" conn = mapping[:connection] conn.transaction do conn.rename_table(actual_table, "#{actual_table}_old") conn.rename_table(temp_table, actual_table) conn.drop_table("#{actual_table}_old") end end end |
+ (Object) init(options = {})
Initialization that is run when a job is executed.
Options:
-
:limit: Limit the number of records returned from sources
-
:offset: Specify the records for data from sources
-
:log_write_mode: If true then the log will write, otherwise it will append
-
:skip_bulk_import: Set to true to skip bulk import
-
:read_locally: Set to true to read from the local cache
-
:rails_root: Set to the rails root to boot rails
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/etl/engine.rb', line 20 def init(={}) unless @initialized puts "initializing ETL engine\n\n" @limit = [:limit] @offset = [:offset] @log_write_mode = 'w' if [:newlog] @skip_bulk_import = [:skip_bulk_import] @read_locally = [:read_locally] @rails_root = [:rails_root] require File.join(@rails_root, 'config/environment') if @rails_root [:config] ||= 'database.yml' [:config] = 'config/database.yml' unless File.exist?([:config]) database_configuration = YAML::load(ERB.new(IO.read([:config])).result + "\n") ActiveRecord::Base.configurations.merge!(database_configuration) ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration) #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}" require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end |
+ (Object) process(file)
Process the specified file. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
The process command will accept either a .ctl or .ctl.rb for a Control file or a .ebf
or .ebf.rb for an ETL Batch File.
54 55 56 |
# File 'lib/etl/engine.rb', line 54 def process(file) new().process(file) end |
+ (Object) table(table_name, connection)
Modify the table name if necessary
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/etl/engine.rb', line 177 def table(table_name, connection) if use_temp_tables? temp_table_name = "tmp_#{table_name}" if temp_tables[temp_table_name].nil? # Create the temp table and add it to the mapping begin connection.drop_table(temp_table_name); rescue; end connection.copy_table(table_name, temp_table_name) temp_tables[temp_table_name] = { :table => table_name, :connection => connection } end temp_table_name else table_name end end |
+ (Object) temp_tables
Get a registry of temp tables
153 154 155 |
# File 'lib/etl/engine.rb', line 153 def temp_tables @temp_tables ||= {} end |
+ (Object) timestamp
Get a timestamp value as a string
83 84 85 |
# File 'lib/etl/engine.rb', line 83 def Time.now.strftime("%Y%m%d%H%M%S") end |
+ (Boolean) use_temp_tables?
Return true if using temp tables
172 173 174 |
# File 'lib/etl/engine.rb', line 172 def use_temp_tables? use_temp_tables ? true : false end |
Instance Method Details
- (Object) benchmarks
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:
-
:transforms
-
:after_reads
-
:before_writes
-
:writes
254 255 256 257 258 259 260 261 |
# File 'lib/etl/engine.rb', line 254 def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end |
- (Object) errors
Array of errors encountered during execution of the ETL process
235 236 237 |
# File 'lib/etl/engine.rb', line 235 def errors @errors ||= [] end |
- (Object) process(file)
Process a file, control object or batch object. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/etl/engine.rb', line 269 def process(file) case file when String process(File.new(file)) when File case file.path when /\.ctl(\.rb)?$/; process_control(file) when /\.ebf(\.rb)?$/; process_batch(file) else raise RuntimeError, "Unsupported file type - #{file.path}" end when ETL::Control::Control process_control(file) when ETL::Batch::Batch process_batch(file) else raise RuntimeError, "Process object must be a String, File, Control instance or Batch instance" end end |
- (Object) say(message)
Say the specified message, with a newline
217 218 219 |
# File 'lib/etl/engine.rb', line 217 def say() say_without_newline( + "\n") end |
- (Object) say_on_own_line(message)
Say the message on its own line
230 231 232 |
# File 'lib/etl/engine.rb', line 230 def say_on_own_line() say("\n" + ) end |
- (Object) say_without_newline(message)
Say the specified message without a newline
222 223 224 225 226 227 |
# File 'lib/etl/engine.rb', line 222 def say_without_newline() if ETL::Engine.realtime_activity $stdout.print $stdout.flush end end |
- (Object) track_error(control, msg)
First attempt at centralizing error notifications
240 241 242 243 244 245 |
# File 'lib/etl/engine.rb', line 240 def track_error(control, msg) errors << msg control.error_handlers.each do |handler| handler.call(msg) end end |