Class: Bunny::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/exchange.rb

Overview

Represents AMQP 0.9.1 exchanges.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, type, name, opts = {}) ⇒ Exchange

Returns a new instance of Exchange.

Parameters:

  • channel (Bunny::Channel)

    Channel this exchange will use.

  • type (Symbol, String)

    Exchange type

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange properties

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should this exchange be automatically deleted when it is no longer used?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/bunny/exchange.rb', line 76

def initialize(channel, type, name, opts = {})
  @channel          = channel
  @name             = name
  @type             = type
  @options          = self.class.add_default_options(name, opts)

  @durable          = @options[:durable]
  @auto_delete      = @options[:auto_delete]
  @internal         = @options[:internal]
  @arguments        = @options[:arguments]

  @bindings         = Set.new

  declare! unless opts[:no_declare] || predeclared? || (@name == AMQ::Protocol::EMPTY_STRING)

  # for basic.return dispatch and such
  @channel.register_exchange(self)
  # for topology recovery
  @channel.record_exchange(self)
end

Instance Attribute Details

#channelBunny::Channel (readonly)

Returns:



17
18
19
# File 'lib/bunny/exchange.rb', line 17

def channel
  @channel
end

#nameString (readonly)

Returns:

  • (String)


20
21
22
# File 'lib/bunny/exchange.rb', line 20

def name
  @name
end

#optsHash

Options hash this exchange instance was instantiated with

Returns:

  • (Hash)


32
33
34
# File 'lib/bunny/exchange.rb', line 32

def opts
  @opts
end

#statusSymbol (readonly)

Returns:

  • (Symbol)


28
29
30
# File 'lib/bunny/exchange.rb', line 28

def status
  @status
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


24
25
26
# File 'lib/bunny/exchange.rb', line 24

def type
  @type
end

Class Method Details

.default(channel_or_connection) ⇒ Exchange

Note:

Do not confuse the default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn't have any special routing semantics.

The default exchange. This exchange is a direct exchange that is predefined by the broker and that cannot be removed. Every queue is bound to this exchange by default with the following routing semantics: messages will be routed to the queue with the same name as the message's routing key. In other words, if a message is published with a routing key of "weather.usa.ca.sandiego" and there is a queue with this name, the message will be routed to the queue.

Examples:

Publishing a messages to the tasks queue

channel     = Bunny::Channel.new(connection)
tasks_queue = channel.queue("tasks")
Bunny::Exchange.default(channel).publish("make clean", :routing_key => "tasks")

Parameters:

  • channel_or_connection (Bunny::Channel)

    Channel to use. Session instances are only supported for backwards compatibility.

Returns:

  • (Exchange)

    An instance that corresponds to the default exchange (of type direct).

See Also:



56
57
58
# File 'lib/bunny/exchange.rb', line 56

def self.default(channel_or_connection)
  self.new(channel_or_connection, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

Instance Method Details

#argumentsHash

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



117
118
119
# File 'lib/bunny/exchange.rb', line 117

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).



105
106
107
# File 'lib/bunny/exchange.rb', line 105

def auto_delete?
  @auto_delete
end

#bind(source, opts = {}) ⇒ Bunny::Exchange

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



179
180
181
182
183
184
# File 'lib/bunny/exchange.rb', line 179

def bind(source, opts = {})
  @channel.exchange_bind(source, self, opts)
  @bindings.add(source: source, opts: opts)

  self
end

#delete(opts = {}) ⇒ Object

Deletes the exchange unless it is predeclared

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

See Also:



160
161
162
163
# File 'lib/bunny/exchange.rb', line 160

def delete(opts = {})
  @channel.delete_recorded_exchange(self)
  @channel.exchange_delete(@name, opts) unless predeclared?
end

#durable?Boolean

Returns true if this exchange was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this exchange was declared as durable (will survive broker restart).



99
100
101
# File 'lib/bunny/exchange.rb', line 99

def durable?
  @durable
end

#internal?Boolean

Returns true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).

Returns:

  • (Boolean)

    true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients)



111
112
113
# File 'lib/bunny/exchange.rb', line 111

def internal?
  @internal
end

#on_return(&block) ⇒ Object

Defines a block that will handle returned messages



210
211
212
213
214
# File 'lib/bunny/exchange.rb', line 210

def on_return(&block)
  @on_return = block

  self
end

#predefined?Boolean Also known as: predeclared?

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



240
241
242
# File 'lib/bunny/exchange.rb', line 240

def predefined?
  (@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i)
end

#publish(payload, opts = {}) ⇒ Bunny::Exchange

Publishes a message

Parameters:

  • payload (String)

    Message payload. It will never be modified by Bunny or RabbitMQ in any way.

  • opts (Hash) (defaults to: {})

    Message properties (metadata) and delivery settings

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:

See Also:



145
146
147
148
149
# File 'lib/bunny/exchange.rb', line 145

def publish(payload, opts = {})
  @channel.basic_publish(payload, self.name, (opts.delete(:routing_key) || opts.delete(:key)), opts)

  self
end

#unbind(source, opts = {}) ⇒ Bunny::Exchange

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



200
201
202
203
204
205
# File 'lib/bunny/exchange.rb', line 200

def unbind(source, opts = {})
  @channel.exchange_unbind(source, self, opts)
  @bindings.delete(source: source, opts: opts)

  self
end

#wait_for_confirmsObject

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to Channel#wait_for_confirms



222
223
224
# File 'lib/bunny/exchange.rb', line 222

def wait_for_confirms
  @channel.wait_for_confirms
end