Module: Resque::Scheduler::DelayingExtensions

Included in:
Extension
Defined in:
lib/resque/scheduler/delaying_extensions.rb

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject


284
285
286
287
288
289
290
# File 'lib/resque/scheduler/delaying_extensions.rb', line 284

def count_all_scheduled_jobs
  total_jobs = 0
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts|
    total_jobs += redis.llen("delayed:#{ts}").to_i
  end
  total_jobs
end

#delayed?(klass, *args) ⇒ Boolean

Discover if a job has been delayed. Examples

Resque.delayed?(MyJob)
Resque.delayed?(MyJob, id: 1)

Returns true if the job has been delayed

Returns:

  • (Boolean)

297
298
299
# File 'lib/resque/scheduler/delaying_extensions.rb', line 297

def delayed?(klass, *args)
  !scheduled_at(klass, *args).empty?
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it's the first job to be scheduled at that time, else false


64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/resque/scheduler/delaying_extensions.rb', line 64

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Store the timestamps at with this item occurs
  redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count


78
79
80
81
82
# File 'lib/resque/scheduler/delaying_extensions.rb', line 78

def delayed_queue_peek(start, count)
  result = redis.zrange(:delayed_queue_schedule, start,
                        start + count - 1)
  Array(result).map(&:to_i)
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule


85
86
87
# File 'lib/resque/scheduler/delaying_extensions.rb', line 85

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp


96
97
98
99
100
101
102
103
# File 'lib/resque/scheduler/delaying_extensions.rb', line 96

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule


91
92
93
# File 'lib/resque/scheduler/delaying_extensions.rb', line 91

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.


13
14
15
16
17
18
# File 'lib/resque/scheduler/delaying_extensions.rb', line 13

def enqueue_at(timestamp, klass, *args)
  validate(klass)
  enqueue_at_with_queue(
    queue_from_class(klass), timestamp, klass, *args
  )
end

#enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object

Identical to enqueue_at, except you can also specify a queue in which the job will be placed after the timestamp has passed. It respects Resque.inline option, by creating the job right away instead of adding to the queue.


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/resque/scheduler/delaying_extensions.rb', line 24

def enqueue_at_with_queue(queue, timestamp, klass, *args)
  return false unless plugin.run_before_schedule_hooks(klass, *args)

  if Resque.inline? || timestamp.to_i < Time.now.to_i
    # Just create the job and let resque perform it right away with
    # inline.  If the class is a custom job class, call self#scheduled
    # on it. This allows you to do things like
    # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1).
    # Otherwise, pass off to Resque.
    if klass.respond_to?(:scheduled)
      klass.scheduled(queue, klass.to_s, *args)
    else
      Resque::Job.create(queue, klass, *args)
    end
  else
    delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
  end

  plugin.run_after_schedule_hooks(klass, *args)
end

#enqueue_delayed(klass, *args) ⇒ Object

Given an encoded item, enqueue it now


147
148
149
150
151
152
# File 'lib/resque/scheduler/delaying_extensions.rb', line 147

def enqueue_delayed(klass, *args)
  hash = job_to_hash(klass, args)
  remove_delayed(klass, *args).times do
    Resque::Scheduler.enqueue_from_config(hash)
  end
end

#enqueue_delayed_selection(klass = nil) ⇒ Object

Given a block, enqueue jobs now that return true from a block

This allows for enqueuing of delayed jobs that have arguments matching certain criteria


230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/resque/scheduler/delaying_extensions.rb', line 230

def enqueue_delayed_selection(klass = nil)
  fail ArgumentError, 'Please supply a block' unless block_given?

  found_jobs = find_delayed_selection(klass) do |payload|
    yield(payload['args'])
  end
  found_jobs.reduce(0) do |sum, encoded_job|
    decoded_job = decode(encoded_job)
    klass = Util.constantize(decoded_job['class'])
    sum + enqueue_delayed(klass, *decoded_job['args'])
  end
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.


47
48
49
# File 'lib/resque/scheduler/delaying_extensions.rb', line 47

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_in, except you can also specify a queue in which the job will be placed after the number of seconds has passed.


54
55
56
57
58
# File 'lib/resque/scheduler/delaying_extensions.rb', line 54

def enqueue_in_with_queue(queue, number_of_seconds_from_now,
                          klass, *args)
  enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now,
                        klass, *args)
end

#find_delayed_selection(klass = nil, &block) ⇒ Object

Given a block, find jobs that return true from a block

This allows for finding of delayed jobs that have arguments matching certain criteria


247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/resque/scheduler/delaying_extensions.rb', line 247

def find_delayed_selection(klass = nil, &block)
  fail ArgumentError, 'Please supply a block' unless block_given?

  found_jobs = []
  start = nil
  while start = search_first_delayed_timestamp_in_range(start, nil)
    job = "delayed:#{start}"
    start += 1
    index = Resque.redis.llen(job) - 1
    while index >= 0
      payload = Resque.redis.lindex(job, index)
      decoded_payload = decode(payload)
      if payload_matches_selection?(decoded_payload, klass, &block)
        found_jobs.push(payload)
      end
      index -= 1
    end
  end
  found_jobs
end

#get_last_enqueued_at(job_name) ⇒ Object


313
314
315
# File 'lib/resque/scheduler/delaying_extensions.rb', line 313

def get_last_enqueued_at(job_name)
  redis.hget('delayed:last_enqueued_at', job_name)
end

#last_enqueued_at(job_name, date) ⇒ Object


309
310
311
# File 'lib/resque/scheduler/delaying_extensions.rb', line 309

def last_enqueued_at(job_name, date)
  redis.hset('delayed:last_enqueued_at', job_name, date)
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don't call directly)


107
108
109
# File 'lib/resque/scheduler/delaying_extensions.rb', line 107

def next_delayed_timestamp(at_time = nil)
  search_first_delayed_timestamp_in_range(nil, at_time || Time.now)
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don't call directly) timestamp can either be in seconds or a datetime


114
115
116
117
118
119
120
121
122
123
124
# File 'lib/resque/scheduler/delaying_extensions.rb', line 114

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  encoded_item = redis.lpop(key)
  redis.srem("timestamps:#{encoded_item}", key)
  item = decode(encoded_item)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#remove_delayed(klass, *args) ⇒ Object

Given an encoded item, remove it from the delayed_queue


141
142
143
144
# File 'lib/resque/scheduler/delaying_extensions.rb', line 141

def remove_delayed(klass, *args)
  search = encode(job_to_hash(klass, args))
  remove_delayed_job(search)
end

#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object

Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.

O(N) where N is the number of jobs scheduled to fire at the given timestamp


273
274
275
276
277
278
279
280
281
282
# File 'lib/resque/scheduler/delaying_extensions.rb', line 273

def remove_delayed_job_from_timestamp(timestamp, klass, *args)
  key = "delayed:#{timestamp.to_i}"
  encoded_job = encode(job_to_hash(klass, args))

  redis.srem("timestamps:#{encoded_job}", key)
  count = redis.lrem(key, 0, encoded_job)
  clean_up_timestamp(key, timestamp)

  count
end

#remove_delayed_selection(klass = nil) ⇒ Object

Given a block, remove jobs that return true from a block

This allows for removal of delayed jobs that have arguments matching certain criteria

Give you only the arguments passed to the jobs at its creation :

For example, with the following job:

  Resque.enqueue_at(
    5.days.from_now,
    SendFollowUpEmail,
    :account_id => 0,
    :user_id => 1
  )

It gives you this as parameter for your block:

  [{"account_id": 0, "user_id": 1}]

173
174
175
176
177
178
179
180
181
# File 'lib/resque/scheduler/delaying_extensions.rb', line 173

def remove_delayed_selection(klass = nil)
  fail ArgumentError, 'Please supply a block' unless block_given?

  abstract_remove_delayed_selection(
    find_delayed_selection(klass) do |payload|
      yield(payload['args'])
    end
  )
end

#remove_delayed_selection_with_all_job_infosObject

Given a block, remove jobs that return true from a block

This allows for removal of delayed jobs matching certain criteria

Give you all the infos of the job.

For example, with the following job:

  Resque.enqueue_at(
    5.days.from_now,
    SendFollowUpEmail,
    :account_id => 0,
    :user_id => 1
  )

It gives you this as parameter for your block:

  {
    "class": "SendFollowUpEmail",
    "args": [{"account_id": 1, "user_id": 1}],
    "queue": "queue_name"
  }

Usefull if, in your passed block, you want to match by class and/or queue (not only args) !

For example:

Resque.remove_delayed_selection_with_all_job_infos { |job|
  [SendFollowUpEmail, SendFollowUpSms].any? { |klass|
    klass.to_s == job["class"]
  } && job["args"][0]['account_id'] == current_account.id]
}

216
217
218
219
220
221
222
223
224
# File 'lib/resque/scheduler/delaying_extensions.rb', line 216

def remove_delayed_selection_with_all_job_infos
  fail ArgumentError, 'Please supply a block' unless block_given?

  abstract_remove_delayed_selection(
    find_delayed_selection do |payload|
      yield(payload)
    end
  )
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in


127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/resque/scheduler/delaying_extensions.rb', line 127

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    key = "delayed:#{item}"
    items = redis.lrange(key, 0, -1)
    redis.pipelined do
      items.each { |ts_item| redis.del("timestamps:#{ts_item}") }
    end
    redis.del key
  end

  redis.del :delayed_queue_schedule
end

#scheduled_at(klass, *args) ⇒ Object

Returns delayed jobs schedule timestamp for klass, args.


302
303
304
305
306
307
# File 'lib/resque/scheduler/delaying_extensions.rb', line 302

def scheduled_at(klass, *args)
  search = encode(job_to_hash(klass, args))
  redis.smembers("timestamps:#{search}").map do |key|
    key.tr('delayed:', '').to_i
  end
end