Method: AMQP::Queue#subscribe

Defined in:
lib/amqp/queue.rb

#subscribe(opts = {}) {|headers, payload| ... } ⇒ Queue

Subscribes to asynchronous message delivery.

The provided block is passed a single message each time the exchange matches a message to this queue.

Attempts to #subscribe multiple times to the same exchange will raise an Exception. If you need more than one consumer per queue, use Consumer instead. Documentation guide on queues explains this and other topics in great detail.

If the block takes 2 parameters, both the header and the body will be passed in for processing.

Examples:

Use of callback with a single argument


EventMachine.run do
  exchange = AMQP::Channel.direct("foo queue")
  EM.add_periodic_timer(1) do
    exchange.publish("random number #{rand(1000)}")
  end

  queue = AMQP::Channel.queue('foo queue')
  queue.subscribe { |body| puts "received payload [#{body}]" }
end

Use of callback with two arguments


EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."

  channel  = AMQP::Channel.new(connection)
  queue    = channel.queue("amqpgem.examples.hello_world", :auto_delete => true)
  exchange = channel.direct("amq.direct")

  queue.bind(exchange)

  channel.on_error do |ch, channel_close|
    puts channel_close.reply_text
    connection.close { EventMachine.stop }
  end

  queue.subscribe do |, payload|
    puts "metadata.routing_key : #{.routing_key}"
    puts "metadata.content_type: #{.content_type}"
    puts "metadata.priority    : #{.priority}"
    puts "metadata.headers     : #{.headers.inspect}"
    puts "metadata.timestamp   : #{.timestamp.inspect}"
    puts "metadata.type        : #{.type}"
    puts "metadata.delivery_tag: #{.delivery_tag}"
    puts "metadata.redelivered : #{.redelivered}"

    puts "metadata.app_id      : #{.app_id}"
    puts "metadata.exchange    : #{.exchange}"
    puts
    puts "Received a message: #{payload}. Disconnecting..."

    connection.close {
      EventMachine.stop { exit }
    }
  end

  exchange.publish("Hello, world!",
                   :app_id      => "amqpgem.example",
                   :priority    => 8,
                   :type        => "kinda.checkin",
                   # headers table keys can be anything
                   :headers     => {
                     :coordinates => {
                       :latitude  => 59.35,
                       :longitude => 18.066667
                     },
                     :participants => 11,
                     :venue        => "Stockholm"
                   },
                   :timestamp   => Time.now.to_i)
end

Using object as consumer (message handler), take one


class Consumer

  #
  # API
  #

  def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING)
    @queue_name = queue_name

    @channel    = channel
    # Consumer#handle_channel_exception will handle channel
    # exceptions. Keep in mind that you can only register one error handler,
    # so the last one registered "wins".
    @channel.on_error(&method(:handle_channel_exception))
  end # initialize

  def start
    @queue = @channel.queue(@queue_name, :exclusive => true)
    # #handle_message method will be handling messages routed to @queue
    @queue.subscribe(&method(:handle_message))
  end # start

  #
  # Implementation
  #

  def handle_message(, payload)
    puts "Received a message: #{payload}, content_type = #{.content_type}"
  end # handle_message(metadata, payload)

  def handle_channel_exception(channel, channel_close)
    puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
  end # handle_channel_exception(channel, channel_close)
end

Using object as consumer (message handler), take two: aggregatied handler

class Consumer

  #
  # API
  #

  def handle_message(, payload)
    puts "Received a message: #{payload}, content_type = #{.content_type}"
  end # handle_message(metadata, payload)
end

class Worker

  #
  # API
  #

  def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new)
    @queue_name = queue_name

    @channel    = channel
    @channel.on_error(&method(:handle_channel_exception))

    @consumer   = consumer
  end # initialize

  def start
    @queue = @channel.queue(@queue_name, :exclusive => true)
    @queue.subscribe(&@consumer.method(:handle_message))
  end # start

  #
  # Implementation
  #

  def handle_channel_exception(channel, channel_close)
    puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
  end # handle_channel_exception(channel, channel_close)
end

Unit-testing objects that are used as consumers, RSpec style


require "ostruct"
require "json"

# RSpec example
describe Consumer do
  describe "when a new message arrives" do
    subject { described_class.new }

    let(:metadata) do
      o = OpenStruct.new

      o.content_type = "application/json"
      o
    end
    let(:payload)  { JSON.encode({ :command => "reload_config" }) }

    it "does some useful work" do
      # check preconditions here if necessary

      subject.handle_message(, payload)

      # add your code expectations here
    end
  end
end

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :ack (Boolean) — default: false

    If this field is set to false the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

  • :nowait (Boolean) — default: false

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

  • :confirm (#call) — default: nil

    If set, this proc will be called when the server confirms subscription to the queue with a basic.consume-ok message. Setting this option will automatically set :nowait => false. This is required for the server to send a confirmation.

  • :exclusive (Boolean) — default: false

    Request exclusive consumer access, meaning only this consumer can access the queue. This is useful when you want a long-lived shared queue to be temporarily accessible by just one application (or thread, or process). If application exclusive consumer is part of crashes or loses network connection to the broker, channel is closed and exclusive consumer is thus cancelled.

Yields:

  • (headers, payload)

    When block only takes one argument, yields payload to it. In case of two arguments, yields headers and payload.

Yield Parameters:

  • headers (AMQP::Header)

    Headers (metadata) associated with this message (for example, routing key).

  • payload (String)

    Message body (content). On Ruby 1.9, you may want to check or enforce content encoding.

Returns:

Raises:

  • (RuntimeError)

See Also:



748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
# File 'lib/amqp/queue.rb', line 748

def subscribe(opts = {}, &block)
  raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQP::Consumer directly to register additional consumers.") if @default_consumer

  opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])

  @channel.once_open do
    self.once_name_is_available do
      # guards against a pathological case race condition when a channel
      # is opened and closed before delayed operations are completed.
      self.basic_consume(!opts[:ack], opts[:exclusive], (opts[:nowait] || block.nil?), opts[:no_local], nil, &opts[:confirm])

      self.on_delivery(&block)
    end
  end

  self
end