Class: ETL::Control::Source

Inherits:
Object show all
Includes:
Enumerable
Defined in:
lib/etl/control/source.rb

Overview

ETL source. Subclasses must implement the each method.

Direct Known Subclasses

DatabaseSource, EnumerableSource, FileSource, ModelSource

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Source) initialize(control, configuration, definition)

Initialize the Source instance

  • control: The control object

  • configuration: The configuration hash

  • definition: The source layout definition

Configuration options:

  • :store_locally: Set to false to not store source data locally (defaults to true)



38
39
40
41
42
43
44
# File 'lib/etl/control/source.rb', line 38

def initialize(control, configuration, definition)
  @control = control
  @configuration = configuration
  @definition = definition
  
  @store_locally = configuration[:store_locally].nil? ? true : configuration[:store_locally]
end

Instance Attribute Details

- (Object) configuration

The configuration Hash



11
12
13
# File 'lib/etl/control/source.rb', line 11

def configuration
  @configuration
end

- (Object) control

The control object



8
9
10
# File 'lib/etl/control/source.rb', line 8

def control
  @control
end

- (Object) definition

The definition Hash



14
15
16
# File 'lib/etl/control/source.rb', line 14

def definition
  @definition
end

- (Object) local_base

Get the local base, defaults to 'source_data'



57
58
59
# File 'lib/etl/control/source.rb', line 57

def local_base
  @local_base
end

- (Object) store_locally

Returns true if the source data should be stored locally for archival Default behavior will return true.



18
19
20
# File 'lib/etl/control/source.rb', line 18

def store_locally
  @store_locally
end

Class Method Details

+ (Object) class_for_name(name)

Convert the name to a Source class.

For example if name is :database then this will return a DatabaseSource class



25
26
27
# File 'lib/etl/control/source.rb', line 25

def class_for_name(name)
  ETL::Control.const_get("#{name.to_s.camelize}Source")
end

Instance Method Details

- (Object) errors

Get an array of errors that occur during reading from the source



47
48
49
# File 'lib/etl/control/source.rb', line 47

def errors
  @errors ||= []
end

- (Object) last_local_file

Get the last fully written local file



85
86
87
# File 'lib/etl/control/source.rb', line 85

def last_local_file
  File.join(local_directory, File.basename(last_local_file_trigger, '.trig'))
end

- (Object) last_local_file_trigger

Get the last local file trigger filename using timestamp in filenames. Filename is in the format YYYYMMDDHHMMSS.csv.trig, but in the case of a file source there is an unpadded sequence number before the file extension. This code may not return the correct “last” file in that case (in particular when there are 10 or more source files). However, at this point only the database source calls the method, and it wouldn't make sense for a file source to use it if multiple files are expected



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/etl/control/source.rb', line 96

def last_local_file_trigger
  trig_files = []
  trig_ext = '.csv.trig'

  # Store the basename (without extension) of all files that end in the
  # desired extension
  Dir.glob(File.join(local_directory, "*" + trig_ext)) do |f|
      # Extract the basename of each file with the extension snipped off
      trig_files << File.basename(f, trig_ext) if File.file?(f)
  end

  # Throw an exception if no trigger files are available
  raise "Local cache trigger file not found" if trig_files.empty?

  # Sort trigger file strings and get the last one
  last_trig = trig_files.sort {|a,b| a <=> b}.last

  # Return the file path including extension
  File.join(local_directory, last_trig + trig_ext)
end

- (Object) local_directory

The local directory for storing. This method must be overriden by subclasses



66
67
68
# File 'lib/etl/control/source.rb', line 66

def local_directory
  raise "local_directory method is abstract"
end

- (Object) local_file(sequence = nil)

Return the local file for storing the raw source data. Each call to this method will result in a timestamped file, so you cannot expect to call it multiple times and reference the same file

Optional sequence can be specified if there are multiple source files



75
76
77
78
79
80
81
82
# File 'lib/etl/control/source.rb', line 75

def local_file(sequence=nil)
  filename = timestamp.to_s
  filename += sequence.to_s if sequence
  
  local_dir = local_directory
  FileUtils.mkdir_p(local_dir)
  File.join(local_dir, "#{filename}.csv")
end

- (Object) local_file_trigger(file)

Get the local trigger file that is used to indicate that the file has been completely written



119
120
121
# File 'lib/etl/control/source.rb', line 119

def local_file_trigger(file)
  Pathname.new(file.to_s + '.trig')
end

- (Object) order

Get the order of fields that this source will present to the pipeline



129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/etl/control/source.rb', line 129

def order
  order = []
  definition.each do |item|
    case item
    when Hash
      order << item[:name]
    else
      order << item
    end
  end
  order
end

- (Object) read_locally

Return true if the source should read locally.



124
125
126
# File 'lib/etl/control/source.rb', line 124

def read_locally
  Engine.read_locally
end

- (Object) timestamp

Get a timestamp value as a string



52
53
54
# File 'lib/etl/control/source.rb', line 52

def timestamp
  Engine.timestamp
end