Class: Sidekiq::Queue
Overview
Represents a queue within Sidekiq. Allows enumeration of all jobs within the queue and deletion of jobs. NB: this queue data is real-time and is changing within Redis moment by moment.
queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
job.klass # => 'MyWorker'
job.args # => [1, 2, 3]
job.delete if job.jid == 'abcdef1234567890'
end
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
-
.all ⇒ Array<Sidekiq::Queue>
Fetch all known queues within Redis.
Instance Method Summary collapse
-
#clear ⇒ Boolean
(also: #💣)
delete all jobs within this queue.
- #each ⇒ Object
-
#find_job(jid) ⇒ Sidekiq::JobRecord?
Find the job with the given JID within this queue.
-
#initialize(name = "default") ⇒ Queue
constructor
A new instance of Queue.
-
#latency ⇒ Float
Calculates this queue’s latency, the difference in seconds since the oldest job in the queue was enqueued.
-
#paused? ⇒ Boolean
If the queue is currently paused.
-
#size ⇒ Integer
The current size of the queue within Redis.
Constructor Details
#initialize(name = "default") ⇒ Queue
Returns a new instance of Queue.
250 251 252 253 |
# File 'lib/sidekiq/api.rb', line 250 def initialize(name = "default") @name = name.to_s @rname = "queue:#{name}" end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
247 248 249 |
# File 'lib/sidekiq/api.rb', line 247 def name @name end |
Class Method Details
.all ⇒ Array<Sidekiq::Queue>
Fetch all known queues within Redis.
243 244 245 |
# File 'lib/sidekiq/api.rb', line 243 def self.all Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } end |
Instance Method Details
#clear ⇒ Boolean Also known as: 💣
delete all jobs within this queue
331 332 333 334 335 336 337 338 339 |
# File 'lib/sidekiq/api.rb', line 331 def clear Sidekiq.redis do |conn| conn.multi do |transaction| transaction.unlink(@rname) transaction.srem("queues", [name]) end end true end |
#each ⇒ Object
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/sidekiq/api.rb', line 295 def each initial_size = size deleted_size = 0 page = 0 page_size = 50 loop do range_start = page * page_size - deleted_size range_end = range_start + page_size - 1 entries = Sidekiq.redis { |conn| conn.lrange @rname, range_start, range_end } break if entries.empty? page += 1 entries.each do |entry| yield JobRecord.new(entry, @name) end deleted_size = initial_size - size end end |
#find_job(jid) ⇒ Sidekiq::JobRecord?
Find the job with the given JID within this queue.
This is a *slow, inefficient* operation. Do not use under normal conditions.
325 326 327 |
# File 'lib/sidekiq/api.rb', line 325 def find_job(jid) detect { |j| j.jid == jid } end |
#latency ⇒ Float
Calculates this queue’s latency, the difference in seconds since the oldest job in the queue was enqueued.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/sidekiq/api.rb', line 273 def latency entry = Sidekiq.redis { |conn| conn.lindex(@rname, -1) } return 0.0 unless entry job = Sidekiq.load_json(entry) enqueued_at = job["enqueued_at"] if enqueued_at if enqueued_at.is_a?(Float) # old format now = Time.now.to_f now - enqueued_at else now = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) (now - enqueued_at) / 1000.0 end else 0.0 end end |
#paused? ⇒ Boolean
Returns if the queue is currently paused.
264 265 266 |
# File 'lib/sidekiq/api.rb', line 264 def paused? false end |
#size ⇒ Integer
The current size of the queue within Redis. This value is real-time and can change between calls.
259 260 261 |
# File 'lib/sidekiq/api.rb', line 259 def size Sidekiq.redis { |con| con.llen(@rname) } end |