Class: Bunny::TopologyRegistry
- Inherits:
-
Object
- Object
- Bunny::TopologyRegistry
- Defined in:
- lib/bunny/topology_registry.rb
Overview
As queues, exchanges, bindings are created and deleted, connections keep track of the topology using this registry.
Then, when the conneciton and its channels are recovered, this registry is used as the source of truth during topology recovery.
This registry takes care of dropping auto-delete exchanges or queues when their respective conditions for removal hold.
Queues collapse
Consumers collapse
Exchanges collapse
Instance Attribute Summary collapse
- #exchange_bindings ⇒ Set<Bunny::RecordedExchangeBinding> readonly
- #queue_bindings ⇒ Set<Bunny::RecordedQueueBinding> readonly
Queues collapse
- #delete_recorded_queue(queue) ⇒ Object
- #delete_recorded_queue_named(name) ⇒ Object
- #delete_recorded_queue_named_without_cascading(name) ⇒ Object
- #filtered_queues ⇒ Array<Bunny::RecordedQueue>
- #record_queue(queue) ⇒ Object
- #record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object
Consumers collapse
- #delete_recorded_consumer(consumer_tag) ⇒ Object
- #filtered_consumers ⇒ Array<Bunny::RecordedConsumer>
- #record_consumer_with(ch, consumer_tag, queue_name, callable, manual_ack, exclusive, arguments) ⇒ Object
Exchanges collapse
- #delete_recorded_exchange(exchange) ⇒ Object
- #delete_recorded_exchange_named(name) ⇒ Object
- #filtered_exchanges ⇒ Array<Bunny::RecordedExchange>
- #record_exchange(exchange) ⇒ Object
- #record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object
Instance Method Summary collapse
- #delete_recorded_exchange_binding(ch, source_name, destination_name, routing_key, arguments) ⇒ Object
- #delete_recorded_queue_binding(ch, exchange_name, queue_name, routing_key, arguments) ⇒ Object
- #filtered_exchange_bindings ⇒ Array<Bunny::RecordedExchangeBinding>
- #filtered_queue_bindings ⇒ Array<Bunny::RecordedQueueBinding>
- #has_more_consumers_on_queue?(consumers, name) ⇒ Boolean
- #has_more_destinations_bound_to_exchange?(queue_bindings, exchange_bindings, name) ⇒ Boolean
-
#initialize(opts = {}) ⇒ TopologyRegistry
constructor
A new instance of TopologyRegistry.
- #maybe_delete_recorded_auto_delete_exchange(name) ⇒ Object
- #maybe_delete_recorded_auto_delete_queue(name) ⇒ Object
- #record_exchange_binding_with(ch, source_name, destination_name, routing_key, arguments) ⇒ Object
- #record_queue_binding_with(ch, exchange_name, queue_name, routing_key, arguments) ⇒ Object
- #remove_recorded_bindings_with_exchange_destination(name) ⇒ Array<Bunny::RecordedBinding>
- #remove_recorded_bindings_with_queue_destination(name) ⇒ Array<Bunny::RecordedBinding>
- #remove_recorded_bindings_with_source(name) ⇒ Array<Bunny::RecordedBinding>
- #reset! ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ TopologyRegistry
Returns a new instance of TopologyRegistry.
34 35 36 37 38 |
# File 'lib/bunny/topology_registry.rb', line 34 def initialize(opts = {}) @filter = opts.fetch(:topology_recovery_filter, DefaultTopologyRecoveryFilter.new) self.reset! end |
Instance Attribute Details
#consumers ⇒ Hash<String, Bunny::RecordedConsumer> (readonly)
124 125 126 |
# File 'lib/bunny/topology_registry.rb', line 124 def consumers @consumers end |
#exchange_bindings ⇒ Set<Bunny::RecordedExchangeBinding> (readonly)
250 251 252 |
# File 'lib/bunny/topology_registry.rb', line 250 def exchange_bindings @exchange_bindings end |
#exchanges ⇒ Hash<String, Bunny::RecordedExchange> (readonly)
181 182 183 |
# File 'lib/bunny/topology_registry.rb', line 181 def exchanges @exchanges end |
#queue_bindings ⇒ Set<Bunny::RecordedQueueBinding> (readonly)
241 242 243 |
# File 'lib/bunny/topology_registry.rb', line 241 def queue_bindings @queue_bindings end |
#queues ⇒ Hash<String, Bunny::RecordedQueue> (readonly)
60 61 62 |
# File 'lib/bunny/topology_registry.rb', line 60 def queues @queues end |
Instance Method Details
#delete_recorded_consumer(consumer_tag) ⇒ Object
153 154 155 156 157 158 159 |
# File 'lib/bunny/topology_registry.rb', line 153 def delete_recorded_consumer(consumer_tag) @consumer_mutex.synchronize do if (val = @consumers.delete(consumer_tag)) self.maybe_delete_recorded_auto_delete_queue(val.queue_name) end end end |
#delete_recorded_exchange(exchange) ⇒ Object
211 212 213 |
# File 'lib/bunny/topology_registry.rb', line 211 def delete_recorded_exchange(exchange) self.delete_recorded_exchange_named(exchange.name) end |
#delete_recorded_exchange_binding(ch, source_name, destination_name, routing_key, arguments) ⇒ Object
308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/bunny/topology_registry.rb', line 308 def delete_recorded_exchange_binding(ch, source_name, destination_name, routing_key, arguments) b = RecordedExchangeBinding.new(ch) .with_source(source_name) .with_destination(destination_name) .with_routing_key(routing_key) .with_arguments(arguments) @binding_mutex.synchronize do if @exchange_bindings.delete?(b) self.maybe_delete_recorded_auto_delete_exchange(source_name) end end end |
#delete_recorded_exchange_named(name) ⇒ Object
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/bunny/topology_registry.rb', line 216 def delete_recorded_exchange_named(name) @exchange_mutex.synchronize do @exchanges.delete(name) bs1 = self.remove_recorded_bindings_with_source(name) bs2 = self.remove_recorded_bindings_with_exchange_destination(name) bs1.each do |b| self.maybe_delete_recorded_auto_delete_exchange(b.source) end bs2.each do |b| self.maybe_delete_recorded_auto_delete_exchange(b.source) end end end |
#delete_recorded_queue(queue) ⇒ Object
92 93 94 |
# File 'lib/bunny/topology_registry.rb', line 92 def delete_recorded_queue(queue) self.delete_recorded_queue_named(queue.name) end |
#delete_recorded_queue_binding(ch, exchange_name, queue_name, routing_key, arguments) ⇒ Object
278 279 280 281 282 283 284 285 286 |
# File 'lib/bunny/topology_registry.rb', line 278 def delete_recorded_queue_binding(ch, exchange_name, queue_name, routing_key, arguments) b = RecordedQueueBinding.new(ch) .with_source(exchange_name) .with_destination(queue_name) .with_routing_key(routing_key) .with_arguments(arguments) @binding_mutex.synchronize { @queue_bindings.delete(b) } end |
#delete_recorded_queue_named(name) ⇒ Object
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/bunny/topology_registry.rb', line 104 def delete_recorded_queue_named(name) @queue_mutex.synchronize do @queues.delete(name) bs = self.remove_recorded_bindings_with_queue_destination(name) bs.each do |b| self.maybe_delete_recorded_auto_delete_exchange(b.source) end end end |
#delete_recorded_queue_named_without_cascading(name) ⇒ Object
97 98 99 100 101 |
# File 'lib/bunny/topology_registry.rb', line 97 def delete_recorded_queue_named_without_cascading(name) @queue_mutex.synchronize do @queues.delete(name) end end |
#filtered_consumers ⇒ Array<Bunny::RecordedConsumer>
128 129 130 |
# File 'lib/bunny/topology_registry.rb', line 128 def filtered_consumers @filter.filter_consumers(@consumers.values) end |
#filtered_exchange_bindings ⇒ Array<Bunny::RecordedExchangeBinding>
254 255 256 |
# File 'lib/bunny/topology_registry.rb', line 254 def filtered_exchange_bindings @filter.filter_exchange_bindings(@exchange_bindings) end |
#filtered_exchanges ⇒ Array<Bunny::RecordedExchange>
185 186 187 |
# File 'lib/bunny/topology_registry.rb', line 185 def filtered_exchanges @filter.filter_exchanges(@exchanges.values) end |
#filtered_queue_bindings ⇒ Array<Bunny::RecordedQueueBinding>
245 246 247 |
# File 'lib/bunny/topology_registry.rb', line 245 def filtered_queue_bindings @filter.filter_queue_bindings(@queue_bindings) end |
#filtered_queues ⇒ Array<Bunny::RecordedQueue>
64 65 66 |
# File 'lib/bunny/topology_registry.rb', line 64 def filtered_queues @filter.filter_queues(@queues.values) end |
#has_more_consumers_on_queue?(consumers, name) ⇒ Boolean
419 420 421 422 423 |
# File 'lib/bunny/topology_registry.rb', line 419 def has_more_consumers_on_queue?(consumers, name) return false if consumers.empty? consumers.any? { |val| val.queue_name == name } end |
#has_more_destinations_bound_to_exchange?(queue_bindings, exchange_bindings, name) ⇒ Boolean
428 429 430 431 432 433 434 435 |
# File 'lib/bunny/topology_registry.rb', line 428 def has_more_destinations_bound_to_exchange?(queue_bindings, exchange_bindings, name) return false if queue_bindings.empty? && exchange_bindings.empty? condition_one = queue_bindings.any? { |val| val.source == name } condition_two = exchange_bindings.any? { |val| val.source == name } condition_one || condition_two end |
#maybe_delete_recorded_auto_delete_exchange(name) ⇒ Object
372 373 374 375 376 377 378 |
# File 'lib/bunny/topology_registry.rb', line 372 def maybe_delete_recorded_auto_delete_exchange(name) @exchange_mutex.synchronize do unless self.has_more_destinations_bound_to_exchange?(@queue_bindings.dup, @exchange_bindings.dup, name) self.delete_recorded_exchange_named(name) end end end |
#maybe_delete_recorded_auto_delete_queue(name) ⇒ Object
361 362 363 364 365 366 367 368 369 |
# File 'lib/bunny/topology_registry.rb', line 361 def maybe_delete_recorded_auto_delete_queue(name) @queue_mutex.synchronize do unless self.has_more_consumers_on_queue?(@consumers.values, name) if (q = @queues[name]) self.delete_recorded_queue(q) end end end end |
#record_consumer_with(ch, consumer_tag, queue_name, callable, manual_ack, exclusive, arguments) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/bunny/topology_registry.rb', line 139 def record_consumer_with(ch, consumer_tag, queue_name, callable, manual_ack, exclusive, arguments) @consumer_mutex.synchronize do cons = RecordedConsumer.new(ch, queue_name) .with_consumer_tag(consumer_tag) .with_callable(callable) .with_manual_ack(manual_ack) .with_exclusive(exclusive) .with_arguments(arguments) @consumers[consumer_tag] = cons end end |
#record_exchange(exchange) ⇒ Object
190 191 192 |
# File 'lib/bunny/topology_registry.rb', line 190 def record_exchange(exchange) @exchange_mutex.synchronize { @exchanges[exchange.name] = RecordedExchange::from(exchange) } end |
#record_exchange_binding_with(ch, source_name, destination_name, routing_key, arguments) ⇒ Object
293 294 295 296 297 298 299 300 301 |
# File 'lib/bunny/topology_registry.rb', line 293 def record_exchange_binding_with(ch, source_name, destination_name, routing_key, arguments) b = RecordedExchangeBinding.new(ch) .with_source(source_name) .with_destination(destination_name) .with_routing_key(routing_key) .with_arguments(arguments) @binding_mutex.synchronize { @exchange_bindings.add(b) } end |
#record_exchange_with(ch, name, type, durable, auto_delete, arguments) ⇒ Object
200 201 202 203 204 205 206 207 208 |
# File 'lib/bunny/topology_registry.rb', line 200 def record_exchange_with(ch, name, type, durable, auto_delete, arguments) exchange = RecordedExchange.new(ch, name) .with_type(type) .with_durable(durable) .with_auto_delete(auto_delete) .with_arguments(arguments) @exchange_mutex.synchronize { @exchanges[exchange.name] = exchange } end |
#record_queue(queue) ⇒ Object
69 70 71 |
# File 'lib/bunny/topology_registry.rb', line 69 def record_queue(queue) @queue_mutex.synchronize { @queues[queue.name] = RecordedQueue::from(queue) } end |
#record_queue_binding_with(ch, exchange_name, queue_name, routing_key, arguments) ⇒ Object
263 264 265 266 267 268 269 270 271 |
# File 'lib/bunny/topology_registry.rb', line 263 def record_queue_binding_with(ch, exchange_name, queue_name, routing_key, arguments) b = RecordedQueueBinding.new(ch) .with_source(exchange_name) .with_destination(queue_name) .with_routing_key(routing_key) .with_arguments(arguments) @binding_mutex.synchronize { @queue_bindings.add(b) } end |
#record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/bunny/topology_registry.rb', line 80 def record_queue_with(ch, name, server_named, durable, auto_delete, exclusive, arguments) queue = RecordedQueue.new(ch, name) .with_server_named(server_named) .with_durable(durable) .with_auto_delete(auto_delete) .with_exclusive(exclusive) .with_arguments(arguments) @queue_mutex.synchronize { @queues[queue.name] = queue } end |
#remove_recorded_bindings_with_exchange_destination(name) ⇒ Array<Bunny::RecordedBinding>
409 410 411 412 413 414 415 |
# File 'lib/bunny/topology_registry.rb', line 409 def remove_recorded_bindings_with_exchange_destination(name) @binding_mutex.synchronize do matches = self.exchange_bindings.filter { |b| b.destination == name } @exchange_bindings = Set.new(@exchange_bindings.reject { |b| b.destination == name }) matches end end |
#remove_recorded_bindings_with_queue_destination(name) ⇒ Array<Bunny::RecordedBinding>
399 400 401 402 403 404 405 |
# File 'lib/bunny/topology_registry.rb', line 399 def remove_recorded_bindings_with_queue_destination(name) @binding_mutex.synchronize do matches = self.queue_bindings.filter { |b| b.destination == name } @queue_bindings = Set.new(@queue_bindings.reject { |b| b.destination == name }) matches end end |
#remove_recorded_bindings_with_source(name) ⇒ Array<Bunny::RecordedBinding>
382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/bunny/topology_registry.rb', line 382 def remove_recorded_bindings_with_source(name) @binding_mutex.synchronize do matching_qbs = self.queue_bindings.filter { |b| b.source == name } matching_xbs = self.exchange_bindings.filter { |b| b.source == name } matches = matching_qbs + matching_xbs matches.each do |b| @queue_bindings.delete(b) @exchange_bindings.delete(b) end matches end end |
#reset! ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/bunny/topology_registry.rb', line 40 def reset! @queues = {} @exchanges = {} @queue_bindings = Set.new @exchange_bindings = Set.new @consumers = {} @queue_mutex = Monitor.new @exchange_mutex = Monitor.new @binding_mutex = Monitor.new @consumer_mutex = Monitor.new end |