Class: Bunny::Queue
- Inherits:
-
Object
- Object
- Bunny::Queue
- Defined in:
- lib/bunny/queue.rb
Overview
Represents AMQP 0.9.1 queue.
Instance Attribute Summary (collapse)
-
- (Bunny::Channel) channel
readonly
Channel this queue uses.
-
- (String) name
readonly
Queue name.
-
- (Hash) options
readonly
Options this queue was created with.
Instance Method Summary (collapse)
-
- (Hash) arguments
Additional optional arguments (typically used by RabbitMQ extensions and plugins).
-
- (Boolean) auto_delete?
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
- (Object) bind(exchange, opts = {})
Binds queue to an exchange.
-
- (Integer) consumer_count
How many active consumers the queue has.
-
- (Object) delete(opts = {})
Deletes the queue.
-
- (Boolean) durable?
True if this queue was declared as durable (will survive broker restart).
-
- (Boolean) exclusive?
True if this queue was declared as exclusive (limited to just one consumer).
-
- (Queue) initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})
constructor
A new instance of Queue.
-
- (Integer) message_count
How many messages the queue has ready (e.g. not delivered but not unacknowledged).
-
- (Array) pop(opts = {:ack => false}, &block)
(also: #get)
Triple of delivery info, message properties and message content.
- - (Hash) pop_as_hash(opts = {:ack => false}, &block) deprecated Deprecated.
-
- (Array) pop_waiting(opts = {:ack => false, :timeout => 1.0}, &block)
(also: #get_waiting)
Triple of delivery info, message properties and message content.
-
- (Object) publish(payload, opts = {})
Publishes a message to the queue via default exchange.
-
- (Object) purge(opts = {})
Purges a queue (removes all messages from it).
-
- (Boolean) server_named?
True if this queue was declared as server named.
-
- (Hash) status
A hash with information about the number of queue messages and consumers.
-
- (Object) subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block)
Adds a consumer to the queue (subscribes for message deliveries).
-
- (Object) subscribe_with(consumer, opts = {:block => false})
Adds a consumer object to the queue (subscribes for message deliveries).
-
- (Object) unbind(exchange, opts = {})
Unbinds queue from an exchange.
Constructor Details
- (Queue) initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {})
A new instance of Queue
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/bunny/queue.rb', line 38 def initialize(channel_or_connection, name = AMQ::Protocol::EMPTY_STRING, opts = {}) # old Bunny versions pass a connection here. In that case, # we just use default channel from it. MK. @channel = channel_from(channel_or_connection) @name = name @options = self.class.(name, opts) @consumers = Hash.new @durable = @options[:durable] @exclusive = @options[:exclusive] @server_named = @name.empty? @auto_delete = @options[:auto_delete] @arguments = @options[:arguments] @bindings = Array.new @default_consumer = nil declare! unless opts[:no_declare] @channel.register_queue(self) end |
Instance Attribute Details
- (Bunny::Channel) channel (readonly)
Channel this queue uses
18 19 20 |
# File 'lib/bunny/queue.rb', line 18 def channel @channel end |
- (String) name (readonly)
Queue name
20 21 22 |
# File 'lib/bunny/queue.rb', line 20 def name @name end |
- (Hash) options (readonly)
Options this queue was created with
22 23 24 |
# File 'lib/bunny/queue.rb', line 22 def @options end |
Instance Method Details
- (Hash) arguments
Additional optional arguments (typically used by RabbitMQ extensions and plugins)
91 92 93 |
# File 'lib/bunny/queue.rb', line 91 def arguments @arguments end |
- (Boolean) auto_delete?
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
78 79 80 |
# File 'lib/bunny/queue.rb', line 78 def auto_delete? @auto_delete end |
- (Object) bind(exchange, opts = {})
Binds queue to an exchange
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/bunny/queue.rb', line 107 def bind(exchange, opts = {}) @channel.queue_bind(@name, exchange, opts) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end # store bindings for automatic recovery. We need to be very careful to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] } @bindings.push(binding) unless @bindings.include?(binding) self end |
- (Integer) consumer_count
How many active consumers the queue has
360 361 362 363 |
# File 'lib/bunny/queue.rb', line 360 def consumer_count s = self.status s[:consumer_count] end |
- (Object) delete(opts = {})
Deletes the queue
330 331 332 333 |
# File 'lib/bunny/queue.rb', line 330 def delete(opts = {}) @channel.deregister_queue(self) @channel.queue_delete(@name, opts) end |
- (Boolean) durable?
True if this queue was declared as durable (will survive broker restart).
64 65 66 |
# File 'lib/bunny/queue.rb', line 64 def durable? @durable end |
- (Boolean) exclusive?
True if this queue was declared as exclusive (limited to just one consumer)
71 72 73 |
# File 'lib/bunny/queue.rb', line 71 def exclusive? @exclusive end |
- (Integer) message_count
How many messages the queue has ready (e.g. not delivered but not unacknowledged)
354 355 356 357 |
# File 'lib/bunny/queue.rb', line 354 def s = self.status s[:message_count] end |
- (Array) pop(opts = {:ack => false}, &block) Also known as: get
Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.
232 233 234 235 236 237 238 239 240 |
# File 'lib/bunny/queue.rb', line 232 def pop(opts = {:ack => false}, &block) delivery_info, properties, content = @channel.basic_get(@name, opts) if block block.call(delivery_info, properties, content) else [delivery_info, properties, content] end end |
- (Hash) pop_as_hash(opts = {:ack => false}, &block)
Version of #pop that returns data in legacy format (as a hash).
247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/bunny/queue.rb', line 247 def pop_as_hash(opts = {:ack => false}, &block) delivery_info, properties, content = @channel.basic_get(@name, opts) result = {:header => properties, :payload => content, :delivery_details => delivery_info} if block block.call(result) else result end end |
- (Array) pop_waiting(opts = {:ack => false, :timeout => 1.0}, &block) Also known as: get_waiting
Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/bunny/queue.rb', line 286 def pop_waiting(opts = {:ack => false, :timeout => 1.0}, &block) delivery_info, properties, content = nil, nil, nil Bunny::Timer.timeout(opts[:timeout], ClientTimeout) do loop do delivery_info, properties, content = @channel.basic_get(@name, opts) if !content.nil? break end end end if block block.call(delivery_info, properties, content) else [delivery_info, properties, content] end end |
- (Object) publish(payload, opts = {})
Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish
314 315 316 317 318 |
# File 'lib/bunny/queue.rb', line 314 def publish(payload, opts = {}) @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name)) self end |
- (Object) purge(opts = {})
Purges a queue (removes all messages from it)
338 339 340 341 342 |
# File 'lib/bunny/queue.rb', line 338 def purge(opts = {}) @channel.queue_purge(@name, opts) self end |
- (Boolean) server_named?
True if this queue was declared as server named.
85 86 87 |
# File 'lib/bunny/queue.rb', line 85 def server_named? @server_named end |
- (Hash) status
A hash with information about the number of queue messages and consumers
347 348 349 350 351 |
# File 'lib/bunny/queue.rb', line 347 def status queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true)) {:message_count => queue_declare_ok., :consumer_count => queue_declare_ok.consumer_count} end |
- (Object) subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block)
Adds a consumer to the queue (subscribes for message deliveries).
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/bunny/queue.rb', line 164 def subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag) consumer = Consumer.new(@channel, self, ctag, !(opts[:ack] || opts[:manual_ack]), opts[:exclusive], opts[:arguments]) consumer.on_delivery(&block) consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation] @channel.basic_consume_with(consumer) if opts[:block] # joins current thread with the consumers pool, will block # the current thread for as long as the consumer pool is active @channel.work_pool.join end consumer end |
- (Object) subscribe_with(consumer, opts = {:block => false})
Adds a consumer object to the queue (subscribes for message deliveries).
202 203 204 205 206 207 |
# File 'lib/bunny/queue.rb', line 202 def subscribe_with(consumer, opts = {:block => false}) @channel.basic_consume_with(consumer) @channel.work_pool.join if opts[:block] consumer end |
- (Object) unbind(exchange, opts = {})
Unbinds queue from an exchange
136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/bunny/queue.rb', line 136 def unbind(exchange, opts = {}) @channel.queue_unbind(@name, exchange, opts) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] } self end |