Class: Gitlab::EventStore::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/gitlab/event_store/subscription.rb

Constant Summary collapse

DEFAULT_GROUP_SIZE =
10
SCHEDULING_BATCH_SIZE =
100
SCHEDULING_BATCH_DELAY =
10.seconds

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, condition, delay, group_size) ⇒ Subscription

Returns a new instance of Subscription.



12
13
14
15
16
17
# File 'lib/gitlab/event_store/subscription.rb', line 12

def initialize(worker, condition, delay, group_size)
  @worker = worker
  @condition = condition
  @delay = delay
  @group_size = group_size || DEFAULT_GROUP_SIZE
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.



10
11
12
# File 'lib/gitlab/event_store/subscription.rb', line 10

def condition
  @condition
end

#delayObject (readonly)

Returns the value of attribute delay.



10
11
12
# File 'lib/gitlab/event_store/subscription.rb', line 10

def delay
  @delay
end

#group_sizeObject (readonly)

Returns the value of attribute group_size.



10
11
12
# File 'lib/gitlab/event_store/subscription.rb', line 10

def group_size
  @group_size
end

#workerObject (readonly)

Returns the value of attribute worker.



10
11
12
# File 'lib/gitlab/event_store/subscription.rb', line 10

def worker
  @worker
end

Instance Method Details

#consume_event(event) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/gitlab/event_store/subscription.rb', line 19

def consume_event(event)
  return unless condition_met?(event)

  if delay
    worker.perform_in(delay, event.class.name, event.data.deep_stringify_keys.to_h)
  else
    worker.perform_async(event.class.name, event.data.deep_stringify_keys.to_h)
  end

  # We rescue and track any exceptions here because we don't want to
  # impact other subscribers if one is faulty.
  # The method `condition_met?`, since it can run a block, it might encounter
  # a bug. By raising an exception here we could interrupt the publishing
  # process, preventing other subscribers from consuming the event.
rescue StandardError => e
  Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event.class.name, event_data: event.data)
end

#consume_events(events) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/gitlab/event_store/subscription.rb', line 37

def consume_events(events)
  event_class = events.first.class
  unless events.all? { |e| e.class < Event && e.instance_of?(event_class) }
    raise InvalidEvent, "Events being published are not an instance of Gitlab::EventStore::Event"
  end

  matched_events = events.select { |event| condition_met?(event) }
  worker_args = events_worker_args(event_class, matched_events)

  # rubocop:disable Scalability/BulkPerformWithContext -- Context info is already available in `ApplicationContext` here.
  if worker_args.size > SCHEDULING_BATCH_SIZE
    # To reduce the number of concurrent jobs, we batch the group of events and add delay between each batch.
    # We add a delay of 1s as bulk_perform_in does not support 0s delay.
    worker.bulk_perform_in(delay || 1.second, worker_args, batch_size: SCHEDULING_BATCH_SIZE, batch_delay: SCHEDULING_BATCH_DELAY)
  elsif delay
    worker.bulk_perform_in(delay, worker_args)
  else
    worker.bulk_perform_async(worker_args)
  end
  # rubocop:enable Scalability/BulkPerformWithContext
rescue StandardError => e
  Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event_class, events: events.map(&:data))
end