Class: Bunny::Session
- Inherits:
-
Object
- Object
- Bunny::Session
- Defined in:
- lib/bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary collapse
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"
- DEFAULT_VHOST =
Default virtual host used for connection
"/"
- DEFAULT_USER =
Default username used for connection
"guest"
- DEFAULT_PASSWORD =
Default password used for connection
"guest"
- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server
- DEFAULT_CHANNEL_MAX =
2047
- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true, :"connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "https://github.com/ruby-amqp/bunny", }
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0
- DEFAULT_RECOVERABLE_EXCEPTIONS =
[StandardError, TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError, SystemCallError, Timeout::Error, Bunny::ConnectionLevelException, Bunny::ConnectionClosedError]
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#channel_max ⇒ Object
readonly
Returns the value of attribute channel_max.
-
#connection_name ⇒ Object
readonly
Returns the value of attribute connection_name.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
-
#network_recovery_interval ⇒ Object
readonly
Returns the value of attribute network_recovery_interval.
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #topology_registry ⇒ Bunny::TopologyRegistry readonly
- #transport ⇒ Bunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
Instance Method Summary collapse
-
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
-
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
- #clean_up_and_fail_on_connection_close!(method) ⇒ Object
- #clean_up_on_shutdown ⇒ Object
-
#close(await_response = true) ⇒ Object
(also: #stop)
Closes the connection.
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#closing? ⇒ Boolean
private
True if this AMQP 0.9.1 connection is closing.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel
(also: #channel)
Opens a new channel and returns it.
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #heartbeat_disabled?(val) ⇒ Boolean protected
- #heartbeat_interval ⇒ Integer deprecated Deprecated.
-
#heartbeat_timeout ⇒ Integer
Heartbeat timeout used.
- #host ⇒ Object
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
- #ignoring_io_errors(&block) ⇒ Object protected
-
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #inspect ⇒ Object
-
#local_port ⇒ Integer
Client socket port.
-
#manually_closed? ⇒ Boolean
True if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
- #normalize_auth_mechanism(value) ⇒ Object protected
- #normalize_client_channel_max(n) ⇒ Object protected
-
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
-
#password ⇒ String
Password used.
- #port ⇒ Object
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
- #record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object
- #record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object
- #reset_address_index ⇒ Object
-
#start ⇒ Object
Starts the connection process.
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
- #update_secret(value, reason) ⇒ Object
-
#username ⇒ String
Username used.
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #validate_connection_options(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
Returns a new instance of Session.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/bunny/session.rb', line 145 def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) opts = case (connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN # we might need to log a warning about ill-formatted IPv6 address but # progname includes hostname, so init like this first @logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level)) @addresses = self.addresses_from(opts) @address_index = 0 @transport = nil @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) # re-init, see above @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) (opts) @last_connection_error = nil # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] | opts[:automatic_recovery] end @recovering_from_network_failure = false @max_recovery_attempts = opts[:recovery_attempts] @recovery_attempts = @max_recovery_attempts # When this is set, connection attempts won't be reset after # successful reconnection. Some find this behavior more sensible # than the per-failure attempt counter. MK. @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true) @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @manually_closed = false @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max @heartbeat_sender = nil @client_heartbeat = self.heartbeat_from(opts) client_props = opts[:properties] || opts[:client_properties] || {} @connection_name = client_props[:connection_name] || opts[:connection_name] @client_properties = DEFAULT_CLIENT_PROPERTIES.merge(client_props) .merge(connection_name: connection_name) @mechanism = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN")) @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new @address_index_mutex = @mutex_impl.new @channels = Hash.new trf = @opts.fetch(:topology_recovery_filter, DefaultTopologyRecoveryFilter.new) @topology_registry = TopologyRegistry.new(topology_recovery_filter: trf) @recovery_attempt_started = opts[:recovery_attempt_started] @recovery_completed = opts[:recovery_completed] @recovery_attempts_exhausted = opts[:recovery_attempts_exhausted] @session_error_handler = opts.fetch(:session_error_handler, Thread.current) @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def channel_id_allocator @channel_id_allocator end |
#channel_max ⇒ Object (readonly)
Returns the value of attribute channel_max.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def channel_max @channel_max end |
#connection_name ⇒ Object (readonly)
Returns the value of attribute connection_name.
95 96 97 |
# File 'lib/bunny/session.rb', line 95 def connection_name @connection_name end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
93 94 95 |
# File 'lib/bunny/session.rb', line 93 def continuation_timeout @continuation_timeout end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def heartbeat @heartbeat end |
#logger ⇒ Logger (readonly)
91 92 93 |
# File 'lib/bunny/session.rb', line 91 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
89 90 91 |
# File 'lib/bunny/session.rb', line 89 def mechanism @mechanism end |
#network_recovery_interval ⇒ Object (readonly)
Returns the value of attribute network_recovery_interval.
94 95 96 |
# File 'lib/bunny/session.rb', line 94 def network_recovery_interval @network_recovery_interval end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def pass @pass end |
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
97 98 99 |
# File 'lib/bunny/session.rb', line 97 def recoverable_exceptions @recoverable_exceptions end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def server_properties @server_properties end |
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
96 97 98 |
# File 'lib/bunny/session.rb', line 96 def socket_configurator @socket_configurator end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def threaded @threaded end |
#topology_registry ⇒ Bunny::TopologyRegistry (readonly)
86 87 88 |
# File 'lib/bunny/session.rb', line 86 def topology_registry @topology_registry end |
#transport ⇒ Bunny::Transport (readonly)
81 82 83 |
# File 'lib/bunny/session.rb', line 81 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
514 515 516 |
# File 'lib/bunny/session.rb', line 514 def self.parse_uri(uri) AMQ::Settings.configure(uri) end |
Instance Method Details
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
575 576 577 |
# File 'lib/bunny/session.rb', line 575 def after_recovery_attempts_exhausted(&block) @recovery_attempts_exhausted = block end |
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
568 569 570 |
# File 'lib/bunny/session.rb', line 568 def after_recovery_completed(&block) @recovery_completed = block end |
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled.
478 479 480 |
# File 'lib/bunny/session.rb', line 478 def automatically_recover? @automatically_recover end |
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
562 563 564 |
# File 'lib/bunny/session.rb', line 562 def before_recovery_attempt_starts(&block) @recovery_attempt_started = block end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
506 507 508 |
# File 'lib/bunny/session.rb', line 506 def blocked? @blocked end |
#clean_up_and_fail_on_connection_close!(method) ⇒ Object
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 |
# File 'lib/bunny/session.rb', line 1156 def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown if threaded? @session_error_handler.raise(@last_connection_error) else raise @last_connection_error end end |
#clean_up_on_shutdown ⇒ Object
1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 |
# File 'lib/bunny/session.rb', line 1168 def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! maybe_shutdown_reader_loop maybe_shutdown_heartbeat_sender rescue ShutdownSignal => _sse # no-op rescue Exception => e @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.}" ensure close_transport end end |
#close(await_response = true) ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/bunny/session.rb', line 410 def close(await_response = true) @status_mutex.synchronize { @status = :closing } ignoring_io_errors do if @transport.open? @logger.debug "Transport is still open..." close_all_channels @logger.debug "Will close all channels...." self.close_connection(await_response) end clean_up_on_shutdown end @status_mutex.synchronize do @status = :closed @manually_closed = true end @logger.debug "Connection is closed" true end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed.
460 461 462 |
# File 'lib/bunny/session.rb', line 460 def closed? @status_mutex.synchronize { @status == :closed } end |
#closing? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if this AMQP 0.9.1 connection is closing.
455 456 457 |
# File 'lib/bunny/session.rb', line 455 def closing? @status_mutex.synchronize { @status == :closing } end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
306 307 308 309 310 311 312 |
# File 'lib/bunny/session.rb', line 306 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport_mutex.synchronize do @transport.configure_socket(&block) end end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open.
449 450 451 |
# File 'lib/bunny/session.rb', line 449 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/bunny/session.rb', line 389 def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n raise ConnectionAlreadyClosed if manually_closed? raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else work_pool = ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout) ch = Bunny::Channel.new(self, n, { work_pool: work_pool }) ch.open ch end end end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
548 549 550 551 552 553 554 555 556 557 558 |
# File 'lib/bunny/session.rb', line 548 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#heartbeat_disabled?(val) ⇒ Boolean (protected)
1583 1584 1585 |
# File 'lib/bunny/session.rb', line 1583 def heartbeat_disabled?(val) 0 == val || val.nil? end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat timeout (not interval) used.
267 |
# File 'lib/bunny/session.rb', line 267 def heartbeat_interval; self.heartbeat; end |
#heartbeat_timeout ⇒ Integer
Returns Heartbeat timeout used.
270 |
# File 'lib/bunny/session.rb', line 270 def heartbeat_timeout; self.heartbeat; end |
#host ⇒ Object
289 290 291 |
# File 'lib/bunny/session.rb', line 289 def host @transport ? @transport.host : host_from_address(@addresses[@address_index]) end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used.
257 |
# File 'lib/bunny/session.rb', line 257 def hostname; self.host; end |
#ignoring_io_errors(&block) ⇒ Object (protected)
1735 1736 1737 1738 1739 1740 1741 |
# File 'lib/bunny/session.rb', line 1735 def ignoring_io_errors(&block) begin block.call rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _ # ignore end end |
#inspect ⇒ Object
1454 1455 1456 |
# File 'lib/bunny/session.rb', line 1454 def inspect to_s end |
#local_port ⇒ Integer
Returns Client socket port.
315 316 317 |
# File 'lib/bunny/session.rb', line 315 def local_port @transport.local_address.ip_port end |
#manually_closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
465 466 467 |
# File 'lib/bunny/session.rb', line 465 def manually_closed? @status_mutex.synchronize { @manually_closed == true } end |
#normalize_auth_mechanism(value) ⇒ Object (protected)
1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 |
# File 'lib/bunny/session.rb', line 1724 def normalize_auth_mechanism(value) case value when [] then "PLAIN" when nil then "PLAIN" else value end end |
#normalize_client_channel_max(n) ⇒ Object (protected)
1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 |
# File 'lib/bunny/session.rb', line 1712 def normalize_client_channel_max(n) return CHANNEL_MAX_LIMIT if n.nil? return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT case n when 0 then CHANNEL_MAX_LIMIT else n end end |
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
489 490 491 |
# File 'lib/bunny/session.rb', line 489 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
498 499 500 |
# File 'lib/bunny/session.rb', line 498 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open.
470 471 472 473 474 |
# File 'lib/bunny/session.rb', line 470 def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end end |
#password ⇒ String
Returns Password used.
261 |
# File 'lib/bunny/session.rb', line 261 def password; self.pass; end |
#port ⇒ Object
293 294 295 |
# File 'lib/bunny/session.rb', line 293 def port @transport ? @transport.port : port_from_address(@addresses[@address_index]) end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
526 527 528 529 530 531 532 533 534 535 536 537 538 |
# File 'lib/bunny/session.rb', line 526 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue Bunny::ResourceLocked => _ true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object
829 830 831 |
# File 'lib/bunny/session.rb', line 829 def record_exchange_with(ch, name, type, durable, auto_delete, arguments) @topology_registry.record_exchange_with(ch, name, type, durable, auto_delete, arguments) end |
#record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object
801 802 803 |
# File 'lib/bunny/session.rb', line 801 def record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) @topology_registry.record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) end |
#reset_address_index ⇒ Object
297 298 299 |
# File 'lib/bunny/session.rb', line 297 def reset_address_index @address_index_mutex.synchronize { @address_index = 0 } end |
#start ⇒ Object
Starts the connection process.
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/bunny/session.rb', line 324 def start return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin begin # close existing transport if we have one, # to not leak sockets @transport_mutex.synchronize do @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect end self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e @logger.warn e. self.initialize_transport @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end @status_mutex.synchronize { @manually_closed = false } self end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity.
285 286 287 |
# File 'lib/bunny/session.rb', line 285 def threaded? @threaded end |
#to_s ⇒ String
1449 1450 1451 1452 |
# File 'lib/bunny/session.rb', line 1449 def to_s oid = ("0x%x" % (self.object_id << 1)) "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end |
#update_secret(value, reason) ⇒ Object
369 370 371 372 373 374 375 |
# File 'lib/bunny/session.rb', line 369 def update_secret(value, reason) @transport.send_frame(AMQ::Protocol::Connection::UpdateSecret.encode(value, reason)) @last_update_secret_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_update_secret_ok end |
#username ⇒ String
Returns Username used.
259 |
# File 'lib/bunny/session.rb', line 259 def username; self.user; end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL).
279 280 281 |
# File 'lib/bunny/session.rb', line 279 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL).
273 274 275 |
# File 'lib/bunny/session.rb', line 273 def uses_tls? @transport.uses_tls? end |
#validate_connection_options(options) ⇒ Object
246 247 248 249 250 251 252 253 254 |
# File 'lib/bunny/session.rb', line 246 def () if [:hosts] && [:addresses] raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" end if ([:host] || [:hostname]) && ([:hosts] || [:addresses]) @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one." end end |
#virtual_host ⇒ String
Returns Virtual host used.
263 |
# File 'lib/bunny/session.rb', line 263 def virtual_host; self.vhost; end |
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
437 438 439 440 441 442 443 444 445 446 |
# File 'lib/bunny/session.rb', line 437 def with_channel(n = nil) ch = create_channel(n) begin yield ch ensure ch.close if ch.open? end self end |