Class: Sequel::ShardedTimedQueueConnectionPool
- Inherits:
- 
      ConnectionPool
      
        - Object
- ConnectionPool
- Sequel::ShardedTimedQueueConnectionPool
 
- Defined in:
- lib/sequel/connection_pool/sharded_timed_queue.rb
Overview
A connection pool allowing multi-threaded access to a sharded pool of connections, using a timed queue (only available in Ruby 3.2+).
Constant Summary
Constants inherited from ConnectionPool
ConnectionPool::OPTS, ConnectionPool::POOL_CLASS_MAP
Instance Attribute Summary collapse
- 
  
    
      #max_size  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    The maximum number of connections this pool will create per shard. 
Attributes inherited from ConnectionPool
#after_connect, #connect_sqls, #db
Instance Method Summary collapse
- 
  
    
      #add_servers(servers)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Adds new servers to the connection pool. 
- 
  
    
      #all_connections  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Yield all of the available connections, and the one currently allocated to this thread (if one is allocated). 
- 
  
    
      #disconnect(opts = OPTS)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Removes all connections currently in the pool’s queue. 
- 
  
    
      #hold(server = :default)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Chooses the first available connection for the given server, or if none are available, creates a new connection. 
- 
  
    
      #initialize(db, opts = OPTS)  ⇒ ShardedTimedQueueConnectionPool 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    - The following additional options are respected: :max_connections
- The maximum number of connections the connection pool will open (default 4) :pool_timeout
- The amount of seconds to wait to acquire a connection before raising a PoolTimeout (default 5) :servers
- 
A hash of servers to use. 
 
 
 
- 
  
    
      #num_waiting(server = :default)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    The number of threads waiting to check out a connection for the given server. 
- #pool_type ⇒ Object
- 
  
    
      #remove_servers(servers)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Remove servers from the connection pool. 
- 
  
    
      #servers  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Return an array of symbols for servers in the connection pool. 
- 
  
    
      #size(server = :default)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    The total number of connections in the pool. 
Methods included from ConnectionPool::ClassMethods
Constructor Details
#initialize(db, opts = OPTS) ⇒ ShardedTimedQueueConnectionPool
The following additional options are respected:
- :max_connections
- 
The maximum number of connections the connection pool will open (default 4) 
- :pool_timeout
- 
The amount of seconds to wait to acquire a connection before raising a PoolTimeout (default 5) 
- :servers
- 
A hash of servers to use. Keys should be symbols. If not present, will use a single :default server. 
- :servers_hash
- 
The base hash to use for the servers. By default, Sequel uses Hash.new(:default). You can use a hash with a default proc that raises an error if you want to catch all cases where a nonexistent server is used. 
| 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 24 def initialize(db, opts = OPTS) super @max_size = Integer(opts[:max_connections] || 4) raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1 @mutex = Mutex.new @timeout = Float(opts[:pool_timeout] || 5) @allocated = {} @sizes = {} @queues = {} @servers = opts.fetch(:servers_hash, Hash.new(:default)) add_servers([:default]) add_servers(opts[:servers].keys) if opts[:servers] end | 
Instance Attribute Details
#max_size ⇒ Object (readonly)
The maximum number of connections this pool will create per shard.
| 11 12 13 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 11 def max_size @max_size end | 
Instance Method Details
#add_servers(servers) ⇒ Object
Adds new servers to the connection pool.  Allows for dynamic expansion of the potential replicas/shards at runtime. servers argument should be an array of symbols.
| 43 44 45 46 47 48 49 50 51 52 53 54 55 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 43 def add_servers(servers) sync do servers.each do |server| next if @servers.has_key?(server) @servers[server] = server @sizes[server] = 0 @queues[server] = Queue.new (@allocated[server] = {}).compare_by_identity end end nil end | 
#all_connections ⇒ Object
Yield all of the available connections, and the one currently allocated to this thread (if one is allocated). This will not yield connections currently allocated to other threads, as it is not safe to operate on them.
| 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 60 def all_connections thread = Sequel.current sync{@queues.to_a}.each do |server, queue| if conn = owned_connection(thread, server) yield conn end # Use a hash to record all connections already seen. As soon as we # come across a connection we've already seen, we stop the loop. conns = {} conns.compare_by_identity while true conn = nil begin break unless (conn = queue.pop(timeout: 0)) && !conns[conn] conns[conn] = true yield conn ensure queue.push(conn) if conn end end end nil end | 
#disconnect(opts = OPTS) ⇒ Object
Removes all connections currently in the pool’s queue. This method has the effect of disconnecting from the database, assuming that no connections are currently being used.
Once a connection is requested using #hold, the connection pool creates new connections to the database.
If the :server option is provided, it should be a symbol or array of symbols, and then the method will only disconnect connectsion from those specified shards.
| 95 96 97 98 99 100 101 102 103 104 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 95 def disconnect(opts=OPTS) (opts[:server] ? Array(opts[:server]) : sync{@servers.keys}).each do |server| raise Sequel::Error, "invalid server" unless queue = sync{@queues[server]} while conn = queue.pop(timeout: 0) disconnect_pool_connection(conn, server) end fill_queue(server) end nil end | 
#hold(server = :default) ⇒ Object
Chooses the first available connection for the given server, or if none are available, creates a new connection. Passes the connection to the supplied block:
pool.hold(:server1) {|conn| conn.execute('DROP TABLE posts')}
Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.
If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.
| 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 119 def hold(server=:default) server = pick_server(server) t = Sequel.current if conn = owned_connection(t, server) return yield(conn) end begin conn = acquire(t, server) yield conn rescue Sequel::DatabaseDisconnectError, *@error_classes => e if disconnect_error?(e) oconn = conn conn = nil disconnect_pool_connection(oconn, server) if oconn sync{@allocated[server].delete(t)} fill_queue(server) end raise ensure release(t, conn, server) if conn end end | 
#num_waiting(server = :default) ⇒ Object
The number of threads waiting to check out a connection for the given server.
| 145 146 147 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 145 def num_waiting(server=:default) @queues[pick_server(server)].num_waiting end | 
#pool_type ⇒ Object
| 196 197 198 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 196 def pool_type :sharded_timed_queue end | 
#remove_servers(servers) ⇒ Object
Remove servers from the connection pool. Similar to disconnecting from all given servers, except that after it is used, future requests for the servers will use the :default server instead.
Note that an error will be raised if there are any connections currently checked out for the given servers.
| 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 160 def remove_servers(servers) conns = [] raise(Sequel::Error, "cannot remove default server") if servers.include?(:default) sync do servers.each do |server| next unless @servers.has_key?(server) queue = @queues[server] while conn = queue.pop(timeout: 0) @sizes[server] -= 1 conns << conn end unless @sizes[server] == 0 raise Sequel::Error, "cannot remove server #{server} as it has allocated connections" end @servers.delete(server) @sizes.delete(server) @queues.delete(server) @allocated.delete(server) end end nil ensure disconnect_connections(conns) end | 
#servers ⇒ Object
Return an array of symbols for servers in the connection pool.
| 192 193 194 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 192 def servers sync{@servers.keys} end | 
#size(server = :default) ⇒ Object
The total number of connections in the pool. Using a non-existant server will return nil.
| 150 151 152 | # File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 150 def size(server=:default) sync{@sizes[server]} end |