Class: Riak::Client::Pool

Inherits:
Object show all
Defined in:
lib/riak/client/pool.rb

Overview

A re-entrant thread-safe resource pool. Generates new resources on demand.

Defined Under Namespace

Classes: BadResource, Element

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Pool) initialize(open, close)

Open is a callable which returns a new object for the pool. Close is called with an object before it is freed.



51
52
53
54
55
56
57
58
# File 'lib/riak/client/pool.rb', line 51

def initialize(open, close)
  @open = open
  @close = close
  @lock = Mutex.new
  @iterator = Mutex.new
  @element_released = ConditionVariable.new
  @pool = Set.new
end

Instance Attribute Details

- (Object) close=(value)

Sets the attribute close

Parameters:

  • value

    the value to set the attribute close to.



47
48
49
# File 'lib/riak/client/pool.rb', line 47

def close=(value)
  @close = value
end

- (Object) open

Returns the value of attribute open



46
47
48
# File 'lib/riak/client/pool.rb', line 46

def open
  @open
end

- (Object) pool

Returns the value of attribute pool



45
46
47
# File 'lib/riak/client/pool.rb', line 45

def pool
  @pool
end

Instance Method Details

- (Object) clear Also known as: close

On each element of the pool, calls close(element) and removes it.



62
63
64
65
66
# File 'lib/riak/client/pool.rb', line 62

def clear
  each_element do |e|
    delete_element e
  end
end

- (Object) delete_element(e)

Deletes an element of the pool. Calls close on its object. Not intendend for external use.



71
72
73
74
75
76
# File 'lib/riak/client/pool.rb', line 71

def delete_element(e)
  @close.call(e.object)
  @lock.synchronize do
    @pool.delete e
  end
end

- (Object) delete_if

Locks each element in turn and closes/deletes elements for which the object passes the block.

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
# File 'lib/riak/client/pool.rb', line 80

def delete_if
  raise ArgumentError, "block required" unless block_given?

  each_element do |e|
    if yield e.object
      delete_element e
    end
  end
end

- (Object) each

As each_element, but yields objects, not wrapper elements.



169
170
171
172
173
# File 'lib/riak/client/pool.rb', line 169

def each
  each_element do |e|
    yield e.object
  end
end

- (Object) each_element

Iterate over a snapshot of the pool. Yielded objects are locked for the duration of the block. This may block the current thread until elements are released by other threads.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/riak/client/pool.rb', line 145

def each_element
  targets = @pool.to_a
  unlocked = []

  @iterator.synchronize do
    until targets.empty?
      @lock.synchronize do
        unlocked, targets = targets.partition {|e| e.unlocked? }
        unlocked.each {|e| e.lock }
      end

      unlocked.each do |e|
        begin
          yield e
        ensure
          e.unlock
        end
      end
      @element_released.wait(@iterator) unless targets.empty?
    end
  end
end

- (Object) size



175
176
177
# File 'lib/riak/client/pool.rb', line 175

def size
  @lock.synchronize { @pool.size }
end

- (Object) take(opts = {}) {|obj| ... } Also known as: >>

Acquire an element of the pool. Yields the object. If all elements are claimed, it will create another one.

Parameters:

  • :filter (callable)

    a callable which receives objects and has the opportunity to reject each in turn.

  • :default (Object)

    if no resources are available, use this object instead of calling #open.

Yields:

  • (obj)

    a block that will perform some action with the element of the pool

Yield Parameters:

  • resource (Object)

    a resource managed by the pool. Locked for the duration of the block



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/riak/client/pool.rb', line 101

def take(opts = {})
  unless block_given?
    raise ArgumentError, "block required"
  end

  r = nil
  begin
    e = nil
    @lock.synchronize do
      # Find an existing element.
      if f = opts[:filter]
        e = pool.find { |e| e.unlocked? and f.call(e.object) }
      else
        e = pool.find { |e| e.unlocked? }
      end

      unless e
        # No objects were acceptable
        resource = opts[:default] || @open.call
        e = Element.new(resource)
        pool << e
      end
      e.lock
    end

    r = yield e.object
  rescue BadResource
    delete_element e
    raise
  ensure
    # Unlock
    if e
      e.unlock
      @element_released.signal
    end
  end
  r
end