Class: Riak::Client::Pool
Overview
A re-entrant thread-safe resource pool. Generates new resources on demand.
Defined Under Namespace
Classes: BadResource, Element
Instance Attribute Summary (collapse)
-
- (Object) close
Sets the attribute close.
-
- (Object) open
Returns the value of attribute open.
-
- (Object) pool
Returns the value of attribute pool.
Instance Method Summary (collapse)
-
- (Object) clear
(also: #close)
On each element of the pool, calls close(element) and removes it.
-
- (Object) delete_element(e)
Deletes an element of the pool.
-
- (Object) delete_if
Locks each element in turn and closes/deletes elements for which the object passes the block.
-
- (Object) each
As each_element, but yields objects, not wrapper elements.
-
- (Object) each_element
Iterate over a snapshot of the pool.
-
- (Pool) initialize(open, close)
constructor
Open is a callable which returns a new object for the pool.
- - (Object) size
-
- (Object) take(opts = {}) {|obj| ... }
(also: #>>)
Acquire an element of the pool.
Constructor Details
- (Pool) initialize(open, close)
Open is a callable which returns a new object for the pool. Close is called with an object before it is freed.
51 52 53 54 55 56 57 58 |
# File 'lib/riak/client/pool.rb', line 51 def initialize(open, close) @open = open @close = close @lock = Mutex.new @iterator = Mutex.new @element_released = ConditionVariable.new @pool = Set.new end |
Instance Attribute Details
- (Object) close=(value)
Sets the attribute close
47 48 49 |
# File 'lib/riak/client/pool.rb', line 47 def close=(value) @close = value end |
- (Object) open
Returns the value of attribute open
46 47 48 |
# File 'lib/riak/client/pool.rb', line 46 def open @open end |
- (Object) pool
Returns the value of attribute pool
45 46 47 |
# File 'lib/riak/client/pool.rb', line 45 def pool @pool end |
Instance Method Details
- (Object) clear Also known as: close
On each element of the pool, calls close(element) and removes it.
62 63 64 65 66 |
# File 'lib/riak/client/pool.rb', line 62 def clear each_element do |e| delete_element e end end |
- (Object) delete_element(e)
Deletes an element of the pool. Calls close on its object. Not intendend for external use.
71 72 73 74 75 76 |
# File 'lib/riak/client/pool.rb', line 71 def delete_element(e) @close.call(e.object) @lock.synchronize do @pool.delete e end end |
- (Object) delete_if
Locks each element in turn and closes/deletes elements for which the object passes the block.
80 81 82 83 84 85 86 87 88 |
# File 'lib/riak/client/pool.rb', line 80 def delete_if raise ArgumentError, "block required" unless block_given? each_element do |e| if yield e.object delete_element e end end end |
- (Object) each
As each_element, but yields objects, not wrapper elements.
169 170 171 172 173 |
# File 'lib/riak/client/pool.rb', line 169 def each each_element do |e| yield e.object end end |
- (Object) each_element
Iterate over a snapshot of the pool. Yielded objects are locked for the duration of the block. This may block the current thread until elements are released by other threads.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/riak/client/pool.rb', line 145 def each_element targets = @pool.to_a unlocked = [] @iterator.synchronize do until targets.empty? @lock.synchronize do unlocked, targets = targets.partition {|e| e.unlocked? } unlocked.each {|e| e.lock } end unlocked.each do |e| begin yield e ensure e.unlock end end @element_released.wait(@iterator) unless targets.empty? end end end |
- (Object) size
175 176 177 |
# File 'lib/riak/client/pool.rb', line 175 def size @lock.synchronize { @pool.size } end |
- (Object) take(opts = {}) {|obj| ... } Also known as: >>
Acquire an element of the pool. Yields the object. If all elements are claimed, it will create another one.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/riak/client/pool.rb', line 101 def take(opts = {}) unless block_given? raise ArgumentError, "block required" end r = nil begin e = nil @lock.synchronize do # Find an existing element. if f = opts[:filter] e = pool.find { |e| e.unlocked? and f.call(e.object) } else e = pool.find { |e| e.unlocked? } end unless e # No objects were acceptable resource = opts[:default] || @open.call e = Element.new(resource) pool << e end e.lock end r = yield e.object rescue BadResource delete_element e raise ensure # Unlock if e e.unlock @element_released.signal end end r end |