Class: Bunny::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/queue.rb

Overview

Represents AMQP 0.9.1 queue.

See Also:

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Queue) initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})

Returns a new instance of Queue

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto_delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/bunny/queue.rb', line 39

def initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})
  # old Bunny versions pass a connection here. In that case,
  # we just use default channel from it. MK.
  @channel          = channel_from(channel_or_connection)
  @name             = name
  @options          = self.class.add_default_options(name, opts)
  @consumers        = Hash.new

  @durable          = @options[:durable]
  @exclusive        = @options[:exclusive]
  @server_named     = @name.empty?
  @auto_delete      = @options[:auto_delete]
  @arguments        = @options[:arguments]

  @bindings         = Array.new

  @default_consumer = nil

  declare! unless opts[:no_declare]

  @channel.register_queue(self)
end

Instance Attribute Details

- (Bunny::Channel) channel (readonly)



19
20
21
# File 'lib/bunny/queue.rb', line 19

def channel
  @channel
end

- (String) name (readonly)



21
22
23
# File 'lib/bunny/queue.rb', line 21

def name
  @name
end

- (Hash) options (readonly)



23
24
25
# File 'lib/bunny/queue.rb', line 23

def options
  @options
end

Instance Method Details

- (Hash) arguments

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins)



92
93
94
# File 'lib/bunny/queue.rb', line 92

def arguments
  @arguments
end

- (Boolean) auto_delete?

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).



79
80
81
# File 'lib/bunny/queue.rb', line 79

def auto_delete?
  @auto_delete
end

- (Object) bind(exchange, opts = {})

Binds queue to an exchange

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/bunny/queue.rb', line 107

def bind(exchange, opts = {})
  @channel.queue_bind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end

- (Integer) consumer_count



308
309
310
311
# File 'lib/bunny/queue.rb', line 308

def consumer_count
  s = self.status
  s[:consumer_count]
end

- (Object) delete(opts = {})

Deletes the queue

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

See Also:



278
279
280
281
# File 'lib/bunny/queue.rb', line 278

def delete(opts = {})
  @channel.deregister_queue(self)
  @channel.queue_delete(@name, opts)
end

- (Boolean) durable?

Returns true if this queue was declared as durable (will survive broker restart).



65
66
67
# File 'lib/bunny/queue.rb', line 65

def durable?
  @durable
end

- (Boolean) exclusive?

Returns true if this queue was declared as exclusive (limited to just one consumer)



72
73
74
# File 'lib/bunny/queue.rb', line 72

def exclusive?
  @exclusive
end

- (Integer) message_count



302
303
304
305
# File 'lib/bunny/queue.rb', line 302

def message_count
  s = self.status
  s[:message_count]
end

- (Array) pop(opts = {:ack => false}, &block) Also known as: get

Returns Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

Examples:

conn = Bunny.new
conn.start

ch   = conn.create_channel
q = ch.queue("test1")
x = ch.default_exchange
x.publish("Hello, everybody!", :routing_key => 'test1')

delivery_info, properties, payload = q.pop

puts "This is the message: " + payload + "\n\n"
conn.close

Options Hash (opts):

  • :ack (Boolean) — default: false

    Will the message be acknowledged manually?

See Also:



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/bunny/queue.rb', line 232

def pop(opts = {:ack => false}, &block)
  get_response, properties, content = @channel.basic_get(@name, opts)

  if block
    if properties
      di = GetResponse.new(get_response, properties, @channel)
      mp = MessageProperties.new(properties)

      block.call(di, mp, content)
    else
      block.call(nil, nil, nil)
    end
  else
    if properties
      di = GetResponse.new(get_response, properties, @channel)
      mp = MessageProperties.new(properties)
      [di, mp, content]
    else
      [nil, nil, nil]
    end
  end
end

- (Object) publish(payload, opts = {})

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



262
263
264
265
266
# File 'lib/bunny/queue.rb', line 262

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

- (Object) purge(opts = {})

Purges a queue (removes all messages from it)



286
287
288
289
290
# File 'lib/bunny/queue.rb', line 286

def purge(opts = {})
  @channel.queue_purge(@name, opts)

  self
end

- (Boolean) server_named?

Returns true if this queue was declared as server named.



86
87
88
# File 'lib/bunny/queue.rb', line 86

def server_named?
  @server_named
end

- (Hash) status

Returns A hash with information about the number of queue messages and consumers



295
296
297
298
299
# File 'lib/bunny/queue.rb', line 295

def status
  queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true))
  {:message_count => queue_declare_ok.message_count,
    :consumer_count => queue_declare_ok.consumer_count}
end

- (Object) subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block)

Adds a consumer to the queue (subscribes for message deliveries).

Options Hash (opts):

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let Bunny generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/bunny/queue.rb', line 164

def subscribe(opts = {
                :consumer_tag    => @channel.generate_consumer_tag,
                :ack             => false,
                :exclusive       => false,
                :block           => false,
                :on_cancellation => nil
              }, &block)

  ctag       = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
  consumer   = Consumer.new(@channel,
                            self,
                            ctag,
                            !(opts[:ack] || opts[:manual_ack]),
                            opts[:exclusive],
                            opts[:arguments])

  consumer.on_delivery(&block)
  consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation]

  @channel.basic_consume_with(consumer)
  if opts[:block]
    # joins current thread with the consumers pool, will block
    # the current thread for as long as the consumer pool is active
    @channel.work_pool.join
  end

  consumer
end

- (Object) subscribe_with(consumer, opts = {:block => false})

Adds a consumer object to the queue (subscribes for message deliveries).

Options Hash (opts):

  • block (Boolean) — default: false

    Should the call block calling thread?

See Also:



202
203
204
205
206
207
# File 'lib/bunny/queue.rb', line 202

def subscribe_with(consumer, opts = {:block => false})
  @channel.basic_consume_with(consumer)

  @channel.work_pool.join if opts[:block]
  consumer
end

- (Object) unbind(exchange, opts = {})

Unbinds queue from an exchange

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/bunny/queue.rb', line 136

def unbind(exchange, opts = {})
  @channel.queue_unbind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] }

  self
end