Class: Bunny::Session
- Inherits:
-
Object
- Object
- Bunny::Session
- Defined in:
- lib/bunny/session.rb
Overview
Represents AMQP 0.9.1 connection (to a RabbitMQ node).
Constant Summary collapse
- DEFAULT_HOST =
Default host used for connection
"127.0.0.1"- DEFAULT_VHOST =
Default virtual host used for connection
"/"- DEFAULT_USER =
Default username used for connection
"guest"- DEFAULT_PASSWORD =
Default password used for connection
"guest"- DEFAULT_HEARTBEAT =
Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
:server- DEFAULT_CHANNEL_MAX =
2047- DEFAULT_CLIENT_PROPERTIES =
RabbitMQ client metadata
{ :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true, :"connection.blocked" => true, # See http://www.rabbitmq.com/auth-notification.html :authentication_failure_close => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "https://github.com/ruby-amqp/bunny", }
- DEFAULT_NETWORK_RECOVERY_INTERVAL =
Default reconnection interval for TCP connection failures
5.0- DEFAULT_RECOVERABLE_EXCEPTIONS =
[StandardError, TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError, SystemCallError, Timeout::Error, Bunny::ConnectionLevelException, Bunny::ConnectionClosedError]
Instance Attribute Summary collapse
-
#channel_id_allocator ⇒ Object
readonly
Returns the value of attribute channel_id_allocator.
-
#channel_max ⇒ Object
readonly
Returns the value of attribute channel_max.
-
#connection_name ⇒ Object
readonly
Returns the value of attribute connection_name.
-
#continuation_timeout ⇒ Integer
readonly
Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds.
-
#frame_max ⇒ Object
readonly
Returns the value of attribute frame_max.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
- #logger ⇒ Logger readonly
-
#mechanism ⇒ String
readonly
Authentication mechanism, e.g.
-
#network_recovery_interval ⇒ Object
readonly
Returns the value of attribute network_recovery_interval.
-
#pass ⇒ Object
readonly
Returns the value of attribute pass.
-
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
-
#server_authentication_mechanisms ⇒ Object
readonly
Returns the value of attribute server_authentication_mechanisms.
-
#server_capabilities ⇒ Object
readonly
Returns the value of attribute server_capabilities.
-
#server_locales ⇒ Object
readonly
Returns the value of attribute server_locales.
-
#server_properties ⇒ Object
readonly
Returns the value of attribute server_properties.
-
#session_error_handler ⇒ Object
readonly
Returns the session error handler.
-
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#threaded ⇒ Object
readonly
Returns the value of attribute threaded.
- #topology_registry ⇒ Bunny::TopologyRegistry readonly
- #transport ⇒ Bunny::Transport readonly
-
#user ⇒ Object
readonly
Returns the value of attribute user.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Class Method Summary collapse
-
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
Instance Method Summary collapse
-
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
-
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
-
#automatically_recover? ⇒ Boolean
True if this connection has automatic recovery from network failure enabled.
-
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
-
#blocked? ⇒ Boolean
True if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
- #clean_up_and_fail_on_connection_close!(method) ⇒ Object
- #clean_up_on_shutdown ⇒ Object
-
#clear_exceptions ⇒ Array<Exception>
Clears all accumulated exceptions from the session error handler.
-
#close(await_response = true) ⇒ Object
(also: #stop)
Closes the connection.
-
#closed? ⇒ Boolean
True if this AMQP 0.9.1 connection is closed.
-
#closing? ⇒ Boolean
private
True if this AMQP 0.9.1 connection is closing.
-
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection.
-
#connecting? ⇒ Boolean
True if this connection is still not fully open.
-
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel
(also: #channel)
Opens a new channel and returns it.
-
#exception_occurred? ⇒ Boolean
Returns true if any exceptions have been accumulated by the session error handler.
-
#exceptions ⇒ Array<Exception>
Returns all accumulated exceptions from the session error handler.
-
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
- #heartbeat_disabled?(val) ⇒ Boolean protected
- #heartbeat_interval ⇒ Integer deprecated Deprecated.
-
#heartbeat_timeout ⇒ Integer
Heartbeat timeout used.
- #host ⇒ Object
-
#hostname ⇒ String
RabbitMQ hostname (or IP address) used.
- #ignoring_io_errors(&block) ⇒ Object protected
-
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
constructor
A new instance of Session.
- #inspect ⇒ Object
-
#local_port ⇒ Integer
Client socket port.
-
#manually_closed? ⇒ Boolean
True if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
- #normalize_auth_mechanism(value) ⇒ Object protected
- #normalize_client_channel_max(n) ⇒ Object protected
-
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
-
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g.
-
#open? ⇒ Boolean
(also: #connected?)
True if this AMQP 0.9.1 connection is open.
-
#password ⇒ String
Password used.
- #port ⇒ Object
-
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
-
#raise_on_exception! ⇒ Object
Raises the first accumulated exception if any exist.
- #record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object
- #record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object
- #reset_address_index ⇒ Object
-
#start ⇒ Object
Starts the connection process.
-
#threaded? ⇒ Boolean
True if this connection uses a separate thread for I/O activity.
- #to_s ⇒ String
- #update_secret(value, reason) ⇒ Object
-
#username ⇒ String
Username used.
-
#uses_ssl? ⇒ Boolean
(also: #ssl?)
True if this connection uses TLS (SSL).
-
#uses_tls? ⇒ Boolean
(also: #tls?)
True if this connection uses TLS (SSL).
- #validate_connection_options(options) ⇒ Object
-
#virtual_host ⇒ String
Virtual host used.
-
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
Constructor Details
#initialize(connection_string_or_opts = , optz = Hash.new) ⇒ Session
Returns a new instance of Session.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/bunny/session.rb', line 146 def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new) opts = case (connection_string_or_opts) when nil then Hash.new when String then self.class.parse_uri(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle } @opts = opts log_file = opts[:log_file] || opts[:logfile] || STDOUT log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN # we might need to log a warning about ill-formatted IPv6 address but # progname includes hostname, so init like this first @logger = opts.fetch(:logger, init_default_logger_without_progname(log_file, log_level)) @addresses = self.addresses_from(opts) @address_index = 0 @transport = nil @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @threaded = opts.fetch(:threaded, true) # re-init, see above @logger = opts.fetch(:logger, init_default_logger(log_file, log_level)) (opts) @last_connection_error = nil # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] | opts[:automatic_recovery] end @recovering_from_network_failure = false @max_recovery_attempts = opts[:recovery_attempts] @recovery_attempts = @max_recovery_attempts # When this is set, connection attempts won't be reset after # successful reconnection. Some find this behavior more sensible # than the per-failure attempt counter. MK. @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true) @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @manually_closed = false @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) # will be-renegotiated during connection tuning steps. MK. @channel_max = @client_channel_max @heartbeat_sender = nil @client_heartbeat = self.heartbeat_from(opts) client_props = opts[:properties] || opts[:client_properties] || {} @connection_name = client_props[:connection_name] || opts[:connection_name] @client_properties = DEFAULT_CLIENT_PROPERTIES.merge(client_props) .merge(connection_name: connection_name) @mechanism = normalize_auth_mechanism(opts.fetch(:auth_mechanism, "PLAIN")) @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) @mutex_impl = @opts.fetch(:mutex_impl, Monitor) # mutex for the channel id => channel hash @channel_mutex = @mutex_impl.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. @transport_mutex = @mutex_impl.new @status_mutex = @mutex_impl.new @address_index_mutex = @mutex_impl.new @channels = Hash.new trf = @opts.fetch(:topology_recovery_filter, DefaultTopologyRecoveryFilter.new) @topology_registry = TopologyRegistry.new(topology_recovery_filter: trf) @recovery_attempt_started = opts[:recovery_attempt_started] @recovery_completed = opts[:recovery_completed] @recovery_attempts_exhausted = opts[:recovery_attempts_exhausted] @session_error_handler = opts.fetch(:session_error_handler, ExceptionAccumulator.new) @recoverable_exceptions = DEFAULT_RECOVERABLE_EXCEPTIONS.dup self.reset_continuations self.initialize_transport end |
Instance Attribute Details
#channel_id_allocator ⇒ Object (readonly)
Returns the value of attribute channel_id_allocator.
85 86 87 |
# File 'lib/bunny/session.rb', line 85 def channel_id_allocator @channel_id_allocator end |
#channel_max ⇒ Object (readonly)
Returns the value of attribute channel_max.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def channel_max @channel_max end |
#connection_name ⇒ Object (readonly)
Returns the value of attribute connection_name.
96 97 98 |
# File 'lib/bunny/session.rb', line 96 def connection_name @connection_name end |
#continuation_timeout ⇒ Integer (readonly)
Returns Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
94 95 96 |
# File 'lib/bunny/session.rb', line 94 def continuation_timeout @continuation_timeout end |
#frame_max ⇒ Object (readonly)
Returns the value of attribute frame_max.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def frame_max @frame_max end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def heartbeat @heartbeat end |
#logger ⇒ Logger (readonly)
92 93 94 |
# File 'lib/bunny/session.rb', line 92 def logger @logger end |
#mechanism ⇒ String (readonly)
Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
90 91 92 |
# File 'lib/bunny/session.rb', line 90 def mechanism @mechanism end |
#network_recovery_interval ⇒ Object (readonly)
Returns the value of attribute network_recovery_interval.
95 96 97 |
# File 'lib/bunny/session.rb', line 95 def network_recovery_interval @network_recovery_interval end |
#pass ⇒ Object (readonly)
Returns the value of attribute pass.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def pass @pass end |
#recoverable_exceptions ⇒ Object
Returns the value of attribute recoverable_exceptions.
98 99 100 |
# File 'lib/bunny/session.rb', line 98 def recoverable_exceptions @recoverable_exceptions end |
#server_authentication_mechanisms ⇒ Object (readonly)
Returns the value of attribute server_authentication_mechanisms.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def server_authentication_mechanisms @server_authentication_mechanisms end |
#server_capabilities ⇒ Object (readonly)
Returns the value of attribute server_capabilities.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def server_capabilities @server_capabilities end |
#server_locales ⇒ Object (readonly)
Returns the value of attribute server_locales.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def server_locales @server_locales end |
#server_properties ⇒ Object (readonly)
Returns the value of attribute server_properties.
84 85 86 |
# File 'lib/bunny/session.rb', line 84 def server_properties @server_properties end |
#session_error_handler ⇒ Object (readonly)
Returns the session error handler. By default, this is a Bunny::ExceptionAccumulator instance.
516 517 518 |
# File 'lib/bunny/session.rb', line 516 def session_error_handler @session_error_handler end |
#socket_configurator ⇒ Object
Returns the value of attribute socket_configurator.
97 98 99 |
# File 'lib/bunny/session.rb', line 97 def socket_configurator @socket_configurator end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def status @status end |
#threaded ⇒ Object (readonly)
Returns the value of attribute threaded.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def threaded @threaded end |
#topology_registry ⇒ Bunny::TopologyRegistry (readonly)
87 88 89 |
# File 'lib/bunny/session.rb', line 87 def topology_registry @topology_registry end |
#transport ⇒ Bunny::Transport (readonly)
82 83 84 |
# File 'lib/bunny/session.rb', line 82 def transport @transport end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def user @user end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
83 84 85 |
# File 'lib/bunny/session.rb', line 83 def vhost @vhost end |
Class Method Details
.parse_uri(uri) ⇒ Hash
Parses an amqp[s] URI into a hash that #initialize accepts.
565 566 567 |
# File 'lib/bunny/session.rb', line 565 def self.parse_uri(uri) AMQ::Settings.configure(uri) end |
Instance Method Details
#after_recovery_attempts_exhausted(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called when the connection recovery failed after the specified numbers of recovery attempts.
626 627 628 |
# File 'lib/bunny/session.rb', line 626 def after_recovery_attempts_exhausted(&block) @recovery_attempts_exhausted = block end |
#after_recovery_completed(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called after successful connection recovery.
619 620 621 |
# File 'lib/bunny/session.rb', line 619 def after_recovery_completed(&block) @recovery_completed = block end |
#automatically_recover? ⇒ Boolean
Returns true if this connection has automatic recovery from network failure enabled.
479 480 481 |
# File 'lib/bunny/session.rb', line 479 def automatically_recover? @automatically_recover end |
#before_recovery_attempt_starts(&block) ⇒ Object
Defines a callable (e.g. a block) that will be called before every connection recovery attempt.
613 614 615 |
# File 'lib/bunny/session.rb', line 613 def before_recovery_attempt_starts(&block) @recovery_attempt_started = block end |
#blocked? ⇒ Boolean
Returns true if the connection is currently blocked by RabbitMQ because it's running low on RAM, disk space, or other resource; false otherwise.
507 508 509 |
# File 'lib/bunny/session.rb', line 507 def blocked? @blocked end |
#clean_up_and_fail_on_connection_close!(method) ⇒ Object
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 |
# File 'lib/bunny/session.rb', line 1207 def clean_up_and_fail_on_connection_close!(method) @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) clean_up_on_shutdown if threaded? @session_error_handler.raise(@last_connection_error) else raise @last_connection_error end end |
#clean_up_on_shutdown ⇒ Object
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 |
# File 'lib/bunny/session.rb', line 1219 def clean_up_on_shutdown begin shut_down_all_consumer_work_pools! maybe_shutdown_reader_loop maybe_shutdown_heartbeat_sender rescue ShutdownSignal => _sse # no-op rescue Exception => e @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" ensure close_transport end end |
#clear_exceptions ⇒ Array<Exception>
Clears all accumulated exceptions from the session error handler. Only works when using the default ExceptionAccumulator handler.
544 545 546 |
# File 'lib/bunny/session.rb', line 544 def clear_exceptions @session_error_handler.clear end |
#close(await_response = true) ⇒ Object Also known as: stop
Closes the connection. This involves closing all of its channels.
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/bunny/session.rb', line 411 def close(await_response = true) @status_mutex.synchronize { @status = :closing } ignoring_io_errors do if @transport.open? @logger.debug "Transport is still open..." close_all_channels @logger.debug "Will close all channels...." self.close_connection(await_response) end clean_up_on_shutdown end @status_mutex.synchronize do @status = :closed @manually_closed = true end @logger.debug "Connection is closed" true end |
#closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection is closed.
461 462 463 |
# File 'lib/bunny/session.rb', line 461 def closed? @status_mutex.synchronize { @status == :closed } end |
#closing? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if this AMQP 0.9.1 connection is closing.
456 457 458 |
# File 'lib/bunny/session.rb', line 456 def closing? @status_mutex.synchronize { @status == :closing } end |
#configure_socket(&block) ⇒ Object
Provides a way to fine tune the socket used by connection. Accepts a block that the socket will be yielded to.
307 308 309 310 311 312 313 |
# File 'lib/bunny/session.rb', line 307 def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport_mutex.synchronize do @transport.configure_socket(&block) end end |
#connecting? ⇒ Boolean
Returns true if this connection is still not fully open.
450 451 452 |
# File 'lib/bunny/session.rb', line 450 def connecting? status == :connecting end |
#create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) ⇒ Bunny::Channel Also known as: channel
Opens a new channel and returns it. This method will block the calling thread until the response is received and the channel is guaranteed to be opened (this operation is very fast and inexpensive).
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/bunny/session.rb', line 390 def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n raise ConnectionAlreadyClosed if manually_closed? raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else work_pool = ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout) ch = Bunny::Channel.new(self, n, { work_pool: work_pool }) ch.open ch end end end |
#exception_occurred? ⇒ Boolean
Returns true if any exceptions have been accumulated by the session error handler. Only works when using the default ExceptionAccumulator handler.
524 525 526 |
# File 'lib/bunny/session.rb', line 524 def exception_occurred? @session_error_handler.any? end |
#exceptions ⇒ Array<Exception>
Returns all accumulated exceptions from the session error handler. Only works when using the default ExceptionAccumulator handler.
534 535 536 |
# File 'lib/bunny/session.rb', line 534 def exceptions @session_error_handler.all end |
#exchange_exists?(name) ⇒ Boolean
Checks if a exchange with given name exists.
Implemented using exchange.declare with passive set to true and a one-off (short lived) channel under the hood.
599 600 601 602 603 604 605 606 607 608 609 |
# File 'lib/bunny/session.rb', line 599 def exchange_exists?(name) ch = create_channel begin ch.exchange(name, :passive => true) true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#heartbeat_disabled?(val) ⇒ Boolean (protected)
1634 1635 1636 |
# File 'lib/bunny/session.rb', line 1634 def heartbeat_disabled?(val) 0 == val || val.nil? end |
#heartbeat_interval ⇒ Integer
Returns Heartbeat timeout (not interval) used.
268 |
# File 'lib/bunny/session.rb', line 268 def heartbeat_interval; self.heartbeat; end |
#heartbeat_timeout ⇒ Integer
Returns Heartbeat timeout used.
271 |
# File 'lib/bunny/session.rb', line 271 def heartbeat_timeout; self.heartbeat; end |
#host ⇒ Object
290 291 292 |
# File 'lib/bunny/session.rb', line 290 def host @transport ? @transport.host : host_from_address(@addresses[@address_index]) end |
#hostname ⇒ String
Returns RabbitMQ hostname (or IP address) used.
258 |
# File 'lib/bunny/session.rb', line 258 def hostname; self.host; end |
#ignoring_io_errors(&block) ⇒ Object (protected)
1786 1787 1788 1789 1790 1791 1792 |
# File 'lib/bunny/session.rb', line 1786 def ignoring_io_errors(&block) begin block.call rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Bunny::NetworkFailure => _ # ignore end end |
#inspect ⇒ Object
1505 1506 1507 |
# File 'lib/bunny/session.rb', line 1505 def inspect to_s end |
#local_port ⇒ Integer
Returns Client socket port.
316 317 318 |
# File 'lib/bunny/session.rb', line 316 def local_port @transport.local_address.ip_port end |
#manually_closed? ⇒ Boolean
Returns true if this AMQP 0.9.1 connection has been closed by the user (as opposed to the server).
466 467 468 |
# File 'lib/bunny/session.rb', line 466 def manually_closed? @status_mutex.synchronize { @manually_closed == true } end |
#normalize_auth_mechanism(value) ⇒ Object (protected)
1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 |
# File 'lib/bunny/session.rb', line 1775 def normalize_auth_mechanism(value) case value when [] then "PLAIN" when nil then "PLAIN" else value end end |
#normalize_client_channel_max(n) ⇒ Object (protected)
1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 |
# File 'lib/bunny/session.rb', line 1763 def normalize_client_channel_max(n) return CHANNEL_MAX_LIMIT if n.nil? return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT case n when 0 then CHANNEL_MAX_LIMIT else n end end |
#on_blocked {|AMQ::Protocol::Connection::Blocked| ... } ⇒ Object
Defines a callback that will be executed when RabbitMQ blocks the connection because it is running low on memory or disk space (as configured via config file and/or rabbitmqctl).
490 491 492 |
# File 'lib/bunny/session.rb', line 490 def on_blocked(&block) @block_callback = block end |
#on_unblocked(&block) ⇒ Object
Defines a callback that will be executed when RabbitMQ unblocks the connection that was previously blocked, e.g. because the memory or disk space alarm has cleared.
499 500 501 |
# File 'lib/bunny/session.rb', line 499 def on_unblocked(&block) @unblock_callback = block end |
#open? ⇒ Boolean Also known as: connected?
Returns true if this AMQP 0.9.1 connection is open.
471 472 473 474 475 |
# File 'lib/bunny/session.rb', line 471 def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end end |
#password ⇒ String
Returns Password used.
262 |
# File 'lib/bunny/session.rb', line 262 def password; self.pass; end |
#port ⇒ Object
294 295 296 |
# File 'lib/bunny/session.rb', line 294 def port @transport ? @transport.port : port_from_address(@addresses[@address_index]) end |
#queue_exists?(name) ⇒ Boolean
Checks if a queue with given name exists.
Implemented using queue.declare with passive set to true and a one-off (short lived) channel under the hood.
577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/bunny/session.rb', line 577 def queue_exists?(name) ch = create_channel begin ch.queue(name, :passive => true) true rescue Bunny::ResourceLocked => _ true rescue Bunny::NotFound => _ false ensure ch.close if ch.open? end end |
#raise_on_exception! ⇒ Object
Raises the first accumulated exception if any exist. Only works when using the default ExceptionAccumulator handler.
This is useful for checking errors at safe points in your application, such as after completing a batch of operations.
557 558 559 |
# File 'lib/bunny/session.rb', line 557 def raise_on_exception! @session_error_handler.raise_first! end |
#record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object
880 881 882 |
# File 'lib/bunny/session.rb', line 880 def record_exchange_with(ch, name, type, durable, auto_delete, arguments) @topology_registry.record_exchange_with(ch, name, type, durable, auto_delete, arguments) end |
#record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object
852 853 854 |
# File 'lib/bunny/session.rb', line 852 def record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) @topology_registry.record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) end |
#reset_address_index ⇒ Object
298 299 300 |
# File 'lib/bunny/session.rb', line 298 def reset_address_index @address_index_mutex.synchronize { @address_index = 0 } end |
#start ⇒ Object
Starts the connection process.
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/bunny/session.rb', line 325 def start return self if connected? @status_mutex.synchronize { @status = :connecting } # reset here for cases when automatic network recovery kicks in # when we were blocked. MK. @blocked = false self.reset_continuations begin begin # close existing transport if we have one, # to not leak sockets @transport_mutex.synchronize do @transport.maybe_initialize_socket @transport.post_initialize_socket @transport.connect end self.init_connection self.open_connection @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e @logger.warn e. self.initialize_transport @logger.warn "Will try to connect to the next endpoint in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } raise end rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end @status_mutex.synchronize { @manually_closed = false } self end |
#threaded? ⇒ Boolean
Returns true if this connection uses a separate thread for I/O activity.
286 287 288 |
# File 'lib/bunny/session.rb', line 286 def threaded? @threaded end |
#to_s ⇒ String
1500 1501 1502 1503 |
# File 'lib/bunny/session.rb', line 1500 def to_s oid = ("0x%x" % (self.object_id << 1)) "#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>" end |
#update_secret(value, reason) ⇒ Object
370 371 372 373 374 375 376 |
# File 'lib/bunny/session.rb', line 370 def update_secret(value, reason) @transport.send_frame(AMQ::Protocol::Connection::UpdateSecret.encode(value, reason)) @last_update_secret_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_update_secret_ok end |
#username ⇒ String
Returns Username used.
260 |
# File 'lib/bunny/session.rb', line 260 def username; self.user; end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
Returns true if this connection uses TLS (SSL).
280 281 282 |
# File 'lib/bunny/session.rb', line 280 def uses_ssl? @transport.uses_ssl? end |
#uses_tls? ⇒ Boolean Also known as: tls?
Returns true if this connection uses TLS (SSL).
274 275 276 |
# File 'lib/bunny/session.rb', line 274 def uses_tls? @transport.uses_tls? end |
#validate_connection_options(options) ⇒ Object
247 248 249 250 251 252 253 254 255 |
# File 'lib/bunny/session.rb', line 247 def () if [:hosts] && [:addresses] raise ArgumentError, "Connection options can't contain hosts and addresses at the same time" end if ([:host] || [:hostname]) && ([:hosts] || [:addresses]) @logger.warn "Connection options contain both a host and an array of hosts (addresses), please pick one." end end |
#virtual_host ⇒ String
Returns Virtual host used.
264 |
# File 'lib/bunny/session.rb', line 264 def virtual_host; self.vhost; end |
#with_channel(n = nil) ⇒ Bunny::Session
Creates a temporary channel, yields it to the block given to this method and closes it.
438 439 440 441 442 443 444 445 446 447 |
# File 'lib/bunny/session.rb', line 438 def with_channel(n = nil) ch = create_channel(n) begin yield ch ensure ch.close if ch.open? end self end |