Class: Bunny::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/queue.rb

Overview

Represents AMQP 0.9.1 queue.

See Also:

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Queue) initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})

Returns a new instance of Queue

Parameters:

  • channel_or_connection (Bunny::Channel)

    Channel this queue will use. Session instances are supported only for backwards compatibility with 0.8.

  • name (String) (defaults to: AMQ::Protocol::EMPTY_STRING)

    Queue name. Pass an empty string to make RabbitMQ generate a unique one.

  • opts (Hash) (defaults to: {})

    Queue properties

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto_delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/bunny/queue.rb', line 38

def initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})
  # old Bunny versions pass a connection here. In that case,
  # we just use default channel from it. MK.
  @channel          = channel_from(channel_or_connection)
  @name             = name
  @options          = self.class.add_default_options(name, opts)
  @consumers        = Hash.new

  @durable          = @options[:durable]
  @exclusive        = @options[:exclusive]
  @server_named     = @name.empty?
  @auto_delete      = @options[:auto_delete]
  @arguments        = @options[:arguments]

  @bindings         = Array.new

  @default_consumer = nil

  declare! unless opts[:no_declare]

  @channel.register_queue(self)
end

Instance Attribute Details

- (Bunny::Channel) channel (readonly)

Returns Channel this queue uses

Returns:



18
19
20
# File 'lib/bunny/queue.rb', line 18

def channel
  @channel
end

- (String) name (readonly)

Returns Queue name

Returns:

  • (String)

    Queue name



20
21
22
# File 'lib/bunny/queue.rb', line 20

def name
  @name
end

- (Hash) options (readonly)

Returns Options this queue was created with

Returns:

  • (Hash)

    Options this queue was created with



22
23
24
# File 'lib/bunny/queue.rb', line 22

def options
  @options
end

Instance Method Details

- (Hash) arguments

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins)

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



91
92
93
# File 'lib/bunny/queue.rb', line 91

def arguments
  @arguments
end

- (Boolean) auto_delete?

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

See Also:



78
79
80
# File 'lib/bunny/queue.rb', line 78

def auto_delete?
  @auto_delete
end

- (Object) bind(exchange, opts = {})

Binds queue to an exchange

Parameters:

  • exchange (Bunny::Exchange, String)

    Exchange to bind to

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/bunny/queue.rb', line 106

def bind(exchange, opts = {})
  @channel.queue_bind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end

- (Integer) consumer_count

Returns How many active consumers the queue has

Returns:

  • (Integer)

    How many active consumers the queue has



311
312
313
314
# File 'lib/bunny/queue.rb', line 311

def consumer_count
  s = self.status
  s[:consumer_count]
end

- (Object) delete(opts = {})

Deletes the queue

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

See Also:



281
282
283
284
# File 'lib/bunny/queue.rb', line 281

def delete(opts = {})
  @channel.deregister_queue(self)
  @channel.queue_delete(@name, opts)
end

- (Boolean) durable?

Returns true if this queue was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this queue was declared as durable (will survive broker restart).

See Also:



64
65
66
# File 'lib/bunny/queue.rb', line 64

def durable?
  @durable
end

- (Boolean) exclusive?

Returns true if this queue was declared as exclusive (limited to just one consumer)

Returns:

  • (Boolean)

    true if this queue was declared as exclusive (limited to just one consumer)

See Also:



71
72
73
# File 'lib/bunny/queue.rb', line 71

def exclusive?
  @exclusive
end

- (Integer) message_count

Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged)

Returns:

  • (Integer)

    How many messages the queue has ready (e.g. not delivered but not unacknowledged)



305
306
307
308
# File 'lib/bunny/queue.rb', line 305

def message_count
  s = self.status
  s[:message_count]
end

- (Array) pop(opts = {:ack => false}, &block) Also known as: get

Returns Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

Examples:

conn = Bunny.new
conn.start

ch   = conn.create_channel
q = ch.queue("test1")
x = ch.default_exchange
x.publish("Hello, everybody!", :routing_key => 'test1')

delivery_info, properties, payload = q.pop

puts "This is the message: " + payload + "\n\n"
conn.close

Parameters:

  • opts (Hash) (defaults to: {:ack => false})

    Options

Options Hash (opts):

  • :ack (Boolean) — default: false

    Will the message be acknowledged manually?

Returns:

  • (Array)

    Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

See Also:



231
232
233
234
235
236
237
238
239
# File 'lib/bunny/queue.rb', line 231

def pop(opts = {:ack => false}, &block)
  delivery_info, properties, content = @channel.basic_get(@name, opts)

  if block
    block.call(delivery_info, properties, content)
  else
    [delivery_info, properties, content]
  end
end

- (Hash) pop_as_hash(opts = {:ack => false}, &block)

Deprecated.

Version of #pop that returns data in legacy format (as a hash).

Returns:

  • (Hash)


246
247
248
249
250
251
252
253
254
255
256
# File 'lib/bunny/queue.rb', line 246

def pop_as_hash(opts = {:ack => false}, &block)
  delivery_info, properties, content = @channel.basic_get(@name, opts)

  result = {:header => properties, :payload => content, :delivery_details => delivery_info}

  if block
    block.call(result)
  else
    result
  end
end

- (Object) publish(payload, opts = {})

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



265
266
267
268
269
# File 'lib/bunny/queue.rb', line 265

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

- (Object) purge(opts = {})

Purges a queue (removes all messages from it)



289
290
291
292
293
# File 'lib/bunny/queue.rb', line 289

def purge(opts = {})
  @channel.queue_purge(@name, opts)

  self
end

- (Boolean) server_named?

Returns true if this queue was declared as server named.

Returns:

  • (Boolean)

    true if this queue was declared as server named.

See Also:



85
86
87
# File 'lib/bunny/queue.rb', line 85

def server_named?
  @server_named
end

- (Hash) status

Returns A hash with information about the number of queue messages and consumers

Returns:

  • (Hash)

    A hash with information about the number of queue messages and consumers

See Also:



298
299
300
301
302
# File 'lib/bunny/queue.rb', line 298

def status
  queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true))
  {:message_count => queue_declare_ok.message_count,
    :consumer_count => queue_declare_ok.consumer_count}
end

- (Object) subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block)

Adds a consumer to the queue (subscribes for message deliveries).

Parameters:

  • opts (Hash) (defaults to: { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil })

    Options

Options Hash (opts):

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let Bunny generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/bunny/queue.rb', line 163

def subscribe(opts = {
                :consumer_tag    => @channel.generate_consumer_tag,
                :ack             => false,
                :exclusive       => false,
                :block           => false,
                :on_cancellation => nil
              }, &block)

  ctag       = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
  consumer   = Consumer.new(@channel,
                            self,
                            ctag,
                            !(opts[:ack] || opts[:manual_ack]),
                            opts[:exclusive],
                            opts[:arguments])

  consumer.on_delivery(&block)
  consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation]

  @channel.basic_consume_with(consumer)
  if opts[:block]
    # joins current thread with the consumers pool, will block
    # the current thread for as long as the consumer pool is active
    @channel.work_pool.join
  end

  consumer
end

- (Object) subscribe_with(consumer, opts = {:block => false})

Adds a consumer object to the queue (subscribes for message deliveries).

Parameters:

  • consumer (Bunny::Consumer)

    a Consumer subclass that implements consumer interface

  • opts (Hash) (defaults to: {:block => false})

    Options

Options Hash (opts):

  • block (Boolean) — default: false

    Should the call block calling thread?

See Also:



201
202
203
204
205
206
# File 'lib/bunny/queue.rb', line 201

def subscribe_with(consumer, opts = {:block => false})
  @channel.basic_consume_with(consumer)

  @channel.work_pool.join if opts[:block]
  consumer
end

- (Object) unbind(exchange, opts = {})

Unbinds queue from an exchange

Parameters:

  • exchange (Bunny::Exchange, String)

    Exchange to unbind from

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/bunny/queue.rb', line 135

def unbind(exchange, opts = {})
  @channel.queue_unbind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] }

  self
end