Class: Bundler::ConnectionPool

Inherits:
Object
  • Object
show all
Includes:
ForkTracker
Defined in:
lib/bundler/vendor/connection_pool/lib/connection_pool.rb,
lib/bundler/vendor/connection_pool/lib/connection_pool.rb,
lib/bundler/vendor/connection_pool/lib/connection_pool/version.rb,
lib/bundler/vendor/connection_pool/lib/connection_pool/wrapper.rb

Overview

Generic connection pool class for sharing a limited number of objects or network connections among many threads. Note: pool elements are lazily created.

Example usage with block (faster):

@pool = Bundler::ConnectionPool.new { Redis.new }
@pool.with do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Using optional timeout override (for that single invocation)

@pool.with(timeout: 2.0) do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Example usage replacing an existing connection (slower):

$redis = Bundler::ConnectionPool.wrap { Redis.new }

def do_work
  $redis.lpop('my-list') if $redis.llen('my-list') > 0
end

Accepts the following options:

  • :size - number of connections to pool, defaults to 5

  • :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds

  • :auto_reload_after_fork - automatically drop all connections after fork, defaults to true

Direct Known Subclasses

Gem::Net::HTTP::Persistent::Pool

Defined Under Namespace

Modules: ForkTracker Classes: Error, PoolShuttingDownError, TimedStack, TimeoutError, Wrapper

Constant Summary collapse

DEFAULTS =
{size: 5, timeout: 5, auto_reload_after_fork: true}.freeze
VERSION =
"2.5.4"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ForkTracker

#_fork

Constructor Details

#initialize(options = {}, &block) ⇒ ConnectionPool

Returns a new instance of ConnectionPool.

Raises:

  • (ArgumentError)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 90

def initialize(options = {}, &block)
  raise ArgumentError, "Connection pool requires a block" unless block

  options = DEFAULTS.merge(options)

  @size = Integer(options.fetch(:size))
  @timeout = options.fetch(:timeout)
  @auto_reload_after_fork = options.fetch(:auto_reload_after_fork)

  @available = TimedStack.new(@size, &block)
  @key = :"pool-#{@available.object_id}"
  @key_count = :"pool-#{@available.object_id}-count"
  @discard_key = :"pool-#{@available.object_id}-discard"
  INSTANCES[self] = self if @auto_reload_after_fork && INSTANCES
end

Instance Attribute Details

#auto_reload_after_forkObject (readonly)

Automatically drop all connections after fork



216
217
218
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 216

def auto_reload_after_fork
  @auto_reload_after_fork
end

#sizeObject (readonly)

Size of this connection pool



214
215
216
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 214

def size
  @size
end

Class Method Details

.after_forkObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 52

def self.after_fork
  INSTANCES.values.each do |pool|
    next unless pool.auto_reload_after_fork

    # We're on after fork, so we know all other threads are dead.
    # All we need to do is to ensure the main thread doesn't have a
    # checked out connection
    pool.checkin(force: true)
    pool.reload do |connection|
      # Unfortunately we don't know what method to call to close the connection,
      # so we try the most common one.
      connection.close if connection.respond_to?(:close)
    end
  end
  nil
end

.wrap(options, &block) ⇒ Object



44
45
46
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 44

def self.wrap(options, &block)
  Wrapper.new(options, &block)
end

Instance Method Details

#availableObject

Number of pool entries available for checkout at this instant.



219
220
221
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 219

def available
  @available.length
end

#checkin(force: false) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 164

def checkin(force: false)
  if ::Thread.current[@key]
    if ::Thread.current[@key_count] == 1 || force
      if ::Thread.current[@discard_key]
        begin
          @available.decrement_created
          ::Thread.current[@discard_key].call(::Thread.current[@key])
        rescue
          nil
        ensure
          ::Thread.current[@discard_key] = nil
        end
      else
        @available.push(::Thread.current[@key])
      end
      ::Thread.current[@key] = nil
      ::Thread.current[@key_count] = nil
    else
      ::Thread.current[@key_count] -= 1
    end
  elsif !force
    raise Bundler::ConnectionPool::Error, "no connections are checked out"
  end

  nil
end

#checkout(options = {}) ⇒ Object



154
155
156
157
158
159
160
161
162
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 154

def checkout(options = {})
  if ::Thread.current[@key]
    ::Thread.current[@key_count] += 1
    ::Thread.current[@key]
  else
    ::Thread.current[@key_count] = 1
    ::Thread.current[@key] = @available.pop(options[:timeout] || @timeout, options)
  end
end

#discard_current_connection {|conn| ... } ⇒ void

This method returns an undefined value.

Marks the current thread’s checked-out connection for discard.

When a connection is marked for discard, it will not be returned to the pool when checked in. Instead, the connection will be discarded. This is useful when a connection has become invalid or corrupted and should not be reused.

Takes an optional block that will be called with the connection to be discarded. The block should perform any necessary clean-up on the connection.

Note: This only affects the connection currently checked out by the calling thread. The connection will be discarded when checkin is called.

Examples:

pool.with do |conn|
  begin
    conn.execute("SELECT 1")
  rescue SomeConnectionError
    pool.discard_current_connection  # Mark connection as bad
    raise
  end
end

Yields:

  • (conn)

Yield Parameters:

  • conn (Object)

    The connection to be discarded.

Yield Returns:

  • (void)


150
151
152
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 150

def discard_current_connection(&block)
  ::Thread.current[@discard_key] = block || proc { |conn| conn }
end

#idleObject

Number of pool entries created and idle in the pool.



224
225
226
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 224

def idle
  @available.idle
end

#reap(idle_seconds = 60, &block) ⇒ Object

Reaps idle connections that have been idle for over idle_seconds. idle_seconds defaults to 60.



209
210
211
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 209

def reap(idle_seconds = 60, &block)
  @available.reap(idle_seconds, &block)
end

#reload(&block) ⇒ Object

Reloads the Bundler::ConnectionPool by passing each connection to block and then removing it the pool. Subsequent checkouts will create new connections as needed.



203
204
205
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 203

def reload(&block)
  @available.shutdown(reload: true, &block)
end

#shutdown(&block) ⇒ Object

Shuts down the Bundler::ConnectionPool by passing each connection to block and then removing it from the pool. Attempting to checkout a connection after shutdown will raise Bundler::ConnectionPool::PoolShuttingDownError.



195
196
197
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 195

def shutdown(&block)
  @available.shutdown(&block)
end

#with(options = {}) ⇒ Object Also known as: then



106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/bundler/vendor/connection_pool/lib/connection_pool.rb', line 106

def with(options = {})
  Thread.handle_interrupt(Exception => :never) do
    conn = checkout(options)
    begin
      Thread.handle_interrupt(Exception => :immediate) do
        yield conn
      end
    ensure
      checkin
    end
  end
end