Class: AMQ::Client::Queue
- Inherits:
-
Entity
- Object
- Entity
- AMQ::Client::Queue
- Includes:
- AnonymousEntityMixin
- Defined in:
- lib/amq/client/amqp/queue.rb
Constant Summary
Constant Summary
Constants included from StatusMixin
Instance Attribute Summary (collapse)
-
- (Object) name
readonly
API.
Attributes inherited from Entity
Attributes included from StatusMixin
Instance Method Summary (collapse)
-
- (Object) acknowledge(delivery_tag, multiple = false)
on_delivery(&block).
-
- (Boolean) auto_delete?
exclusive?.
- - (Object) bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)
- - (Object) cancel(nowait = false, &block)
-
- (Object) consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)
Basic.Consume.
- - (Object) declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)
- - (Object) delete(if_unused = false, if_empty = false, nowait = false, &block)
- - (Boolean) durable?
-
- (Boolean) exclusive?
durable?.
-
- (Object) handle_bind_ok(method)
handle_purge_ok(method).
-
- (Object) handle_cancel_ok(method)
def handle_delivery.
-
- (Object) handle_consume_ok(method)
handle_delete_ok(method).
- - (Object) handle_declare_ok(method)
- - (Object) handle_delete_ok(method)
-
- (Object) handle_delivery(method, header, payload)
handle_unbind_ok(method).
-
- (Object) handle_purge_ok(method)
handle_consume_ok(method).
-
- (Object) handle_unbind_ok(method)
handle_bind_ok(method).
-
- (Queue) initialize(client, channel, name = AMQ::Protocol::EMPTY_STRING)
constructor
A new instance of Queue.
- - (Boolean) no_ack?
- - (Object) on_delivery(&block)
- - (Object) purge(nowait = false, &block)
-
- (Object) reject(delivery_tag, requeue = true)
acknowledge(delivery_tag, multiple = false).
- - (Object) unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)
Methods included from AnonymousEntityMixin
Methods inherited from Entity
#error, #exec_callback, handle, handlers
Methods included from StatusMixin
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Constructor Details
- (Queue) initialize(client, channel, name = AMQ::Protocol::EMPTY_STRING)
A new instance of Queue
24 25 26 27 28 29 |
# File 'lib/amq/client/amqp/queue.rb', line 24 def initialize(client, channel, name = AMQ::Protocol::EMPTY_STRING) super(client) @name = name @channel = channel end |
Instance Attribute Details
- (Object) name (readonly)
API
22 23 24 |
# File 'lib/amq/client/amqp/queue.rb', line 22 def name @name end |
Instance Method Details
- (Object) acknowledge(delivery_tag, multiple = false)
on_delivery(&block)
183 184 185 |
# File 'lib/amq/client/amqp/queue.rb', line 183 def acknowledge(delivery_tag, multiple = false) @client.send(Protocol::Basic::Ack.encode(@channel.id, delivery_tag, multiple)) end |
- (Boolean) auto_delete?
exclusive?
39 40 41 |
# File 'lib/amq/client/amqp/queue.rb', line 39 def auto_delete? @auto_delete end |
- (Object) bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/amq/client/amqp/queue.rb', line 82 def bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) nowait = true unless block exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @client.send(Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments)) if !nowait self.callbacks[:bind] = block # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_bind_ok.push(self) end self end |
- (Object) cancel(nowait = false, &block)
150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/amq/client/amqp/queue.rb', line 150 def cancel(nowait = false, &block) @client.send(Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait)) if !nowait self.callbacks[:consume] = block @channel.queues_awaiting_cancel_ok.push(self) else @consumer_tag = nil end end |
- (Object) consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)
Basic.Consume
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/amq/client/amqp/queue.rb', line 129 def consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) raise RuntimeError.new("This instance is already being consumed! Create another one using #dup.") if @consumer_tag nowait = true unless block @consumer_tag = "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" @client.send(Protocol::Basic::Consume.encode(@channel.id, @name, @consumer_tag, no_local, no_ack, exclusive, nowait, arguments)) @client.consumers[@consumer_tag] = self @no_ack = no_ack if !nowait self.callbacks[:consume] = block @channel.queues_awaiting_consume_ok.push(self) end self end |
- (Object) declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/amq/client/amqp/queue.rb', line 45 def declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) @durable = durable @exclusive = exclusive @auto_delete = auto_delete nowait = true unless block @client.send(Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments)) if !nowait self.callbacks[:declare] = block @channel.queues_awaiting_declare_ok.push(self) end if @client.sync? @client.read_until_receives(Protocol::Queue::DeclareOk) unless nowait end self end |
- (Object) delete(if_unused = false, if_empty = false, nowait = false, &block)
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/amq/client/amqp/queue.rb', line 66 def delete(if_unused = false, if_empty = false, nowait = false, &block) nowait = true unless block @client.send(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait)) if !nowait self.callbacks[:delete] = block # TODO: delete itself from queues cache @channel.queues_awaiting_delete_ok.push(self) end self end |
- (Boolean) durable?
31 32 33 |
# File 'lib/amq/client/amqp/queue.rb', line 31 def durable? @durable end |
- (Boolean) exclusive?
durable?
35 36 37 |
# File 'lib/amq/client/amqp/queue.rb', line 35 def exclusive? @exclusive end |
- (Object) handle_bind_ok(method)
handle_purge_ok(method)
211 212 213 |
# File 'lib/amq/client/amqp/queue.rb', line 211 def handle_bind_ok(method) self.exec_callback(:bind) end |
- (Object) handle_cancel_ok(method)
def handle_delivery
223 224 225 226 |
# File 'lib/amq/client/amqp/queue.rb', line 223 def handle_cancel_ok(method) @consumer_tag = nil self.exec_callback(:cancel, method.consumer_tag) end |
- (Object) handle_consume_ok(method)
handle_delete_ok(method)
203 204 205 |
# File 'lib/amq/client/amqp/queue.rb', line 203 def handle_consume_ok(method) self.exec_callback(:consume, method.consumer_tag) end |
- (Object) handle_declare_ok(method)
193 194 195 196 197 |
# File 'lib/amq/client/amqp/queue.rb', line 193 def handle_declare_ok(method) @name = method.queue if self.anonymous? self.exec_callback(:declare, method.queue, method.consumer_count, method.) end |
- (Object) handle_delete_ok(method)
199 200 201 |
# File 'lib/amq/client/amqp/queue.rb', line 199 def handle_delete_ok(method) self.exec_callback(:delete, method.) end |
- (Object) handle_delivery(method, header, payload)
handle_unbind_ok(method)
219 220 221 |
# File 'lib/amq/client/amqp/queue.rb', line 219 def handle_delivery(method, header, payload) self.exec_callback(:delivery, header, payload, method.consumer_tag, method.delivery_tag, method.redelivered, method.exchange, method.routing_key) end |
- (Object) handle_purge_ok(method)
handle_consume_ok(method)
207 208 209 |
# File 'lib/amq/client/amqp/queue.rb', line 207 def handle_purge_ok(method) self.exec_callback(:purge, method.) end |
- (Object) handle_unbind_ok(method)
handle_bind_ok(method)
215 216 217 |
# File 'lib/amq/client/amqp/queue.rb', line 215 def handle_unbind_ok(method) self.exec_callback(:unbind) end |
- (Boolean) no_ack?
124 125 126 |
# File 'lib/amq/client/amqp/queue.rb', line 124 def no_ack? @no_ack end |
- (Object) on_delivery(&block)
179 180 181 |
# File 'lib/amq/client/amqp/queue.rb', line 179 def on_delivery(&block) self.callbacks[:delivery] = block if block end |
- (Object) purge(nowait = false, &block)
164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/amq/client/amqp/queue.rb', line 164 def purge(nowait = false, &block) nowait = true unless block @client.send(Protocol::Queue::Purge.encode(@channel.id, @name, nowait)) if !nowait self.callbacks[:purge] = block # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_purge_ok.push(self) end self end |
- (Object) reject(delivery_tag, requeue = true)
acknowledge(delivery_tag, multiple = false)
187 188 189 |
# File 'lib/amq/client/amqp/queue.rb', line 187 def reject(delivery_tag, requeue = true) @client.send(Protocol::Basic::Reject.encode(@channel.id, delivery_tag, requeue)) end |
- (Object) unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/amq/client/amqp/queue.rb', line 105 def unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @client.send(Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments)) self.callbacks[:unbind] = block # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_unbind_ok.push(self) self end |