Examples:
Use of callback with a single argument
EventMachine.run do
exchange = AMQP::Channel.direct("foo queue")
EM.add_periodic_timer(1) do
exchange.publish("random number #{rand(1000)}")
end
queue = AMQP::Channel.queue('foo queue')
queue.subscribe { |body| puts "received payload [#{body}]" }
end
Use of callback with two arguments
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
puts "Connected to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
channel = AMQP::Channel.new(connection)
queue = channel.queue("amqpgem.examples.hello_world", :auto_delete => true)
exchange = channel.direct("amq.direct")
queue.bind(exchange)
channel.on_error do |ch, channel_close|
puts channel_close.reply_text
connection.close { EventMachine.stop }
end
queue.subscribe do |metadata, payload|
puts "metadata.routing_key : #{metadata.routing_key}"
puts "metadata.content_type: #{metadata.content_type}"
puts "metadata.priority : #{metadata.priority}"
puts "metadata.headers : #{metadata..inspect}"
puts "metadata.timestamp : #{metadata.timestamp.inspect}"
puts "metadata.type : #{metadata.type}"
puts "metadata.delivery_tag: #{metadata.delivery_tag}"
puts "metadata.redelivered : #{metadata.redelivered}"
puts "metadata.app_id : #{metadata.app_id}"
puts "metadata.exchange : #{metadata.exchange}"
puts
puts "Received a message: #{payload}. Disconnecting..."
connection.close {
EventMachine.stop { exit }
}
end
exchange.publish("Hello, world!",
:app_id => "amqpgem.example",
:priority => 8,
:type => "kinda.checkin",
:headers => {
:coordinates => {
:latitude => 59.35,
:longitude => 18.066667
},
:participants => 11,
:venue => "Stockholm"
},
:timestamp => Time.now.to_i)
end
Using object as consumer (message handler), take one
class Consumer
def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING)
@queue_name = queue_name
@channel = channel
@channel.on_error(&method(:handle_channel_exception))
end
def start
@queue = @channel.queue(@queue_name, :exclusive => true)
@queue.subscribe(&method(:handle_message))
end
def handle_message(metadata, payload)
puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
end
def handle_channel_exception(channel, channel_close)
puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end end
Using object as consumer (message handler), take two: aggregatied handler
class Consumer
def handle_message(metadata, payload)
puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
end end
class Worker
def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new)
@queue_name = queue_name
@channel = channel
@channel.on_error(&method(:handle_channel_exception))
@consumer = consumer
end
def start
@queue = @channel.queue(@queue_name, :exclusive => true)
@queue.subscribe(&@consumer.method(:handle_message))
end
def handle_channel_exception(channel, channel_close)
puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end end
Unit-testing objects that are used as consumers, RSpec style
require "ostruct"
require "json"
describe Consumer do
describe "when a new message arrives" do
subject { described_class.new }
let(:metadata) do
o = OpenStruct.new
o.content_type = "application/json"
o
end
let(:payload) { JSON.encode({ :command => "reload_config" }) }
it "does some useful work" do
subject.handle_message(metadata, payload)
end
end
end