Class: Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager

Inherits:
Object
  • Object
show all
Includes:
ExclusiveLeaseGuard
Defined in:
lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb

Constant Summary collapse

LEASE_TIMEOUT =
10.seconds
RESUME_PROCESSING_BATCH_SIZE =
1_000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ExclusiveLeaseGuard

#exclusive_lease, #lease_release?, #lease_taken_log_level, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease

Constructor Details

#initialize(worker_name:, prefix:) ⇒ QueueManager

Returns a new instance of QueueManager.



14
15
16
17
18
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 14

def initialize(worker_name:, prefix:)
  @worker_name = worker_name
  @redis_key = "#{prefix}:throttled_jobs:{#{worker_name.underscore}}"
  @metadata_key = "#{prefix}:resume_meta:{#{worker_name.underscore}}"
end

Instance Attribute Details

#metadata_keyObject (readonly)

Returns the value of attribute metadata_key.



12
13
14
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 12

def 
  @metadata_key
end

#redis_keyObject (readonly)

Returns the value of attribute redis_key.



12
13
14
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 12

def redis_key
  @redis_key
end

#worker_nameObject (readonly)

Returns the value of attribute worker_name.



12
13
14
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 12

def worker_name
  @worker_name
end

Instance Method Details

#add_to_queue!(job, context) ⇒ Object



20
21
22
23
24
25
26
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 20

def add_to_queue!(job, context)
  with_redis do |redis|
    redis.rpush(@redis_key, serialize(job, context))
  end

  deferred_job_counter.increment({ worker: @worker_name })
end

#has_jobs_in_queue?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 32

def has_jobs_in_queue?
  queue_size != 0
end

#queue_sizeObject



28
29
30
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 28

def queue_size
  with_redis { |redis| redis.llen(@redis_key) }
end

#resume_processing!(limit:) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 36

def resume_processing!(limit:)
  try_obtain_lease do
    with_redis do |redis|
      jobs = next_batch_from_queue(redis, limit: limit)
      break if jobs.empty?

      bulk_send_to_processing_queue(jobs)
      remove_processed_jobs(redis, limit: jobs.length)

      jobs.length
    end
  end
end