Class: Bud::BudCollection

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/bud/collections.rb

Overview

– the collection types “ each collection is partitioned into 4:

  • pending holds tuples deferred til the next tick

  • storage holds the “normal” tuples

  • delta holds the delta for rhs's of rules during semi-naive

  • new_delta will hold the lhs tuples currently being produced during s-n

  • tick_delta holds Union(delta_i) for each delta_i processed in fixpoint iteration i.

++

Direct Known Subclasses

BudChannel, BudPersistentCollection, BudReadOnly, BudScratch, PushElement

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (BudCollection) initialize(name, bud_instance, given_schema = nil, defer_schema = false)

:nodoc: all



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/bud/collections.rb', line 25

def initialize(name, bud_instance, given_schema=nil, defer_schema=false) # :nodoc: all
  @tabname = name
  @bud_instance = bud_instance
  @invalidated = true
  @is_source = true # unless it shows up on the lhs of some rule
  @scanner_cnt = 0
  @wired_by = []
  @accumulate_tick_deltas = false
  init_schema(given_schema) unless given_schema.nil? and defer_schema
  init_buffers
end

Instance Attribute Details

- (Object) accumulate_tick_deltas

updated in bud.do_wiring



23
24
25
# File 'lib/bud/collections.rb', line 23

def accumulate_tick_deltas
  @accumulate_tick_deltas
end

- (Object) bud_instance

:nodoc: all



16
17
18
# File 'lib/bud/collections.rb', line 16

def bud_instance
  @bud_instance
end

- (Object) cols (readonly)

:nodoc: all



17
18
19
# File 'lib/bud/collections.rb', line 17

def cols
  @cols
end

- (Object) invalidated

Returns the value of attribute invalidated



21
22
23
# File 'lib/bud/collections.rb', line 21

def invalidated
  @invalidated
end

- (Object) is_source

Returns the value of attribute is_source



22
23
24
# File 'lib/bud/collections.rb', line 22

def is_source
  @is_source
end

- (Object) key_cols (readonly)

:nodoc: all



17
18
19
# File 'lib/bud/collections.rb', line 17

def key_cols
  @key_cols
end

- (Object) new_delta (readonly)

:nodoc: all



19
20
21
# File 'lib/bud/collections.rb', line 19

def new_delta
  @new_delta
end

- (Object) pending (readonly)

:nodoc: all



19
20
21
# File 'lib/bud/collections.rb', line 19

def pending
  @pending
end

- (Object) rescan

Returns the value of attribute rescan



21
22
23
# File 'lib/bud/collections.rb', line 21

def rescan
  @rescan
end

- (Object) scanner_cnt (readonly)

Returns the value of attribute scanner_cnt



20
21
22
# File 'lib/bud/collections.rb', line 20

def scanner_cnt
  @scanner_cnt
end

- (Object) struct (readonly)

Returns the value of attribute struct



18
19
20
# File 'lib/bud/collections.rb', line 18

def struct
  @struct
end

- (Object) tabname (readonly)

:nodoc: all



17
18
19
# File 'lib/bud/collections.rb', line 17

def tabname
  @tabname
end

- (Object) wired_by (readonly)

Returns the value of attribute wired_by



20
21
22
# File 'lib/bud/collections.rb', line 20

def wired_by
  @wired_by
end

Instance Method Details

- (Object) *(collection)



774
775
776
# File 'lib/bud/collections.rb', line 774

def *(collection)
  return to_push_elem.join(collection)
end

- (Object) <<(item)

instantaneously place an individual item from rhs into collection on lhs



505
506
507
# File 'lib/bud/collections.rb', line 505

def <<(item)
  insert(item)
end

- (Object) <=(collection)

instantaneously merge items from collection o into buf



601
602
603
604
605
606
607
# File 'lib/bud/collections.rb', line 601

def <=(collection)
  unless bud_instance.toplevel.inside_tick
    raise Bud::CompileError, "illegal use of <= outside of bloom block, use <+ instead"
  end

  merge(collection)
end

- (Object) [](k)



359
360
361
362
363
364
365
# File 'lib/bud/collections.rb', line 359

def [](k)
  # assumes that key is in storage or delta, but not both
  # is this enforced in do_insert?
  check_enumerable(k)
  t = @storage[k]
  return t.nil? ? @delta[k] : t
end

- (Object) add_rescan_invalidate(rescan, invalidate)



681
682
683
684
685
# File 'lib/bud/collections.rb', line 681

def add_rescan_invalidate(rescan, invalidate)
  # No change. Most collections don't need to rescan on every tick (only do
  # so on negate). Also, there's no cache to invalidate by default.
  # Scratches and PushElements override this method.
end

- (Object) argagg(aggname, gbkey_cols, collection, &blk)



746
747
748
749
750
751
752
753
# File 'lib/bud/collections.rb', line 746

def argagg(aggname, gbkey_cols, collection, &blk)
  elem = to_push_elem
  gbkey_cols = gbkey_cols.map{|k| canonicalize_col(k)} unless gbkey_cols.nil?
  retval = elem.argagg(aggname, gbkey_cols, canonicalize_col(collection), &blk)
  # PushElement inherits the schema accessors from this Collection
  retval.extend @cols_access
  retval
end

- (Object) argmax(gbkey_cols, col, &blk)



767
768
769
# File 'lib/bud/collections.rb', line 767

def argmax(gbkey_cols, col, &blk)
  argagg(:max, gbkey_cols, col, &blk)
end

- (Object) argmin(gbkey_cols, col, &blk)



759
760
761
# File 'lib/bud/collections.rb', line 759

def argmin(gbkey_cols, col, &blk)
  argagg(:min, gbkey_cols, col, &blk)
end

- (Object) bootstrap



687
688
689
690
691
692
# File 'lib/bud/collections.rb', line 687

def bootstrap
  unless @pending.empty?
    @delta = @pending
    @pending = {}
  end
end

- (Object) canonicalize_col(col)



801
802
803
# File 'lib/bud/collections.rb', line 801

def canonicalize_col(col)
  col.class <= Symbol ? self.send(col) : col
end

- (Object) close

:nodoc: all



346
347
# File 'lib/bud/collections.rb', line 346

def close # :nodoc: all
end

- (Object) do_insert(t, store)



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/bud/collections.rb', line 440

def do_insert(t, store)
  if $BUD_DEBUG
    storetype = case store.object_id
                  when @storage.object_id; "storage"
                  when @pending.object_id; "pending"
                  when @delta.object_id; "delta"
                  when @new_delta.object_id; "new_delta"
                end
    puts "#{qualified_tabname}.#{storetype} ==> #{t.inspect}"
  end
  return if t.nil? # silently ignore nils resulting from map predicates failing
  t = prep_tuple(t)
  key = get_key_vals(t)
  merge_to_buf(store, key, t, store[key])
end

- (Object) each(&block)

:nodoc: all



271
272
273
# File 'lib/bud/collections.rb', line 271

def each(&block) # :nodoc: all
  each_from([@storage, @delta], &block)
end

- (Object) each_delta(&block)



281
282
283
# File 'lib/bud/collections.rb', line 281

def each_delta(&block)
  each_from([@delta], &block)
end

- (Object) each_raw(&block)



276
277
278
# File 'lib/bud/collections.rb', line 276

def each_raw(&block)
  each_from([@storage], &block)
end

- (Object) each_tick_delta(&block)



286
287
288
# File 'lib/bud/collections.rb', line 286

def each_tick_delta(&block)
  @tick_delta.each(&block)
end

- (Object) each_with_index(the_name = tabname, the_schema = schema, &blk)

XXX: Although we support each_with_index over Bud collections, using it is probably not a great idea: the index assigned to a given collection member is not defined by the language semantics.



214
215
216
217
218
219
220
221
# File 'lib/bud/collections.rb', line 214

def each_with_index(the_name=tabname, the_schema=schema, &blk)
  if @bud_instance.wiring?
    pusher = to_push_elem(the_name, the_schema)
    pusher.each_with_index(&blk)
  else
    super(&blk)
  end
end

- (Boolean) empty?



380
381
382
# File 'lib/bud/collections.rb', line 380

def empty?
  length == 0
end

- (Boolean) exists?(&block)



386
387
388
389
390
391
392
393
394
# File 'lib/bud/collections.rb', line 386

def exists?(&block)
  if length == 0
    return false
  elsif not block_given?
    return true
  else
    return ((detect{|t| yield t}).nil?) ? false : true
  end
end

- (Object) flat_map(&blk)



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/bud/collections.rb', line 228

def flat_map(&blk)
  if @bud_instance.wiring?
    pusher = self.pro(&blk)
    toplevel = @bud_instance.toplevel
    elem = Bud::PushElement.new(tabname, toplevel.this_rule_context, tabname)
    pusher.wire_to(elem)
    f = Proc.new do |t|
      t.each do |i|
        elem.push_out(i, false)
      end
      nil
    end
    elem.set_block(&f)
    toplevel.push_elems[[self.object_id, :flatten]] = elem
    elem
  else
    @storage.flat_map(&blk)
  end
end

- (Object) flush



621
# File 'lib/bud/collections.rb', line 621

def flush ; end

- (Object) flush_deltas



695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
# File 'lib/bud/collections.rb', line 695

def flush_deltas
  if $BUD_DEBUG
    puts "#{qualified_tabname}.flush delta --> storage" unless @delta.empty?
    puts "#{qualified_tabname}.flush new_delta --> storage" unless @new_delta.empty?
  end
  unless @delta.empty?
    @storage.merge!(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end
  unless @new_delta.empty?
    @storage.merge!(@new_delta)
    @new_delta.clear
  end
  # @tick_delta kept around for higher strata.
end

- (Object) group(key_cols, *aggpairs, &blk)



789
790
791
792
793
# File 'lib/bud/collections.rb', line 789

def group(key_cols, *aggpairs, &blk)
  key_cols = key_cols.map{|k| canonicalize_col(k)} unless key_cols.nil?
  aggpairs = prep_aggpairs(aggpairs)
  return to_push_elem.group(key_cols, *aggpairs, &blk)
end

- (Boolean) has_key?(k)



351
352
353
354
355
# File 'lib/bud/collections.rb', line 351

def has_key?(k)
  check_enumerable(k)
  return false if k.nil? or self[k].nil?
  return true
end

- (Boolean) include?(item)



369
370
371
372
373
374
# File 'lib/bud/collections.rb', line 369

def include?(item)
  return true if key_cols.nil?
  return false if item.nil?
  key = get_key_vals(item)
  return (item == self[key])
end

- (Object) init_schema(given_schema)



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/bud/collections.rb', line 45

def init_schema(given_schema)
  given_schema ||= {[:key]=>[:val]}
  @given_schema = given_schema
  @cols, @key_cols = BudCollection.parse_schema(given_schema)

  # Check that no location specifiers appear in the schema. In the case of
  # channels, the location specifier has already been stripped from the
  # user-specified schema.
  @cols.each do |s|
    if s.to_s.start_with? "@"
      raise Bud::CompileError, "illegal use of location specifier (@) in column #{s} of non-channel collection #{tabname}"
    end
  end

  @key_colnums = @key_cols.map {|k| @cols.index(k)}
  @val_colnums = val_cols.map {|k| @cols.index(k)}

  if @cols.empty?
    @cols = nil
  else
    @struct = Bud::TupleStruct.new_struct(@cols)
    @structlen = @struct.members.length
  end
  setup_accessors
end

- (Object) insert(o, source = nil)

:nodoc: all



499
500
501
502
# File 'lib/bud/collections.rb', line 499

def insert(o, source=nil) # :nodoc: all
  # puts "insert: #{o} into #{qualified_tabname}"
  do_insert(o, @storage)
end

- (Object) inspect



75
76
77
# File 'lib/bud/collections.rb', line 75

def inspect
  "#{self.class}:#{self.object_id.to_s(16)} [#{qualified_tabname}]"
end

- (Object) inspected



189
190
191
# File 'lib/bud/collections.rb', line 189

def inspected
  self.pro{|t| [t.inspect]}
end

- (Object) invalidate_at_tick



291
292
293
# File 'lib/bud/collections.rb', line 291

def invalidate_at_tick
  true # being conservative here as a default.
end

- (Object) keys



177
178
179
# File 'lib/bud/collections.rb', line 177

def keys
  self.pro{|t| get_key_vals(t)}
end

- (Object) length



376
377
378
# File 'lib/bud/collections.rb', line 376

def length
  @storage.length + @delta.length
end

- (Object) merge(o, buf = @delta)

:nodoc: all



563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# File 'lib/bud/collections.rb', line 563

def merge(o, buf=@delta) # :nodoc: all
  if o.class <= Bud::PushElement
    add_merge_target
    deduce_schema(o) if @cols.nil?
    o.wire_to self
  elsif o.class <= Bud::BudCollection
    add_merge_target
    deduce_schema(o) if @cols.nil?
    o.pro.wire_to self
  elsif o.class <= Proc
    add_merge_target
    tbl = register_coll_expr(o)
    tbl.pro.wire_to self
  elsif o.class <= Bud::LatticePushElement
    add_merge_target
    o.wire_to self
  elsif o.class <= Bud::LatticeWrapper
    add_merge_target
    o.to_push_elem.wire_to self
  else
    unless o.nil?
      o = o.uniq.compact if o.respond_to?(:uniq)
      check_enumerable(o)
      establish_schema(o) if @cols.nil?
      o.each {|i| do_insert(i, buf)}
    end
  end
end

- (Object) non_temporal_predecessors



296
297
298
# File 'lib/bud/collections.rb', line 296

def non_temporal_predecessors
  @wired_by.select {|e| e.outputs.include? self}
end

- (Object) notin(collection, *preds, &blk)



795
796
797
798
799
# File 'lib/bud/collections.rb', line 795

def notin(collection, *preds, &blk)
  elem1 = to_push_elem
  elem2 = collection.to_push_elem
  return elem1.notin(elem2, *preds, &blk)
end

- (Object) null_tuple



171
172
173
# File 'lib/bud/collections.rb', line 171

def null_tuple
  @struct.new
end

- (Object) pending_merge(o)

:nodoc: all



611
612
613
614
615
616
617
618
# File 'lib/bud/collections.rb', line 611

def pending_merge(o) # :nodoc: all
  unless o.nil?
    o = o.uniq.compact if o.respond_to?(:uniq)
    check_enumerable(o)
    establish_schema(o) if @cols.nil?
    o.each{|i| self.do_insert(i, @pending)}
  end
end

- (Object) positive_predecessors



301
302
303
# File 'lib/bud/collections.rb', line 301

def positive_predecessors
  @wired_by.select {|e| e.outputs.include?(self) || e.pendings.include?(self)}
end

- (Object) prep_aggpairs(aggpairs)



778
779
780
781
782
783
784
785
786
787
# File 'lib/bud/collections.rb', line 778

def prep_aggpairs(aggpairs)
  aggpairs.map do |ap|
    agg, *rest = ap
    if rest.empty?
      [agg]
    else
      [agg] + rest.map {|c| canonicalize_col(c)}
    end
  end
end

- (Object) pro(the_name = tabname, the_schema = schema, &blk)



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/bud/collections.rb', line 195

def pro(the_name=tabname, the_schema=schema, &blk)
  if @bud_instance.wiring?
    pusher = to_push_elem(the_name, the_schema)
    # If there is no code block evaluate, use the scanner directly
    pusher = pusher.pro(&blk) unless blk.nil?
    pusher
  else
    rv = []
    self.each do |t|
      t = blk.call(t)
      rv << t unless t.nil?
    end
    rv
  end
end

- (Object) qualified_tabname



71
72
73
# File 'lib/bud/collections.rb', line 71

def qualified_tabname
  @qualified_tabname ||= @bud_instance.toplevel?  ? tabname : "#{@bud_instance.qualified_name}.#{tabname}".to_sym
end

- (Object) reduce(initial, &blk)



805
806
807
# File 'lib/bud/collections.rb', line 805

def reduce(initial, &blk)
  return to_push_elem.reduce(initial, &blk)
end

- (Object) register_coll_expr(expr)



592
593
594
595
596
597
# File 'lib/bud/collections.rb', line 592

def register_coll_expr(expr)
  coll_name = "expr_#{expr.object_id}"
  cols = (1..@cols.length).map{|i| "c#{i}".to_sym} unless @cols.nil?
  @bud_instance.coll_expr(coll_name.to_sym, expr, cols)
  @bud_instance.send(coll_name)
end

- (Object) rename(the_name, the_schema = nil, &blk)

Raises:



258
259
260
261
262
263
264
265
# File 'lib/bud/collections.rb', line 258

def rename(the_name, the_schema=nil, &blk)
  raise Bud::Error unless @bud_instance.wiring?
  # a scratch with this name should have been defined during rewriting
  unless @bud_instance.respond_to? the_name
    raise Bud::Error, "rename failed to define a scratch named #{the_name}"
  end
  pro(the_name, the_schema, &blk)
end

- (Object) schema



109
110
111
112
113
# File 'lib/bud/collections.rb', line 109

def schema
  return nil if @cols.nil?
  return key_cols if val_cols.empty?
  return { key_cols => val_cols }
end

- (Object) sort(&blk)



249
250
251
252
253
254
255
256
# File 'lib/bud/collections.rb', line 249

def sort(&blk)
  if @bud_instance.wiring?
    pusher = self.pro
    pusher.sort("sort#{object_id}".to_sym, @bud_instance, @cols, &blk)
  else
    @storage.values.sort(&blk)
  end
end

- (Object) tick

Raises:



651
652
653
# File 'lib/bud/collections.rb', line 651

def tick
  raise Bud::Error, "tick must be overriden in #{self.class}"
end

- (Object) tick_deltas

:nodoc: all



658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
# File 'lib/bud/collections.rb', line 658

def tick_deltas # :nodoc: all
  unless @delta.empty?
    puts "#{qualified_tabname}.tick_deltas delta --> storage (#{@delta.size} elems)" if $BUD_DEBUG
    @storage.merge!(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end

  unless @new_delta.empty?
    puts "#{qualified_tabname}.tick_deltas new_delta --> delta (#{@new_delta.size} elems)" if $BUD_DEBUG

    # NB: key conflicts between two new_delta tuples are detected in
    # do_insert().
    @new_delta.each_pair do |key, tup|
      merge_to_buf(@delta, key, tup, @storage[key])
    end
    @new_delta.clear
    return !(@delta.empty?)
  end
  return false # delta empty; another fixpoint iter not required.
end

- (Object) tick_metrics



306
307
308
309
310
311
312
313
314
315
# File 'lib/bud/collections.rb', line 306

def tick_metrics
  strat_num = bud_instance.this_stratum
  addr = bud_instance.ip_port unless bud_instance.port.nil?
  key = { :addr=>addr, :tabname=>qualified_tabname,
          :strat_num=>strat_num}

  bud_instance.metrics[:collections] ||= {}
  bud_instance.metrics[:collections][key] ||= 0
  bud_instance.metrics[:collections][key] += 1
end

- (Object) to_push_elem(the_name = tabname, the_schema = schema)



713
714
715
716
717
718
719
720
721
722
723
724
725
726
# File 'lib/bud/collections.rb', line 713

def to_push_elem(the_name=tabname, the_schema=schema)
  # if no push source yet, set one up
  toplevel = @bud_instance.toplevel
  this_stratum = toplevel.this_stratum
  oid = self.object_id
  unless toplevel.scanners[this_stratum][[oid, the_name]]
    scanner = Bud::ScannerElement.new(the_name, @bud_instance,
                                      self, the_schema)
    toplevel.scanners[this_stratum][[oid, the_name]] = scanner
    toplevel.push_sources[this_stratum][[oid, the_name]] = scanner
    @scanner_cnt += 1
  end
  return toplevel.scanners[this_stratum][[oid, the_name]]
end

- (Object) uniquify_tabname

:nodoc: all



810
811
812
813
# File 'lib/bud/collections.rb', line 810

def uniquify_tabname # :nodoc: all
  # just append current number of microseconds
  @tabname = (@tabname.to_s + Time.new.tv_usec.to_s).to_sym
end

- (Object) val_cols

:nodoc: all



117
118
119
# File 'lib/bud/collections.rb', line 117

def val_cols # :nodoc: all
  @cols - @key_cols
end

- (Object) values



183
184
185
# File 'lib/bud/collections.rb', line 183

def values
  self.pro{|t| (self.key_cols.length..self.cols.length-1).map{|i| t[i]}}
end