Class: AMQ::Client::Queue

Inherits:
Entity
  • Object
show all
Includes:
AnonymousEntityMixin
Defined in:
lib/amq/client/amqp/queue.rb

Constant Summary

Constant Summary

Constants included from StatusMixin

StatusMixin::VALUES

Instance Attribute Summary (collapse)

Attributes inherited from Entity

#callbacks

Attributes included from StatusMixin

#status

Instance Method Summary (collapse)

Methods included from AnonymousEntityMixin

#anonymous?, #dup

Methods inherited from Entity

#error, #exec_callback, handle, handlers

Methods included from StatusMixin

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

- (Queue) initialize(client, channel, name = AMQ::Protocol::EMPTY_STRING)

A new instance of Queue



24
25
26
27
28
29
# File 'lib/amq/client/amqp/queue.rb', line 24

def initialize(client, channel, name = AMQ::Protocol::EMPTY_STRING)
  super(client)

  @name    = name
  @channel = channel
end

Instance Attribute Details

- (Object) name (readonly)

API



22
23
24
# File 'lib/amq/client/amqp/queue.rb', line 22

def name
  @name
end

Instance Method Details

- (Object) acknowledge(delivery_tag, multiple = false)

on_delivery(&block)



183
184
185
# File 'lib/amq/client/amqp/queue.rb', line 183

def acknowledge(delivery_tag, multiple = false)
  @client.send(Protocol::Basic::Ack.encode(@channel.id, delivery_tag, multiple))
end

- (Boolean) auto_delete?

exclusive?

Returns:

  • (Boolean)


39
40
41
# File 'lib/amq/client/amqp/queue.rb', line 39

def auto_delete?
  @auto_delete
end

- (Object) bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/amq/client/amqp/queue.rb', line 82

def bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)
  nowait = true unless block
  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else

                    exchange
                  end

  @client.send(Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments))

  if !nowait
    self.callbacks[:bind] = block

    # TODO: handle channel & connection-level exceptions
    @channel.queues_awaiting_bind_ok.push(self)
  end

  self
end

- (Object) cancel(nowait = false, &block)



150
151
152
153
154
155
156
157
158
159
160
# File 'lib/amq/client/amqp/queue.rb', line 150

def cancel(nowait = false, &block)
  @client.send(Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))

  if !nowait
    self.callbacks[:consume] = block

    @channel.queues_awaiting_cancel_ok.push(self)
  else
    @consumer_tag            = nil
  end
end

- (Object) consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)

Basic.Consume

Raises:

  • (RuntimeError)


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/amq/client/amqp/queue.rb', line 129

def consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)
  raise RuntimeError.new("This instance is already being consumed! Create another one using #dup.") if @consumer_tag

  nowait        = true unless block
  @consumer_tag = "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
  @client.send(Protocol::Basic::Consume.encode(@channel.id, @name, @consumer_tag, no_local, no_ack, exclusive, nowait, arguments))

  @client.consumers[@consumer_tag] = self
  @no_ack          = no_ack

  if !nowait
    self.callbacks[:consume]         = block

    @channel.queues_awaiting_consume_ok.push(self)
  end

  self
end

- (Object) declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/amq/client/amqp/queue.rb', line 45

def declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)
  @durable     = durable
  @exclusive   = exclusive
  @auto_delete = auto_delete

  nowait = true unless block
  @client.send(Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments))

  if !nowait
    self.callbacks[:declare] = block
    @channel.queues_awaiting_declare_ok.push(self)
  end

  if @client.sync?
    @client.read_until_receives(Protocol::Queue::DeclareOk) unless nowait
  end

  self
end

- (Object) delete(if_unused = false, if_empty = false, nowait = false, &block)



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/amq/client/amqp/queue.rb', line 66

def delete(if_unused = false, if_empty = false, nowait = false, &block)
  nowait = true unless block
  @client.send(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait))

  if !nowait
    self.callbacks[:delete] = block

    # TODO: delete itself from queues cache
    @channel.queues_awaiting_delete_ok.push(self)
  end

  self
end

- (Boolean) durable?

Returns:

  • (Boolean)


31
32
33
# File 'lib/amq/client/amqp/queue.rb', line 31

def durable?
  @durable
end

- (Boolean) exclusive?

durable?

Returns:

  • (Boolean)


35
36
37
# File 'lib/amq/client/amqp/queue.rb', line 35

def exclusive?
  @exclusive
end

- (Object) handle_bind_ok(method)

handle_purge_ok(method)



211
212
213
# File 'lib/amq/client/amqp/queue.rb', line 211

def handle_bind_ok(method)
  self.exec_callback(:bind)
end

- (Object) handle_cancel_ok(method)

def handle_delivery



223
224
225
226
# File 'lib/amq/client/amqp/queue.rb', line 223

def handle_cancel_ok(method)
  @consumer_tag            = nil
  self.exec_callback(:cancel, method.consumer_tag)
end

- (Object) handle_consume_ok(method)

handle_delete_ok(method)



203
204
205
# File 'lib/amq/client/amqp/queue.rb', line 203

def handle_consume_ok(method)
  self.exec_callback(:consume, method.consumer_tag)
end

- (Object) handle_declare_ok(method)



193
194
195
196
197
# File 'lib/amq/client/amqp/queue.rb', line 193

def handle_declare_ok(method)
  @name = method.queue if self.anonymous?

  self.exec_callback(:declare, method.queue, method.consumer_count, method.message_count)
end

- (Object) handle_delete_ok(method)



199
200
201
# File 'lib/amq/client/amqp/queue.rb', line 199

def handle_delete_ok(method)
  self.exec_callback(:delete, method.message_count)
end

- (Object) handle_delivery(method, header, payload)

handle_unbind_ok(method)



219
220
221
# File 'lib/amq/client/amqp/queue.rb', line 219

def handle_delivery(method, header, payload)
  self.exec_callback(:delivery, header, payload, method.consumer_tag, method.delivery_tag, method.redelivered, method.exchange, method.routing_key)
end

- (Object) handle_purge_ok(method)

handle_consume_ok(method)



207
208
209
# File 'lib/amq/client/amqp/queue.rb', line 207

def handle_purge_ok(method)
  self.exec_callback(:purge, method.message_count)
end

- (Object) handle_unbind_ok(method)

handle_bind_ok(method)



215
216
217
# File 'lib/amq/client/amqp/queue.rb', line 215

def handle_unbind_ok(method)
  self.exec_callback(:unbind)
end

- (Boolean) no_ack?

Returns:

  • (Boolean)


124
125
126
# File 'lib/amq/client/amqp/queue.rb', line 124

def no_ack?
  @no_ack
end

- (Object) on_delivery(&block)



179
180
181
# File 'lib/amq/client/amqp/queue.rb', line 179

def on_delivery(&block)
  self.callbacks[:delivery] = block if block
end

- (Object) purge(nowait = false, &block)



164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/amq/client/amqp/queue.rb', line 164

def purge(nowait = false, &block)
  nowait        = true unless block
  @client.send(Protocol::Queue::Purge.encode(@channel.id, @name, nowait))

  if !nowait
    self.callbacks[:purge] = block
    # TODO: handle channel & connection-level exceptions
    @channel.queues_awaiting_purge_ok.push(self)
  end

  self
end

- (Object) reject(delivery_tag, requeue = true)

acknowledge(delivery_tag, multiple = false)



187
188
189
# File 'lib/amq/client/amqp/queue.rb', line 187

def reject(delivery_tag, requeue = true)
  @client.send(Protocol::Basic::Reject.encode(@channel.id, delivery_tag, requeue))
end

- (Object) unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/amq/client/amqp/queue.rb', line 105

def unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)
  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else

                    exchange
                  end

  @client.send(Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments))

  self.callbacks[:unbind] = block
  # TODO: handle channel & connection-level exceptions
  @channel.queues_awaiting_unbind_ok.push(self)

  self
end