Class: Bunny::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/session.rb

Overview

Represents AMQP 0.9.1 connection (to a RabbitMQ node).

Constant Summary collapse

DEFAULT_HOST =

Default host used for connection

"127.0.0.1"
DEFAULT_VHOST =

Default virtual host used for connection

"/"
DEFAULT_USER =

Default username used for connection

"guest"
DEFAULT_PASSWORD =

Default password used for connection

"guest"
DEFAULT_HEARTBEAT =

Default heartbeat interval, the same value as RabbitMQ 3.0 uses.

:server
DEFAULT_CHANNEL_MAX =
2047
DEFAULT_CLIENT_PROPERTIES =

RabbitMQ client metadata

{
  :capabilities => {
    :publisher_confirms           => true,
    :consumer_cancel_notify       => true,
    :exchange_exchange_bindings   => true,
    :"basic.nack"                 => true,
    :"connection.blocked"         => true,
    # See http://www.rabbitmq.com/auth-notification.html
    :authentication_failure_close => true
  },
  :product      => "Bunny",
  :platform     => ::RUBY_DESCRIPTION,
  :version      => Bunny::VERSION,
  :information  => "https://github.com/ruby-amqp/bunny",
}
DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0
DEFAULT_RECOVERABLE_EXCEPTIONS =
[StandardError, TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError, SystemCallError, Timeout::Error, Bunny::ConnectionLevelException, Bunny::ConnectionClosedError]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session

Returns a new instance of Session.

Parameters:

  • connection_string_or_opts (String, Hash) (defaults to: )

    Connection string or a hash of connection options

  • optz (Hash) (defaults to: Hash.new)

    Extra options not related to connection

Options Hash (connection_string_or_opts):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :hosts (Array<String>) — default: ["127.0.0.1"]

    list of hostname or IP addresses to select hostname from when connecting

  • :addresses (Array<String>) — default: ["127.0.0.1:5672"]

    list of addresses to select hostname and port from when connecting

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :heartbeat (Integer, Symbol) — default: :server

    Heartbeat timeout to offer to the server. :server means use the value suggested by RabbitMQ. 0 means heartbeats and socket read timeouts will be disabled (not recommended).

  • :network_recovery_interval (Integer) — default: 4

    Recovery interval periodic network recovery will use. This includes initial pause after network failure.

  • :tls (Boolean) — default: false

    Should TLS/SSL be used?

  • :tls_cert (String) — default: nil

    Path to client TLS/SSL certificate file (.pem)

  • :tls_key (String) — default: nil

    Path to client TLS/SSL private key file (.pem)

  • :tls_ca_certificates (Array<String>)

    Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration

  • :verify_peer (String) — default: true

    Whether TLS peer verification should be performed

  • :tls_protocol (Symbol) — default: negotiated

    What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2)

  • :channel_max (Integer) — default: 2047

    Maximum number of channels allowed on this connection, minus 1 to account for the special channel 0.

  • :continuation_timeout (Integer) — default: 15000

    Timeout for client operations that expect a response (e.g. Queue#get), in milliseconds.

  • :connection_timeout (Integer) — default: 30

    Timeout in seconds for connecting to the server.

  • :read_timeout (Integer) — default: 30

    TCP socket read timeout in seconds. If heartbeats are disabled this will be ignored.

  • :write_timeout (Integer) — default: 30

    TCP socket write timeout in seconds.

  • :hosts_shuffle_strategy (Proc)

    a callable that reorders a list of host strings, defaults to Array#shuffle

  • :recovery_completed (Proc)

    a callable that will be called when a network recovery is performed

  • :logger (Logger)

    The logger. If missing, one is created using :log_file and :log_level.

  • :log_file (IO, String)

    The file or path to use when creating a logger. Defaults to STDOUT.

  • :logfile (IO, String)

    DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.

  • :log_level (Integer)

    The log level to use when creating a logger. Defaults to LOGGER::WARN

  • :automatically_recover (Boolean) — default: true

    Should automatically recover from network failures?

  • :recovery_attempts (Integer) — default: nil

    Max number of recovery attempts, nil means forever

  • :reset_recovery_attempts_after_reconnection (Integer) — default: true

    Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object.

  • :recovery_attempt_started (Proc) — default: nil

    Will be called before every connection recovery attempt

  • :recovery_completed (Proc) — default: nil

    Will be called after successful connection recovery

  • :recovery_attempts_exhausted (Proc) — default: nil

    Will be called when the connection recovery failed after the specified amount of recovery attempts

  • :recover_from_connection_close (Boolean) — default: true

    Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)?

  • :session_error_handler (Object) — default: ExceptionAccumulator.new

    Object which responds to #raise that will act as a session error handler. Defaults to a Bunny::ExceptionAccumulator instance which stores exceptions for safe retrieval later. Can be set to Thread.current for legacy behavior (raises asynchronous exceptions in the creating thread), or any object that responds to #raise.

  • :topology_recovery_filter (Bunny::TopologyRecoveryFilter)

    if provided, will be used for object filtering during topology recovery

Options Hash (optz):

  • :auth_mechanism (String) — default: "PLAIN"

    Authentication mechanism, PLAIN or EXTERNAL

  • :locale (String) — default: "PLAIN"

    Locale RabbitMQ should use

  • :connection_name (String) — default: nil

    Client-provided connection name, if any. Note that the value returned does not uniquely identify a connection and cannot be used as a connection identifier in HTTP API requests.

See Also:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/bunny/session.rb', line 146

def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new)
  opts = case (connection_string_or_opts)
         when nil then
           Hash.new
         when String then
           self.class.parse_uri(connection_string_or_opts)
         when Hash then
           connection_string_or_opts
         end.merge(optz)

  @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle }

  @opts            = opts
  log_file         = opts[:log_file] || opts[:logfile] || STDOUT
  log_level        = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN
  # we might need to log a warning about ill-formatted IPv6 address but
  # progname includes hostname, so init like this first
  @logger          = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level))

  @addresses       = self.addresses_from(opts)
  @address_index   = 0

  @transport       = nil
  @user            = self.username_from(opts)
  @pass            = self.password_from(opts)
  @vhost           = self.vhost_from(opts)
  @threaded        = opts.fetch(:threaded, true)

  # re-init, see above
  @logger          = opts.fetch(:logger, init_default_logger(log_file, log_level))

  validate_connection_options(opts)
  @last_connection_error = nil

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] | opts[:automatic_recovery]
                           end
  @recovering_from_network_failure = false
  @max_recovery_attempts = opts[:recovery_attempts]
  @recovery_attempts     = @max_recovery_attempts
  # When this is set, connection attempts won't be reset after
  # successful reconnection. Some find this behavior more sensible
  # than the per-failure attempt counter. MK.
  @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true)

  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true)
  # in ms
  @continuation_timeout   = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)

  @status             = :not_connected
  @manually_closed    = false
  @blocked            = false

  # these are negotiated with the broker during the connection tuning phase
  @client_frame_max   = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
  @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
  # will be-renegotiated during connection tuning steps. MK.
  @channel_max        = @client_channel_max
  @heartbeat_sender   = nil
  @client_heartbeat   = self.heartbeat_from(opts)

  client_props         = opts[:properties] || opts[:client_properties] || {}
  @connection_name     = client_props[:connection_name] || opts[:connection_name]
  @client_properties   = DEFAULT_CLIENT_PROPERTIES.merge(client_props)
                                                  .merge(connection_name: connection_name)
  @mechanism           = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN"))
  @credentials_encoder = credentials_encoder_for(@mechanism)
  @locale              = @opts.fetch(:locale, DEFAULT_LOCALE)

  @mutex_impl          = @opts.fetch(:mutex_impl, Monitor)

  # mutex for the channel id => channel hash
  @channel_mutex       = @mutex_impl.new
  # transport operations/continuations mutex. A workaround for
  # the non-reentrant Ruby mutexes. MK.
  @transport_mutex     = @mutex_impl.new
  @status_mutex        = @mutex_impl.new
  @address_index_mutex = @mutex_impl.new

  @channels            = Hash.new

  trf = @opts.fetch(:topology_recovery_filter, DefaultTopologyRecoveryFilter.new)
  @topology_registry = TopologyRegistry.new(topology_recovery_filter: trf)

  @recovery_attempt_started = opts[:recovery_attempt_started]
  @recovery_completed       = opts[:recovery_completed]
  @recovery_attempts_exhausted = opts[:recovery_attempts_exhausted]

  @session_error_handler = opts.fetch(:session_error_handler, ExceptionAccumulator.new)

  @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup

  self.reset_continuations
  self.initialize_transport

end

Instance Attribute Details

#channel_id_allocatorObject (readonly)

Returns the value of attribute channel_id_allocator.



85
86
87
# File 'lib/bunny/session.rb', line 85

def channel_id_allocator
  @channel_id_allocator
end

#channel_maxObject (readonly)

Returns the value of attribute channel_max.



83
84
85
# File 'lib/bunny/session.rb', line 83

def channel_max
  @channel_max
end

#connection_nameObject (readonly)

Returns the value of attribute connection_name.



96
97
98
# File 'lib/bunny/session.rb', line 96

def connection_name
  @connection_name
end

#continuation_timeoutInteger (readonly)

Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.

Returns:

  • (Integer)

    Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.



94
95
96
# File 'lib/bunny/session.rb', line 94

def continuation_timeout
  @continuation_timeout
end

#frame_maxObject (readonly)

Returns the value of attribute frame_max.



83
84
85
# File 'lib/bunny/session.rb', line 83

def frame_max
  @frame_max
end

#heartbeatObject (readonly)

Returns the value of attribute heartbeat.



83
84
85
# File 'lib/bunny/session.rb', line 83

def heartbeat
  @heartbeat
end

#loggerLogger (readonly)

Returns:

  • (Logger)


92
93
94
# File 'lib/bunny/session.rb', line 92

def logger
  @logger
end

#mechanismString (readonly)

Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"

Returns:

  • (String)


90
91
92
# File 'lib/bunny/session.rb', line 90

def mechanism
  @mechanism
end

#network_recovery_intervalObject (readonly)

Returns the value of attribute network_recovery_interval.



95
96
97
# File 'lib/bunny/session.rb', line 95

def network_recovery_interval
  @network_recovery_interval
end

#passObject (readonly)

Returns the value of attribute pass.



83
84
85
# File 'lib/bunny/session.rb', line 83

def pass
  @pass
end

#recoverable_exceptionsObject

Returns the value of attribute recoverable_exceptions.



98
99
100
# File 'lib/bunny/session.rb', line 98

def recoverable_exceptions
  @recoverable_exceptions
end

#server_authentication_mechanismsObject (readonly)

Returns the value of attribute server_authentication_mechanisms.



84
85
86
# File 'lib/bunny/session.rb', line 84

def server_authentication_mechanisms
  @server_authentication_mechanisms
end

#server_capabilitiesObject (readonly)

Returns the value of attribute server_capabilities.



84
85
86
# File 'lib/bunny/session.rb', line 84

def server_capabilities
  @server_capabilities
end

#server_localesObject (readonly)

Returns the value of attribute server_locales.



84
85
86
# File 'lib/bunny/session.rb', line 84

def server_locales
  @server_locales
end

#server_propertiesObject (readonly)

Returns the value of attribute server_properties.



84
85
86
# File 'lib/bunny/session.rb', line 84

def server_properties
  @server_properties
end

#session_error_handlerObject (readonly)

Returns the session error handler. By default, this is a Bunny::ExceptionAccumulator instance.

Returns:

  • (Object)

    the session error handler



516
517
518
# File 'lib/bunny/session.rb', line 516

def session_error_handler
  @session_error_handler
end

#socket_configuratorObject

Returns the value of attribute socket_configurator.



97
98
99
# File 'lib/bunny/session.rb', line 97

def socket_configurator
  @socket_configurator
end

#statusObject (readonly)

Returns the value of attribute status.



83
84
85
# File 'lib/bunny/session.rb', line 83

def status
  @status
end

#threadedObject (readonly)

Returns the value of attribute threaded.



83
84
85
# File 'lib/bunny/session.rb', line 83

def threaded
  @threaded
end

#topology_registryBunny::TopologyRegistry (readonly)



87
88
89
# File 'lib/bunny/session.rb', line 87

def topology_registry
  @topology_registry
end

#transportBunny::Transport (readonly)

Returns:

  • (Bunny::Transport)


82
83
84
# File 'lib/bunny/session.rb', line 82

def transport
  @transport
end

#userObject (readonly)

Returns the value of attribute user.



83
84
85
# File 'lib/bunny/session.rb', line 83

def user
  @user
end

#vhostObject (readonly)

Returns the value of attribute vhost.



83
84
85
# File 'lib/bunny/session.rb', line 83

def vhost
  @vhost
end

Class Method Details

.parse_uri(uri) ⇒ Hash

Parses an amqp[s] URI into a hash that #initialize accepts.

Parameters:

  • uri (String | Hash)

    amqp or amqps URI to parse

Returns:

  • (Hash)

    Parsed URI as a hash



565
566
567
# File 'lib/bunny/session.rb', line 565

def self.parse_uri(uri)
  AMQ::Settings.configure(uri)
end

Instance Method Details

#after_recovery_attempts_exhausted(&block) ⇒ Object

Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.



626
627
628
# File 'lib/bunny/session.rb', line 626

def after_recovery_attempts_exhausted(&block)
  @recovery_attempts_exhausted = block
end

#after_recovery_completed(&block) ⇒ Object

Defines a callable (e.g. a block) that will be called after successful connection recovery.



619
620
621
# File 'lib/bunny/session.rb', line 619

def after_recovery_completed(&block)
  @recovery_completed = block
end

#automatically_recover?Boolean

Returns true if this connection has automatic recovery from network failure enabled.

Returns:

  • (Boolean)

    true if this connection has automatic recovery from network failure enabled



479
480
481
# File 'lib/bunny/session.rb', line 479

def automatically_recover?
  @automatically_recover
end

#before_recovery_attempt_starts(&block) ⇒ Object

Defines a callable (e.g. a block) that will be called before every connection recovery attempt.



613
614
615
# File 'lib/bunny/session.rb', line 613

def before_recovery_attempt_starts(&block)
  @recovery_attempt_started = block
end

#blocked?Boolean

Returns true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.

Returns:

  • (Boolean)

    true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise

See Also:



507
508
509
# File 'lib/bunny/session.rb', line 507

def blocked?
  @blocked
end

#clean_up_and_fail_on_connection_close!(method) ⇒ Object



1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
# File 'lib/bunny/session.rb', line 1207

def clean_up_and_fail_on_connection_close!(method)
  @last_connection_error = instantiate_connection_level_exception(method)
  @continuations.push(method)

  clean_up_on_shutdown
  if threaded?
    @session_error_handler.raise(@last_connection_error)
  else
    raise @last_connection_error
  end
end

#clean_up_on_shutdownObject



1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
# File 'lib/bunny/session.rb', line 1219

def clean_up_on_shutdown
  begin
    shut_down_all_consumer_work_pools!
    maybe_shutdown_reader_loop
    maybe_shutdown_heartbeat_sender
  rescue ShutdownSignal => _sse
    # no-op
  rescue Exception => e
    @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
  ensure
    close_transport
  end
end

#clear_exceptionsArray<Exception>

Clears all accumulated exceptions from the session error handler. Only works when using the default ExceptionAccumulator handler.

Returns:

  • (Array<Exception>)

    the exceptions that were cleared

Raises:

  • (NoMethodError)

    if the session error handler doesn't respond to #clear



544
545
546
# File 'lib/bunny/session.rb', line 544

def clear_exceptions
  @session_error_handler.clear
end

#close(await_response = true) ⇒ Object Also known as: stop

Closes the connection. This involves closing all of its channels.



411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/bunny/session.rb', line 411

def close(await_response = true)
  @status_mutex.synchronize { @status = :closing }

  ignoring_io_errors do
    if @transport.open?
      @logger.debug "Transport is still open..."
      close_all_channels

      @logger.debug "Will close all channels...."
      self.close_connection(await_response)
    end

    clean_up_on_shutdown
  end
  @status_mutex.synchronize do
    @status = :closed
    @manually_closed = true
  end
  @logger.debug "Connection is closed"
  true
end

#closed?Boolean

Returns true if this AMQP 0.9.1 connection is closed.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is closed



461
462
463
# File 'lib/bunny/session.rb', line 461

def closed?
  @status_mutex.synchronize { @status == :closed }
end

#closing?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true if this AMQP 0.9.1 connection is closing.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is closing



456
457
458
# File 'lib/bunny/session.rb', line 456

def closing?
  @status_mutex.synchronize { @status == :closing }
end

#configure_socket(&block) ⇒ Object

Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.

Raises:

  • (ArgumentError)


307
308
309
310
311
312
313
# File 'lib/bunny/session.rb', line 307

def configure_socket(&block)
  raise ArgumentError, "No block provided!" if block.nil?

  @transport_mutex.synchronize do
    @transport.configure_socket(&block)
  end
end

#connecting?Boolean

Returns true if this connection is still not fully open.

Returns:

  • (Boolean)

    true if this connection is still not fully open



450
451
452
# File 'lib/bunny/session.rb', line 450

def connecting?
  status == :connecting
end

#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel

Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).

Returns:

Raises:

  • (ArgumentError)


390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/bunny/session.rb', line 390

def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60)
  raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
  raise ConnectionAlreadyClosed if manually_closed?
  raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected?

  @channel_mutex.synchronize do
    if n && (ch = @channels[n])
      ch
    else
      work_pool = ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout)
      ch = Bunny::Channel.new(self, n, {
        work_pool: work_pool
      })
      ch.open
      ch
    end
  end
end

#exception_occurred?Boolean

Returns true if any exceptions have been accumulated by the session error handler. Only works when using the default ExceptionAccumulator handler.

Returns:

  • (Boolean)

    true if exceptions have been accumulated

Raises:

  • (NoMethodError)

    if the session error handler doesn't respond to #any?



524
525
526
# File 'lib/bunny/session.rb', line 524

def exception_occurred?
  @session_error_handler.any?
end

#exceptionsArray<Exception>

Returns all accumulated exceptions from the session error handler. Only works when using the default ExceptionAccumulator handler.

Returns:

  • (Array<Exception>)

    all accumulated exceptions

Raises:

  • (NoMethodError)

    if the session error handler doesn't respond to #all



534
535
536
# File 'lib/bunny/session.rb', line 534

def exceptions
  @session_error_handler.all
end

#exchange_exists?(name) ⇒ Boolean

Checks if a exchange with given name exists.

Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.

Parameters:

  • name (String)

    Exchange name

Returns:

  • (Boolean)

    true if exchange exists



599
600
601
602
603
604
605
606
607
608
609
# File 'lib/bunny/session.rb', line 599

def exchange_exists?(name)
  ch = create_channel
  begin
    ch.exchange(name, :passive => true)
    true
  rescue Bunny::NotFound => _
    false
  ensure
    ch.close if ch.open?
  end
end

#heartbeat_disabled?(val) ⇒ Boolean (protected)

Returns:

  • (Boolean)


1634
1635
1636
# File 'lib/bunny/session.rb', line 1634

def heartbeat_disabled?(val)
  0 == val || val.nil?
end

#heartbeat_intervalInteger

Deprecated.

Returns Heartbeat timeout (not interval) used.

Returns:

  • (Integer)

    Heartbeat timeout (not interval) used



268
# File 'lib/bunny/session.rb', line 268

def heartbeat_interval; self.heartbeat; end

#heartbeat_timeoutInteger

Returns Heartbeat timeout used.

Returns:

  • (Integer)

    Heartbeat timeout used



271
# File 'lib/bunny/session.rb', line 271

def heartbeat_timeout; self.heartbeat; end

#hostObject



290
291
292
# File 'lib/bunny/session.rb', line 290

def host
  @transport ? @transport.host : host_from_address(@addresses[@address_index])
end

#hostnameString

Returns RabbitMQ hostname (or IP address) used.

Returns:

  • (String)

    RabbitMQ hostname (or IP address) used



258
# File 'lib/bunny/session.rb', line 258

def hostname;     self.host;  end

#ignoring_io_errors(&block) ⇒ Object (protected)



1786
1787
1788
1789
1790
1791
1792
# File 'lib/bunny/session.rb', line 1786

def ignoring_io_errors(&block)
  begin
    block.call
  rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _
    # ignore
  end
end

#inspectObject



1505
1506
1507
# File 'lib/bunny/session.rb', line 1505

def inspect
  to_s
end

#local_portInteger

Returns Client socket port.

Returns:

  • (Integer)

    Client socket port



316
317
318
# File 'lib/bunny/session.rb', line 316

def local_port
  @transport.local_address.ip_port
end

#manually_closed?Boolean

Returns true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server)



466
467
468
# File 'lib/bunny/session.rb', line 466

def manually_closed?
  @status_mutex.synchronize { @manually_closed == true }
end

#normalize_auth_mechanism(value) ⇒ Object (protected)



1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
# File 'lib/bunny/session.rb', line 1775

def normalize_auth_mechanism(value)
  case value
  when [] then
    "PLAIN"
  when nil then
    "PLAIN"
  else
    value
  end
end

#normalize_client_channel_max(n) ⇒ Object (protected)



1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
# File 'lib/bunny/session.rb', line 1763

def normalize_client_channel_max(n)
  return CHANNEL_MAX_LIMIT if n.nil?
  return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT

  case n
  when 0 then
    CHANNEL_MAX_LIMIT
  else
    n
  end
end

#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object

Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).

Yields:

  • (AMQ::Protocol::Connection::Blocked)

    connection.blocked method which provides a reason for blocking



490
491
492
# File 'lib/bunny/session.rb', line 490

def on_blocked(&block)
  @block_callback = block
end

#on_unblocked(&block) ⇒ Object

Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.

See Also:



499
500
501
# File 'lib/bunny/session.rb', line 499

def on_unblocked(&block)
  @unblock_callback = block
end

#open?Boolean Also known as: connected?

Returns true if this AMQP 0.9.1 connection is open.

Returns:

  • (Boolean)

    true if this AMQP 0.9.1 connection is open



471
472
473
474
475
# File 'lib/bunny/session.rb', line 471

def open?
  @status_mutex.synchronize do
    (status == :open || status == :connected || status == :connecting) && @transport.open?
  end
end

#passwordString

Returns Password used.

Returns:

  • (String)

    Password used



262
# File 'lib/bunny/session.rb', line 262

def password;     self.pass;  end

#portObject



294
295
296
# File 'lib/bunny/session.rb', line 294

def port
  @transport ? @transport.port : port_from_address(@addresses[@address_index])
end

#queue_exists?(name) ⇒ Boolean

Checks if a queue with given name exists.

Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.

Parameters:

  • name (String)

    Queue name

Returns:

  • (Boolean)

    true if queue exists



577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/bunny/session.rb', line 577

def queue_exists?(name)
  ch = create_channel
  begin
    ch.queue(name, :passive => true)
    true
  rescue Bunny::ResourceLocked => _
    true
  rescue Bunny::NotFound => _
    false
  ensure
    ch.close if ch.open?
  end
end

#raise_on_exception!Object

Raises the first accumulated exception if any exist. Only works when using the default ExceptionAccumulator handler.

This is useful for checking errors at safe points in your application, such as after completing a batch of operations.

Raises:

  • (Exception)

    the first accumulated exception

  • (NoMethodError)

    if the session error handler doesn't respond to #raise_first!



557
558
559
# File 'lib/bunny/session.rb', line 557

def raise_on_exception!
  @session_error_handler.raise_first!
end

#record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object

Parameters:

  • ch (Bunny::Channel)
  • name (String)
  • type (String)
  • durable (Boolean)
  • auto_delete (Boolean)
  • arguments (Hash)


880
881
882
# File 'lib/bunny/session.rb', line 880

def record_exchange_with(ch, name, type, durable, auto_delete, arguments)
  @topology_registry.record_exchange_with(ch, name, type, durable, auto_delete, arguments)
end

#record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object

Parameters:

  • ch (Bunny::Channel)
  • name (String)
  • server_named (Boolean)
  • durable (Boolean)
  • auto_delete (Boolean)
  • exclusive (Boolean)
  • arguments (Hash)


852
853
854
# File 'lib/bunny/session.rb', line 852

def record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments)
  @topology_registry.record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments)
end

#reset_address_indexObject



298
299
300
# File 'lib/bunny/session.rb', line 298

def reset_address_index
  @address_index_mutex.synchronize { @address_index = 0 }
end

#startObject

Starts the connection process.



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/bunny/session.rb', line 325

def start
  return self if connected?

  @status_mutex.synchronize { @status = :connecting }
  # reset here for cases when automatic network recovery kicks in
  # when we were blocked. MK.
  @blocked       = false
  self.reset_continuations

  begin
    begin
      # close existing transport if we have one,
      # to not leak sockets
      @transport_mutex.synchronize do
        @transport.maybe_initialize_socket
        @transport.post_initialize_socket
        @transport.connect
      end

      self.init_connection
      self.open_connection

      @reader_loop = nil
      self.start_reader_loop if threaded?

    rescue TCPConnectionFailed => e
      @logger.warn e.message
      self.initialize_transport
      @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}"

      return self.start
    rescue
      @status_mutex.synchronize { @status = :not_connected }
      raise
    end
  rescue HostListDepleted
    self.reset_address_index
    @status_mutex.synchronize { @status = :not_connected }
    raise TCPConnectionFailedForAllHosts
  end
  @status_mutex.synchronize { @manually_closed = false }

  self
end

#threaded?Boolean

Returns true if this connection uses a separate thread for I/O activity.

Returns:

  • (Boolean)

    true if this connection uses a separate thread for I/O activity



286
287
288
# File 'lib/bunny/session.rb', line 286

def threaded?
  @threaded
end

#to_sString

Returns:

  • (String)


1500
1501
1502
1503
# File 'lib/bunny/session.rb', line 1500

def to_s
  oid = ("0x%x" % (self.object_id << 1))
  "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>"
end

#update_secret(value, reason) ⇒ Object



370
371
372
373
374
375
376
# File 'lib/bunny/session.rb', line 370

def update_secret(value, reason)
  @transport.send_frame(AMQ::Protocol::Connection::UpdateSecret.encode(value, reason))
  @last_update_secret_ok = wait_on_continuations
  raise_if_continuation_resulted_in_a_connection_error!

  @last_update_secret_ok
end

#usernameString

Returns Username used.

Returns:

  • (String)

    Username used



260
# File 'lib/bunny/session.rb', line 260

def username;     self.user;  end

#uses_ssl?Boolean Also known as: ssl?

Returns true if this connection uses TLS (SSL).

Returns:

  • (Boolean)

    true if this connection uses TLS (SSL)



280
281
282
# File 'lib/bunny/session.rb', line 280

def uses_ssl?
  @transport.uses_ssl?
end

#uses_tls?Boolean Also known as: tls?

Returns true if this connection uses TLS (SSL).

Returns:

  • (Boolean)

    true if this connection uses TLS (SSL)



274
275
276
# File 'lib/bunny/session.rb', line 274

def uses_tls?
  @transport.uses_tls?
end

#validate_connection_options(options) ⇒ Object



247
248
249
250
251
252
253
254
255
# File 'lib/bunny/session.rb', line 247

def validate_connection_options(options)
  if options[:hosts] && options[:addresses]
    raise ArgumentError, "Connection options can't contain hosts and addresses at the same time"
  end

  if (options[:host] || options[:hostname]) && (options[:hosts] || options[:addresses])
    @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one."
  end
end

#virtual_hostString

Returns Virtual host used.

Returns:

  • (String)

    Virtual host used



264
# File 'lib/bunny/session.rb', line 264

def virtual_host; self.vhost; end

#with_channel(n = nil) ⇒ Bunny::Session

Creates a temporary channel, yields it to the block given to this method and closes it.

Returns:



438
439
440
441
442
443
444
445
446
447
# File 'lib/bunny/session.rb', line 438

def with_channel(n = nil)
  ch = create_channel(n)
  begin
    yield ch
  ensure
    ch.close if ch.open?
  end

  self
end