Class: Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager
- Inherits:
-
Object
- Object
- Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager
- Includes:
- ExclusiveLeaseGuard
- Defined in:
- lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb
Constant Summary collapse
- LEASE_TIMEOUT =
10.seconds
- RESUME_PROCESSING_BATCH_SIZE =
1_000
Instance Attribute Summary collapse
-
#metadata_key ⇒ Object
readonly
Returns the value of attribute metadata_key.
-
#redis_key ⇒ Object
readonly
Returns the value of attribute redis_key.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Instance Method Summary collapse
- #add_to_queue!(job, context) ⇒ Object
- #has_jobs_in_queue? ⇒ Boolean
-
#initialize(worker_name:, prefix:) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #queue_size ⇒ Object
- #resume_processing!(limit:) ⇒ Object
Methods included from ExclusiveLeaseGuard
#exclusive_lease, #lease_release?, #lease_taken_log_level, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease
Constructor Details
#initialize(worker_name:, prefix:) ⇒ QueueManager
Returns a new instance of QueueManager.
14 15 16 17 18 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 14 def initialize(worker_name:, prefix:) @worker_name = worker_name @redis_key = "#{prefix}:throttled_jobs:{#{worker_name.underscore}}" @metadata_key = "#{prefix}:resume_meta:{#{worker_name.underscore}}" end |
Instance Attribute Details
#metadata_key ⇒ Object (readonly)
Returns the value of attribute metadata_key.
12 13 14 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 12 def @metadata_key end |
#redis_key ⇒ Object (readonly)
Returns the value of attribute redis_key.
12 13 14 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 12 def redis_key @redis_key end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
12 13 14 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 12 def worker_name @worker_name end |
Instance Method Details
#add_to_queue!(job, context) ⇒ Object
20 21 22 23 24 25 26 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 20 def add_to_queue!(job, context) with_redis do |redis| redis.rpush(@redis_key, serialize(job, context)) end deferred_job_counter.increment({ worker: @worker_name }) end |
#has_jobs_in_queue? ⇒ Boolean
32 33 34 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 32 def has_jobs_in_queue? queue_size != 0 end |
#queue_size ⇒ Object
28 29 30 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 28 def queue_size with_redis { |redis| redis.llen(@redis_key) } end |
#resume_processing!(limit:) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/gitlab/sidekiq_middleware/concurrency_limit/queue_manager.rb', line 36 def resume_processing!(limit:) try_obtain_lease do with_redis do |redis| jobs = next_batch_from_queue(redis, limit: limit) break if jobs.empty? bulk_send_to_processing_queue(jobs) remove_processed_jobs(redis, limit: jobs.length) jobs.length end end end |