Module: Bud
- Included in:
- DepAnalysis, ReblBase, VizHelper
- Defined in:
- lib/bud.rb,
lib/bud/aggs.rb,
lib/bud/state.rb,
lib/bud/errors.rb,
lib/bud/storage/dbm.rb,
lib/bud/collections.rb,
lib/bud/executor/join.rb,
lib/bud/executor/group.rb,
lib/bud/executor/elements.rb,
lib/bud/storage/zookeeper.rb
Overview
The root Bud module. To cause an instance of Bud to begin executing, there are three main options:
-
Synchronously. To do this, instantiate your program and then call tick() one or more times; each call evaluates a single Bud timestep. In this mode, any network messages or timer events that occur will be buffered until the next call to tick(). This is mostly intended for “one-shot” programs that compute a single result and then terminate, or for interactively “single-stepping” through the execution of an event-driven system.
-
In a separate thread in the foreground. To do this, instantiate your program and then call run_fg(). The Bud interpreter will then run, handling network events and evaluating new timesteps as appropriate. The run_fg() method will not return unless an error occurs.
-
In a separate thread in the background. To do this, instantiate your program and then call run_bg(). The Bud interpreter will run asynchronously. To interact with Bud (e.g., insert additional data or inspect the state of a Bud collection), use the sync_do and async_do methods.
Most programs should use method #3. Note that in all three cases, the stop() method should be used to shutdown a Bud instance and release any resources it is using.
:main: Bud
Defined Under Namespace
Modules: PushSHOuterJoin Classes: Accum, AccumPair, Agg, ArgExemplary, Avg, BagLattice, BoolLattice, BooleanAnd, BooleanOr, BudChannel, BudCollExpr, BudCollection, BudDbmTable, BudFileReader, BudInputInterface, BudOutputInterface, BudPeriodic, BudPersistentCollection, BudReadOnly, BudScratch, BudServer, BudTable, BudTemp, BudTerminal, BudZkTable, Choose, ChooseOneRand, CompileError, Count, Error, IllegalSourceError, KeyConstraintError, Lattice, LatticePushElement, LatticeScanner, LatticeWrapper, MapLattice, Max, MaxLattice, Min, MinLattice, PositiveSetLattice, PushApplyMethod, PushArgAgg, PushEachWithIndex, PushElement, PushGroup, PushNotIn, PushPredicate, PushReduce, PushSHJoin, PushSort, PushStatefulElement, ScannerElement, SetLattice, ShutdownWithCallbacksError, Sum, TupleStruct, TypeError
Instance Attribute Summary (collapse)
-
- (Object) app_tables
readonly
Returns the value of attribute app_tables.
-
- (Object) budtime
readonly
Returns the value of attribute budtime.
-
- (Object) builtin_tables
readonly
Returns the value of attribute builtin_tables.
-
- (Object) channels
readonly
Returns the value of attribute channels.
-
- (Object) dbm_tables
readonly
Returns the value of attribute dbm_tables.
-
- (Object) done_bootstrap
readonly
Returns the value of attribute done_bootstrap.
-
- (Object) dsock
readonly
Returns the value of attribute dsock.
-
- (Object) inbound
readonly
Returns the value of attribute inbound.
-
- (Object) inside_tick
readonly
Returns the value of attribute inside_tick.
-
- (Object) lattices
readonly
Returns the value of attribute lattices.
-
- (Object) merge_targets
readonly
Returns the value of attribute merge_targets.
-
- (Object) meta_parser
readonly
Returns the value of attribute meta_parser.
-
- (Object) metrics
Returns the value of attribute metrics.
-
- (Object) options
readonly
Returns the value of attribute options.
-
- (Object) periodics
Returns the value of attribute periodics.
-
- (Object) push_elems
readonly
Returns the value of attribute push_elems.
-
- (Object) push_joins
readonly
Returns the value of attribute push_joins.
-
- (Object) push_sources
readonly
Returns the value of attribute push_sources.
-
- (Object) qualified_name
Returns the value of attribute qualified_name.
-
- (Object) rtracer
readonly
Returns the value of attribute rtracer.
-
- (Object) rule_orig_src
readonly
Returns the value of attribute rule_orig_src.
-
- (Object) running_async
readonly
Returns the value of attribute running_async.
-
- (Object) scanners
readonly
Returns the value of attribute scanners.
-
- (Object) stratified_rules
Returns the value of attribute stratified_rules.
-
- (Object) tables
readonly
Returns the value of attribute tables.
-
- (Object) this_rule
readonly
Returns the value of attribute this_rule.
-
- (Object) this_rule_context
Returns the value of attribute this_rule_context.
-
- (Object) this_stratum
readonly
Returns the value of attribute this_stratum.
-
- (Object) viz
readonly
Returns the value of attribute viz.
-
- (Object) zk_tables
readonly
Returns the value of attribute zk_tables.
Instance Method Summary (collapse)
-
- (Object) accum(x)
aggregate method to be used in Bud::BudCollection.group.
-
- (Object) accum_pair(x, y)
aggregate method to be used in Bud::BudCollection.group.
-
- (Object) async_do
Like sync_do, but does not block the caller's thread: the given callback will be invoked at some future time.
-
- (Object) avg(x)
aggregate method to be used in Bud::BudCollection.group.
- - (Object) bool_and(x)
- - (Object) bool_or(x)
-
- (Object) bootstrap
give empty defaults for these.
-
- (Object) bud_clock
Returns the wallclock time associated with the current Bud tick.
- - (Object) cancel_shutdown_cb(id)
-
- (Object) channel(name, schema = nil, loopback = false)
declare a transient network collection.
-
- (Object) choose(x)
exemplary aggregate method to be used in Bud::BudCollection.group.
-
- (Object) choose_rand(x = nil)
exemplary aggregate method to be used in Bud::BudCollection.group.
-
- (Object) coll_expr(name, expr, schema = nil)
declare a collection-generating expression.
-
- (Object) collection_stratum(collection)
Return the stratum number of the given collection.
-
- (Object) count(x = nil)
aggregate method to be used in Bud::BudCollection.group.
-
- (Object) delta(out_tbl)
A common special case for sync_callback: block on a delta to a table.
-
- (Object) do_bootstrap
Evaluate all bootstrap blocks and tick deltas.
- - (Object) do_register_callback(tbl_name, &blk)
-
- (Object) file_reader(name, filename)
declare a collection to be read from filename.
- - (Object) import_instance(name)
-
- (Object) initialize(options = {})
options to the Bud runtime are passed in a hash, with the following keys
-
network configuration.
-
-
- (Object) input
:nodoc: all.
-
- (Object) inspect
XXX make tag specific.
-
- (Object) int_ip_port
Returns the internal IP and port.
-
- (Object) interface(mode, name, schema = nil)
declare a transient collection to be an input or output interface.
-
- (Object) interfaces(direction, collections)
an alternative approach to declaring interfaces.
- - (Object) ip
-
- (Object) ip_port
Returns the IP and port of the Bud instance as a string.
-
- (Object) load_lattice_defs
Define methods to implement the state declarations for every registered kind of lattice.
-
- (Object) loopback(name, schema = nil)
declare a transient network collection that delivers facts back to the current Bud instance.
-
- (Object) max(x)
exemplary aggregate method to be used in Bud::BudCollection.group.
-
- (Object) min(x)
exemplary aggregate method to be used in Bud::BudCollection.group.
- - (Object) module_wrapper_class(mod)
-
- (Object) on_shutdown(&blk)
Register a callback that will be invoked when this instance of Bud is shutting down.
-
- (Object) output
:nodoc: all.
- - (Object) pause
-
- (Object) periodic(name, period = 1)
declare a collection to be auto-populated every period seconds.
- - (Object) port
-
- (Object) post_shutdown(&blk)
Register a callback that will be invoked when after this instance of Bud has been shutdown.
- - (Object) readonly(name, schema = nil)
-
- (Object) register_callback(tbl_name, &blk)
Register a new callback.
-
- (Object) run_bg
Run Bud in the background (in a different thread).
-
- (Object) run_fg
Run Bud in the “foreground” – the caller's thread will be used to run the Bud interpreter.
-
- (Object) scratch(name, schema = nil)
declare a transient collection.
-
- (Object) singleton_class
metaprogramming support for ruby and for rule rewriting helper to define instance methods.
-
- (Object) start(do_tick = false)
Startup a Bud instance if it is not currently started.
-
- (Object) stop(stop_em = false, do_shutdown_cb = true)
(also: #stop_bg)
Shutdown a Bud instance and release any resources that it was using.
- - (Object) store(name, storage, schema = nil)
-
- (Object) sum(x)
aggregate method to be used in Bud::BudCollection.group.
-
- (Object) sync(name, storage, schema = nil)
declare a syncronously-flushed persistent collection.
-
- (Object) sync_callback(in_tbl, tupleset, out_tbl)
sync_callback supports synchronous interaction with Bud modules.
-
- (Object) sync_do
Given a block, evaluate that block inside the background Ruby thread at some time in the future, and then perform a Bloom tick.
-
- (Object) table(name, schema = nil)
declare an in-memory, non-transient collection.
-
- (Object) temp(name)
declare a scratch in a bloom statement lhs.
-
- (Object) terminal(name)
:nodoc: all.
-
- (Object) tick
From client code, manually trigger a timestep of Bloom execution.
-
- (Object) tick_internal
One timestep of Bloom execution.
- - (Object) toplevel
- - (Boolean) toplevel?
-
- (Object) unregister_callback(id)
Unregister the callback that has the given ID.
-
- (Boolean) wiring?
Are we currently in the process of wiring together the dataflow?.
Instance Attribute Details
- (Object) app_tables (readonly)
Returns the value of attribute app_tables
71 72 73 |
# File 'lib/bud.rb', line 71 def app_tables @app_tables end |
- (Object) budtime (readonly)
Returns the value of attribute budtime
70 71 72 |
# File 'lib/bud.rb', line 70 def budtime @budtime end |
- (Object) builtin_tables (readonly)
Returns the value of attribute builtin_tables
71 72 73 |
# File 'lib/bud.rb', line 71 def builtin_tables @builtin_tables end |
- (Object) channels (readonly)
Returns the value of attribute channels
71 72 73 |
# File 'lib/bud.rb', line 71 def channels @channels end |
- (Object) dbm_tables (readonly)
Returns the value of attribute dbm_tables
71 72 73 |
# File 'lib/bud.rb', line 71 def dbm_tables @dbm_tables end |
- (Object) done_bootstrap (readonly)
Returns the value of attribute done_bootstrap
73 74 75 |
# File 'lib/bud.rb', line 73 def done_bootstrap @done_bootstrap end |
- (Object) dsock (readonly)
Returns the value of attribute dsock
70 71 72 |
# File 'lib/bud.rb', line 70 def dsock @dsock end |
- (Object) inbound (readonly)
Returns the value of attribute inbound
70 71 72 |
# File 'lib/bud.rb', line 70 def inbound @inbound end |
- (Object) inside_tick (readonly)
Returns the value of attribute inside_tick
74 75 76 |
# File 'lib/bud.rb', line 74 def inside_tick @inside_tick end |
- (Object) lattices (readonly)
Returns the value of attribute lattices
71 72 73 |
# File 'lib/bud.rb', line 71 def lattices @lattices end |
- (Object) merge_targets (readonly)
Returns the value of attribute merge_targets
72 73 74 |
# File 'lib/bud.rb', line 72 def merge_targets @merge_targets end |
- (Object) meta_parser (readonly)
Returns the value of attribute meta_parser
70 71 72 |
# File 'lib/bud.rb', line 70 def @meta_parser end |
- (Object) metrics
Returns the value of attribute metrics
76 77 78 |
# File 'lib/bud.rb', line 76 def metrics @metrics end |
- (Object) options (readonly)
Returns the value of attribute options
70 71 72 |
# File 'lib/bud.rb', line 70 def @options end |
- (Object) periodics
Returns the value of attribute periodics
76 77 78 |
# File 'lib/bud.rb', line 76 def periodics @periodics end |
- (Object) push_elems (readonly)
Returns the value of attribute push_elems
72 73 74 |
# File 'lib/bud.rb', line 72 def push_elems @push_elems end |
- (Object) push_joins (readonly)
Returns the value of attribute push_joins
72 73 74 |
# File 'lib/bud.rb', line 72 def push_joins @push_joins end |
- (Object) push_sources (readonly)
Returns the value of attribute push_sources
72 73 74 |
# File 'lib/bud.rb', line 72 def push_sources @push_sources end |
- (Object) qualified_name
Returns the value of attribute qualified_name
77 78 79 |
# File 'lib/bud.rb', line 77 def qualified_name @qualified_name end |
- (Object) rtracer (readonly)
Returns the value of attribute rtracer
70 71 72 |
# File 'lib/bud.rb', line 70 def rtracer @rtracer end |
- (Object) rule_orig_src (readonly)
Returns the value of attribute rule_orig_src
73 74 75 |
# File 'lib/bud.rb', line 73 def rule_orig_src @rule_orig_src end |
- (Object) running_async (readonly)
Returns the value of attribute running_async
78 79 80 |
# File 'lib/bud.rb', line 78 def running_async @running_async end |
- (Object) scanners (readonly)
Returns the value of attribute scanners
72 73 74 |
# File 'lib/bud.rb', line 72 def scanners @scanners end |
- (Object) stratified_rules
Returns the value of attribute stratified_rules
75 76 77 |
# File 'lib/bud.rb', line 75 def stratified_rules @stratified_rules end |
- (Object) tables (readonly)
Returns the value of attribute tables
71 72 73 |
# File 'lib/bud.rb', line 71 def tables @tables end |
- (Object) this_rule (readonly)
Returns the value of attribute this_rule
73 74 75 |
# File 'lib/bud.rb', line 73 def this_rule @this_rule end |
- (Object) this_rule_context
Returns the value of attribute this_rule_context
77 78 79 |
# File 'lib/bud.rb', line 77 def this_rule_context @this_rule_context end |
- (Object) this_stratum (readonly)
Returns the value of attribute this_stratum
73 74 75 |
# File 'lib/bud.rb', line 73 def this_stratum @this_stratum end |
- (Object) viz (readonly)
Returns the value of attribute viz
70 71 72 |
# File 'lib/bud.rb', line 70 def viz @viz end |
- (Object) zk_tables (readonly)
Returns the value of attribute zk_tables
71 72 73 |
# File 'lib/bud.rb', line 71 def zk_tables @zk_tables end |
Instance Method Details
- (Object) accum(x)
aggregate method to be used in Bud::BudCollection.group. accumulates all x inputs into a set.
207 208 209 |
# File 'lib/bud/aggs.rb', line 207 def accum(x) [Accum.new, x] end |
- (Object) accum_pair(x, y)
aggregate method to be used in Bud::BudCollection.group. accumulates x, y inputs into a set of pairs (two element arrays).
223 224 225 |
# File 'lib/bud/aggs.rb', line 223 def accum_pair(x, y) [AccumPair.new, x, y] end |
- (Object) async_do
Like sync_do, but does not block the caller's thread: the given callback will be invoked at some future time. Note that calls to async_do respect FIFO order.
810 811 812 813 814 815 816 |
# File 'lib/bud.rb', line 810 def async_do EventMachine::schedule do yield if block_given? # Do another tick, in case the user-supplied block inserted any data tick_internal end end |
- (Object) avg(x)
aggregate method to be used in Bud::BudCollection.group. computes average of a multiset of x values
191 192 193 |
# File 'lib/bud/aggs.rb', line 191 def avg(x) [Avg.new, x] end |
- (Object) bool_and(x)
81 82 83 |
# File 'lib/bud/aggs.rb', line 81 def bool_and(x) [BooleanAnd.new, x] end |
- (Object) bool_or(x)
95 96 97 |
# File 'lib/bud/aggs.rb', line 95 def bool_or(x) [BooleanOr.new, x] end |
- (Object) bootstrap
give empty defaults for these
626 627 |
# File 'lib/bud.rb', line 626 def bootstrap # :nodoc: all end |
- (Object) bud_clock
Returns the wallclock time associated with the current Bud tick. That is, this value is guaranteed to remain the same for the duration of a single tick, but will likely change between ticks.
1169 1170 1171 1172 1173 |
# File 'lib/bud.rb', line 1169 def bud_clock raise Bud::Error, "bud_clock undefined outside tick" unless @inside_tick @tick_clock_time ||= Time.now @tick_clock_time end |
- (Object) cancel_shutdown_cb(id)
770 771 772 773 774 775 |
# File 'lib/bud.rb', line 770 def cancel_shutdown_cb(id) schedule_and_wait do raise Bud::Error unless @shutdown_callbacks.has_key? id @shutdown_callbacks.delete(id) end end |
- (Object) channel(name, schema = nil, loopback = false)
declare a transient network collection. default schema [:address, :val] => []
115 116 117 118 119 |
# File 'lib/bud/state.rb', line 115 def channel(name, schema=nil, loopback=false) define_collection(name) @tables[name] = Bud::BudChannel.new(name, self, schema, loopback) @channels[name] = @tables[name] end |
- (Object) choose(x)
exemplary aggregate method to be used in Bud::BudCollection.group. arbitrarily but deterministically chooses among x entries being aggregated.
114 115 116 |
# File 'lib/bud/aggs.rb', line 114 def choose(x) [Choose.new, x] end |
- (Object) choose_rand(x = nil)
exemplary aggregate method to be used in Bud::BudCollection.group. randomly chooses among x entries being aggregated.
144 145 146 |
# File 'lib/bud/aggs.rb', line 144 def choose_rand(x=nil) [ChooseOneRand.new, x] end |
- (Object) coll_expr(name, expr, schema = nil)
declare a collection-generating expression. default schema [:key] => [:val].
64 65 66 67 |
# File 'lib/bud/state.rb', line 64 def coll_expr(name, expr, schema=nil) define_collection(name) @tables[name] = Bud::BudCollExpr.new(name, self, expr, schema) end |
- (Object) collection_stratum(collection)
Return the stratum number of the given collection. NB: if a collection does not appear on the lhs or rhs of any rules, it is not currently assigned to a strata.
1178 1179 1180 1181 1182 1183 1184 |
# File 'lib/bud.rb', line 1178 def collection_stratum(collection) t_stratum.each do |t| return t.stratum if t.predicate == collection end return nil end |
- (Object) count(x = nil)
aggregate method to be used in Bud::BudCollection.group. counts number of entries aggregated. argument is ignored.
171 172 173 |
# File 'lib/bud/aggs.rb', line 171 def count(x=nil) [Count.new] end |
- (Object) delta(out_tbl)
A common special case for sync_callback: block on a delta to a table.
900 901 902 |
# File 'lib/bud.rb', line 900 def delta(out_tbl) sync_callback(nil, nil, out_tbl) end |
- (Object) do_bootstrap
Evaluate all bootstrap blocks and tick deltas
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 |
# File 'lib/bud.rb', line 1044 def do_bootstrap # Evaluate bootstrap for imported modules @this_rule_context = self imported = import_defs.keys imported.each do |mod_alias| wrapper = import_instance mod_alias wrapper.do_bootstrap end self.class.ancestors.reverse.each do |anc| anc.instance_methods(false).each do |m| if /^__bootstrap__/.match m self.method(m.to_sym).call end end end bootstrap if toplevel == self @tables.each_value {|t| t.bootstrap} @lattices.each_value {|l| l.bootstrap} end @done_bootstrap = true end |
- (Object) do_register_callback(tbl_name, &blk)
840 841 842 843 844 845 846 847 848 849 850 |
# File 'lib/bud.rb', line 840 def do_register_callback(tbl_name, &blk) unless @tables.has_key? tbl_name raise Bud::Error, "no such collection: #{tbl_name}" end raise Bud::Error if @callbacks.has_key? @callback_id @callbacks[@callback_id] = [tbl_name, blk] cb_id = @callback_id @callback_id += 1 return cb_id end |
- (Object) file_reader(name, filename)
declare a collection to be read from filename. rhs of statements only
131 132 133 134 |
# File 'lib/bud/state.rb', line 131 def file_reader(name, filename) define_collection(name) @tables[name] = Bud::BudFileReader.new(name, filename, self) end |
- (Object) import_instance(name)
202 203 204 205 |
# File 'lib/bud.rb', line 202 def import_instance(name) name = "@" + name.to_s instance_variable_get(name) if instance_variable_defined? name end |
- (Object) initialize(options = {})
options to the Bud runtime are passed in a hash, with the following keys
-
network configuration
-
:ip IP address string for this instance
-
:port port number for this instance
-
:ext_ip IP address at which external nodes can contact this instance
-
:ext_port port number to go with :ext_ip
-
-
operating system interaction
-
:stdin if non-nil, reading from the stdio collection results in reading from this IO handle
-
:stdout writing to the stdio collection results in writing to this IO handle; defaults to $stdout
-
:signal_handling how to handle SIGINT and SIGTERM. If :none, these signals are ignored. Else shutdown all bud instances.
-
-
tracing and output
-
:quiet if true, suppress certain messages
-
:trace if true, generate budvis outputs
-
:rtrace if true, generate budplot outputs
-
:dump_rewrite if true, dump results of internal rewriting of Bloom code to a file
-
:print_wiring if true, print the wiring diagram of the program to stdout
-
:metrics if true, dumps a hash of internal performance metrics
-
-
controlling execution
-
:tag a name for this instance, suitable for display during tracing and visualization
-
:channel_filter a code block that can be used to filter the network messages delivered to this Bud instance. At the start of each tick, the code block is invoked for every channel with any incoming messages; the code block is passed the name of the channel and an array containing the inbound messages. It should return a two-element array containing “accepted” and “postponed” messages, respectively. Accepted messages are delivered during this tick, and postponed messages are buffered and passed to the filter in subsequent ticks. Any messages that aren't in either array are dropped.
-
-
storage configuration
-
:dbm_dir filesystem directory to hold DBM-backed collections
-
:dbm_truncate if true, DBM-backed collections are opened with OTRUNC
-
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/bud.rb', line 111 def initialize(={}) # capture the binding for a subsequent 'eval'. This ensures that local # variable names introduced later in this method don't interfere with # table names used in the eval block. [:dump_rewrite] ||= ENV["BUD_DUMP_REWRITE"].to_i > 0 [:dump_ast] ||= ENV["BUD_DUMP_AST"].to_i > 0 [:print_wiring] ||= ENV["BUD_PRINT_WIRING"].to_i > 0 @qualified_name = "" @tables = {} @lattices = {} @channels = {} @dbm_tables = {} @zk_tables = {} @stratified_rules = [] @push_elems = {} @callbacks = {} @callback_id = 0 @shutdown_callbacks = {} @shutdown_callback_id = 0 @post_shutdown_callbacks = [] @timers = [] @app_tables = [] @inside_tick = false @tick_clock_time = nil @budtime = 0 @inbound = {} @done_bootstrap = false @done_wiring = false @instance_id = ILLEGAL_INSTANCE_ID # Assigned when we start running @metrics = {} @endtime = nil @this_stratum = 0 @push_sorted_elems = nil @running_async = false @bud_started = false # Setup options (named arguments), along with default values @options = .clone @options[:ip] ||= "127.0.0.1" @ip = @options[:ip] @options[:port] ||= 0 @options[:port] = @options[:port].to_i # NB: If using an ephemeral port (specified by port = 0), the actual port # number won't be known until we start EM load_lattice_defs builtin_state resolve_imports call_state_methods @declarations = self.class.instance_methods.select {|m| m =~ /^__bloom__.+$/}.map {|m| m.to_s} @viz = VizOnline.new(self) if @options[:trace] @rtracer = RTrace.new(self) if @options[:rtrace] do_rewrite if toplevel == self # initialize per-stratum state @num_strata = @stratified_rules.length @scanners = @num_strata.times.map{{}} @push_sources = @num_strata.times.map{{}} @push_joins = @num_strata.times.map{[]} @merge_targets = @num_strata.times.map{Set.new} end end |
- (Object) input
:nodoc: all
42 43 44 |
# File 'lib/bud/state.rb', line 42 def input # :nodoc: all true end |
- (Object) inspect
XXX make tag specific
905 906 907 |
# File 'lib/bud.rb', line 905 def inspect "#{self.class}:#{self.object_id.to_s(16)}" end |
- (Object) int_ip_port
Returns the internal IP and port. See ip_port.
1033 1034 1035 1036 |
# File 'lib/bud.rb', line 1033 def int_ip_port raise Bud::Error, "int_ip_port called before port defined" if @port.nil? and @options[:port] == 0 @port.nil? ? "#{@ip}:#{@options[:port]}" : "#{@ip}:#{@port}" end |
- (Object) interface(mode, name, schema = nil)
declare a transient collection to be an input or output interface
51 52 53 54 55 |
# File 'lib/bud/state.rb', line 51 def interface(mode, name, schema=nil) define_collection(name) t_provides << [name.to_s, mode] @tables[name] = (mode ? BudInputInterface : BudOutputInterface).new(name, self, schema) end |
- (Object) interfaces(direction, collections)
an alternative approach to declaring interfaces
157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/bud/state.rb', line 157 def interfaces(direction, collections) mode = case direction when :input then true when :output then false else raise Bud::CompileError, "unrecognized interface type #{direction}" end collections.each do |tab| t_provides << [tab.to_s, mode] end end |
- (Object) ip
1022 1023 1024 |
# File 'lib/bud.rb', line 1022 def ip [:ext_ip] ? "#{@options[:ext_ip]}" : "#{@ip}" end |
- (Object) ip_port
Returns the IP and port of the Bud instance as a string. In addition to the local IP and port, the user may define an external IP and/or port. The external version of each is returned if available. If not, the local version is returned. There are use cases for mixing and matching local and external. local_ip:external_port would be if you have local port forwarding, and external_ip:local_port would be if you're in a DMZ, for example.
1017 1018 1019 1020 |
# File 'lib/bud.rb', line 1017 def ip_port raise Bud::Error, "ip_port called before port defined" if port.nil? ip.to_s + ":" + port.to_s end |
- (Object) load_lattice_defs
Define methods to implement the state declarations for every registered kind of lattice.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/bud/state.rb', line 171 def load_lattice_defs Bud::Lattice.global_mfuncs.each do |m| next if RuleRewriter::MONOTONE_WHITELIST.include? m if Bud::BudCollection.instance_methods.include? m.to_s puts "monotone method #{m} conflicts with non-monotonic method in BudCollection" end end Bud::Lattice.global_morphs.each do |m| next if RuleRewriter::MONOTONE_WHITELIST.include? m if Bud::BudCollection.instance_methods.include? m.to_s puts "morphism #{m} conflicts with non-monotonic method in BudCollection" end end # Sanity-check lattice definitions # XXX: We should do this only once per lattice Bud::Lattice.lattice_kinds.each do |wrap_name, klass| unless klass.method_defined? :merge raise Bud::CompileError, "lattice #{wrap_name} does not define a merge function" end # If a method is marked as monotone in any lattice, every lattice that # declares a method of that name must also mark it as monotone. meth_list = klass.instance_methods(false).to_set Bud::Lattice.global_mfuncs.each do |m| next unless meth_list.include? m.to_s unless klass.mfuncs.include? m raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone" end end # Apply a similar check for morphs Bud::Lattice.global_morphs.each do |m| next unless meth_list.include? m.to_s unless klass.morphs.include? m raise Bud::CompileError, "method #{m} in #{wrap_name} must be a morph" end end # Similarly, check for non-monotone lattice methods that are found in the # builtin list of monotone operators. The "merge" method is implicitly # monotone (XXX: should it be declared as a morph or monotone function?) meth_list.each do |m_str| m = m_str.to_sym next unless RuleRewriter::MONOTONE_WHITELIST.include? m # XXX: ugly hack. We want to allow lattice class implementations to # define their own equality semantics. next if m == :== unless klass.mfuncs.include?(m) || klass.morphs.include?(m) || m == :merge raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone" end end # XXX: replace "self" with toplevel? self.singleton_class.send(:define_method, wrap_name) do |lat_name| define_lattice(lat_name) @lattices[lat_name] = Bud::LatticeWrapper.new(lat_name, klass, self) end end end |
- (Object) loopback(name, schema = nil)
declare a transient network collection that delivers facts back to the current Bud instance. This is syntax sugar for a channel that always delivers to the IP/port of the current Bud instance. Default schema [:key] => [:val]
125 126 127 128 |
# File 'lib/bud/state.rb', line 125 def loopback(name, schema=nil) schema ||= {[:key] => [:val]} channel(name, schema, true) end |
- (Object) max(x)
exemplary aggregate method to be used in Bud::BudCollection.group. computes maximum of x entries aggregated.
67 68 69 |
# File 'lib/bud/aggs.rb', line 67 def max(x) [Max.new, x] end |
- (Object) min(x)
exemplary aggregate method to be used in Bud::BudCollection.group. computes minimum of x entries aggregated.
50 51 52 |
# File 'lib/bud/aggs.rb', line 50 def min(x) [Min.new, x] end |
- (Object) module_wrapper_class(mod)
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/bud.rb', line 177 def module_wrapper_class(mod) class_name = "#{mod}__wrap" begin klass = Module.const_get(class_name.to_sym) unless klass.is_a? Class raise Bud::Error, "internal error: #{class_name} is in use" end rescue NameError # exception if class class_name doesn't exist end klass ||= eval "class #{class_name}; include Bud; include #{mod}; end" klass end |
- (Object) on_shutdown(&blk)
Register a callback that will be invoked when this instance of Bud is shutting down. XXX: The naming of this method (and cancel_shutdown_cb) is inconsistent with the naming of register_callback and friends.
758 759 760 761 762 763 764 765 766 767 768 |
# File 'lib/bud.rb', line 758 def on_shutdown(&blk) # Start EM if not yet started start_reactor rv = nil schedule_and_wait do rv = @shutdown_callback_id @shutdown_callbacks[@shutdown_callback_id] = blk @shutdown_callback_id += 1 end return rv end |
- (Object) output
:nodoc: all
46 47 48 |
# File 'lib/bud/state.rb', line 46 def output # :nodoc: all false end |
- (Object) pause
704 705 706 707 708 |
# File 'lib/bud.rb', line 704 def pause schedule_and_wait do @running_async = false end end |
- (Object) periodic(name, period = 1)
declare a collection to be auto-populated every period seconds. schema [:key] => [:val]. rhs of statements only.
138 139 140 141 142 143 |
# File 'lib/bud/state.rb', line 138 def periodic(name, period=1) define_collection(name) raise Bud::Error if @periodics.has_key? [name] @periodics << [name, period] @tables[name] = Bud::BudPeriodic.new(name, self) end |
- (Object) port
1026 1027 1028 1029 1030 |
# File 'lib/bud.rb', line 1026 def port return nil if @port.nil? and @options[:port] == 0 and not @options[:ext_port] return @options[:ext_port] ? @options[:ext_port] : (@port.nil? ? @options[:port] : @port) end |
- (Object) post_shutdown(&blk)
Register a callback that will be invoked when after this instance of Bud has been shutdown.
779 780 781 782 783 784 785 |
# File 'lib/bud.rb', line 779 def post_shutdown(&blk) # Start EM if not yet started start_reactor schedule_and_wait do @post_shutdown_callbacks << blk end end |
- (Object) readonly(name, schema = nil)
102 103 104 105 |
# File 'lib/bud/state.rb', line 102 def readonly(name, schema=nil) define_collection(name) @tables[name] = Bud::BudReadOnly.new(name, self, schema) end |
- (Object) register_callback(tbl_name, &blk)
Register a new callback. Given the name of a Bud collection, this method arranges for the given block to be invoked at the end of any tick in which any tuples have been inserted into the specified collection. The code block is passed the collection as an argument; this provides a convenient way to examine the tuples inserted during that fixpoint. (Note that because the Bud runtime is blocked while the callback is invoked, it can also examine any other Bud state freely.)
Note that registering callbacks on persistent collections (e.g., tables, syncs and stores) is probably not wise: as long as any tuples are stored in the collection, the callback will be invoked at the end of every tick.
829 830 831 832 833 834 835 836 837 838 |
# File 'lib/bud.rb', line 829 def register_callback(tbl_name, &blk) # We allow callbacks to be added before or after EM has been started. To # simplify matters, we start EM if it hasn't been started yet. start_reactor cb_id = nil schedule_and_wait do cb_id = do_register_callback(tbl_name, &blk) end return cb_id end |
- (Object) run_bg
Run Bud in the background (in a different thread). This means that the Bud interpreter will run asynchronously from the caller, so care must be used when interacting with it. For example, it is not safe to directly examine Bud collections from the caller's thread (see async_do and sync_do).
This instance of Bud will run until stop() is called.
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 |
# File 'lib/bud.rb', line 643 def run_bg start schedule_and_wait do if @running_async raise Bud::Error, "run_bg called on already-running Bud instance" end @running_async = true # Consume any events received while we weren't running async tick_internal end @rtracer.sleep if [:rtrace] end |
- (Object) run_fg
Run Bud in the “foreground” – the caller's thread will be used to run the Bud interpreter. This means this method won't return unless an error occurs or Bud is halted. It is often more useful to run Bud in a different thread: see run_bg.
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 |
# File 'lib/bud.rb', line 714 def run_fg # If we're called from the EventMachine thread (and EM is running), blocking # the current thread would imply deadlocking ourselves. if Thread.current == EventMachine::reactor_thread and EventMachine::reactor_running? raise Bud::Error, "cannot invoke run_fg from inside EventMachine" end q = Queue.new # Note that this must be a post-shutdown callback: if this is the only # thread, then the program might exit after run_fg() returns. If run_fg() # blocked on a normal shutdown callback, the program might exit before the # other shutdown callbacks have a chance to run. post_shutdown do q.push(true) end run_bg # Block caller's thread until Bud has shutdown q.pop report_metrics if [:metrics] end |
- (Object) scratch(name, schema = nil)
declare a transient collection. default schema [:key] => [:val]
97 98 99 100 |
# File 'lib/bud/state.rb', line 97 def scratch(name, schema=nil) define_collection(name) @tables[name] = Bud::BudScratch.new(name, self, schema) end |
- (Object) singleton_class
metaprogramming support for ruby and for rule rewriting helper to define instance methods
631 632 633 |
# File 'lib/bud.rb', line 631 def singleton_class # :nodoc: all class << self; self; end end |
- (Object) start(do_tick = false)
Startup a Bud instance if it is not currently started. This starts EventMachine (if needed) and binds to a UDP server socket. If do_tick is true, we also execute a single Bloom timestep. Regardless, calling this method does NOT cause Bud to begin running asynchronously (see run_bg).
663 664 665 666 667 668 669 |
# File 'lib/bud.rb', line 663 def start(do_tick=false) start_reactor schedule_and_wait do do_startup unless @bud_started tick_internal if do_tick end end |
- (Object) stop(stop_em = false, do_shutdown_cb = true) Also known as: stop_bg
Shutdown a Bud instance and release any resources that it was using. This method blocks until Bud has been shutdown. If stop_em is true, the EventMachine event loop is also shutdown; this will interfere with the execution of any other Bud instances in the same process (as well as anything else that happens to use EventMachine).
741 742 743 744 745 746 747 748 749 750 751 |
# File 'lib/bud.rb', line 741 def stop(stop_em=false, do_shutdown_cb=true) schedule_and_wait do do_shutdown(do_shutdown_cb) end if stop_em Bud.stop_em_loop EventMachine::reactor_thread.join end report_metrics if [:metrics] end |
- (Object) store(name, storage, schema = nil)
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/bud/state.rb', line 81 def store(name, storage, schema=nil) define_collection(name) case storage when :zookeeper # treat "schema" as a hash of options = schema raise Bud::Error, "Zookeeper tables require a :path option" if [:path].nil? [:addr] ||= "localhost:2181" @tables[name] = Bud::BudZkTable.new(name, [:path], [:addr], self) @zk_tables[name] = @tables[name] else raise Bud::Error, "unknown async storage engine #{storage.to_s}" end end |
- (Object) sum(x)
aggregate method to be used in Bud::BudCollection.group. computes sum of x entries aggregated.
156 157 158 |
# File 'lib/bud/aggs.rb', line 156 def sum(x) [Sum.new, x] end |
- (Object) sync(name, storage, schema = nil)
declare a syncronously-flushed persistent collection. default schema [:key] => [:val].
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/bud/state.rb', line 70 def sync(name, storage, schema=nil) define_collection(name) case storage when :dbm @tables[name] = Bud::BudDbmTable.new(name, self, schema) @dbm_tables[name] = @tables[name] else raise Bud::Error, "unknown synchronous storage engine #{storage.to_s}" end end |
- (Object) sync_callback(in_tbl, tupleset, out_tbl)
sync_callback supports synchronous interaction with Bud modules. The caller supplies the name of an input collection, a set of tuples to insert, and an output collection on which to 'listen.' The call blocks until tuples are inserted into the output collection: these are returned to the caller.
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 |
# File 'lib/bud.rb', line 864 def sync_callback(in_tbl, tupleset, out_tbl) q = Queue.new # If the runtime shuts down before we see anything in the output collection, # make sure we hear about it so we can raise an error # XXX: Using two separate callbacks here is ugly. shutdown_cb = on_shutdown do q.push :callback end cb = nil sync_do { if in_tbl t = @tables[in_tbl] if t.class <= Bud::BudChannel or t.class <= Bud::BudZkTable t <~ tupleset else t <+ tupleset end end cb = do_register_callback(out_tbl) do |c| q.push c.to_a end } result = q.pop if result == :callback # Don't try to unregister the callbacks first: runtime is already shutdown raise Bud::ShutdownWithCallbacksError, "Bud instance shutdown before sync_callback completed" end unregister_callback(cb) cancel_shutdown_cb(shutdown_cb) return result end |
- (Object) sync_do
Given a block, evaluate that block inside the background Ruby thread at some time in the future, and then perform a Bloom tick. Because the block is evaluate inside the background Ruby thread, the block can safely examine Bud state. Note that calling sync_do blocks the caller until the block has been evaluated; for a non-blocking version, see async_do.
Note that the block is invoked after one Bud timestep has ended but before the next timestep begins. Hence, synchronous accumulation (<=) into a Bud scratch collection in a callback is typically not a useful thing to do: when the next tick begins, the content of any scratch collections will be emptied, which includes anything inserted by a sync_do block using <=. To avoid this behavior, insert into scratches using <+.
799 800 801 802 803 804 805 |
# File 'lib/bud.rb', line 799 def sync_do schedule_and_wait do yield if block_given? # Do another tick, in case the user-supplied block inserted any data tick_internal end end |
- (Object) table(name, schema = nil)
declare an in-memory, non-transient collection. default schema [:key] => [:val].
58 59 60 61 |
# File 'lib/bud/state.rb', line 58 def table(name, schema=nil) define_collection(name) @tables[name] = Bud::BudTable.new(name, self, schema) end |
- (Object) temp(name)
declare a scratch in a bloom statement lhs. schema inferred from rhs.
108 109 110 111 112 |
# File 'lib/bud/state.rb', line 108 def temp(name) define_collection(name) # defer schema definition until merge @tables[name] = Bud::BudTemp.new(name, self, nil, true) end |
- (Object) terminal(name)
:nodoc: all
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/bud/state.rb', line 145 def terminal(name) # :nodoc: all if defined?(@terminal) && @terminal != name raise Bud::Error, "can't register IO collection #{name} in addition to #{@terminal}" else @terminal = name end define_collection(name) @tables[name] = Bud::BudTerminal.new(name, self) @channels[name] = @tables[name] end |
- (Object) tick
From client code, manually trigger a timestep of Bloom execution.
1039 1040 1041 |
# File 'lib/bud.rb', line 1039 def tick start(true) end |
- (Object) tick_internal
One timestep of Bloom execution. This MUST be invoked from the EventMachine thread; it is not intended to be called directly by client code.
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 |
# File 'lib/bud.rb', line 1070 def tick_internal puts "#{object_id}/#{port} : ============================================= (#{@budtime})" if $BUD_DEBUG begin starttime = Time.now if [:metrics] if [:metrics] and not @endtime.nil? @metrics[:betweentickstats] ||= initialize_stats @metrics[:betweentickstats] = running_stats(@metrics[:betweentickstats], starttime - @endtime) end @inside_tick = true unless @done_bootstrap do_bootstrap do_wiring else # inform tables and elements about beginning of tick. @app_tables.each {|t| t.tick} @default_rescan.each {|elem| elem.rescan = true} @default_invalidate.each {|elem| elem.invalidated = true # Call tick on tables here itself. The rest below elem.invalidate_cache unless elem.class <= PushElement } # The following loop invalidates additional (non-default) elements and # tables that depend on the run-time invalidation state of a table. # Loop once to set the flags. each_scanner do |scanner, stratum| if scanner.rescan scanner.rescan_set.each {|e| e.rescan = true} scanner.invalidate_set.each {|e| e.invalidated = true e.invalidate_cache unless e.class <= PushElement } end end # Loop a second time to actually call invalidate_cache. We can't merge # this with the loops above because some versions of invalidate_cache # (e.g., join) depend on the rescan state of other elements. @num_strata.times do |stratum| @push_sorted_elems[stratum].each {|e| e.invalidate_cache if e.invalidated} end end receive_inbound # compute fixpoint for each stratum in order @stratified_rules.each_with_index do |rules,stratum| fixpoint = false first_iter = true until fixpoint @scanners[stratum].each_value {|s| s.scan(first_iter)} fixpoint = true first_iter = false # flush any tuples in the pipes @push_sorted_elems[stratum].each {|p| p.flush} # tick deltas on any merge targets and look for more deltas # check to see if any joins saw a delta @push_joins[stratum].each do |p| if p.found_delta fixpoint = false p.tick_deltas end end @merge_targets[stratum].each do |t| fixpoint = false if t.tick_deltas end end # push end-of-fixpoint @push_sorted_elems[stratum].each do |p| p.stratum_end end @merge_targets[stratum].each do |t| t.flush_deltas end end @viz.do_cards if @options[:trace] do_flush invoke_callbacks @budtime += 1 @inbound.clear @reset_list.each {|e| e.invalidated = false; e.rescan = false} ensure @inside_tick = false @tick_clock_time = nil end if [:metrics] @endtime = Time.now @metrics[:tickstats] ||= initialize_stats @metrics[:tickstats] = running_stats(@metrics[:tickstats], @endtime - starttime) end end |
- (Object) toplevel
190 191 192 |
# File 'lib/bud.rb', line 190 def toplevel @toplevel = (@options[:toplevel] || self) end |
- (Boolean) toplevel?
198 199 200 |
# File 'lib/bud.rb', line 198 def toplevel? toplevel.object_id == self.object_id end |
- (Object) unregister_callback(id)
Unregister the callback that has the given ID.
853 854 855 856 857 858 |
# File 'lib/bud.rb', line 853 def unregister_callback(id) schedule_and_wait do raise Bud::Error, "missing callback: #{id.inspect}" unless @callbacks.has_key? id @callbacks.delete(id) end end |
- (Boolean) wiring?
Are we currently in the process of wiring together the dataflow?
212 213 214 |
# File 'lib/bud.rb', line 212 def wiring? toplevel? ? (@done_bootstrap && !@done_wiring) : toplevel.wiring? end |