Class: Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager

Inherits:
Object
  • Object
show all
Includes:
ExclusiveLeaseGuard
Defined in:
lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb

Constant Summary collapse

MAX_PROCESSING_TIME =
5.minutes
LEASE_TIMEOUT =
MAX_PROCESSING_TIME + 2.seconds
MAX_BATCH_SIZE =
5_000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ExclusiveLeaseGuard

#exclusive_lease, #lease_release?, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease

Constructor Details

#initialize(worker_name:, prefix:) ⇒ QueueManager

Returns a new instance of QueueManager.



15
16
17
18
19
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 15

def initialize(worker_name:, prefix:)
  @worker_name = worker_name
  @redis_key = "#{prefix}:throttled_jobs:{#{worker_name.underscore}}"
  @metadata_key = "#{prefix}:resume_meta:{#{worker_name.underscore}}"
end

Instance Attribute Details

#metadata_keyObject (readonly)

Returns the value of attribute metadata_key.



13
14
15
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 13

def 
  @metadata_key
end

#redis_keyObject (readonly)

Returns the value of attribute redis_key.



13
14
15
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 13

def redis_key
  @redis_key
end

#worker_nameObject (readonly)

Returns the value of attribute worker_name.



13
14
15
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 13

def worker_name
  @worker_name
end

Instance Method Details

#add_to_queue!(job, context) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 21

def add_to_queue!(job, context)
  with_redis do |redis|
    redis.rpush(@redis_key, serialize(job, context))
  end

  deferred_job_counter.increment({ worker: @worker_name })
end

#has_jobs_in_queue?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 33

def has_jobs_in_queue?
  queue_size != 0
end

#queue_sizeObject



29
30
31
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 29

def queue_size
  with_redis { |redis| redis.llen(@redis_key) }
end

#resume_processing!Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 37

def resume_processing!
  try_obtain_lease do
    with_redis do |redis|
      unless Feature.enabled?(:concurrency_limit_eager_resume_processing, :instance, type: :ops)
        resumed_jobs_count = resume_processing_once!(redis)
        break resumed_jobs_count
      end

      deadline = MAX_PROCESSING_TIME.from_now
      total_resumed_jobs = 0
      while deadline.future?
        resumed_jobs_count = resume_processing_once!(redis)
        break if resumed_jobs_count == 0

        total_resumed_jobs += resumed_jobs_count
      end
      total_resumed_jobs
    end
  end
end