Class: Bud::BudCollection

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/bud/collections.rb

Overview

– the collection types “ each collection is partitioned into 4:

  • pending holds tuples deferred til the next tick

  • storage holds the “normal” tuples

  • delta holds the delta for rhs's of rules during semi-naive

  • new_delta will hold the lhs tuples currently being produced during s-n

  • tick_delta holds Union(delta_i) for each delta_i processed in fixpoint iteration i.

++

Direct Known Subclasses

BudChannel, BudPersistentCollection, BudReadOnly, BudScratch, PushElement

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (BudCollection) initialize(name, bud_instance, given_schema = nil, defer_schema = false)

:nodoc: all



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/bud/collections.rb', line 26

def initialize(name, bud_instance, given_schema=nil, defer_schema=false) # :nodoc: all
  @tabname = name
  @bud_instance = bud_instance
  @invalidated = true
  @is_source = true # unless it shows up on the lhs of some rule
  @scanner_cnt = 0
  @wired_by = []
  @accumulate_tick_deltas = false
  init_schema(given_schema) unless given_schema.nil? and defer_schema
  init_buffers
end

Instance Attribute Details

- (Object) accumulate_tick_deltas

updated in bud.do_wiring



24
25
26
# File 'lib/bud/collections.rb', line 24

def accumulate_tick_deltas
  @accumulate_tick_deltas
end

- (Object) bud_instance

:nodoc: all



17
18
19
# File 'lib/bud/collections.rb', line 17

def bud_instance
  @bud_instance
end

- (Object) cols (readonly)

:nodoc: all



18
19
20
# File 'lib/bud/collections.rb', line 18

def cols
  @cols
end

- (Object) delta (readonly)

:nodoc: all



20
21
22
# File 'lib/bud/collections.rb', line 20

def delta
  @delta
end

- (Object) invalidated

Returns the value of attribute invalidated



22
23
24
# File 'lib/bud/collections.rb', line 22

def invalidated
  @invalidated
end

- (Object) is_source

Returns the value of attribute is_source



23
24
25
# File 'lib/bud/collections.rb', line 23

def is_source
  @is_source
end

- (Object) key_cols (readonly)

:nodoc: all



18
19
20
# File 'lib/bud/collections.rb', line 18

def key_cols
  @key_cols
end

- (Object) new_delta (readonly)

:nodoc: all



20
21
22
# File 'lib/bud/collections.rb', line 20

def new_delta
  @new_delta
end

- (Object) pending (readonly)

:nodoc: all



20
21
22
# File 'lib/bud/collections.rb', line 20

def pending
  @pending
end

- (Object) rescan

Returns the value of attribute rescan



22
23
24
# File 'lib/bud/collections.rb', line 22

def rescan
  @rescan
end

- (Object) scanner_cnt (readonly)

Returns the value of attribute scanner_cnt



21
22
23
# File 'lib/bud/collections.rb', line 21

def scanner_cnt
  @scanner_cnt
end

- (Object) storage (readonly)

:nodoc: all



20
21
22
# File 'lib/bud/collections.rb', line 20

def storage
  @storage
end

- (Object) struct (readonly)

Returns the value of attribute struct



19
20
21
# File 'lib/bud/collections.rb', line 19

def struct
  @struct
end

- (Object) tabname (readonly)

:nodoc: all



18
19
20
# File 'lib/bud/collections.rb', line 18

def tabname
  @tabname
end

- (Object) tick_delta (readonly)

:nodoc: all



20
21
22
# File 'lib/bud/collections.rb', line 20

def tick_delta
  @tick_delta
end

- (Object) wired_by (readonly)

Returns the value of attribute wired_by



21
22
23
# File 'lib/bud/collections.rb', line 21

def wired_by
  @wired_by
end

Instance Method Details

- (Object) *(collection)



781
782
783
# File 'lib/bud/collections.rb', line 781

def *(collection)
  return to_push_elem.join(collection)
end

- (Object) <<(item)

instantaneously place an individual item from rhs into collection on lhs



512
513
514
# File 'lib/bud/collections.rb', line 512

def <<(item)
  insert(item)
end

- (Object) <=(collection)

instantaneously merge items from collection o into buf



608
609
610
611
612
613
614
# File 'lib/bud/collections.rb', line 608

def <=(collection)
  unless bud_instance.toplevel.inside_tick
    raise Bud::CompileError, "illegal use of <= outside of bloom block, use <+ instead"
  end

  merge(collection)
end

- (Object) [](k)



366
367
368
369
370
371
372
# File 'lib/bud/collections.rb', line 366

def [](k)
  # assumes that key is in storage or delta, but not both
  # is this enforced in do_insert?
  check_enumerable(k)
  t = @storage[k]
  return t.nil? ? @delta[k] : t
end

- (Object) add_rescan_invalidate(rescan, invalidate)



688
689
690
691
692
# File 'lib/bud/collections.rb', line 688

def add_rescan_invalidate(rescan, invalidate)
  # No change. Most collections don't need to rescan on every tick (only do
  # so on negate). Also, there's no cache to invalidate by default.
  # Scratches and PushElements override this method.
end

- (Object) argagg(aggname, gbkey_cols, collection, &blk)



753
754
755
756
757
758
759
760
# File 'lib/bud/collections.rb', line 753

def argagg(aggname, gbkey_cols, collection, &blk)
  elem = to_push_elem
  gbkey_cols = gbkey_cols.map{|k| canonicalize_col(k)} unless gbkey_cols.nil?
  retval = elem.argagg(aggname, gbkey_cols, canonicalize_col(collection), &blk)
  # PushElement inherits the schema accessors from this Collection
  retval.extend @cols_access
  retval
end

- (Object) argmax(gbkey_cols, col, &blk)



774
775
776
# File 'lib/bud/collections.rb', line 774

def argmax(gbkey_cols, col, &blk)
  argagg(:max, gbkey_cols, col, &blk)
end

- (Object) argmin(gbkey_cols, col, &blk)



766
767
768
# File 'lib/bud/collections.rb', line 766

def argmin(gbkey_cols, col, &blk)
  argagg(:min, gbkey_cols, col, &blk)
end

- (Object) bootstrap



694
695
696
697
698
699
# File 'lib/bud/collections.rb', line 694

def bootstrap
  unless @pending.empty?
    @delta = @pending
    @pending = {}
  end
end

- (Object) canonicalize_col(col)



808
809
810
# File 'lib/bud/collections.rb', line 808

def canonicalize_col(col)
  col.class <= Symbol ? self.send(col) : col
end

- (Object) close

:nodoc: all



353
354
# File 'lib/bud/collections.rb', line 353

def close # :nodoc: all
end

- (Object) do_insert(t, store)



447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/bud/collections.rb', line 447

def do_insert(t, store)
  if $BUD_DEBUG
    storetype = case store.object_id
                  when @storage.object_id; "storage"
                  when @pending.object_id; "pending"
                  when @delta.object_id; "delta"
                  when @new_delta.object_id; "new_delta"
                end
    puts "#{qualified_tabname}.#{storetype} ==> #{t.inspect}"
  end
  return if t.nil? # silently ignore nils resulting from map predicates failing
  t = prep_tuple(t)
  key = get_key_vals(t)
  merge_to_buf(store, key, t, store[key])
end

- (Object) each(&block)

:nodoc: all



272
273
274
# File 'lib/bud/collections.rb', line 272

def each(&block) # :nodoc: all
  each_from([@storage, @delta], &block)
end

- (Object) each_from_sym(buf_syms, &block)

:nodoc: all



323
324
325
326
327
328
329
330
331
332
333
# File 'lib/bud/collections.rb', line 323

def each_from_sym(buf_syms, &block) # :nodoc: all
  bufs = buf_syms.map do |s|
    case s
    when :storage then @storage
    when :delta then @delta
    when :new_delta then @new_delta
    else raise Bud::Error, "bad symbol passed into each_from_sym"
    end
  end
  each_from(bufs, &block)
end

- (Object) each_raw(&block)



277
278
279
# File 'lib/bud/collections.rb', line 277

def each_raw(&block)
  @storage.each_value(&block)
end

- (Object) each_tick_delta(&block)



282
283
284
# File 'lib/bud/collections.rb', line 282

def each_tick_delta(&block)
  @tick_delta.each(&block)
end

- (Object) each_with_index(the_name = tabname, the_schema = schema, &blk)

XXX: Although we support each_with_index over Bud collections, using it is probably not a great idea: the index assigned to a given collection member is not defined by the language semantics.



215
216
217
218
219
220
221
222
# File 'lib/bud/collections.rb', line 215

def each_with_index(the_name=tabname, the_schema=schema, &blk)
  if @bud_instance.wiring?
    pusher = to_push_elem(the_name, the_schema)
    pusher.each_with_index(&blk)
  else
    super(&blk)
  end
end

- (Boolean) empty?

Returns:

  • (Boolean)


387
388
389
# File 'lib/bud/collections.rb', line 387

def empty?
  length == 0
end

- (Boolean) exists?(&block)

Returns:

  • (Boolean)


393
394
395
396
397
398
399
400
401
# File 'lib/bud/collections.rb', line 393

def exists?(&block)
  if length == 0
    return false
  elsif not block_given?
    return true
  else
    return ((detect{|t| yield t}).nil?) ? false : true
  end
end

- (Object) flat_map(&blk)



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/bud/collections.rb', line 229

def flat_map(&blk)
  if @bud_instance.wiring?
    pusher = self.pro(&blk)
    toplevel = @bud_instance.toplevel
    elem = Bud::PushElement.new(tabname, toplevel.this_rule_context, tabname)
    pusher.wire_to(elem)
    f = Proc.new do |t|
      t.each do |i|
        elem.push_out(i, false)
      end
      nil
    end
    elem.set_block(&f)
    toplevel.push_elems[[self.object_id, :flatten]] = elem
    elem
  else
    @storage.flat_map(&blk)
  end
end

- (Object) flush



628
# File 'lib/bud/collections.rb', line 628

def flush ; end

- (Object) flush_deltas



702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
# File 'lib/bud/collections.rb', line 702

def flush_deltas
  if $BUD_DEBUG
    puts "#{qualified_tabname}.flush delta --> storage" unless @delta.empty?
    puts "#{qualified_tabname}.flush new_delta --> storage" unless @new_delta.empty?
  end
  unless @delta.empty?
    @storage.merge!(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end
  unless @new_delta.empty?
    @storage.merge!(@new_delta)
    @new_delta.clear
  end
  # @tick_delta kept around for higher strata.
end

- (Object) group(key_cols, *aggpairs, &blk)



796
797
798
799
800
# File 'lib/bud/collections.rb', line 796

def group(key_cols, *aggpairs, &blk)
  key_cols = key_cols.map{|k| canonicalize_col(k)} unless key_cols.nil?
  aggpairs = prep_aggpairs(aggpairs)
  return to_push_elem.group(key_cols, *aggpairs, &blk)
end

- (Boolean) has_key?(k)

Returns:

  • (Boolean)


358
359
360
361
362
# File 'lib/bud/collections.rb', line 358

def has_key?(k)
  check_enumerable(k)
  return false if k.nil? or self[k].nil?
  return true
end

- (Boolean) include?(item)

Returns:

  • (Boolean)


376
377
378
379
380
381
# File 'lib/bud/collections.rb', line 376

def include?(item)
  return true if key_cols.nil? or (key_cols.empty? and length > 0)
  return false if item.nil?
  key = get_key_vals(item)
  return (item == self[key])
end

- (Object) init_schema(given_schema)



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/bud/collections.rb', line 46

def init_schema(given_schema)
  given_schema ||= {[:key]=>[:val]}
  @given_schema = given_schema
  @cols, @key_cols = BudCollection.parse_schema(given_schema)

  # Check that no location specifiers appear in the schema. In the case of
  # channels, the location specifier has already been stripped from the
  # user-specified schema.
  @cols.each do |s|
    if s.to_s.start_with? "@"
      raise Bud::CompileError, "illegal use of location specifier (@) in column #{s} of non-channel collection #{tabname}"
    end
  end

  @key_colnums = @key_cols.map {|k| @cols.index(k)}
  @val_colnums = val_cols.map {|k| @cols.index(k)}

  if @cols.empty?
    @cols = nil
  else
    @struct = ($struct_classes[@cols] ||= Bud::TupleStruct.new(*@cols))
    @structlen = @struct.members.length
  end
  setup_accessors
end

- (Object) insert(o, source = nil)

:nodoc: all



506
507
508
509
# File 'lib/bud/collections.rb', line 506

def insert(o, source=nil) # :nodoc: all
  # puts "insert: #{o} into #{qualified_tabname}"
  do_insert(o, @storage)
end

- (Object) inspect



76
77
78
# File 'lib/bud/collections.rb', line 76

def inspect
  "#{self.class}:#{self.object_id.to_s(16)} [#{qualified_tabname}]"
end

- (Object) inspected



190
191
192
# File 'lib/bud/collections.rb', line 190

def inspected
  self.pro{|t| [t.inspect]}
end

- (Object) invalidate_at_tick



287
288
289
# File 'lib/bud/collections.rb', line 287

def invalidate_at_tick
  true # being conservative here as a default.
end

- (Object) keys



178
179
180
# File 'lib/bud/collections.rb', line 178

def keys
  self.pro{|t| get_key_vals(t)}
end

- (Object) length



383
384
385
# File 'lib/bud/collections.rb', line 383

def length
  @storage.length + @delta.length
end

- (Object) merge(o, buf = @delta)

:nodoc: all



570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
# File 'lib/bud/collections.rb', line 570

def merge(o, buf=@delta) # :nodoc: all
  if o.class <= Bud::PushElement
    add_merge_target
    deduce_schema(o) if @cols.nil?
    o.wire_to self
  elsif o.class <= Bud::BudCollection
    add_merge_target
    deduce_schema(o) if @cols.nil?
    o.pro.wire_to self
  elsif o.class <= Proc
    add_merge_target
    tbl = register_coll_expr(o)
    tbl.pro.wire_to self
  elsif o.class <= Bud::LatticePushElement
    add_merge_target
    o.wire_to self
  elsif o.class <= Bud::LatticeWrapper
    add_merge_target
    o.to_push_elem.wire_to self
  else
    unless o.nil?
      o = o.uniq.compact if o.respond_to?(:uniq)
      check_enumerable(o)
      establish_schema(o) if @cols.nil?
      o.each {|i| do_insert(i, buf)}
    end
  end
end

- (Object) non_temporal_predecessors



292
293
294
# File 'lib/bud/collections.rb', line 292

def non_temporal_predecessors
  @wired_by.select {|e| e.outputs.include? self}
end

- (Object) notin(collection, *preds, &blk)



802
803
804
805
806
# File 'lib/bud/collections.rb', line 802

def notin(collection, *preds, &blk)
  elem1 = to_push_elem
  elem2 = collection.to_push_elem
  return elem1.notin(elem2, *preds, &blk)
end

- (Object) null_tuple



172
173
174
# File 'lib/bud/collections.rb', line 172

def null_tuple
  @struct.new
end

- (Object) pending_merge(o)

:nodoc: all



618
619
620
621
622
623
624
625
# File 'lib/bud/collections.rb', line 618

def pending_merge(o) # :nodoc: all
  unless o.nil?
    o = o.uniq.compact if o.respond_to?(:uniq)
    check_enumerable(o)
    establish_schema(o) if @cols.nil?
    o.each{|i| self.do_insert(i, @pending)}
  end
end

- (Object) positive_predecessors



297
298
299
# File 'lib/bud/collections.rb', line 297

def positive_predecessors
  @wired_by.select {|e| e.outputs.include?(self) || e.pendings.include?(self)}
end

- (Object) prep_aggpairs(aggpairs)



785
786
787
788
789
790
791
792
793
794
# File 'lib/bud/collections.rb', line 785

def prep_aggpairs(aggpairs)
  aggpairs.map do |ap|
    agg, *rest = ap
    if rest.empty?
      [agg]
    else
      [agg] + rest.map {|c| canonicalize_col(c)}
    end
  end
end

- (Object) pro(the_name = tabname, the_schema = schema, &blk)



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/bud/collections.rb', line 196

def pro(the_name=tabname, the_schema=schema, &blk)
  if @bud_instance.wiring?
    pusher = to_push_elem(the_name, the_schema)
    # If there is no code block evaluate, use the scanner directly
    pusher = pusher.pro(&blk) unless blk.nil?
    pusher
  else
    rv = []
    self.each do |t|
      t = blk.call(t)
      rv << t unless t.nil?
    end
    rv
  end
end

- (Object) qualified_tabname



72
73
74
# File 'lib/bud/collections.rb', line 72

def qualified_tabname
  @qualified_tabname ||= @bud_instance.toplevel?  ? tabname : "#{@bud_instance.qualified_name}.#{tabname}".to_sym
end

- (Object) reduce(initial, &blk)



812
813
814
# File 'lib/bud/collections.rb', line 812

def reduce(initial, &blk)
  return to_push_elem.reduce(initial, &blk)
end

- (Object) register_coll_expr(expr)



599
600
601
602
603
604
# File 'lib/bud/collections.rb', line 599

def register_coll_expr(expr)
  coll_name = "expr_#{expr.object_id}"
  cols = (1..@cols.length).map{|i| "c#{i}".to_sym} unless @cols.nil?
  @bud_instance.coll_expr(coll_name.to_sym, expr, cols)
  @bud_instance.send(coll_name)
end

- (Object) rename(the_name, the_schema = nil, &blk)

Raises:



259
260
261
262
263
264
265
266
# File 'lib/bud/collections.rb', line 259

def rename(the_name, the_schema=nil, &blk)
  raise Bud::Error unless @bud_instance.wiring?
  # a scratch with this name should have been defined during rewriting
  unless @bud_instance.respond_to? the_name
    raise Bud::Error, "rename failed to define a scratch named #{the_name}"
  end
  pro(the_name, the_schema, &blk)
end

- (Object) schema



110
111
112
113
114
# File 'lib/bud/collections.rb', line 110

def schema
  return nil if @cols.nil?
  return key_cols if val_cols.empty?
  return { key_cols => val_cols }
end

- (Object) sort(&blk)



250
251
252
253
254
255
256
257
# File 'lib/bud/collections.rb', line 250

def sort(&blk)
  if @bud_instance.wiring?
    pusher = self.pro
    pusher.sort("sort#{object_id}", @bud_instance, @cols, &blk)
  else
    @storage.values.sort(&blk)
  end
end

- (Object) tick

Raises:



658
659
660
# File 'lib/bud/collections.rb', line 658

def tick
  raise Bud::Error, "tick must be overriden in #{self.class}"
end

- (Object) tick_deltas

:nodoc: all



665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
# File 'lib/bud/collections.rb', line 665

def tick_deltas # :nodoc: all
  unless @delta.empty?
    puts "#{qualified_tabname}.tick_deltas delta --> storage (#{@delta.size} elems)" if $BUD_DEBUG
    @storage.merge!(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end

  unless @new_delta.empty?
    puts "#{qualified_tabname}.tick_deltas new_delta --> delta (#{@new_delta.size} elems)" if $BUD_DEBUG

    # NB: key conflicts between two new_delta tuples are detected in
    # do_insert().
    @new_delta.each_pair do |key, tup|
      merge_to_buf(@delta, key, tup, @storage[key])
    end
    @new_delta.clear
    return !(@delta.empty?)
  end
  return false # delta empty; another fixpoint iter not required.
end

- (Object) tick_metrics



302
303
304
305
306
307
308
309
310
# File 'lib/bud/collections.rb', line 302

def tick_metrics
  strat_num = bud_instance.this_stratum
  rule_num = bud_instance.this_rule
  addr = nil
  addr = bud_instance.ip_port unless bud_instance.port.nil?
  bud_instance.metrics[:collections] ||= {}
  bud_instance.metrics[:collections][{:addr=>addr, :tabname=>qualified_tabname, :strat_num=>strat_num, :rule_num=>rule_num}] ||= 0
  bud_instance.metrics[:collections][{:addr=>addr, :tabname=>qualified_tabname, :strat_num=>strat_num, :rule_num=>rule_num}] += 1
end

- (Object) to_push_elem(the_name = tabname, the_schema = schema)



720
721
722
723
724
725
726
727
728
729
730
731
732
733
# File 'lib/bud/collections.rb', line 720

def to_push_elem(the_name=tabname, the_schema=schema)
  # if no push source yet, set one up
  toplevel = @bud_instance.toplevel
  this_stratum = toplevel.this_stratum
  oid = self.object_id
  unless toplevel.scanners[this_stratum][[oid, the_name]]
    scanner = Bud::ScannerElement.new(the_name, @bud_instance,
                                      self, the_schema)
    toplevel.scanners[this_stratum][[oid, the_name]] = scanner
    toplevel.push_sources[this_stratum][[oid, the_name]] = scanner
    @scanner_cnt += 1
  end
  return toplevel.scanners[this_stratum][[oid, the_name]]
end

- (Object) uniquify_tabname

:nodoc: all



817
818
819
820
# File 'lib/bud/collections.rb', line 817

def uniquify_tabname # :nodoc: all
  # just append current number of microseconds
  @tabname = (@tabname.to_s + Time.new.tv_usec.to_s).to_sym
end

- (Object) val_cols

:nodoc: all



118
119
120
# File 'lib/bud/collections.rb', line 118

def val_cols # :nodoc: all
  @cols - @key_cols
end

- (Object) values



184
185
186
# File 'lib/bud/collections.rb', line 184

def values
  self.pro{|t| (self.key_cols.length..self.cols.length-1).map{|i| t[i]}}
end