Class: Que::Scheduler::ActiveJobType

Inherits:
ToEnqueue
  • Object
show all
Defined in:
lib/que/scheduler/to_enqueue.rb

Overview

For jobs of type ActiveJob

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from ToEnqueue

active_job_defined?, active_job_version, active_job_version_supports_queues?, create, valid_job_class?

Class Method Details

.extract_scheduled_at(scheduled_at) ⇒ Object

ActiveJob scheduled_at is returned as a float, or a string post Rails 7.1, and we want a Time for consistency



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/que/scheduler/to_enqueue.rb', line 138

def extract_scheduled_at(scheduled_at)
  # rubocop:disable Style/EmptyElse
  if scheduled_at.is_a?(Float)
    Que::Scheduler::TimeZone.time_zone.at(scheduled_at)
  elsif scheduled_at.is_a?(String)
    Que::Scheduler::TimeZone.time_zone.parse(scheduled_at)
  else
    nil
  end
  # rubocop:enable Style/EmptyElse
end

Instance Method Details

#calculate_enqueued_values(job) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/que/scheduler/to_enqueue.rb', line 94

def calculate_enqueued_values(job)
  # Now read the just inserted job back out of the DB to get the actual values that will
  # be used when the job is worked.
  data = JSON.parse(job.to_json, symbolize_names: true)

  scheduled_at = self.class.extract_scheduled_at(data[:scheduled_at])

  # Rails didn't support queues for ActiveJob for a while
  used_queue = data[:queue_name] if ToEnqueue.active_job_version_supports_queues?

  # We can't get the priority out of the DB, as the returned `job` doesn't give us access
  # to the underlying ActiveJob that was scheduled. We have no option but to assume
  # it was what we told it to use. If no priority was specified, we must assume it was
  # the Que default, which is 100 t.ly/1jRK5
  assume_used_priority = priority.nil? ? 100 : priority

  {
    args: data.fetch(:arguments),
    queue: used_queue,
    priority: assume_used_priority,
    run_at: scheduled_at,
    job_class: job_class.to_s,
    job_id: data.fetch(:provider_job_id),
  }
end

#enqueueObject



85
86
87
88
89
90
91
92
# File 'lib/que/scheduler/to_enqueue.rb', line 85

def enqueue
  job = enqueue_active_job

  return nil if job.nil? || !job # nil in Rails < 6.1, false after.

  enqueued_values = calculate_enqueued_values(job)
  EnqueuedJobType.new(enqueued_values)
end

#enqueue_active_jobObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/que/scheduler/to_enqueue.rb', line 120

def enqueue_active_job
  job_settings = {
    priority: priority,
    wait_until: run_at,
    queue: queue || Que::Scheduler::VersionSupport.default_scheduler_queue,
  }.compact

  job_class_set = job_class.set(**job_settings)
  if args.is_a?(Hash)
    job_class_set.perform_later(**args)
  else
    job_class_set.perform_later(*args)
  end
end