Class: Bud::BudCollection
- Inherits:
-
Object
- Object
- Bud::BudCollection
show all
- Includes:
- Enumerable
- Defined in:
- lib/bud/collections.rb
Overview
– the collection types
“ each collection is partitioned into 4:
-
pending holds tuples deferred til the next tick
-
storage holds the “normal” tuples
-
delta holds the delta for rhs's of rules during semi-naive
-
new_delta will hold the lhs tuples currently being produced during s-n
-
tick_delta holds Union(delta_i) for each delta_i processed in fixpoint
iteration i.
++
Instance Attribute Summary (collapse)
Instance Method Summary
(collapse)
-
- (Object) *(collection)
-
- (Object) <<(item)
instantaneously place an individual item from rhs into collection on lhs.
-
- (Object) <=(collection)
instantaneously merge items from collection o into buf.
-
- (Object) [](k)
-
- (Object) add_rescan_invalidate(rescan, invalidate)
-
- (Object) argagg(aggname, gbkey_cols, collection, &blk)
-
- (Object) argmax(gbkey_cols, col, &blk)
-
- (Object) argmin(gbkey_cols, col, &blk)
-
- (Object) bootstrap
-
- (Object) canonicalize_col(col)
-
- (Object) close
-
- (Object) do_insert(t, store)
-
- (Object) each(&block)
-
- (Object) each_from_sym(buf_syms, &block)
-
- (Object) each_raw(&block)
-
- (Object) each_tick_delta(&block)
-
- (Object) each_with_index(the_name = tabname, the_schema = schema, &blk)
XXX: Although we support each_with_index over Bud collections, using it is
probably not a great idea: the index assigned to a given collection member
is not defined by the language semantics.
-
- (Boolean) empty?
-
- (Boolean) exists?(&block)
-
- (Object) flat_map(&blk)
-
- (Object) flush
-
- (Object) flush_deltas
-
- (Object) group(key_cols, *aggpairs, &blk)
-
- (Boolean) has_key?(k)
-
- (Boolean) include?(item)
-
- (Object) init_schema(given_schema)
-
- (BudCollection) initialize(name, bud_instance, given_schema = nil, defer_schema = false)
constructor
-
- (Object) insert(o, source = nil)
-
- (Object) inspect
-
- (Object) inspected
-
- (Object) invalidate_at_tick
-
- (Object) keys
-
- (Object) length
-
- (Object) merge(o, buf = @delta)
-
- (Object) non_temporal_predecessors
-
- (Object) notin(collection, *preds, &blk)
-
- (Object) null_tuple
-
- (Object) pending_merge(o)
-
- (Object) positive_predecessors
-
- (Object) prep_aggpairs(aggpairs)
-
- (Object) pro(the_name = tabname, the_schema = schema, &blk)
-
- (Object) qualified_tabname
-
- (Object) reduce(initial, &blk)
-
- (Object) register_coll_expr(expr)
-
- (Object) rename(the_name, the_schema = nil, &blk)
-
- (Object) schema
-
- (Object) sort(&blk)
-
- (Object) tick
-
- (Object) tick_deltas
-
- (Object) tick_metrics
-
- (Object) to_push_elem(the_name = tabname, the_schema = schema)
-
- (Object) uniquify_tabname
-
- (Object) val_cols
-
- (Object) values
Constructor Details
- (BudCollection) initialize(name, bud_instance, given_schema = nil, defer_schema = false)
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/bud/collections.rb', line 26
def initialize(name, bud_instance, given_schema=nil, defer_schema=false) @tabname = name
@bud_instance = bud_instance
@invalidated = true
@is_source = true @scanner_cnt = 0
@wired_by = []
@accumulate_tick_deltas = false
init_schema(given_schema) unless given_schema.nil? and defer_schema
init_buffers
end
|
Instance Attribute Details
- (Object) accumulate_tick_deltas
24
25
26
|
# File 'lib/bud/collections.rb', line 24
def accumulate_tick_deltas
@accumulate_tick_deltas
end
|
- (Object) bud_instance
17
18
19
|
# File 'lib/bud/collections.rb', line 17
def bud_instance
@bud_instance
end
|
- (Object) cols
18
19
20
|
# File 'lib/bud/collections.rb', line 18
def cols
@cols
end
|
- (Object) delta
20
21
22
|
# File 'lib/bud/collections.rb', line 20
def delta
@delta
end
|
- (Object) invalidated
Returns the value of attribute invalidated
22
23
24
|
# File 'lib/bud/collections.rb', line 22
def invalidated
@invalidated
end
|
- (Object) is_source
Returns the value of attribute is_source
23
24
25
|
# File 'lib/bud/collections.rb', line 23
def is_source
@is_source
end
|
- (Object) key_cols
18
19
20
|
# File 'lib/bud/collections.rb', line 18
def key_cols
@key_cols
end
|
- (Object) new_delta
20
21
22
|
# File 'lib/bud/collections.rb', line 20
def new_delta
@new_delta
end
|
- (Object) pending
20
21
22
|
# File 'lib/bud/collections.rb', line 20
def pending
@pending
end
|
- (Object) rescan
Returns the value of attribute rescan
22
23
24
|
# File 'lib/bud/collections.rb', line 22
def rescan
@rescan
end
|
- (Object) scanner_cnt
Returns the value of attribute scanner_cnt
21
22
23
|
# File 'lib/bud/collections.rb', line 21
def scanner_cnt
@scanner_cnt
end
|
- (Object) storage
20
21
22
|
# File 'lib/bud/collections.rb', line 20
def storage
@storage
end
|
- (Object) struct
Returns the value of attribute struct
19
20
21
|
# File 'lib/bud/collections.rb', line 19
def struct
@struct
end
|
- (Object) tabname
18
19
20
|
# File 'lib/bud/collections.rb', line 18
def tabname
@tabname
end
|
- (Object) tick_delta
20
21
22
|
# File 'lib/bud/collections.rb', line 20
def tick_delta
@tick_delta
end
|
- (Object) wired_by
Returns the value of attribute wired_by
21
22
23
|
# File 'lib/bud/collections.rb', line 21
def wired_by
@wired_by
end
|
Instance Method Details
- (Object) *(collection)
781
782
783
|
# File 'lib/bud/collections.rb', line 781
def *(collection)
return to_push_elem.join(collection)
end
|
- (Object) <<(item)
instantaneously place an individual item from rhs into collection on lhs
512
513
514
|
# File 'lib/bud/collections.rb', line 512
def <<(item)
insert(item)
end
|
- (Object) <=(collection)
instantaneously merge items from collection o into buf
608
609
610
611
612
613
614
|
# File 'lib/bud/collections.rb', line 608
def <=(collection)
unless bud_instance.toplevel.inside_tick
raise Bud::CompileError, "illegal use of <= outside of bloom block, use <+ instead"
end
merge(collection)
end
|
- (Object) [](k)
366
367
368
369
370
371
372
|
# File 'lib/bud/collections.rb', line 366
def [](k)
check_enumerable(k)
t = @storage[k]
return t.nil? ? @delta[k] : t
end
|
- (Object) add_rescan_invalidate(rescan, invalidate)
688
689
690
691
692
|
# File 'lib/bud/collections.rb', line 688
def add_rescan_invalidate(rescan, invalidate)
end
|
- (Object) argagg(aggname, gbkey_cols, collection, &blk)
753
754
755
756
757
758
759
760
|
# File 'lib/bud/collections.rb', line 753
def argagg(aggname, gbkey_cols, collection, &blk)
elem = to_push_elem
gbkey_cols = gbkey_cols.map{|k| canonicalize_col(k)} unless gbkey_cols.nil?
retval = elem.argagg(aggname, gbkey_cols, canonicalize_col(collection), &blk)
retval.extend @cols_access
retval
end
|
- (Object) argmax(gbkey_cols, col, &blk)
774
775
776
|
# File 'lib/bud/collections.rb', line 774
def argmax(gbkey_cols, col, &blk)
argagg(:max, gbkey_cols, col, &blk)
end
|
- (Object) argmin(gbkey_cols, col, &blk)
766
767
768
|
# File 'lib/bud/collections.rb', line 766
def argmin(gbkey_cols, col, &blk)
argagg(:min, gbkey_cols, col, &blk)
end
|
- (Object) bootstrap
694
695
696
697
698
699
|
# File 'lib/bud/collections.rb', line 694
def bootstrap
unless @pending.empty?
@delta = @pending
@pending = {}
end
end
|
- (Object) canonicalize_col(col)
808
809
810
|
# File 'lib/bud/collections.rb', line 808
def canonicalize_col(col)
col.class <= Symbol ? self.send(col) : col
end
|
- (Object) close
353
354
|
# File 'lib/bud/collections.rb', line 353
def close end
|
- (Object) do_insert(t, store)
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
|
# File 'lib/bud/collections.rb', line 447
def do_insert(t, store)
if $BUD_DEBUG
storetype = case store.object_id
when @storage.object_id; "storage"
when @pending.object_id; "pending"
when @delta.object_id; "delta"
when @new_delta.object_id; "new_delta"
end
puts "#{qualified_tabname}.#{storetype} ==> #{t.inspect}"
end
return if t.nil? t = prep_tuple(t)
key = get_key_vals(t)
merge_to_buf(store, key, t, store[key])
end
|
- (Object) each(&block)
272
273
274
|
# File 'lib/bud/collections.rb', line 272
def each(&block) each_from([@storage, @delta], &block)
end
|
- (Object) each_from_sym(buf_syms, &block)
323
324
325
326
327
328
329
330
331
332
333
|
# File 'lib/bud/collections.rb', line 323
def each_from_sym(buf_syms, &block) bufs = buf_syms.map do |s|
case s
when :storage then @storage
when :delta then @delta
when :new_delta then @new_delta
else raise Bud::Error, "bad symbol passed into each_from_sym"
end
end
each_from(bufs, &block)
end
|
- (Object) each_raw(&block)
277
278
279
|
# File 'lib/bud/collections.rb', line 277
def each_raw(&block)
@storage.each_value(&block)
end
|
- (Object) each_tick_delta(&block)
282
283
284
|
# File 'lib/bud/collections.rb', line 282
def each_tick_delta(&block)
@tick_delta.each(&block)
end
|
- (Object) each_with_index(the_name = tabname, the_schema = schema, &blk)
XXX: Although we support each_with_index over Bud collections, using it is
probably not a great idea: the index assigned to a given collection member
is not defined by the language semantics.
215
216
217
218
219
220
221
222
|
# File 'lib/bud/collections.rb', line 215
def each_with_index(the_name=tabname, the_schema=schema, &blk)
if @bud_instance.wiring?
pusher = to_push_elem(the_name, the_schema)
pusher.each_with_index(&blk)
else
super(&blk)
end
end
|
- (Boolean) empty?
387
388
389
|
# File 'lib/bud/collections.rb', line 387
def empty?
length == 0
end
|
- (Boolean) exists?(&block)
393
394
395
396
397
398
399
400
401
|
# File 'lib/bud/collections.rb', line 393
def exists?(&block)
if length == 0
return false
elsif not block_given?
return true
else
return ((detect{|t| yield t}).nil?) ? false : true
end
end
|
- (Object) flat_map(&blk)
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
# File 'lib/bud/collections.rb', line 229
def flat_map(&blk)
if @bud_instance.wiring?
pusher = self.pro(&blk)
toplevel = @bud_instance.toplevel
elem = Bud::PushElement.new(tabname, toplevel.this_rule_context, tabname)
pusher.wire_to(elem)
f = Proc.new do |t|
t.each do |i|
elem.push_out(i, false)
end
nil
end
elem.set_block(&f)
toplevel.push_elems[[self.object_id, :flatten]] = elem
elem
else
@storage.flat_map(&blk)
end
end
|
- (Object) flush
628
|
# File 'lib/bud/collections.rb', line 628
def flush ; end
|
- (Object) flush_deltas
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
|
# File 'lib/bud/collections.rb', line 702
def flush_deltas
if $BUD_DEBUG
puts "#{qualified_tabname}.flush delta --> storage" unless @delta.empty?
puts "#{qualified_tabname}.flush new_delta --> storage" unless @new_delta.empty?
end
unless @delta.empty?
@storage.merge!(@delta)
@tick_delta.concat(@delta.values) if accumulate_tick_deltas
@delta.clear
end
unless @new_delta.empty?
@storage.merge!(@new_delta)
@new_delta.clear
end
end
|
- (Object) group(key_cols, *aggpairs, &blk)
796
797
798
799
800
|
# File 'lib/bud/collections.rb', line 796
def group(key_cols, *aggpairs, &blk)
key_cols = key_cols.map{|k| canonicalize_col(k)} unless key_cols.nil?
aggpairs = prep_aggpairs(aggpairs)
return to_push_elem.group(key_cols, *aggpairs, &blk)
end
|
- (Boolean) has_key?(k)
358
359
360
361
362
|
# File 'lib/bud/collections.rb', line 358
def has_key?(k)
check_enumerable(k)
return false if k.nil? or self[k].nil?
return true
end
|
- (Boolean) include?(item)
376
377
378
379
380
381
|
# File 'lib/bud/collections.rb', line 376
def include?(item)
return true if key_cols.nil? or (key_cols.empty? and length > 0)
return false if item.nil?
key = get_key_vals(item)
return (item == self[key])
end
|
- (Object) init_schema(given_schema)
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/bud/collections.rb', line 46
def init_schema(given_schema)
given_schema ||= {[:key]=>[:val]}
@given_schema = given_schema
@cols, @key_cols = BudCollection.parse_schema(given_schema)
@cols.each do |s|
if s.to_s.start_with? "@"
raise Bud::CompileError, "illegal use of location specifier (@) in column #{s} of non-channel collection #{tabname}"
end
end
@key_colnums = @key_cols.map {|k| @cols.index(k)}
@val_colnums = val_cols.map {|k| @cols.index(k)}
if @cols.empty?
@cols = nil
else
@struct = ($struct_classes[@cols] ||= Bud::TupleStruct.new(*@cols))
@structlen = @struct.members.length
end
setup_accessors
end
|
- (Object) insert(o, source = nil)
506
507
508
509
|
# File 'lib/bud/collections.rb', line 506
def insert(o, source=nil) do_insert(o, @storage)
end
|
- (Object) inspect
76
77
78
|
# File 'lib/bud/collections.rb', line 76
def inspect
"#{self.class}:#{self.object_id.to_s(16)} [#{qualified_tabname}]"
end
|
- (Object) inspected
190
191
192
|
# File 'lib/bud/collections.rb', line 190
def inspected
self.pro{|t| [t.inspect]}
end
|
- (Object) invalidate_at_tick
287
288
289
|
# File 'lib/bud/collections.rb', line 287
def invalidate_at_tick
true end
|
- (Object) keys
178
179
180
|
# File 'lib/bud/collections.rb', line 178
def keys
self.pro{|t| get_key_vals(t)}
end
|
- (Object) length
383
384
385
|
# File 'lib/bud/collections.rb', line 383
def length
@storage.length + @delta.length
end
|
- (Object) merge(o, buf = @delta)
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
|
# File 'lib/bud/collections.rb', line 570
def merge(o, buf=@delta) if o.class <= Bud::PushElement
add_merge_target
deduce_schema(o) if @cols.nil?
o.wire_to self
elsif o.class <= Bud::BudCollection
add_merge_target
deduce_schema(o) if @cols.nil?
o.pro.wire_to self
elsif o.class <= Proc
add_merge_target
tbl = register_coll_expr(o)
tbl.pro.wire_to self
elsif o.class <= Bud::LatticePushElement
add_merge_target
o.wire_to self
elsif o.class <= Bud::LatticeWrapper
add_merge_target
o.to_push_elem.wire_to self
else
unless o.nil?
o = o.uniq.compact if o.respond_to?(:uniq)
check_enumerable(o)
establish_schema(o) if @cols.nil?
o.each {|i| do_insert(i, buf)}
end
end
end
|
- (Object) non_temporal_predecessors
292
293
294
|
# File 'lib/bud/collections.rb', line 292
def non_temporal_predecessors
@wired_by.select {|e| e.outputs.include? self}
end
|
- (Object) notin(collection, *preds, &blk)
802
803
804
805
806
|
# File 'lib/bud/collections.rb', line 802
def notin(collection, *preds, &blk)
elem1 = to_push_elem
elem2 = collection.to_push_elem
return elem1.notin(elem2, *preds, &blk)
end
|
- (Object) null_tuple
172
173
174
|
# File 'lib/bud/collections.rb', line 172
def null_tuple
@struct.new
end
|
- (Object) pending_merge(o)
618
619
620
621
622
623
624
625
|
# File 'lib/bud/collections.rb', line 618
def pending_merge(o) unless o.nil?
o = o.uniq.compact if o.respond_to?(:uniq)
check_enumerable(o)
establish_schema(o) if @cols.nil?
o.each{|i| self.do_insert(i, @pending)}
end
end
|
- (Object) positive_predecessors
297
298
299
|
# File 'lib/bud/collections.rb', line 297
def positive_predecessors
@wired_by.select {|e| e.outputs.include?(self) || e.pendings.include?(self)}
end
|
- (Object) prep_aggpairs(aggpairs)
785
786
787
788
789
790
791
792
793
794
|
# File 'lib/bud/collections.rb', line 785
def prep_aggpairs(aggpairs)
aggpairs.map do |ap|
agg, *rest = ap
if rest.empty?
[agg]
else
[agg] + rest.map {|c| canonicalize_col(c)}
end
end
end
|
- (Object) pro(the_name = tabname, the_schema = schema, &blk)
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
|
# File 'lib/bud/collections.rb', line 196
def pro(the_name=tabname, the_schema=schema, &blk)
if @bud_instance.wiring?
pusher = to_push_elem(the_name, the_schema)
pusher = pusher.pro(&blk) unless blk.nil?
pusher
else
rv = []
self.each do |t|
t = blk.call(t)
rv << t unless t.nil?
end
rv
end
end
|
- (Object) qualified_tabname
72
73
74
|
# File 'lib/bud/collections.rb', line 72
def qualified_tabname
@qualified_tabname ||= @bud_instance.toplevel? ? tabname : "#{@bud_instance.qualified_name}.#{tabname}".to_sym
end
|
- (Object) reduce(initial, &blk)
812
813
814
|
# File 'lib/bud/collections.rb', line 812
def reduce(initial, &blk)
return to_push_elem.reduce(initial, &blk)
end
|
- (Object) register_coll_expr(expr)
599
600
601
602
603
604
|
# File 'lib/bud/collections.rb', line 599
def register_coll_expr(expr)
coll_name = "expr_#{expr.object_id}"
cols = (1..@cols.length).map{|i| "c#{i}".to_sym} unless @cols.nil?
@bud_instance.coll_expr(coll_name.to_sym, expr, cols)
@bud_instance.send(coll_name)
end
|
- (Object) rename(the_name, the_schema = nil, &blk)
259
260
261
262
263
264
265
266
|
# File 'lib/bud/collections.rb', line 259
def rename(the_name, the_schema=nil, &blk)
raise Bud::Error unless @bud_instance.wiring?
unless @bud_instance.respond_to? the_name
raise Bud::Error, "rename failed to define a scratch named #{the_name}"
end
pro(the_name, the_schema, &blk)
end
|
- (Object) schema
110
111
112
113
114
|
# File 'lib/bud/collections.rb', line 110
def schema
return nil if @cols.nil?
return key_cols if val_cols.empty?
return { key_cols => val_cols }
end
|
- (Object) sort(&blk)
250
251
252
253
254
255
256
257
|
# File 'lib/bud/collections.rb', line 250
def sort(&blk)
if @bud_instance.wiring?
pusher = self.pro
pusher.sort("sort#{object_id}", @bud_instance, @cols, &blk)
else
@storage.values.sort(&blk)
end
end
|
- (Object) tick
658
659
660
|
# File 'lib/bud/collections.rb', line 658
def tick
raise Bud::Error, "tick must be overriden in #{self.class}"
end
|
- (Object) tick_deltas
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
|
# File 'lib/bud/collections.rb', line 665
def tick_deltas unless @delta.empty?
puts "#{qualified_tabname}.tick_deltas delta --> storage (#{@delta.size} elems)" if $BUD_DEBUG
@storage.merge!(@delta)
@tick_delta.concat(@delta.values) if accumulate_tick_deltas
@delta.clear
end
unless @new_delta.empty?
puts "#{qualified_tabname}.tick_deltas new_delta --> delta (#{@new_delta.size} elems)" if $BUD_DEBUG
@new_delta.each_pair do |key, tup|
merge_to_buf(@delta, key, tup, @storage[key])
end
@new_delta.clear
return !(@delta.empty?)
end
return false end
|
- (Object) tick_metrics
302
303
304
305
306
307
308
309
310
|
# File 'lib/bud/collections.rb', line 302
def tick_metrics
strat_num = bud_instance.this_stratum
rule_num = bud_instance.this_rule
addr = nil
addr = bud_instance.ip_port unless bud_instance.port.nil?
bud_instance.metrics[:collections] ||= {}
bud_instance.metrics[:collections][{:addr=>addr, :tabname=>qualified_tabname, :strat_num=>strat_num, :rule_num=>rule_num}] ||= 0
bud_instance.metrics[:collections][{:addr=>addr, :tabname=>qualified_tabname, :strat_num=>strat_num, :rule_num=>rule_num}] += 1
end
|
- (Object) to_push_elem(the_name = tabname, the_schema = schema)
720
721
722
723
724
725
726
727
728
729
730
731
732
733
|
# File 'lib/bud/collections.rb', line 720
def to_push_elem(the_name=tabname, the_schema=schema)
toplevel = @bud_instance.toplevel
this_stratum = toplevel.this_stratum
oid = self.object_id
unless toplevel.scanners[this_stratum][[oid, the_name]]
scanner = Bud::ScannerElement.new(the_name, @bud_instance,
self, the_schema)
toplevel.scanners[this_stratum][[oid, the_name]] = scanner
toplevel.push_sources[this_stratum][[oid, the_name]] = scanner
@scanner_cnt += 1
end
return toplevel.scanners[this_stratum][[oid, the_name]]
end
|
- (Object) uniquify_tabname
817
818
819
820
|
# File 'lib/bud/collections.rb', line 817
def uniquify_tabname @tabname = (@tabname.to_s + Time.new.tv_usec.to_s).to_sym
end
|
- (Object) val_cols
118
119
120
|
# File 'lib/bud/collections.rb', line 118
def val_cols @cols - @key_cols
end
|
- (Object) values
184
185
186
|
# File 'lib/bud/collections.rb', line 184
def values
self.pro{|t| (self.key_cols.length..self.cols.length-1).map{|i| t[i]}}
end
|