Class: AMQP::Channel

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers, RegisterEntityMixin
Includes:
Entity
Defined in:
lib/amqp/channel.rb

Overview

What are AMQP channels

To quote AMQP 0.9.1 specification:

AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

Opening a channel

Channels are opened asynchronously. There are two ways to do it: using a callback or pseudo-synchronous mode.

Unless your application needs multiple channels, this approach is recommended. Alternatively, AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, however, it can be used as if it was a synchronous, blocking method:

Even though in the example above channel isn’t immediately open, it is safe to declare exchanges using it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration and other operations on exchanges and queues. Library methods that rely on channel being open will be enqueued and executed in a FIFO manner when broker confirms channel opening. Note, however, that this “pseudo-synchronous mode” is easy to abuse and introduce race conditions AMQP gem cannot resolve for you. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.

Key methods

Key methods of Channel class are

refer to documentation for those methods for usage examples.

Channel provides a number of convenience methods that instantiate queues and exchanges of various types associated with this channel:

Error handling

It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:

When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application needs.

Closing a channel

Channels are opened when objects is instantiated and closed using #close method when application no longer needs it.

RabbitMQ extensions.

AMQP gem supports several RabbitMQ extensions that extend Channel functionality. Learn more in VendorSpecificExtensions

Examples:

Opening a channel with a callback

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  AMQP::Channel.new(client) do |channel, open_ok|
    # when this block is executed, channel is open and ready for use
  end
end

Instantiating a channel that will be open eventually

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  channel  = AMQP::Channel.new(client)
  exchange = channel.default_exchange

  # ...
end

Queue declaration with incompatible attributes results in a channel-level exception

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    puts "Channel ##{channel.id} is now open!"

    channel.on_error do |ch, close|
      puts "Handling channel-level exception"

      connection.close {
        EM.stop { exit }
      }
    end

    EventMachine.add_timer(0.4) do
      # these two definitions result in a race condition. For sake of this example,
      # however, it does not matter. Whatever definition succeeds first, 2nd one will
      # cause a channel-level exception (because attributes are not identical)
      AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
        puts "#{queue.name} is ready to go"
      end

      AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
        puts "#{queue.name} is ready to go"
      end
    end
  end
end

Closing a channel your application no longer needs

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  AMQP::Channel.new(client) do |channel, open_ok|
    channel.close do |close_ok|
      # when this block is executed, channel is successfully closed
    end
  end
end

See Also:

Constant Summary

DEFAULT_REPLY_TEXT =
"Goodbye".freeze
RECOVERY_EVENTS =
[:after_connection_interruption, :before_recovery, :after_recovery].freeze

Constants included from Openable

Openable::VALUES

Instance Attribute Summary (collapse)

Attributes included from Entity

#callbacks

Declaring exchanges (collapse)

Declaring queues (collapse)

Channel lifecycle (collapse)

QoS and flow handling (collapse)

Message acknowledgements (collapse)

Transactions (collapse)

Error handling (collapse)

Publisher Confirms (collapse)

Instance Method Summary (collapse)

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from RegisterEntityMixin

register_entity

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #opened!, #opened?, #opening!, #opening?

Constructor Details

- (Channel) initialize(connection = nil, id = nil, options = {}) {|channel, open_ok| ... }

Returns a new instance of Channel

Examples:

Instantiating a channel for default connection (accessible as AMQP.connection)


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # channel is ready: set up your messaging flow by creating exchanges,
    # queues, binding them together and so on.
  end
end

Instantiating a channel for explicitly given connection


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # ...
  end
end

Instantiating a channel with a :prefetch option


AMQP.connect do |connection|
  AMQP::Channel.new(connection, :prefetch => 5) do |channel, open_ok|
    # ...
  end
end

Options Hash (options):

  • :prefetch (Boolean) — default: nil

    Specifies number of messages to prefetch. Channel-specific. See #prefetch.

  • :auto_recovery (Boolean) — default: nil

    Turns on automatic network failure recovery mode for this channel.

Yields:

  • (channel, open_ok)

    Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.

Yield Parameters:

  • channel (Channel)

    Channel that is successfully open

  • open_ok (AMQP::Protocol::Channel::OpenOk)

    AMQP channel.open-ok) instance

See Also:



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/amqp/channel.rb', line 233

def initialize(connection = nil, id = nil, options = {}, &block)
  raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?

  @connection = connection || AMQP.connection || AMQP.start
  # this means 2nd argument is options
  if id.kind_of?(Hash)
    options = options.merge(id)
    id      = @connection.next_channel_id
  end

  super(@connection)

  @id        = id || @connection.next_channel_id
  @exchanges = Hash.new
  @queues    = Hash.new
  @consumers = Hash.new
  @options       = { :auto_recovery => @connection.auto_recovering? }.merge(options)
  @auto_recovery = (!!@options[:auto_recovery])

  # we must synchronize frameset delivery. MK.
  @mutex     = Mutex.new

  reset_state!

  # 65536 is here for cases when channel is opened without passing a callback in,
  # otherwise channel_mix would be nil and it causes a lot of needless headaches.
  # lets just have this default. MK.
  channel_max = if @connection.open?
                  @connection.channel_max || 65536
                else
                  65536
                end

  if channel_max != 0 && !(0..channel_max).include?(@id)
    raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}")
  end

  # we need this deferrable to mimic what AMQP gem 0.7 does to enable
  # the following (pseudo-synchronous) style of programming some people use in their
  # existing codebases:
  #
  # connection = AMQP.connect
  # channel    = AMQP::Channel.new(connection)
  # queue      = AMQP::Queue.new(channel)
  #
  # ...
  #
  # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
  @channel_is_open_deferrable = AMQP::Deferrable.new

  @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}

  # only send channel.open when connection is actually open. Makes it possible to
  # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
  @connection.on_connection do
    self.open do |ch, open_ok|
      @channel_is_open_deferrable.succeed

      if block
        case block.arity
        when 1 then block.call(ch)
        else block.call(ch, open_ok)
        end # case
      end # if

      self.prefetch(@options[:prefetch], false) if @options[:prefetch]
    end # self.open
  end # @connection.on_open
end

Instance Attribute Details

- (Boolean) auto_recovery

Returns true if this channel is in automatic recovery mode

See Also:



305
306
307
# File 'lib/amqp/channel.rb', line 305

def auto_recovery
  @auto_recovery
end

- (AMQP::Connection) connection (readonly)

AMQP connection this channel belongs to.



164
165
166
# File 'lib/amqp/channel.rb', line 164

def connection
  @connection
end

- (Object) consumers_awaiting_cancel_ok (readonly)

Returns the value of attribute consumers_awaiting_cancel_ok



177
178
179
# File 'lib/amqp/channel.rb', line 177

def consumers_awaiting_cancel_ok
  @consumers_awaiting_cancel_ok
end

- (Object) consumers_awaiting_consume_ok (readonly)

Returns the value of attribute consumers_awaiting_consume_ok



177
178
179
# File 'lib/amqp/channel.rb', line 177

def consumers_awaiting_consume_ok
  @consumers_awaiting_consume_ok
end

- (Object) exchanges_awaiting_bind_ok (readonly)

Returns the value of attribute exchanges_awaiting_bind_ok



175
176
177
# File 'lib/amqp/channel.rb', line 175

def exchanges_awaiting_bind_ok
  @exchanges_awaiting_bind_ok
end

- (Object) exchanges_awaiting_declare_ok (readonly)

Returns the value of attribute exchanges_awaiting_declare_ok



175
176
177
# File 'lib/amqp/channel.rb', line 175

def exchanges_awaiting_declare_ok
  @exchanges_awaiting_declare_ok
end

- (Object) exchanges_awaiting_delete_ok (readonly)

Returns the value of attribute exchanges_awaiting_delete_ok



175
176
177
# File 'lib/amqp/channel.rb', line 175

def exchanges_awaiting_delete_ok
  @exchanges_awaiting_delete_ok
end

- (Object) exchanges_awaiting_unbind_ok (readonly)

Returns the value of attribute exchanges_awaiting_unbind_ok



175
176
177
# File 'lib/amqp/channel.rb', line 175

def exchanges_awaiting_unbind_ok
  @exchanges_awaiting_unbind_ok
end

- (Object) flow_is_active

Returns the value of attribute flow_is_active



179
180
181
# File 'lib/amqp/channel.rb', line 179

def flow_is_active
  @flow_is_active
end

- (Object) id (readonly)

Returns the value of attribute id



173
174
175
# File 'lib/amqp/channel.rb', line 173

def id
  @id
end

- (Integer) publisher_index

Publisher index is an index of the last message since the confirmations were activated, started with 0. It’s incremented by 1 every time a message is published. This is done on both client and server, hence this acknowledged messages can be matched via its delivery-tag.



1289
1290
1291
# File 'lib/amqp/channel.rb', line 1289

def publisher_index
  @publisher_index ||= 0
end

- (Object) queues_awaiting_bind_ok (readonly)

Returns the value of attribute queues_awaiting_bind_ok



176
177
178
# File 'lib/amqp/channel.rb', line 176

def queues_awaiting_bind_ok
  @queues_awaiting_bind_ok
end

- (Object) queues_awaiting_declare_ok (readonly)

Returns the value of attribute queues_awaiting_declare_ok



176
177
178
# File 'lib/amqp/channel.rb', line 176

def queues_awaiting_declare_ok
  @queues_awaiting_declare_ok
end

- (Object) queues_awaiting_delete_ok (readonly)

Returns the value of attribute queues_awaiting_delete_ok



176
177
178
# File 'lib/amqp/channel.rb', line 176

def queues_awaiting_delete_ok
  @queues_awaiting_delete_ok
end

- (Object) queues_awaiting_get_response (readonly)

Returns the value of attribute queues_awaiting_get_response



176
177
178
# File 'lib/amqp/channel.rb', line 176

def queues_awaiting_get_response
  @queues_awaiting_get_response
end

- (Object) queues_awaiting_purge_ok (readonly)

Returns the value of attribute queues_awaiting_purge_ok



176
177
178
# File 'lib/amqp/channel.rb', line 176

def queues_awaiting_purge_ok
  @queues_awaiting_purge_ok
end

- (Object) queues_awaiting_unbind_ok (readonly)

Returns the value of attribute queues_awaiting_unbind_ok



176
177
178
# File 'lib/amqp/channel.rb', line 176

def queues_awaiting_unbind_ok
  @queues_awaiting_unbind_ok
end

- (Symbol) status (readonly)

Status of this channel (one of: :opening, :closing, :open, :closed)



169
170
171
# File 'lib/amqp/channel.rb', line 169

def status
  @status
end

Instance Method Details

- (Object) acknowledge(delivery_tag, multiple = false)

Acknowledge one or all messages on the channel.



1021
1022
1023
1024
1025
# File 'lib/amqp/channel.rb', line 1021

def acknowledge(delivery_tag, multiple = false)
  @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple))

  self
end

- (Object) auto_recover

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/amqp/channel.rb', line 316

def auto_recover
  return unless auto_recovering?

  @channel_is_open_deferrable.fail
  @channel_is_open_deferrable = AMQP::Deferrable.new

  self.open do
    @channel_is_open_deferrable.succeed

    # re-establish prefetch
    self.prefetch(@options[:prefetch], false) if @options[:prefetch]

    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end

- (Boolean) auto_recovering?



308
309
310
# File 'lib/amqp/channel.rb', line 308

def auto_recovering?
  @auto_recovery
end

- (Object) before_recovery(&block)

Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



1247
1248
1249
# File 'lib/amqp/channel.rb', line 1247

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

- (Object) close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)

Closes AMQP channel.



950
951
952
953
954
955
956
957
# File 'lib/amqp/channel.rb', line 950

def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
  self.once_open do
    self.status = :closing
    @connection.send_frame(AMQ::Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id))

    self.redefine_callback :close, &block
  end
end

- (Boolean) closing?



943
944
945
# File 'lib/amqp/channel.rb', line 943

def closing?
  self.status == :closing
end

- (Object) confirm_select(nowait = false, &block)



1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
# File 'lib/amqp/channel.rb', line 1119

def confirm_select(nowait = false, &block)
  self.once_open do
          if nowait && block
    raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
  end

  @uses_publisher_confirmations = true
  reset_publisher_index!

  self.redefine_callback(:confirm_select, &block) unless nowait
  self.redefine_callback(:after_publish) do
    increment_publisher_index!
  end
  @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, nowait))

  self
  end
end

- (Connection) conn

AMQP connection this channel is part of



165
166
167
# File 'lib/amqp/channel.rb', line 165

def connection
  @connection
end

- (Hash<String, Consumer>) consumers



1183
1184
1185
# File 'lib/amqp/channel.rb', line 1183

def consumers
  @consumers
end

- (Exchange) default_exchange

Returns exchange object with the same name as default (aka unnamed) exchange. Default exchange is a direct exchange and automatically routes messages to queues when routing key matches queue name exactly. This feature is known as “automatic binding” (of queues to default exchange).

Use default exchange when you want to route messages directly to specific queues (queue names are known, you don’t mind this kind of coupling between applications).

Examples:

Using default exchange to publish messages to queues with known names

AMQP.start(:host => 'localhost') do |connection|
  ch        = AMQP::Channel.new(connection)

  queue1    = ch.queue("queue1").subscribe do |payload|
    puts "[#{queue1.name}] => #{payload}"
  end
  queue2    = ch.queue("queue2").subscribe do |payload|
    puts "[#{queue2.name}] => #{payload}"
  end
  queue3    = ch.queue("queue3").subscribe do |payload|
    puts "[#{queue3.name}] => #{payload}"
  end
  queues    = [queue1, queue2, queue3]

  # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec.
  exchange = AMQP::Exchange.default
  EM.add_periodic_timer(1) do
    q = queues.sample

    exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name
  end
end

See Also:



491
492
493
# File 'lib/amqp/channel.rb', line 491

def default_exchange
  @default_exchange ||= Exchange.default(self)
end

- (Exchange) direct(name = 'amq.direct', opts = {}, &block)

Defines, intializes and returns a direct Exchange instance.

Learn more about direct exchanges in Exchange class documentation.

Examples:

Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style)


# an exchange application A will be using to publish updates
# to some search index
exchange = channel.direct("index.updates")

# In the same (or different) process declare a queue that broker will
# generate name for, bind it to aforementioned exchange using method chaining
queue    = channel.queue("").
                   # queue will be receiving messages that were published with
                   # :routing_key attribute value of "search.index.updates"
                   bind(exchange, :routing_key => "search.index.updates").
                   # register a callback that will be run when messages arrive
                   subscribe { |header, message| puts("Received #{message}") }

# now publish a new document contents for indexing,
# message will be delivered to the queue we declared and bound on the line above
exchange.publish(document.content, :routing_key => "search.index.updates")

Instantiating a direct exchange using #direct with a callback


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    channel.direct("email.replies_listener") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

See Also:



438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/amqp/channel.rb', line 438

def direct(name = 'amq.direct', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:direct, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :direct, name, opts, &block))
  end
end

- (Array<Exchange>) exchanges



1193
1194
1195
# File 'lib/amqp/channel.rb', line 1193

def exchanges
  @exchanges.values
end

- (Exchange) fanout(name = 'amq.fanout', opts = {}, &block)

Defines, intializes and returns a fanout Exchange instance.

Learn more about fanout exchanges in Exchange class documentation.

Examples:

Using fanout exchange to deliver messages to multiple consumers


# open up a channel
# declare a fanout exchange
# declare 3 queues, binds them
# publish a message

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

See Also:



541
542
543
544
545
546
547
548
549
550
551
552
# File 'lib/amqp/channel.rb', line 541

def fanout(name = 'amq.fanout', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:fanout, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :fanout, name, opts, &block))
  end
end

- (AMQP::Exchange) find_exchange(name)

Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.



1398
1399
1400
# File 'lib/amqp/channel.rb', line 1398

def find_exchange(name)
  @exchanges[name]
end

- (Object) flow(active = false, &block)

Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flow­control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.



976
977
978
979
980
981
# File 'lib/amqp/channel.rb', line 976

def flow(active = false, &block)
  @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))

  self.redefine_callback :flow, &block
  self
end

- (Boolean) flow_is_active?

Returns True if flow in this channel is active (messages will be delivered to consumers that use this channel).



986
987
988
# File 'lib/amqp/channel.rb', line 986

def flow_is_active?
  @flow_is_active
end

- (Object) handle_basic_ack(method)

Handler for Basic.Ack. By default, it just executes hook specified via the #confirm method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Ack).



1365
1366
1367
# File 'lib/amqp/channel.rb', line 1365

def handle_basic_ack(method)
  self.exec_callback(:ack, method)
end

- (Object) handle_basic_nack(method)

Handler for Basic.Nack. By default, it just executes hook specified via the #confirm_failed method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Nack).



1376
1377
1378
# File 'lib/amqp/channel.rb', line 1376

def handle_basic_nack(method)
  self.exec_callback(:nack, method)
end

- (Object) handle_select_ok(method)

Handler for Confirm.Select-Ok. By default, it just executes hook specified via the #confirmations method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Confirm::SelectOk) and then it deletes the callback, since Confirm.Select is supposed to be sent just once.



1355
1356
1357
# File 'lib/amqp/channel.rb', line 1355

def handle_select_ok(method)
  self.exec_callback_once(:confirm_select, method)
end

- (Exchange) headers(name = 'amq.match', opts = {}, &block)

Defines, intializes and returns a headers Exchange instance.

Learn more about headers exchanges in Exchange class documentation.

Examples:

Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)


puts "=> Headers routing example"
puts
AMQP.start do |connection|
  channel   = AMQP::Channel.new(connection)
  channel.on_error do |ch, channel_close|
    puts "A channel-level exception: #{channel_close.inspect}"
  end

  exchange = channel.headers("amq.match", :durable => true)

  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |, payload|
    puts "[linux/x64] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |, payload|
    puts "[linux/x32] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |, payload|
    puts "[linux] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |, payload|
    puts "[macosx|octocore] Got a message: #{payload}"
  end

  EventMachine.add_timer(0.5) do
    exchange.publish "For linux/x64",   :headers => { :arch => "x64", :os => 'linux' }
    exchange.publish "For linux/x32",   :headers => { :arch => "x32", :os => 'linux' }
    exchange.publish "For linux",       :headers => { :os => 'linux'  }
    exchange.publish "For OS X",        :headers => { :os => 'macosx' }
    exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' }
    exchange.publish "For ocotocore",   :headers => { :cores => 8  }
  end

  show_stopper = Proc.new do
    $stdout.puts "Stopping..."
    connection.close {
      EventMachine.stop { exit }
    }
  end

  Signal.trap "INT", show_stopper
  EventMachine.add_timer(2, show_stopper)
end

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

See Also:



754
755
756
757
758
759
760
761
762
763
764
765
# File 'lib/amqp/channel.rb', line 754

def headers(name = 'amq.match', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:headers, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :headers, name, opts, &block))
  end
end

- (Object) increment_publisher_index!

This method is executed after publishing of each message via Exchage#publish. Currently it just increments publisher index by 1, so messages can be actually matched.



1306
1307
1308
# File 'lib/amqp/channel.rb', line 1306

def increment_publisher_index!
  @publisher_index += 1
end

- (self) on_ack(nowait = false) {|basick_ack| ... }

Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.

Yields:

  • (basick_ack)

    Callback which will be executed every time we receive Basic.Ack from the broker.

Yield Parameters:

  • basick_ack (AMQ::Protocol::Basic::Ack)

    Protocol method class instance.

Raises:

  • (RuntimeError)

    Occurs when confirmations are already activated.

  • (RuntimeError)

    Occurs when nowait is true and block is given.



1327
1328
1329
1330
1331
# File 'lib/amqp/channel.rb', line 1327

def on_ack(nowait = false, &block)
  self.define_callback(:ack, &block) if block

  self
end

- (Object) on_connection_interruption(&block) Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



1236
1237
1238
# File 'lib/amqp/channel.rb', line 1236

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

- (Object) on_error(&block)

Defines a callback that will be executed when channel is closed after channel-level exception.



1110
1111
1112
# File 'lib/amqp/channel.rb', line 1110

def on_error(&block)
  self.define_callback(:error, &block)
end

- (self) on_nack(&block)

Register error callback for Basic.Nack. It’s called when message(s) is rejected.



1338
1339
1340
1341
1342
# File 'lib/amqp/channel.rb', line 1338

def on_nack(&block)
  self.define_callback(:nack, &block) if block

  self
end

- (Object) on_recovery(&block) Also known as: after_recovery

Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).



1265
1266
1267
# File 'lib/amqp/channel.rb', line 1265

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

- (Object) once_open(&block) Also known as: once_opened

Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker). If you need to delay an operation till the moment channel is open, this method is what you are looking for.

Multiple callbacks are supported. If when this moment is called, channel is already open, block is executed immediately.



932
933
934
935
936
937
938
# File 'lib/amqp/channel.rb', line 932

def once_open(&block)
  @channel_is_open_deferrable.callback do
    # guards against cases when deferred operations
    # don't complete before the channel is closed
    block.call if open?
  end
end

- (Object) open(&block) Also known as: reopen

Note:

Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.

Opens AMQP channel.



908
909
910
911
912
913
914
# File 'lib/amqp/channel.rb', line 908

def open(&block)
  @connection.send_frame(AMQ::Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING))
  @connection.channels[@id] = self
  self.status = :opening

  self.redefine_callback :open, &block
end

- (Boolean) open?

Returns true if channel is not closed.



920
921
922
# File 'lib/amqp/channel.rb', line 920

def open?
  self.status == :opened || self.status == :opening
end

- (Channel) prefetch(count, global = false, &block)

Returns self



998
999
1000
1001
1002
1003
1004
1005
1006
1007
# File 'lib/amqp/channel.rb', line 998

def prefetch(count, global = false, &block)
  self.once_open do
    # RabbitMQ does not support prefetch_size.
    self.qos(0, count, global, &block)

    @options[:prefetch] = count
  end

  self
end

- (Object) qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)

Note:

RabbitMQ as of 2.3.1 does not support prefetch_size.

Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.



1218
1219
1220
1221
1222
1223
# File 'lib/amqp/channel.rb', line 1218

def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block)
  @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global))

  self.redefine_callback :qos, &block
  self
end

- (Queue) queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) {|queue, declare_ok| ... }

Declares and returns a Queue instance associated with this channel. See Queue class documentation for more information about queues.

To make broker generate queue name for you (a classic example is exclusive queues that are only used for a short period of time), pass empty string as name value. Then queue will get it’s name as soon as broker’s response (queue.declare-ok) arrives. Note that in this case, block is required.

Like for exchanges, queue names starting with ‘amq.’ cannot be modified and should not be used by applications.

Examples:

Declaring a queue in a mail delivery app using Channel#queue without a block

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |ch|
    # message producers will be able to send messages to this queue
    # using direct exchange and routing key = "mail.delivery"
    queue = ch.queue("mail.delivery", :durable => true)
    queue.subscribe do |headers, payload|
      # ...
    end
  end
end

Declaring a server-named exclusive queue that receives all messages related to events, using a block.

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |ch|
    # message producers will be able to send messages to this queue
    # using amq.topic exchange with routing keys that begin with "events"
    ch.queue("", :exclusive => true) do |queue|
      queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload|
        # ...
      end
    end
  end
end

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM). Any remaining messages in the queue will be purged when the queue is deleted regardless of the message’s persistence setting.

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :exclusive (Boolean) — default: false

    Exclusive queues may only be used by a single connection. Exclusivity also implies that queue is automatically deleted when connection is closed. Only one consumer is allowed to remove messages from exclusive queue.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Yields:

  • (queue, declare_ok)

    Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional.

Yield Parameters:

  • queue (Queue)

    Queue that is successfully declared and is ready to be used.

  • declare_ok (AMQP::Protocol::Queue::DeclareOk)

    AMQP queue.declare-ok) instance.

Raises:

  • (ArgumentError)

See Also:



849
850
851
852
853
854
855
856
857
858
859
860
861
862
# File 'lib/amqp/channel.rb', line 849

def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
  raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?

  if name && !name.empty? && (queue = find_queue(name))
    extended_opts = Queue.add_default_options(name, opts, block)

    validate_parameters_match!(queue, extended_opts, :queue)

    block.call(queue) if block
    queue
  else
    self.queue!(name, opts, &block)
  end
end

- (Queue) queue!(name, opts = {}, &block)

Same as #queue but when queue with the same name already exists in this channel object’s cache, this method will replace existing queue with a newly defined one. Consider using #queue instead.

See Also:



872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
# File 'lib/amqp/channel.rb', line 872

def queue!(name, opts = {}, &block)
  queue = if block.nil?
            Queue.new(self, name, opts)
          else
            shim = Proc.new { |q, method|
      if block.arity == 1
        block.call(q)
      else
        queue = find_queue(method.queue)
        block.call(queue, method.consumer_count, method.message_count)
      end
    }
            Queue.new(self, name, opts, &shim)
          end

  register_queue(queue)
end

- (Array<Queue>) queues



893
894
895
# File 'lib/amqp/channel.rb', line 893

def queues
  @queues
end

- (Channel) recover(requeue = true, &block)

Note:

RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.

Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.



1052
1053
1054
1055
1056
1057
# File 'lib/amqp/channel.rb', line 1052

def recover(requeue = true, &block)
  @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))

  self.redefine_callback :recover, &block
  self
end

- (Object) register_exchange(exchange)

Implementation

Raises:

  • (ArgumentError)


1386
1387
1388
1389
1390
# File 'lib/amqp/channel.rb', line 1386

def register_exchange(exchange)
  raise ArgumentError, "argument is nil!" if exchange.nil?

  @exchanges[exchange.name] = exchange
end

- (Object) reject(delivery_tag, requeue = true, multi = false)

Reject a message with given delivery tag.



1033
1034
1035
1036
1037
1038
1039
1040
1041
# File 'lib/amqp/channel.rb', line 1033

def reject(delivery_tag, requeue = true, multi = false)
  if multi
    @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue))
  else
    @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue))
  end

  self
end

- (Object) reset_publisher_index!

Resets publisher index to 0



1296
1297
1298
# File 'lib/amqp/channel.rb', line 1296

def reset_publisher_index!
  @publisher_index = 0
end

- (Object) reuse

Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens itself with this new id, releasing the old id after the new one is allocated.

This includes recovery of known exchanges, queues and bindings, exactly the same way as when the client recovers from a network failure.



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/amqp/channel.rb', line 341

def reuse
  old_id = @id
  # must release after we allocate a new id, otherwise we will end up
  # with the same value. MK.
  @id    = self.class.next_channel_id
  @connection.release_channel_id(old_id)

  @channel_is_open_deferrable.fail
  @channel_is_open_deferrable = AMQP::Deferrable.new

  self.open do
    @channel_is_open_deferrable.succeed

    # re-establish prefetch
    self.prefetch(@options[:prefetch], false) if @options[:prefetch]

    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end

- (Object) synchronize(&block)

Synchronizes given block using this channel’s mutex.



1207
1208
1209
# File 'lib/amqp/channel.rb', line 1207

def synchronize(&block)
  @mutex.synchronize(&block)
end

- (Exchange) topic(name = 'amq.topic', opts = {}, &block)

Defines, intializes and returns a topic Exchange instance.

Learn more about topic exchanges in Exchange class documentation.

Examples:

Using topic exchange to deliver relevant news updates

AMQP.connect do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("pub/sub")

  # Subscribers.
  channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload|
    puts "A new dev post: '#{payload}'"
  end
  channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload|
    puts "A new post about Ruby: '#{payload}'"
  end

  # Let's publish some data.
  exchange.publish "Ruby post",     :routing_key => "technology.dev.ruby"
  exchange.publish "Erlang post",   :routing_key => "technology.dev.erlang"
  exchange.publish "Sinatra post",  :routing_key => "technology.web.ruby"
  exchange.publish "Jewelery post", :routing_key => "jewelery.ruby"
end

Using topic exchange to deliver geographically-relevant data

AMQP.connect do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("pub/sub")

  # Subscribers.
  channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
    puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
    puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
    puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
    puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
    puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
    puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
  end

  exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
    publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
    publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
    publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
    publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
    publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
    publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
    publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
    publish("Rome update",             :routing_key => "europe.italy.roma").
    publish("Paris update",            :routing_key => "europe.france.paris")
end

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

See Also:



652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/amqp/channel.rb', line 652

def topic(name = 'amq.topic', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:topic, name, opts, block)

    validate_parameters_match!(exchange, extended_opts, :exchange)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :topic, name, opts, &block))
  end
end

- (Object) tx_commit(&block)

Commits AMQP transaction.



1080
1081
1082
1083
1084
1085
# File 'lib/amqp/channel.rb', line 1080

def tx_commit(&block)
  @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))

  self.redefine_callback :tx_commit, &block
  self
end

- (Object) tx_rollback(&block)

Rolls AMQP transaction back.



1090
1091
1092
1093
1094
1095
# File 'lib/amqp/channel.rb', line 1090

def tx_rollback(&block)
  @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))

  self.redefine_callback :tx_rollback, &block
  self
end

- (Object) tx_select(&block)

Sets the channel to use standard transactions. One must use this method at least once on a channel before using #tx_tommit or tx_rollback methods.



1070
1071
1072
1073
1074
1075
# File 'lib/amqp/channel.rb', line 1070

def tx_select(&block)
  @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))

  self.redefine_callback :tx_select, &block
  self
end

- (Boolean) uses_publisher_confirmations?



1311
1312
1313
# File 'lib/amqp/channel.rb', line 1311

def uses_publisher_confirmations?
  @uses_publisher_confirmations
end

- (Object) validate_parameters_match!(entity, parameters, type) (protected)



1550
1551
1552
1553
1554
# File 'lib/amqp/channel.rb', line 1550

def validate_parameters_match!(entity, parameters, type)
  unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive]
    raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
  end
end