Class: Bud::BudZkTable
- Inherits:
-
BudPersistentCollection
show all
- Defined in:
- lib/bud/storage/zookeeper.rb
Overview
Persistent table implementation based on Zookeeper.
Instance Attribute Summary
#accumulate_tick_deltas, #bud_instance, #cols, #delta, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #storage, #struct, #tabname, #tick_delta, #wired_by
Instance Method Summary
(collapse)
#*, #[], #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #do_insert, #each, #each_from_sym, #each_raw, #each_tick_delta, #each_with_index, #empty?, #exists?, #flat_map, #flush_deltas, #group, #has_key?, #include?, #init_schema, #insert, #inspect, #inspected, #keys, #length, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick_deltas, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values
Methods included from Enumerable
#pro
Constructor Details
- (BudZkTable) initialize(name, zk_path, zk_addr, bud_instance)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/bud/storage/zookeeper.rb', line 10
def initialize(name, zk_path, zk_addr, bud_instance)
unless defined? Bud::HAVE_ZOOKEEPER
raise Bud::Error, "zookeeper gem is not installed: zookeeper-backed stores cannot be used"
end
super(name, bud_instance, [:key] => [:val, :opts])
@zk = Zookeeper.new(zk_addr)
zk_path = zk_path.chomp("/") unless zk_path == "/"
@zk_path = zk_path
@base_path = @zk_path
@base_path += "/" unless @zk_path.end_with? "/"
@store_mutex = Mutex.new
@zk_mutex = Mutex.new
@next_storage = {}
@saw_delta = false
@child_watch_id = nil
end
|
Instance Method Details
- (Object) <<(o)
182
183
184
|
# File 'lib/bud/storage/zookeeper.rb', line 182
def <<(o)
raise Bud::Error, "illegal use of << with zookeeper store '#{@tabname}' on left"
end
|
- (Object) <=(o)
178
179
180
|
# File 'lib/bud/storage/zookeeper.rb', line 178
def <=(o)
raise Bud::Error, "illegal use of <= with zookeeper store '#{@tabname}' on left"
end
|
- (Object) close
165
166
167
168
|
# File 'lib/bud/storage/zookeeper.rb', line 165
def close
@zk_mutex.synchronize { @zk.close }
end
|
- (Object) flush
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
# File 'lib/bud/storage/zookeeper.rb', line 137
def flush
each_from([@pending]) do |t|
path = @base_path + t.key
data = t.val
ephemeral = false
sequence = false
opts = t.opts
unless opts.nil?
if opts[:ephemeral] == true
ephemeral = true
end
if opts[:sequence] == true
sequence = true
end
end
r = @zk.create(:path => path, :data => data,
:ephemeral => ephemeral, :sequence => sequence)
if r[:rc] == Zookeeper::ZNODEEXISTS
puts "Ignoring duplicate insert: #{t.inspect}"
elsif r[:rc] != Zookeeper::ZOK
puts "Failed create of #{path}: #{r.inspect}"
end
end
@pending.clear
end
|
- (Object) get_and_watch
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/bud/storage/zookeeper.rb', line 87
def get_and_watch
r = @zk.get_children(:path => @zk_path, :watcher => @child_watcher)
return unless r[:stat].exists
@child_watch_id = r[:req_id]
new_children = {}
r[:children].each do |c|
child_path = @base_path + c
get_r = @zk.get(:path => child_path)
unless get_r[:stat].exists
puts "ZK: failed to fetch child: #{child_path}"
return
end
data = get_r[:data]
data ||= ""
new_children[c] = [c, data]
end
need_tick = false
@store_mutex.synchronize {
@next_storage = new_children
if @storage != @next_storage
need_tick = true
@saw_delta = true
end
}
if need_tick and @bud_instance.running_async
EventMachine::schedule {
@bud_instance.tick_internal
}
end
end
|
- (Object) invalidate_at_tick
29
30
31
|
# File 'lib/bud/storage/zookeeper.rb', line 29
def invalidate_at_tick
true
end
|
- (Object) invalidate_cache
33
34
|
# File 'lib/bud/storage/zookeeper.rb', line 33
def invalidate_cache
end
|
- (Object) start_watchers
Since the watcher callbacks might invoke EventMachine, we wait until after
EM startup to start watching for Zk events.
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/bud/storage/zookeeper.rb', line 38
def start_watchers
@child_watcher = Zookeeper::Callbacks::WatcherCallback.new do
while true
break if @zk.closed?
if @zk_mutex.try_lock
get_and_watch unless @zk.closed?
@zk_mutex.unlock
break
end
end
end
@stat_watcher = Zookeeper::Callbacks::WatcherCallback.new do
while true
break if @zk.closed?
if @zk_mutex.try_lock
stat_and_watch unless @zk.closed?
@zk_mutex.unlock
break
end
end
end
stat_and_watch
end
|
- (Object) stat_and_watch
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/bud/storage/zookeeper.rb', line 71
def stat_and_watch
r = @zk.stat(:path => @zk_path, :watcher => @stat_watcher)
unless r[:stat].exists
r = @zk.create(:path => @zk_path)
if r[:rc] != Zookeeper::ZOK and r[:rc] != Zookeeper::ZNODEEXISTS
raise
end
end
get_and_watch unless @child_watch_id
end
|
- (Object) tick
128
129
130
131
132
133
134
135
|
# File 'lib/bud/storage/zookeeper.rb', line 128
def tick
@store_mutex.synchronize {
return unless @saw_delta
@storage = @next_storage
@next_storage = {}
@saw_delta = false
}
end
|