Class: Bunny::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/queue.rb

Overview

Represents AMQP 0.9.1 queue.

See Also:

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Queue) initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})

Returns a new instance of Queue

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto_delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/bunny/queue.rb', line 39

def initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})
  # old Bunny versions pass a connection here. In that case,
  # we just use default channel from it. MK.
  @channel          = channel_from(channel_or_connection)
  @name             = name
  @options          = self.class.add_default_options(name, opts)
  @consumers        = Hash.new

  @durable          = @options[:durable]
  @exclusive        = @options[:exclusive]
  @server_named     = @name.empty?
  @auto_delete      = @options[:auto_delete]
  @arguments        = @options[:arguments]

  @bindings         = Array.new

  @default_consumer = nil

  declare! unless opts[:no_declare]

  @channel.register_queue(self)
end

Instance Attribute Details

- (Bunny::Channel) channel (readonly)



19
20
21
# File 'lib/bunny/queue.rb', line 19

def channel
  @channel
end

- (String) name (readonly)



21
22
23
# File 'lib/bunny/queue.rb', line 21

def name
  @name
end

- (Hash) options (readonly)



23
24
25
# File 'lib/bunny/queue.rb', line 23

def options
  @options
end

Instance Method Details

- (Hash) arguments

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins)



92
93
94
# File 'lib/bunny/queue.rb', line 92

def arguments
  @arguments
end

- (Boolean) auto_delete?

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).



79
80
81
# File 'lib/bunny/queue.rb', line 79

def auto_delete?
  @auto_delete
end

- (Object) bind(exchange, opts = {})

Binds queue to an exchange

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/bunny/queue.rb', line 107

def bind(exchange, opts = {})
  @channel.queue_bind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end

- (Integer) consumer_count



320
321
322
323
# File 'lib/bunny/queue.rb', line 320

def consumer_count
  s = self.status
  s[:consumer_count]
end

- (Object) delete(opts = {})

Deletes the queue

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

See Also:



290
291
292
293
# File 'lib/bunny/queue.rb', line 290

def delete(opts = {})
  @channel.deregister_queue(self)
  @channel.queue_delete(@name, opts)
end

- (Boolean) durable?

Returns true if this queue was declared as durable (will survive broker restart).



65
66
67
# File 'lib/bunny/queue.rb', line 65

def durable?
  @durable
end

- (Boolean) exclusive?

Returns true if this queue was declared as exclusive (limited to just one consumer)



72
73
74
# File 'lib/bunny/queue.rb', line 72

def exclusive?
  @exclusive
end

- (Integer) message_count



314
315
316
317
# File 'lib/bunny/queue.rb', line 314

def message_count
  s = self.status
  s[:message_count]
end

- (Array) pop(opts = {:manual_ack => false}, &block) Also known as: get

Returns Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

Examples:

conn = Bunny.new
conn.start

ch   = conn.create_channel
q = ch.queue("test1")
x = ch.default_exchange
x.publish("Hello, everybody!", :routing_key => 'test1')

delivery_info, properties, payload = q.pop

puts "This is the message: " + payload + "\n\n"
conn.close

Options Hash (opts):

  • :ack (Boolean) — default: false

    [DEPRECATED] Use :manual_ack instead

  • :manual_ack (Boolean) — default: false

    Will the message be acknowledged manually?

See Also:



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/bunny/queue.rb', line 239

def pop(opts = {:manual_ack => false}, &block)
  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  get_response, properties, content = @channel.basic_get(@name, opts)

  if block
    if properties
      di = GetResponse.new(get_response, properties, @channel)
      mp = MessageProperties.new(properties)

      block.call(di, mp, content)
    else
      block.call(nil, nil, nil)
    end
  else
    if properties
      di = GetResponse.new(get_response, properties, @channel)
      mp = MessageProperties.new(properties)
      [di, mp, content]
    else
      [nil, nil, nil]
    end
  end
end

- (Object) publish(payload, opts = {})

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



274
275
276
277
278
# File 'lib/bunny/queue.rb', line 274

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

- (Object) purge(opts = {})

Purges a queue (removes all messages from it)



298
299
300
301
302
# File 'lib/bunny/queue.rb', line 298

def purge(opts = {})
  @channel.queue_purge(@name, opts)

  self
end

- (Boolean) server_named?

Returns true if this queue was declared as server named.



86
87
88
# File 'lib/bunny/queue.rb', line 86

def server_named?
  @server_named
end

- (Hash) status

Returns A hash with information about the number of queue messages and consumers



307
308
309
310
311
# File 'lib/bunny/queue.rb', line 307

def status
  queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true))
  {:message_count => queue_declare_ok.message_count,
    :consumer_count => queue_declare_ok.consumer_count}
end

- (Object) subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :manual_ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block)

Adds a consumer to the queue (subscribes for message deliveries).

Options Hash (opts):

  • :ack (Boolean) — default: false

    [DEPRECATED] Use :manual_ack instead

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let Bunny generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/bunny/queue.rb', line 165

def subscribe(opts = {
                :consumer_tag    => @channel.generate_consumer_tag,
                :manual_ack      => false,
                :exclusive       => false,
                :block           => false,
                :on_cancellation => nil
              }, &block)

  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  ctag       = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
  consumer   = Consumer.new(@channel,
                            self,
                            ctag,
                            !opts[:manual_ack],
                            opts[:exclusive],
                            opts[:arguments])

  consumer.on_delivery(&block)
  consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation]

  @channel.basic_consume_with(consumer)
  if opts[:block]
    # joins current thread with the consumers pool, will block
    # the current thread for as long as the consumer pool is active
    @channel.work_pool.join
  end

  consumer
end

- (Object) subscribe_with(consumer, opts = {:block => false})

Adds a consumer object to the queue (subscribes for message deliveries).

Options Hash (opts):

  • block (Boolean) — default: false

    Should the call block calling thread?

See Also:



208
209
210
211
212
213
# File 'lib/bunny/queue.rb', line 208

def subscribe_with(consumer, opts = {:block => false})
  @channel.basic_consume_with(consumer)

  @channel.work_pool.join if opts[:block]
  consumer
end

- (Object) unbind(exchange, opts = {})

Unbinds queue from an exchange

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/bunny/queue.rb', line 136

def unbind(exchange, opts = {})
  @channel.queue_unbind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] }

  self
end