Class: Bud::BudDbmTable
- Inherits:
-
BudPersistentCollection
show all
- Defined in:
- lib/bud/storage/dbm.rb
Overview
Persistent table implementation based on dbm.
Instance Attribute Summary
#accumulate_tick_deltas, #bud_instance, #cols, #delta, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #storage, #struct, #tabname, #tick_delta, #wired_by
Instance Method Summary
(collapse)
#invalidate_at_tick
#*, #<=, #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #do_insert, #each_from_sym, #each_tick_delta, #each_with_index, #exists?, #flat_map, #group, #init_schema, #inspect, #inspected, #invalidate_at_tick, #keys, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values
Methods included from Enumerable
#pro
Constructor Details
- (BudDbmTable) initialize(name, bud_instance, given_schema)
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/bud/storage/dbm.rb', line 6
def initialize(name, bud_instance, given_schema)
dbm_dir = bud_instance.options[:dbm_dir]
raise Bud::Error, "dbm support must be enabled via 'dbm_dir'" unless dbm_dir
if bud_instance.port.nil?
raise Bud::Error, "use of dbm storage requires an explicit port to be specified in Bud initialization options"
end
unless File.exists?(dbm_dir)
Dir.mkdir(dbm_dir)
puts "Created directory: #{dbm_dir}" unless bud_instance.options[:quiet]
end
dirname = "#{dbm_dir}/bud_#{bud_instance.port}"
unless File.exists?(dirname)
Dir.mkdir(dirname)
puts "Created directory: #{dirname}" unless bud_instance.options[:quiet]
end
super(name, bud_instance, given_schema)
@to_delete = []
@invalidated = true
db_fname = "#{dirname}/#{name}.dbm"
flags = DBM::WRCREAT
if bud_instance.options[:dbm_truncate] == true
flags |= DBM::NEWDB
end
@dbm = DBM.open(db_fname, 0666, flags)
if @dbm.nil?
raise Bud::Error, "failed to open dbm database '#{db_fname}': #{@dbm.errmsg}"
end
end
|
Instance Method Details
- (Object) [](key)
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/bud/storage/dbm.rb', line 44
def [](key)
check_enumerable(key)
key_s = MessagePack.pack(key)
val_s = @dbm[key_s]
if val_s
return make_tuple(key, MessagePack.unpack(val_s))
else
return @delta[key]
end
end
|
- (Object) close
117
118
119
120
|
# File 'lib/bud/storage/dbm.rb', line 117
def close
@dbm.close unless @dbm.nil?
@dbm = nil
end
|
- (Object) each(&block)
83
84
85
86
|
# File 'lib/bud/storage/dbm.rb', line 83
def each(&block)
each_from([@delta], &block)
each_storage(&block)
end
|
- (Object) each_from(bufs, &block)
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/bud/storage/dbm.rb', line 92
def each_from(bufs, &block)
bufs.each do |b|
if b == @storage then
each_storage(&block)
else
b.each_value do |v|
tick_metrics if bud_instance.options[:metrics]
yield v
end
end
end
end
|
- (Object) each_raw(&block)
88
89
90
|
# File 'lib/bud/storage/dbm.rb', line 88
def each_raw(&block)
each_storage(&block)
end
|
- (Object) each_storage(&block)
105
106
107
108
109
110
111
112
|
# File 'lib/bud/storage/dbm.rb', line 105
def each_storage(&block)
@dbm.each do |k,v|
k_ary = MessagePack.unpack(k)
v_ary = MessagePack.unpack(v)
tick_metrics if bud_instance.options[:metrics]
yield make_tuple(k_ary, v_ary)
end
end
|
- (Boolean) empty?
222
223
224
|
# File 'lib/bud/storage/dbm.rb', line 222
def empty?
@dbm.empty?
end
|
- (Object) flush
114
115
|
# File 'lib/bud/storage/dbm.rb', line 114
def flush
end
|
- (Object) flush_deltas
159
160
161
162
163
164
165
166
167
|
# File 'lib/bud/storage/dbm.rb', line 159
def flush_deltas
unless @delta.empty?
merge_to_db(@delta)
@tick_delta.concat(@delta.values) if accumulate_tick_deltas
@delta.clear
end
merge_to_db(@new_delta)
@new_delta = {}
end
|
- (Boolean) has_key?(k)
59
60
61
62
63
64
|
# File 'lib/bud/storage/dbm.rb', line 59
def has_key?(k)
check_enumerable(k)
key_s = MessagePack.pack(k)
return true if @dbm.has_key? key_s
return @delta.has_key? k
end
|
- (Boolean) include?(tuple)
66
67
68
69
70
|
# File 'lib/bud/storage/dbm.rb', line 66
def include?(tuple)
key = get_key_vals(tuple)
value = self[key]
return (value == tuple)
end
|
- (Object) init_storage
38
39
40
41
42
|
# File 'lib/bud/storage/dbm.rb', line 38
def init_storage
@storage = nil
end
|
- (Object) insert(tuple)
Also known as:
<<
185
186
187
188
|
# File 'lib/bud/storage/dbm.rb', line 185
def insert(tuple)
key = get_key_vals(tuple)
merge_tuple_to_db(key, tuple)
end
|
- (Object) invalidate_cache
217
218
|
# File 'lib/bud/storage/dbm.rb', line 217
def invalidate_cache
end
|
- (Object) length
55
56
57
|
# File 'lib/bud/storage/dbm.rb', line 55
def length
@dbm.length + @delta.length
end
|
- (Object) make_tuple(k_ary, v_ary)
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/bud/storage/dbm.rb', line 72
def make_tuple(k_ary, v_ary)
t = @struct.new
@key_colnums.each_with_index do |k,i|
t[k] = k_ary[i]
end
val_cols.each_with_index do |c,i|
t[cols.index(c)] = v_ary[i]
end
t
end
|
- (Object) merge_to_db(buf)
122
123
124
125
126
|
# File 'lib/bud/storage/dbm.rb', line 122
def merge_to_db(buf)
buf.each do |key,tuple|
merge_tuple_to_db(key, tuple)
end
end
|
- (Object) merge_tuple_to_db(key, tuple)
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/bud/storage/dbm.rb', line 128
def merge_tuple_to_db(key, tuple)
key_s = MessagePack.pack(key)
if @dbm.has_key?(key_s)
old_tuple = self[key]
raise_pk_error(tuple, old_tuple) if tuple != old_tuple
else
val = val_cols.map{|c| tuple[cols.index(c)]}
@dbm[key_s] = MessagePack.pack(val)
end
end
|
- (Object) pending_delete(o)
172
173
174
175
176
177
178
179
180
|
# File 'lib/bud/storage/dbm.rb', line 172
def pending_delete(o)
if o.class <= Bud::PushElement
o.wire_to(self, :delete)
elsif o.class <= Bud::BudCollection
o.pro.wire_to(self, :delete)
else
@to_delete.concat(o.map{|t| prep_tuple(t) unless t.nil?})
end
end
|
- (Object) tick
Remove to_delete and then move pending => delta.
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
|
# File 'lib/bud/storage/dbm.rb', line 193
def tick
deleted = nil
@to_delete.each do |tuple|
k = get_key_vals(tuple)
k_str = MessagePack.pack(k)
cols_str = @dbm[k_str]
unless cols_str.nil?
db_cols = MessagePack.unpack(cols_str)
delete_cols = val_cols.map{|c| tuple[cols.index(c)]}
if db_cols == delete_cols
deleted ||= @dbm.delete k_str
end
end
end
@to_delete = []
@invalidated = !deleted.nil?
unless @pending.empty?
@delta = @pending
@pending = {}
end
flush
end
|
- (Object) tick_deltas
move deltas to on-disk storage, and new_deltas to deltas
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/bud/storage/dbm.rb', line 140
def tick_deltas
unless @delta.empty?
merge_to_db(@delta)
@tick_delta.concat(@delta.values) if accumulate_tick_deltas
@delta.clear
end
unless @new_delta.empty?
@new_delta.reject! {|key, val| self[key] == val}
@delta = @new_delta
@new_delta = {}
end
return !(@delta.empty?)
end
|