Class: AMQP::Channel
- Inherits:
-
AMQ::Client::Channel
- Object
- AMQ::Client::Channel
- AMQP::Channel
- Defined in:
- lib/amqp/channel.rb
Overview
What are AMQP channels
To quote AMQP 0.9.1 specification:
AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
Opening a channel
Channels are opened asynchronously. There are two ways to do it: using a callback or pseudo-synchronous mode.
Unless your application needs multiple channels, this approach is recommended. Alternatively, AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, however, it can be used as if it was a synchronous, blocking method:
Even though in the example above channel isn’t immediately open, it is safe to declare exchanges using it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration and other operations on exchanges and queues. Library methods that rely on channel being open will be enqueued and executed in a FIFO manner when broker confirms channel opening. Note, however, that this “pseudo-synchronous mode” is easy to abuse and introduce race conditions AMQP gem cannot resolve for you. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.
Key methods
Key methods of Channel class are
refer to documentation for those methods for usage examples.
Channel provides a number of convenience methods that instantiate queues and exchanges of various types associated with this channel:
Error handling
It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:
When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application needs.
Closing a channel
Channels are opened when objects is instantiated and closed using #close method when application no longer needs it.
RabbitMQ extensions.
AMQP gem supports several RabbitMQ extensions that extend Channel functionality. Learn more in VendorSpecificExtensions
Instance Attribute Summary (collapse)
-
- (Boolean) auto_recovery
True if this channel is in automatic recovery mode.
-
- (Connection) connection
(also: #conn)
readonly
AMQP connection this channel is part of.
-
- (Symbol) status
readonly
Status of this channel (one of: :opening, :closing, :open, :closed).
Declaring exchanges (collapse)
-
- (Exchange) default_exchange
Returns exchange object with the same name as default (aka unnamed) exchange.
-
- (Exchange) direct(name = 'amq.direct', opts = {}, &block)
Defines, intializes and returns a direct Exchange instance.
-
- (Exchange) fanout(name = 'amq.fanout', opts = {}, &block)
Defines, intializes and returns a fanout Exchange instance.
-
- (Exchange) headers(name = 'amq.match', opts = {}, &block)
Defines, intializes and returns a headers Exchange instance.
-
- (Exchange) topic(name = 'amq.topic', opts = {}, &block)
Defines, intializes and returns a topic Exchange instance.
Declaring queues (collapse)
-
- (Queue) queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) {|queue, declare_ok| ... }
Declares and returns a Queue instance associated with this channel.
-
- (Queue) queue!(name, opts = {}, &block)
Same as #queue but when queue with the same name already exists in this channel object’s cache, this method will replace existing queue with a newly defined one.
Channel lifecycle (collapse)
-
- (Object) close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
Closes AMQP channel.
- - (Boolean) closing?
-
- (Object) once_open(&block)
(also: #once_opened)
Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker).
-
- (Object) open(&block)
Opens AMQP channel.
-
- (Boolean) open?
True if channel is not closed.
QoS and flow handling (collapse)
-
- (Object) flow(active = false, &block)
Asks the peer to pause or restart the flow of content data sent to a consumer.
-
- (Boolean) flow_is_active?
True if flow in this channel is active (messages will be delivered to consumers that use this channel).
-
- (Channel) prefetch(count, global = false, &block)
Self.
Message acknowledgements (collapse)
-
- (Object) acknowledge(delivery_tag, multiple = false)
Acknowledge one or all messages on the channel.
-
- (Channel) recover(requeue = true, &block)
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
-
- (Object) reject(delivery_tag, requeue = true)
Reject a message with given delivery tag.
Transactions (collapse)
-
- (Object) tx_commit(&block)
Commits AMQP transaction.
-
- (Object) tx_rollback(&block)
Rolls AMQP transaction back.
-
- (Object) tx_select(&block)
Sets the channel to use standard transactions.
Error handling (collapse)
-
- (Object) on_error(&block)
Defines a callback that will be executed when channel is closed after channel-level exception.
Class Method Summary (collapse)
- + (Object) method_missing(meth, *args, &blk) deprecated Deprecated.
-
+ (Fixnum) next_channel_id
Returns next available channel id.
-
+ (Object) release_channel_id(i)
Releases previously allocated channel id.
-
+ (Object) reset_channel_id_allocator
Resets channel allocator.
Instance Method Summary (collapse)
-
- (Object) auto_recover
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
- (Boolean) auto_recovering?
True if this channel uses automatic recovery mode.
-
- (Channel) initialize(connection = nil, id = self.class.next_channel_id, options = {}) {|channel, open_ok| ... }
constructor
A new instance of Channel.
-
- (Object) reuse
Can be used to recover channels from channel-level exceptions.
-
- (RPC) rpc(name, obj = nil)
Instantiates and returns an RPC instance associated with this channel.
-
- (Object) rpcs
Returns a hash of all rpc proxy objects.
- - (Object) validate_parameters_match!(entity, parameters, type) protected
Constructor Details
- (Channel) initialize(connection = nil, id = self.class.next_channel_id, options = {}) {|channel, open_ok| ... }
A new instance of Channel
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/amqp/channel.rb', line 204 def initialize(connection = nil, id = self.class.next_channel_id, = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? @connection = connection || AMQP.connection || AMQP.start # this means 2nd argument is options if id.kind_of?(Hash) = .merge(id) id = self.class.next_channel_id end super(@connection, id, ) @rpcs = Hash.new # we need this deferrable to mimic what AMQP gem 0.7 does to enable # the following (pseudo-synchronous) style of programming some people use in their # existing codebases: # # connection = AMQP.connect # channel = AMQP::Channel.new(connection) # queue = AMQP::Queue.new(channel) # # ... # # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK. @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]} # only send channel.open when connection is actually open. Makes it possible to # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK. @connection.on_connection do self.open do |ch, open_ok| @channel_is_open_deferrable.succeed if block case block.arity when 1 then block.call(ch) else block.call(ch, open_ok) end # case end # if self.prefetch([:prefetch], false) if [:prefetch] end # self.open end # @connection.on_open end |
Instance Attribute Details
- (Boolean) auto_recovery
True if this channel is in automatic recovery mode
252 253 254 |
# File 'lib/amqp/channel.rb', line 252 def auto_recovery @auto_recovery end |
- (Connection) connection (readonly) Also known as: conn
AMQP connection this channel is part of
152 153 154 |
# File 'lib/amqp/channel.rb', line 152 def connection @connection end |
- (Symbol) status (readonly)
Status of this channel (one of: :opening, :closing, :open, :closed)
157 158 159 |
# File 'lib/amqp/channel.rb', line 157 def status @status end |
Class Method Details
+ (Object) method_missing(meth, *args, &blk)
Allows for calls to all MQ instance methods. This implicitly calls AMQP::Channel.new so that a new channel is allocated for subsequent operations.
1252 1253 1254 |
# File 'lib/amqp/channel.rb', line 1252 def self.method_missing(meth, *args, &blk) self.default.__send__(meth, *args, &blk) end |
+ (Fixnum) next_channel_id
Returns next available channel id. This method is thread safe.
1173 1174 1175 1176 1177 1178 1179 |
# File 'lib/amqp/channel.rb', line 1173 def self.next_channel_id channel_id_mutex.synchronize do self.initialize_channel_id_allocator @int_allocator.allocate end end |
+ (Object) release_channel_id(i)
Releases previously allocated channel id. This method is thread safe.
1187 1188 1189 1190 1191 1192 1193 |
# File 'lib/amqp/channel.rb', line 1187 def self.release_channel_id(i) channel_id_mutex.synchronize do self.initialize_channel_id_allocator @int_allocator.release(i) end end |
+ (Object) reset_channel_id_allocator
Resets channel allocator. This method is thread safe.
1199 1200 1201 1202 1203 1204 1205 |
# File 'lib/amqp/channel.rb', line 1199 def self.reset_channel_id_allocator channel_id_mutex.synchronize do initialize_channel_id_allocator @int_allocator.reset end end |
Instance Method Details
- (Object) acknowledge(delivery_tag, multiple = false)
Acknowledge one or all messages on the channel.
998 999 1000 |
# File 'lib/amqp/channel.rb', line 998 def acknowledge(delivery_tag, multiple = false) super(delivery_tag, multiple) end |
- (Object) auto_recover
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/amqp/channel.rb', line 263 def auto_recover return unless auto_recovering? @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end |
- (Boolean) auto_recovering?
True if this channel uses automatic recovery mode
255 256 257 |
# File 'lib/amqp/channel.rb', line 255 def auto_recovering? @auto_recovery end |
- (Object) close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
Closes AMQP channel.
934 935 936 937 938 939 |
# File 'lib/amqp/channel.rb', line 934 def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) self.status = :closing r = super(reply_code, reply_text, class_id, method_id, &block) r end |
- (Boolean) closing?
927 928 929 |
# File 'lib/amqp/channel.rb', line 927 def closing? self.status == :closing end |
- (Exchange) default_exchange
Returns exchange object with the same name as default (aka unnamed) exchange. Default exchange is a direct exchange and automatically routes messages to queues when routing key matches queue name exactly. This feature is known as “automatic binding” (of queues to default exchange).
Use default exchange when you want to route messages directly to specific queues (queue names are known, you don’t mind this kind of coupling between applications).
438 439 440 |
# File 'lib/amqp/channel.rb', line 438 def default_exchange @default_exchange ||= Exchange.default(self) end |
- (Exchange) direct(name = 'amq.direct', opts = {}, &block)
Defines, intializes and returns a direct Exchange instance.
Learn more about direct exchanges in Exchange class documentation.
385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/amqp/channel.rb', line 385 def direct(name = 'amq.direct', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:direct, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :direct, name, opts, &block)) end end |
- (Exchange) fanout(name = 'amq.fanout', opts = {}, &block)
Defines, intializes and returns a fanout Exchange instance.
Learn more about fanout exchanges in Exchange class documentation.
488 489 490 491 492 493 494 495 496 497 498 499 |
# File 'lib/amqp/channel.rb', line 488 def fanout(name = 'amq.fanout', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:fanout, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :fanout, name, opts, &block)) end end |
- (Object) flow(active = false, &block)
Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.
958 959 960 |
# File 'lib/amqp/channel.rb', line 958 def flow(active = false, &block) super(active, &block) end |
- (Boolean) flow_is_active?
True if flow in this channel is active (messages will be delivered to consumers that use this channel).
965 966 967 |
# File 'lib/amqp/channel.rb', line 965 def flow_is_active? @flow_is_active end |
- (Exchange) headers(name = 'amq.match', opts = {}, &block)
Defines, intializes and returns a headers Exchange instance.
Learn more about headers exchanges in Exchange class documentation.
701 702 703 704 705 706 707 708 709 710 711 712 |
# File 'lib/amqp/channel.rb', line 701 def headers(name = 'amq.match', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:headers, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :headers, name, opts, &block)) end end |
- (Object) on_error(&block)
Defines a callback that will be executed when channel is closed after channel-level exception.
1067 1068 1069 |
# File 'lib/amqp/channel.rb', line 1067 def on_error(&block) super(&block) end |
- (Object) once_open(&block) Also known as: once_opened
Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker). If you need to delay an operation till the moment channel is open, this method is what you are looking for.
Multiple callbacks are supported. If when this moment is called, channel is already open, block is executed immediately.
916 917 918 919 920 921 922 |
# File 'lib/amqp/channel.rb', line 916 def once_open(&block) @channel_is_open_deferrable.callback do # guards against cases when deferred operations # don't complete before the channel is closed block.call if open? end end |
- (Object) open(&block)
Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
Opens AMQP channel.
898 899 900 |
# File 'lib/amqp/channel.rb', line 898 def open(&block) super(&block) end |
- (Boolean) open?
True if channel is not closed.
904 905 906 |
# File 'lib/amqp/channel.rb', line 904 def open? self.status == :opened || self.status == :opening end |
- (Channel) prefetch(count, global = false, &block)
Self
977 978 979 980 981 982 983 984 |
# File 'lib/amqp/channel.rb', line 977 def prefetch(count, global = false, &block) self.once_open do # RabbitMQ as of 2.3.1 does not support prefetch_size. self.qos(0, count, global, &block) end self end |
- (Queue) queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) {|queue, declare_ok| ... }
Declares and returns a Queue instance associated with this channel. See Queue class documentation for more information about queues.
To make broker generate queue name for you (a classic example is exclusive queues that are only used for a short period of time), pass empty string as name value. Then queue will get it’s name as soon as broker’s response (queue.declare-ok) arrives. Note that in this case, block is required.
Like for exchanges, queue names starting with ‘amq.’ cannot be modified and should not be used by applications.
796 797 798 799 800 801 802 803 804 805 806 807 808 809 |
# File 'lib/amqp/channel.rb', line 796 def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.(name, opts, block) validate_parameters_match!(queue, extended_opts, :queue) block.call(queue) if block queue else self.queue!(name, opts, &block) end end |
- (Queue) queue!(name, opts = {}, &block)
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 |
# File 'lib/amqp/channel.rb', line 819 def queue!(name, opts = {}, &block) queue = if block.nil? Queue.new(self, name, opts) else shim = Proc.new { |q, method| if block.arity == 1 block.call(q) else queue = find_queue(method.queue) block.call(queue, method.consumer_count, method.) end } Queue.new(self, name, opts, &shim) end register_queue(queue) end |
- (Channel) recover(requeue = true, &block)
RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
1021 1022 1023 |
# File 'lib/amqp/channel.rb', line 1021 def recover(requeue = true, &block) super(requeue, &block) end |
- (Object) reject(delivery_tag, requeue = true)
Reject a message with given delivery tag.
1008 1009 1010 |
# File 'lib/amqp/channel.rb', line 1008 def reject(delivery_tag, requeue = true) super(delivery_tag, requeue) end |
- (Object) reuse
Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens itself with this new id, releasing the old id after the new one is allocated.
This includes recovery of known exchanges, queues and bindings, exactly the same way as when the client recovers from a network failure.
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/amqp/channel.rb', line 288 def reuse old_id = @id # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = self.class.next_channel_id self.class.release_channel_id(old_id) @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end |
- (RPC) rpc(name, obj = nil)
Instantiates and returns an RPC instance associated with this channel.
The optional object may be a class name, module name or object instance. When given a class or module name, the object is instantiated during this setup. The passed queue is automatically subscribed to so it passes all messages (and their arguments) to the object.
Marshalling and unmarshalling the objects is handled internally. This marshalling is subject to the same restrictions as defined in the [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard library.
When the optional object is not passed, the returned rpc reference is used to send messages and arguments to the queue. See class="caps">RPC#method_missing which does all of the heavy lifting with the proxy. Some client elsewhere must call this method with the optional block so that there is a valid destination. Failure to do so will just enqueue marshalled messages that are never consumed.
875 876 877 |
# File 'lib/amqp/channel.rb', line 875 def rpc(name, obj = nil) RPC.new(self, name, obj) end |
- (Object) rpcs
Returns a hash of all rpc proxy objects.
Most of the time, this method is not called by application code.
886 887 888 |
# File 'lib/amqp/channel.rb', line 886 def rpcs @rpcs.values end |
- (Exchange) topic(name = 'amq.topic', opts = {}, &block)
Defines, intializes and returns a topic Exchange instance.
Learn more about topic exchanges in Exchange class documentation.
599 600 601 602 603 604 605 606 607 608 609 610 |
# File 'lib/amqp/channel.rb', line 599 def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:topic, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :topic, name, opts, &block)) end end |
- (Object) tx_commit(&block)
Commits AMQP transaction.
1043 1044 1045 |
# File 'lib/amqp/channel.rb', line 1043 def tx_commit(&block) super(&block) end |
- (Object) tx_rollback(&block)
Rolls AMQP transaction back.
1050 1051 1052 |
# File 'lib/amqp/channel.rb', line 1050 def tx_rollback(&block) super(&block) end |
- (Object) tx_select(&block)
Sets the channel to use standard transactions. One must use this method at least once on a channel before using #tx_tommit or tx_rollback methods.
1036 1037 1038 |
# File 'lib/amqp/channel.rb', line 1036 def tx_select(&block) super(&block) end |
- (Object) validate_parameters_match!(entity, parameters, type) (protected)
1260 1261 1262 1263 1264 |
# File 'lib/amqp/channel.rb', line 1260 def validate_parameters_match!(entity, parameters, type) unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive] raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters) end end |