Class: Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager
- Inherits:
-
Object
- Object
- Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager
- Includes:
- ExclusiveLeaseGuard
- Defined in:
- lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb
Constant Summary collapse
- MAX_PROCESSING_TIME =
5.minutes
- LEASE_TIMEOUT =
MAX_PROCESSING_TIME + 2.seconds
- MAX_BATCH_SIZE =
5_000
Instance Attribute Summary collapse
-
#metadata_key ⇒ Object
readonly
Returns the value of attribute metadata_key.
-
#redis_key ⇒ Object
readonly
Returns the value of attribute redis_key.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Instance Method Summary collapse
- #add_to_queue!(job, context) ⇒ Object
- #has_jobs_in_queue? ⇒ Boolean
-
#initialize(worker_name:, prefix:) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #queue_size ⇒ Object
- #resume_processing! ⇒ Object
Methods included from ExclusiveLeaseGuard
#exclusive_lease, #lease_release?, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease
Constructor Details
#initialize(worker_name:, prefix:) ⇒ QueueManager
Returns a new instance of QueueManager.
15 16 17 18 19 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 15 def initialize(worker_name:, prefix:) @worker_name = worker_name @redis_key = "#{prefix}:throttled_jobs:{#{worker_name.underscore}}" @metadata_key = "#{prefix}:resume_meta:{#{worker_name.underscore}}" end |
Instance Attribute Details
#metadata_key ⇒ Object (readonly)
Returns the value of attribute metadata_key.
13 14 15 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 13 def @metadata_key end |
#redis_key ⇒ Object (readonly)
Returns the value of attribute redis_key.
13 14 15 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 13 def redis_key @redis_key end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
13 14 15 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 13 def worker_name @worker_name end |
Instance Method Details
#add_to_queue!(job, context) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 21 def add_to_queue!(job, context) with_redis do |redis| redis.rpush(@redis_key, serialize(job, context)) end deferred_job_counter.increment({ worker: @worker_name }) end |
#has_jobs_in_queue? ⇒ Boolean
33 34 35 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 33 def has_jobs_in_queue? queue_size != 0 end |
#queue_size ⇒ Object
29 30 31 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 29 def queue_size with_redis { |redis| redis.llen(@redis_key) } end |
#resume_processing! ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 37 def resume_processing! try_obtain_lease do with_redis do |redis| unless Feature.enabled?(:concurrency_limit_eager_resume_processing, :instance, type: :ops) resumed_jobs_count = resume_processing_once!(redis) break resumed_jobs_count end deadline = MAX_PROCESSING_TIME.from_now total_resumed_jobs = 0 while deadline.future? resumed_jobs_count = resume_processing_once!(redis) break if resumed_jobs_count == 0 total_resumed_jobs += resumed_jobs_count end total_resumed_jobs end end end |