Class: Gitlab::EventStore::Subscription
- Inherits:
-
Object
- Object
- Gitlab::EventStore::Subscription
- Defined in:
- lib/gitlab/event_store/subscription.rb
Constant Summary collapse
- DEFAULT_GROUP_SIZE =
10
- SCHEDULING_BATCH_SIZE =
100
- SCHEDULING_BATCH_DELAY =
10.seconds
Instance Attribute Summary collapse
-
#condition ⇒ Object
readonly
Returns the value of attribute condition.
-
#delay ⇒ Object
readonly
Returns the value of attribute delay.
-
#group_size ⇒ Object
readonly
Returns the value of attribute group_size.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
- #consume_event(event) ⇒ Object
- #consume_events(events) ⇒ Object
-
#initialize(worker, condition, delay, group_size) ⇒ Subscription
constructor
A new instance of Subscription.
Constructor Details
#initialize(worker, condition, delay, group_size) ⇒ Subscription
Returns a new instance of Subscription.
12 13 14 15 16 17 |
# File 'lib/gitlab/event_store/subscription.rb', line 12 def initialize(worker, condition, delay, group_size) @worker = worker @condition = condition @delay = delay @group_size = group_size || DEFAULT_GROUP_SIZE end |
Instance Attribute Details
#condition ⇒ Object (readonly)
Returns the value of attribute condition.
10 11 12 |
# File 'lib/gitlab/event_store/subscription.rb', line 10 def condition @condition end |
#delay ⇒ Object (readonly)
Returns the value of attribute delay.
10 11 12 |
# File 'lib/gitlab/event_store/subscription.rb', line 10 def delay @delay end |
#group_size ⇒ Object (readonly)
Returns the value of attribute group_size.
10 11 12 |
# File 'lib/gitlab/event_store/subscription.rb', line 10 def group_size @group_size end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
10 11 12 |
# File 'lib/gitlab/event_store/subscription.rb', line 10 def worker @worker end |
Instance Method Details
#consume_event(event) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/gitlab/event_store/subscription.rb', line 19 def consume_event(event) return unless condition_met?(event) if delay worker.perform_in(delay, event.class.name, event.data.deep_stringify_keys.to_h) else worker.perform_async(event.class.name, event.data.deep_stringify_keys.to_h) end # We rescue and track any exceptions here because we don't want to # impact other subscribers if one is faulty. # The method `condition_met?`, since it can run a block, it might encounter # a bug. By raising an exception here we could interrupt the publishing # process, preventing other subscribers from consuming the event. rescue StandardError => e Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event.class.name, event_data: event.data) end |
#consume_events(events) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/gitlab/event_store/subscription.rb', line 37 def consume_events(events) event_class = events.first.class unless events.all? { |e| e.class < Event && e.instance_of?(event_class) } raise InvalidEvent, "Events being published are not an instance of Gitlab::EventStore::Event" end matched_events = events.select { |event| condition_met?(event) } worker_args = events_worker_args(event_class, matched_events) # rubocop:disable Scalability/BulkPerformWithContext -- Context info is already available in `ApplicationContext` here. if worker_args.size > SCHEDULING_BATCH_SIZE # To reduce the number of concurrent jobs, we batch the group of events and add delay between each batch. # We add a delay of 1s as bulk_perform_in does not support 0s delay. worker.bulk_perform_in(delay || 1.second, worker_args, batch_size: SCHEDULING_BATCH_SIZE, batch_delay: SCHEDULING_BATCH_DELAY) elsif delay worker.bulk_perform_in(delay, worker_args) else worker.bulk_perform_async(worker_args) end # rubocop:enable Scalability/BulkPerformWithContext rescue StandardError => e Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event_class, events: events.map(&:data)) end |