Module: Bud

Included in:
DepAnalysis, ReblBase, VizHelper
Defined in:
lib/bud.rb,
lib/bud/aggs.rb,
lib/bud/state.rb,
lib/bud/errors.rb,
lib/bud/version.rb,
lib/bud/storage/dbm.rb,
lib/bud/collections.rb,
lib/bud/executor/join.rb,
lib/bud/executor/group.rb,
lib/bud/storage/zookeeper.rb,
lib/bud/executor/elements.rb

Overview

The root Bud module. To cause an instance of Bud to begin executing, there are three main options:

  1. Synchronously. To do this, instantiate your program and then call tick() one or more times; each call evaluates a single Bud timestep. In this mode, any network messages or timer events that occur will be buffered until the next call to tick(). This is mostly intended for “one-shot” programs that compute a single result and then terminate, or for interactively “single-stepping” through the execution of an event-driven system.

  2. In a separate thread in the foreground. To do this, instantiate your program and then call run_fg(). The Bud interpreter will then run, handling network events and evaluating new timesteps as appropriate. The run_fg() method will not return unless an error occurs.

  3. In a separate thread in the background. To do this, instantiate your program and then call run_bg(). The Bud interpreter will run asynchronously. To interact with Bud (e.g., insert additional data or inspect the state of a Bud collection), use the sync_do and async_do methods.

Most programs should use method #3. Note that in all three cases, the stop() method should be used to shutdown a Bud instance and release any resources it is using.

:main: Bud

Defined Under Namespace

Modules: PushSHOuterJoin Classes: Accum, AccumPair, Agg, ArgExemplary, Avg, BagLattice, BoolLattice, BooleanAnd, BooleanOr, BudChannel, BudCollExpr, BudCollection, BudDbmTable, BudFileReader, BudInputInterface, BudOutputInterface, BudPeriodic, BudPersistentCollection, BudReadOnly, BudScratch, BudServer, BudTable, BudTemp, BudTerminal, BudZkTable, Choose, ChooseOneRand, CompileError, Count, Error, IllegalSourceError, KeyConstraintError, Lattice, LatticePushElement, LatticeScanner, LatticeWrapper, MapLattice, Max, MaxLattice, Min, MinLattice, PositiveSetLattice, PushApplyMethod, PushArgAgg, PushEachWithIndex, PushElement, PushGroup, PushNotIn, PushPredicate, PushReduce, PushSHJoin, PushSort, PushStatefulElement, ScannerElement, SetLattice, ShutdownWithCallbacksError, Sum, TupleStruct, TypeError

Constant Summary

VERSION =
"0.9.8"

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Instance Attribute Details

- (Object) app_tables (readonly)

Returns the value of attribute app_tables



71
72
73
# File 'lib/bud.rb', line 71

def app_tables
  @app_tables
end

- (Object) budtime (readonly)

Returns the value of attribute budtime



70
71
72
# File 'lib/bud.rb', line 70

def budtime
  @budtime
end

- (Object) builtin_tables (readonly)

Returns the value of attribute builtin_tables



71
72
73
# File 'lib/bud.rb', line 71

def builtin_tables
  @builtin_tables
end

- (Object) channels (readonly)

Returns the value of attribute channels



71
72
73
# File 'lib/bud.rb', line 71

def channels
  @channels
end

- (Object) dbm_tables (readonly)

Returns the value of attribute dbm_tables



71
72
73
# File 'lib/bud.rb', line 71

def dbm_tables
  @dbm_tables
end

- (Object) done_bootstrap (readonly)

Returns the value of attribute done_bootstrap



73
74
75
# File 'lib/bud.rb', line 73

def done_bootstrap
  @done_bootstrap
end

- (Object) dsock (readonly)

Returns the value of attribute dsock



70
71
72
# File 'lib/bud.rb', line 70

def dsock
  @dsock
end

- (Object) inbound (readonly)

Returns the value of attribute inbound



70
71
72
# File 'lib/bud.rb', line 70

def inbound
  @inbound
end

- (Object) inside_tick (readonly)

Returns the value of attribute inside_tick



74
75
76
# File 'lib/bud.rb', line 74

def inside_tick
  @inside_tick
end

- (Object) lattices (readonly)

Returns the value of attribute lattices



71
72
73
# File 'lib/bud.rb', line 71

def lattices
  @lattices
end

- (Object) merge_targets (readonly)

Returns the value of attribute merge_targets



72
73
74
# File 'lib/bud.rb', line 72

def merge_targets
  @merge_targets
end

- (Object) meta_parser (readonly)

Returns the value of attribute meta_parser



70
71
72
# File 'lib/bud.rb', line 70

def meta_parser
  @meta_parser
end

- (Object) metrics

Returns the value of attribute metrics



76
77
78
# File 'lib/bud.rb', line 76

def metrics
  @metrics
end

- (Object) options (readonly)

Returns the value of attribute options



70
71
72
# File 'lib/bud.rb', line 70

def options
  @options
end

- (Object) periodics

Returns the value of attribute periodics



76
77
78
# File 'lib/bud.rb', line 76

def periodics
  @periodics
end

- (Object) push_elems (readonly)

Returns the value of attribute push_elems



72
73
74
# File 'lib/bud.rb', line 72

def push_elems
  @push_elems
end

- (Object) push_joins (readonly)

Returns the value of attribute push_joins



72
73
74
# File 'lib/bud.rb', line 72

def push_joins
  @push_joins
end

- (Object) push_sources (readonly)

Returns the value of attribute push_sources



72
73
74
# File 'lib/bud.rb', line 72

def push_sources
  @push_sources
end

- (Object) qualified_name

Returns the value of attribute qualified_name



77
78
79
# File 'lib/bud.rb', line 77

def qualified_name
  @qualified_name
end

- (Object) rtracer (readonly)

Returns the value of attribute rtracer



70
71
72
# File 'lib/bud.rb', line 70

def rtracer
  @rtracer
end

- (Object) running_async (readonly)

Returns the value of attribute running_async



78
79
80
# File 'lib/bud.rb', line 78

def running_async
  @running_async
end

- (Object) scanners (readonly)

Returns the value of attribute scanners



72
73
74
# File 'lib/bud.rb', line 72

def scanners
  @scanners
end

- (Object) stratified_rules

Returns the value of attribute stratified_rules



75
76
77
# File 'lib/bud.rb', line 75

def stratified_rules
  @stratified_rules
end

- (Object) tables (readonly)

Returns the value of attribute tables



71
72
73
# File 'lib/bud.rb', line 71

def tables
  @tables
end

- (Object) this_rule_context (readonly)

Returns the value of attribute this_rule_context



73
74
75
# File 'lib/bud.rb', line 73

def this_rule_context
  @this_rule_context
end

- (Object) this_stratum (readonly)

Returns the value of attribute this_stratum



73
74
75
# File 'lib/bud.rb', line 73

def this_stratum
  @this_stratum
end

- (Object) viz (readonly)

Returns the value of attribute viz



70
71
72
# File 'lib/bud.rb', line 70

def viz
  @viz
end

- (Object) zk_tables (readonly)

Returns the value of attribute zk_tables



71
72
73
# File 'lib/bud.rb', line 71

def zk_tables
  @zk_tables
end

Instance Method Details

- (Object) accum(x)

aggregate method to be used in Bud::BudCollection.group. accumulates all x inputs into a set.



207
208
209
# File 'lib/bud/aggs.rb', line 207

def accum(x)
  [Accum.new, x]
end

- (Object) accum_pair(x, y)

aggregate method to be used in Bud::BudCollection.group. accumulates x, y inputs into a set of pairs (two element arrays).



223
224
225
# File 'lib/bud/aggs.rb', line 223

def accum_pair(x, y)
  [AccumPair.new, x, y]
end

- (Object) async_do

Like sync_do, but does not block the caller's thread: the given callback will be invoked at some future time. Note that calls to async_do respect FIFO order.



818
819
820
821
822
823
824
# File 'lib/bud.rb', line 818

def async_do
  EventMachine::schedule do
    yield if block_given?
    # Do another tick, in case the user-supplied block inserted any data
    tick_internal
  end
end

- (Object) avg(x)

aggregate method to be used in Bud::BudCollection.group. computes average of a multiset of x values



191
192
193
# File 'lib/bud/aggs.rb', line 191

def avg(x)
  [Avg.new, x]
end

- (Object) bool_and(x)



81
82
83
# File 'lib/bud/aggs.rb', line 81

def bool_and(x)
  [BooleanAnd.new, x]
end

- (Object) bool_or(x)



95
96
97
# File 'lib/bud/aggs.rb', line 95

def bool_or(x)
  [BooleanOr.new, x]
end

- (Object) bootstrap

give empty defaults for these



620
621
# File 'lib/bud.rb', line 620

def bootstrap # :nodoc: all
end

- (Object) bud_clock

Returns the wallclock time associated with the current Bud tick. That is, this value is guaranteed to remain the same for the duration of a single tick, but will likely change between ticks.

Raises:



1181
1182
1183
1184
1185
# File 'lib/bud.rb', line 1181

def bud_clock
  raise Bud::Error, "bud_clock undefined outside tick" unless @inside_tick
  @tick_clock_time ||= Time.now
  @tick_clock_time
end

- (Object) cancel_shutdown_cb(id)



778
779
780
781
782
783
# File 'lib/bud.rb', line 778

def cancel_shutdown_cb(id)
  schedule_and_wait do
    raise Bud::Error unless @shutdown_callbacks.has_key? id
    @shutdown_callbacks.delete(id)
  end
end

- (Object) channel(name, schema = nil, loopback = false)

declare a transient network collection. default schema [:address, :val] => []



115
116
117
118
119
# File 'lib/bud/state.rb', line 115

def channel(name, schema=nil, loopback=false)
  define_collection(name)
  @tables[name] = Bud::BudChannel.new(name, self, schema, loopback)
  @channels[name] = @tables[name]
end

- (Object) choose(x)

exemplary aggregate method to be used in Bud::BudCollection.group. arbitrarily but deterministically chooses among x entries being aggregated.



114
115
116
# File 'lib/bud/aggs.rb', line 114

def choose(x)
  [Choose.new, x]
end

- (Object) choose_rand(x = nil)

exemplary aggregate method to be used in Bud::BudCollection.group. randomly chooses among x entries being aggregated.



144
145
146
# File 'lib/bud/aggs.rb', line 144

def choose_rand(x=nil)
  [ChooseOneRand.new, x]
end

- (Object) coll_expr(name, expr, schema = nil)

declare a collection-generating expression. default schema [:key] => [:val].



64
65
66
67
# File 'lib/bud/state.rb', line 64

def coll_expr(name, expr, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudCollExpr.new(name, self, expr, schema)
end

- (Object) collection_stratum(collection)

Return the stratum number of the given collection. NB: if a collection does not appear on the lhs or rhs of any rules, it is not currently assigned to a strata.



1190
1191
1192
1193
1194
1195
1196
# File 'lib/bud.rb', line 1190

def collection_stratum(collection)
  t_stratum.each do |t|
    return t.stratum if t.predicate == collection
  end

  return nil
end

- (Object) count(x = nil)

aggregate method to be used in Bud::BudCollection.group. counts number of entries aggregated. argument is ignored.



171
172
173
# File 'lib/bud/aggs.rb', line 171

def count(x=nil)
  [Count.new]
end

- (Object) delta(out_tbl)

A common special case for sync_callback: block on a delta to a table.



908
909
910
# File 'lib/bud.rb', line 908

def delta(out_tbl)
  sync_callback(nil, nil, out_tbl)
end

- (Object) do_bootstrap

Evaluate all bootstrap blocks and tick deltas



1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
# File 'lib/bud.rb', line 1052

def do_bootstrap
  # Evaluate bootstrap for imported modules
  @this_rule_context = self
  imported = import_defs.keys
  imported.each do |mod_alias|
    wrapper = import_instance mod_alias
    wrapper.do_bootstrap
  end
  self.class.ancestors.reverse.each do |anc|
    anc.instance_methods(false).each do |m|
      if /^__bootstrap__/.match m
        self.method(m.to_sym).call
      end
    end
  end
  bootstrap

  if toplevel == self
    @tables.each_value {|t| t.bootstrap}
    @lattices.each_value {|l| l.bootstrap}
  end
  @done_bootstrap = true
end

- (Object) do_invalidate_rescan



1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
# File 'lib/bud.rb', line 1076

def do_invalidate_rescan
  @default_rescan.each {|elem| elem.rescan = true}
  @default_invalidate.each {|elem|
    elem.invalidated = true
    # Call tick on tables here itself. The rest below
    elem.invalidate_cache unless elem.class <= PushElement
  }

  # The following loop invalidates additional (non-default) elements and
  # tables that depend on the run-time invalidation state of a table.  Loop
  # once to set the flags.
  each_scanner do |scanner, stratum|
    if scanner.rescan
      scanner.rescan_set.each {|e| e.rescan = true}
      scanner.invalidate_set.each {|e|
        e.invalidated = true
        e.invalidate_cache unless e.class <= PushElement
      }
    end
  end

  # Loop a second time to actually call invalidate_cache.  We can't merge this
  # with the loops above because some versions of invalidate_cache (e.g.,
  # join) depend on the rescan state of other elements.
  @num_strata.times do |stratum|
    @push_sorted_elems[stratum].each {|e| e.invalidate_cache if e.invalidated}
  end
end

- (Object) do_register_callback(tbl_name, &blk)

Raises:



848
849
850
851
852
853
854
855
856
857
858
# File 'lib/bud.rb', line 848

def do_register_callback(tbl_name, &blk)
  unless @tables.has_key? tbl_name
    raise Bud::Error, "no such collection: #{tbl_name}"
  end

  raise Bud::Error if @callbacks.has_key? @callback_id
  @callbacks[@callback_id] = [tbl_name, blk]
  cb_id = @callback_id
  @callback_id += 1
  return cb_id
end

- (Object) file_reader(name, filename)

declare a collection to be read from filename. rhs of statements only



131
132
133
134
# File 'lib/bud/state.rb', line 131

def file_reader(name, filename)
  define_collection(name)
  @tables[name] = Bud::BudFileReader.new(name, filename, self)
end

- (Object) import_instance(name)



198
199
200
201
# File 'lib/bud.rb', line 198

def import_instance(name)
  name = "@" + name.to_s
  instance_variable_get(name) if instance_variable_defined? name
end

- (Object) initialize(options = {})

options to the Bud runtime are passed in a hash, with the following keys

  • network configuration

    • :ip IP address string for this instance

    • :port port number for this instance

    • :ext_ip IP address at which external nodes can contact this instance

    • :ext_port port number to go with :ext_ip

  • operating system interaction

    • :stdin if non-nil, reading from the stdio collection results in reading from this IO handle

    • :stdout writing to the stdio collection results in writing to this IO handle; defaults to $stdout

    • :signal_handling how to handle SIGINT and SIGTERM. If :none, these signals are ignored. Else shutdown all bud instances.

  • tracing and output

    • :quiet if true, suppress certain messages

    • :trace if true, generate budvis outputs

    • :rtrace if true, generate budplot outputs

    • :dump_rewrite if true, dump results of internal rewriting of Bloom code to a file

    • :print_wiring if true, print the wiring diagram of the program to stdout

    • :metrics if true, dumps a hash of internal performance metrics

  • controlling execution

    • :tag a name for this instance, suitable for display during tracing and visualization

    • :channel_filter a code block that can be used to filter the network messages delivered to this Bud instance. At the start of each tick, the code block is invoked for every channel with any incoming messages; the code block is passed the name of the channel and an array containing the inbound messages. It should return a two-element array containing “accepted” and “postponed” messages, respectively. Accepted messages are delivered during this tick, and postponed messages are buffered and passed to the filter in subsequent ticks. Any messages that aren't in either array are dropped.

  • storage configuration

    • :dbm_dir filesystem directory to hold DBM-backed collections

    • :dbm_truncate if true, DBM-backed collections are opened with OTRUNC



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/bud.rb', line 111

def initialize(options={})
  options[:dump_rewrite] ||= ENV["BUD_DUMP_REWRITE"].to_i > 0
  options[:dump_ast]     ||= ENV["BUD_DUMP_AST"].to_i > 0
  options[:print_wiring] ||= ENV["BUD_PRINT_WIRING"].to_i > 0
  @qualified_name = ""
  @tables = {}
  @lattices = {}
  @channels = {}
  @dbm_tables = {}
  @zk_tables = {}
  @stratified_rules = []
  @push_elems = {}
  @callbacks = {}
  @callback_id = 0
  @shutdown_callbacks = {}
  @shutdown_callback_id = 0
  @post_shutdown_callbacks = []
  @timers = []
  @app_tables = []
  @inside_tick = false
  @tick_clock_time = nil
  @budtime = 0
  @inbound = {}
  @done_bootstrap = false
  @done_wiring = false
  @instance_id = ILLEGAL_INSTANCE_ID # Assigned when we start running
  @metrics = {}
  @endtime = nil
  @this_stratum = -1
  @this_rule_id = -1
  @push_sorted_elems = nil
  @running_async = false
  @bud_started = false

  # Setup options (named arguments), along with default values
  @options = options.clone
  @options[:ip] ||= "127.0.0.1"
  @ip = @options[:ip]
  @options[:port] ||= 0
  @options[:port] = @options[:port].to_i
  # NB: If using an ephemeral port (specified by port = 0), the actual port
  # number won't be known until we start EM

  load_lattice_defs
  builtin_state
  resolve_imports
  call_state_methods

  @viz = VizOnline.new(self) if @options[:trace]
  @rtracer = RTrace.new(self) if @options[:rtrace]

  do_rewrite
  if toplevel == self
    # initialize per-stratum state
    @num_strata = @stratified_rules.length
    @scanners = @num_strata.times.map{{}}
    @push_sources = @num_strata.times.map{{}}
    @push_joins = @num_strata.times.map{[]}
    @merge_targets = @num_strata.times.map{Set.new}
  end
end

- (Object) input

:nodoc: all



42
43
44
# File 'lib/bud/state.rb', line 42

def input # :nodoc: all
  true
end

- (Object) inspect

XXX make tag specific



913
914
915
# File 'lib/bud.rb', line 913

def inspect
  "#{self.class}:#{self.object_id.to_s(16)}"
end

- (Object) int_ip_port

Returns the internal IP and port. See ip_port.

Raises:



1041
1042
1043
1044
# File 'lib/bud.rb', line 1041

def int_ip_port
  raise Bud::Error, "int_ip_port called before port defined" if @port.nil? and @options[:port] == 0
  @port.nil? ? "#{@ip}:#{@options[:port]}" : "#{@ip}:#{@port}"
end

- (Object) interface(mode, name, schema = nil)

declare a transient collection to be an input or output interface



51
52
53
54
55
# File 'lib/bud/state.rb', line 51

def interface(mode, name, schema=nil)
  define_collection(name)
  t_provides << [name.to_s, mode]
  @tables[name] = (mode ? BudInputInterface : BudOutputInterface).new(name, self, schema)
end

- (Object) interfaces(direction, collections)

an alternative approach to declaring interfaces



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/bud/state.rb', line 157

def interfaces(direction, collections)
  mode = case direction
    when :input then true
    when :output then false
  else
    raise Bud::CompileError, "unrecognized interface type #{direction}"
  end
  collections.each do |tab|
    t_provides << [tab.to_s, mode]
  end 
end

- (Object) ip



1030
1031
1032
# File 'lib/bud.rb', line 1030

def ip
  options[:ext_ip] ? "#{@options[:ext_ip]}" : "#{@ip}"
end

- (Object) ip_port

Returns the IP and port of the Bud instance as a string. In addition to the local IP and port, the user may define an external IP and/or port. The external version of each is returned if available. If not, the local version is returned. There are use cases for mixing and matching local and external. local_ip:external_port would be if you have local port forwarding, and external_ip:local_port would be if you're in a DMZ, for example.

Raises:



1025
1026
1027
1028
# File 'lib/bud.rb', line 1025

def ip_port
  raise Bud::Error, "ip_port called before port defined" if port.nil?
  ip.to_s + ":" + port.to_s
end

- (Object) load_lattice_defs

Define methods to implement the state declarations for every registered kind of lattice.



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/bud/state.rb', line 171

def load_lattice_defs
  Bud::Lattice.global_mfuncs.each do |m|
    next if RuleRewriter::MONOTONE_WHITELIST.include? m
    if Bud::BudCollection.instance_methods.include? m.to_s
      puts "monotone method #{m} conflicts with non-monotonic method in BudCollection"
    end
  end

  Bud::Lattice.global_morphs.each do |m|
    next if RuleRewriter::MONOTONE_WHITELIST.include? m
    if Bud::BudCollection.instance_methods.include? m.to_s
      puts "morphism #{m} conflicts with non-monotonic method in BudCollection"
    end
  end

  # Sanity-check lattice definitions
  # XXX: We should do this only once per lattice
  Bud::Lattice.lattice_kinds.each do |wrap_name, klass|
    unless klass.method_defined? :merge
      raise Bud::CompileError, "lattice #{wrap_name} does not define a merge function"
    end

    # If a method is marked as monotone in any lattice, every lattice that
    # declares a method of that name must also mark it as monotone.
    meth_list = klass.instance_methods(false).to_set
    Bud::Lattice.global_mfuncs.each do |m|
      next unless meth_list.include? m.to_s
      unless klass.mfuncs.include? m
        raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone"
      end
    end

    # Apply a similar check for morphs
    Bud::Lattice.global_morphs.each do |m|
      next unless meth_list.include? m.to_s
      unless klass.morphs.include? m
        raise Bud::CompileError, "method #{m} in #{wrap_name} must be a morph"
      end
    end

    # Similarly, check for non-monotone lattice methods that are found in the
    # builtin list of monotone operators. The "merge" method is implicitly
    # monotone (XXX: should it be declared as a morph or monotone function?)
    meth_list.each do |m_str|
      m = m_str.to_sym
      next unless RuleRewriter::MONOTONE_WHITELIST.include? m
      # XXX: ugly hack. We want to allow lattice class implementations to
      # define their own equality semantics.
      next if m == :==
      unless klass.mfuncs.include?(m) || klass.morphs.include?(m) || m == :merge
        raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone"
      end
    end

    # XXX: replace "self" with toplevel?
    self.singleton_class.send(:define_method, wrap_name) do |lat_name|
      define_lattice(lat_name)
      @lattices[lat_name] = Bud::LatticeWrapper.new(lat_name, klass, self)
    end
  end
end

- (Object) loopback(name, schema = nil)

declare a transient network collection that delivers facts back to the current Bud instance. This is syntax sugar for a channel that always delivers to the IP/port of the current Bud instance. Default schema [:key] => [:val]



125
126
127
128
# File 'lib/bud/state.rb', line 125

def loopback(name, schema=nil)
  schema ||= {[:key] => [:val]}
  channel(name, schema, true)
end

- (Object) max(x)

exemplary aggregate method to be used in Bud::BudCollection.group. computes maximum of x entries aggregated.



67
68
69
# File 'lib/bud/aggs.rb', line 67

def max(x)
  [Max.new, x]
end

- (Object) min(x)

exemplary aggregate method to be used in Bud::BudCollection.group. computes minimum of x entries aggregated.



50
51
52
# File 'lib/bud/aggs.rb', line 50

def min(x)
  [Min.new, x]
end

- (Object) module_wrapper_class(mod)



173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/bud.rb', line 173

def module_wrapper_class(mod)
  class_name = "#{mod}__wrap"
  begin
    klass = Module.const_get(class_name.to_sym)
    unless klass.is_a? Class
      raise Bud::Error, "internal error: #{class_name} is in use"
    end
  rescue NameError # exception if class class_name doesn't exist
  end
  klass ||= eval "class #{class_name}; include Bud; include #{mod}; end"
  klass
end

- (Object) on_shutdown(&blk)

Register a callback that will be invoked when this instance of Bud is shutting down. XXX: The naming of this method (and cancel_shutdown_cb) is inconsistent with the naming of register_callback and friends.



766
767
768
769
770
771
772
773
774
775
776
# File 'lib/bud.rb', line 766

def on_shutdown(&blk)
  # Start EM if not yet started
  start_reactor
  rv = nil
  schedule_and_wait do
    rv = @shutdown_callback_id
    @shutdown_callbacks[@shutdown_callback_id] = blk
    @shutdown_callback_id += 1
  end
  return rv
end

- (Object) output

:nodoc: all



46
47
48
# File 'lib/bud/state.rb', line 46

def output # :nodoc: all
  false
end

- (Object) pause



698
699
700
701
702
# File 'lib/bud.rb', line 698

def pause
  schedule_and_wait do
    @running_async = false
  end
end

- (Object) periodic(name, period = 1)

declare a collection to be auto-populated every period seconds. schema [:key] => [:val]. rhs of statements only.

Raises:



138
139
140
141
142
143
# File 'lib/bud/state.rb', line 138

def periodic(name, period=1)
  define_collection(name)
  raise Bud::Error if @periodics.has_key? [name]
  @periodics << [name, period]
  @tables[name] = Bud::BudPeriodic.new(name, self)
end

- (Object) port



1034
1035
1036
1037
1038
# File 'lib/bud.rb', line 1034

def port
  return nil if @port.nil? and @options[:port] == 0 and not @options[:ext_port]
  return @options[:ext_port] ? @options[:ext_port] :
    (@port.nil? ? @options[:port] : @port)
end

- (Object) post_shutdown(&blk)

Register a callback that will be invoked when after this instance of Bud has been shutdown.



787
788
789
790
791
792
793
# File 'lib/bud.rb', line 787

def post_shutdown(&blk)
  # Start EM if not yet started
  start_reactor
  schedule_and_wait do
    @post_shutdown_callbacks << blk
  end
end

- (Object) readonly(name, schema = nil)



102
103
104
105
# File 'lib/bud/state.rb', line 102

def readonly(name, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudReadOnly.new(name, self, schema)
end

- (Object) register_callback(tbl_name, &blk)

Register a new callback. Given the name of a Bud collection, this method arranges for the given block to be invoked at the end of any tick in which any tuples have been inserted into the specified collection. The code block is passed the collection as an argument; this provides a convenient way to examine the tuples inserted during that fixpoint. (Note that because the Bud runtime is blocked while the callback is invoked, it can also examine any other Bud state freely.)

Note that registering callbacks on persistent collections (e.g., tables, syncs and stores) is probably not wise: as long as any tuples are stored in the collection, the callback will be invoked at the end of every tick.



837
838
839
840
841
842
843
844
845
846
# File 'lib/bud.rb', line 837

def register_callback(tbl_name, &blk)
  # We allow callbacks to be added before or after EM has been started. To
  # simplify matters, we start EM if it hasn't been started yet.
  start_reactor
  cb_id = nil
  schedule_and_wait do
    cb_id = do_register_callback(tbl_name, &blk)
  end
  return cb_id
end

- (Object) run_bg

Run Bud in the background (in a different thread). This means that the Bud interpreter will run asynchronously from the caller, so care must be used when interacting with it. For example, it is not safe to directly examine Bud collections from the caller's thread (see async_do and sync_do).

This instance of Bud will run until stop() is called.



637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/bud.rb', line 637

def run_bg
  start

  schedule_and_wait do
    if @running_async
      raise Bud::Error, "run_bg called on already-running Bud instance"
    end
    @running_async = true

    # Consume any events received while we weren't running async
    tick_internal
  end

  @rtracer.sleep if options[:rtrace]
end

- (Object) run_fg

Run Bud in the “foreground” – the caller's thread will be used to run the Bud interpreter. This means this method won't return unless an error occurs or Bud is halted. It is often more useful to run Bud in a different thread: see run_bg.



708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
# File 'lib/bud.rb', line 708

def run_fg
  # If we're called from the EventMachine thread (and EM is running), blocking
  # the current thread would imply deadlocking ourselves.
  if Thread.current == EventMachine::reactor_thread and EventMachine::reactor_running?
    raise Bud::Error, "cannot invoke run_fg from inside EventMachine"
  end

  q = Queue.new
  # Note that this must be a post-shutdown callback: if this is the only
  # thread, then the program might exit after run_fg() returns. If run_fg()
  # blocked on a normal shutdown callback, the program might exit before the
  # other shutdown callbacks have a chance to run.
  post_shutdown do
    q.push(true)
  end

  run_bg
  # Block caller's thread until Bud has shutdown
  q.pop
  report_metrics if options[:metrics]
end

- (Object) scratch(name, schema = nil)

declare a transient collection. default schema [:key] => [:val]



97
98
99
100
# File 'lib/bud/state.rb', line 97

def scratch(name, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudScratch.new(name, self, schema)
end

- (Object) singleton_class

metaprogramming support for ruby and for rule rewriting helper to define instance methods



625
626
627
# File 'lib/bud.rb', line 625

def singleton_class # :nodoc: all
  class << self; self; end
end

- (Object) start(do_tick = false)

Startup a Bud instance if it is not currently started. This starts EventMachine (if needed) and binds to a UDP server socket. If do_tick is true, we also execute a single Bloom timestep. Regardless, calling this method does NOT cause Bud to begin running asynchronously (see run_bg).



657
658
659
660
661
662
663
# File 'lib/bud.rb', line 657

def start(do_tick=false)
  start_reactor
  schedule_and_wait do
    do_startup unless @bud_started
    tick_internal if do_tick
  end
end

- (Object) stop(stop_em = false, do_shutdown_cb = true) Also known as: stop_bg

Shutdown a Bud instance and release any resources that it was using. This method blocks until Bud has been shutdown. If stop_em is true, the EventMachine event loop is also shutdown; this will interfere with the execution of any other Bud instances in the same process (as well as anything else that happens to use EventMachine). We always shutdown the EM loop if there are no more running Bud instances (this does interfere with other EM-using apps, but it is necessary).



737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
# File 'lib/bud.rb', line 737

def stop(stop_em=false, do_shutdown_cb=true)
  schedule_and_wait do
    do_shutdown(do_shutdown_cb)
  end

  # If we're shutting down the last active Bud instance, shutdown the EM event
  # loop as well. This is probably good practice in general, but it also
  # prevents weird EM behavior -- it seems as though EM::ConnectionNotBound
  # exceptions can be raised if the EM event loop is left running and
  # subsequent events arrive.
  $signal_lock.synchronize {
    stop_em = true if $bud_instances.empty? and EventMachine::reactor_running?
  }

  if stop_em
    Bud.stop_em_loop
    unless Thread.current == EventMachine::reactor_thread
      EventMachine::reactor_thread.join
    end
  end

  report_metrics if options[:metrics]
end

- (Object) store(name, storage, schema = nil)



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/bud/state.rb', line 81

def store(name, storage, schema=nil)
  define_collection(name)
  case storage
  when :zookeeper
    # treat "schema" as a hash of options
    options = schema
    raise Bud::Error, "Zookeeper tables require a :path option" if options[:path].nil?
    options[:addr] ||= "localhost:2181"
    @tables[name] = Bud::BudZkTable.new(name, options[:path], options[:addr], self)
    @zk_tables[name] = @tables[name]
  else
    raise Bud::Error, "unknown async storage engine #{storage.to_s}"
  end
end

- (Object) sum(x)

aggregate method to be used in Bud::BudCollection.group. computes sum of x entries aggregated.



156
157
158
# File 'lib/bud/aggs.rb', line 156

def sum(x)
  [Sum.new, x]
end

- (Object) sync(name, storage, schema = nil)

declare a syncronously-flushed persistent collection. default schema [:key] => [:val].



70
71
72
73
74
75
76
77
78
79
# File 'lib/bud/state.rb', line 70

def sync(name, storage, schema=nil)
  define_collection(name)
  case storage
  when :dbm
    @tables[name] = Bud::BudDbmTable.new(name, self, schema)
    @dbm_tables[name] = @tables[name]
  else
    raise Bud::Error, "unknown synchronous storage engine #{storage.to_s}"
  end
end

- (Object) sync_callback(in_tbl, tupleset, out_tbl)

sync_callback supports synchronous interaction with Bud modules. The caller supplies the name of an input collection, a set of tuples to insert, and an output collection on which to 'listen.' The call blocks until tuples are inserted into the output collection: these are returned to the caller.



872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
# File 'lib/bud.rb', line 872

def sync_callback(in_tbl, tupleset, out_tbl)
  q = Queue.new

  # If the runtime shuts down before we see anything in the output collection,
  # make sure we hear about it so we can raise an error
  # XXX: Using two separate callbacks here is ugly.
  shutdown_cb = on_shutdown do
    q.push :callback
  end

  cb = nil
  sync_do {
    if in_tbl
      t = @tables[in_tbl]
      if t.class <= Bud::BudChannel or t.class <= Bud::BudZkTable
        t <~ tupleset
      else
        t <+ tupleset
      end
    end

    cb = do_register_callback(out_tbl) do |c|
      q.push c.to_a
    end
  }
  result = q.pop
  if result == :callback
    # Don't try to unregister the callbacks first: runtime is already shutdown
    raise Bud::ShutdownWithCallbacksError, "Bud instance shutdown before sync_callback completed"
  end
  unregister_callback(cb)
  cancel_shutdown_cb(shutdown_cb)
  return result
end

- (Object) sync_do

Given a block, evaluate that block inside the background Ruby thread at some time in the future, and then perform a Bloom tick. Because the block is evaluate inside the background Ruby thread, the block can safely examine Bud state. Note that calling sync_do blocks the caller until the block has been evaluated; for a non-blocking version, see async_do.

Note that the block is invoked after one Bud timestep has ended but before the next timestep begins. Hence, synchronous accumulation (<=) into a Bud scratch collection in a callback is typically not a useful thing to do: when the next tick begins, the content of any scratch collections will be emptied, which includes anything inserted by a sync_do block using <=. To avoid this behavior, insert into scratches using <+.



807
808
809
810
811
812
813
# File 'lib/bud.rb', line 807

def sync_do
  schedule_and_wait do
    yield if block_given?
    # Do another tick, in case the user-supplied block inserted any data
    tick_internal
  end
end

- (Object) table(name, schema = nil)

declare an in-memory, non-transient collection. default schema [:key] => [:val].



58
59
60
61
# File 'lib/bud/state.rb', line 58

def table(name, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudTable.new(name, self, schema)
end

- (Object) temp(name)

declare a scratch in a bloom statement lhs. schema inferred from rhs.



108
109
110
111
112
# File 'lib/bud/state.rb', line 108

def temp(name)
  define_collection(name)
  # defer schema definition until merge
  @tables[name] = Bud::BudTemp.new(name, self, nil, true)
end

- (Object) terminal(name)

:nodoc: all



145
146
147
148
149
150
151
152
153
154
# File 'lib/bud/state.rb', line 145

def terminal(name) # :nodoc: all
  if defined?(@terminal) && @terminal != name
    raise Bud::Error, "can't register IO collection #{name} in addition to #{@terminal}"
  else
    @terminal = name
  end
  define_collection(name)
  @tables[name] = Bud::BudTerminal.new(name, self)
  @channels[name] = @tables[name]
end

- (Object) tick

From client code, manually trigger a timestep of Bloom execution.



1047
1048
1049
# File 'lib/bud.rb', line 1047

def tick
  start(true)
end

- (Object) tick_internal

One timestep of Bloom execution. This MUST be invoked from the EventMachine thread; it is not intended to be called directly by client code.



1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
# File 'lib/bud.rb', line 1107

def tick_internal
  puts "#{object_id}/#{port} : ============================================= (#{@budtime})" if $BUD_DEBUG
  begin
    starttime = Time.now if options[:metrics]
    if options[:metrics] and not @endtime.nil?
      @metrics[:betweentickstats] ||= initialize_stats
      @metrics[:betweentickstats] = running_stats(@metrics[:betweentickstats],
                                                  starttime - @endtime)
    end
    @inside_tick = true

    unless @done_bootstrap
      do_bootstrap
      do_wiring
    else
      # inform tables and elements about beginning of tick.
      @app_tables.each {|t| t.tick}
      do_invalidate_rescan
    end

    receive_inbound
    # compute fixpoint for each stratum in order
    @stratified_rules.each_with_index do |rules,stratum|
      fixpoint = false
      first_iter = true
      until fixpoint
        @scanners[stratum].each_value {|s| s.scan(first_iter)}
        fixpoint = true
        first_iter = false
        # flush any tuples in the pipes
        @push_sorted_elems[stratum].each {|p| p.flush}
        # tick deltas on any merge targets and look for more deltas
        # check to see if any joins saw a delta
        @push_joins[stratum].each do |p|
          if p.found_delta
            fixpoint = false
            p.tick_deltas
          end
        end
        @merge_targets[stratum].each do |t|
          fixpoint = false if t.tick_deltas
        end
      end
      # push end-of-fixpoint
      @push_sorted_elems[stratum].each do |p|
        p.stratum_end
      end
      @merge_targets[stratum].each do |t|
        t.flush_deltas
      end
    end
    @viz.do_cards if @options[:trace]
    do_flush

    invoke_callbacks
    @budtime += 1
    @inbound.clear
    @reset_list.each {|e| e.invalidated = false; e.rescan = false}

  ensure
    @inside_tick = false
    @tick_clock_time = nil
  end

  if options[:metrics]
    @endtime = Time.now
    @metrics[:tickstats] ||= initialize_stats
    @metrics[:tickstats] = running_stats(@metrics[:tickstats], @endtime - starttime)
  end
end

- (Object) toplevel



186
187
188
# File 'lib/bud.rb', line 186

def toplevel
   @toplevel = (@options[:toplevel] || self)
end

- (Boolean) toplevel?



194
195
196
# File 'lib/bud.rb', line 194

def toplevel?
  toplevel.object_id == self.object_id
end

- (Object) unregister_callback(id)

Unregister the callback that has the given ID.



861
862
863
864
865
866
# File 'lib/bud.rb', line 861

def unregister_callback(id)
  schedule_and_wait do
    raise Bud::Error, "missing callback: #{id.inspect}" unless @callbacks.has_key? id
    @callbacks.delete(id)
  end
end

- (Boolean) wiring?

Are we currently in the process of wiring together the dataflow?



208
209
210
# File 'lib/bud.rb', line 208

def wiring?
  toplevel? ? (@done_bootstrap && !@done_wiring) : toplevel.wiring?
end