Class: Que::Scheduler::ActiveJobType
- Defined in:
- lib/que/scheduler/to_enqueue.rb
Overview
For jobs of type ActiveJob
Class Method Summary collapse
-
.extract_scheduled_at(scheduled_at) ⇒ Object
ActiveJob scheduled_at is returned as a float, or a string post Rails 7.1, and we want a Time for consistency.
Instance Method Summary collapse
Methods inherited from ToEnqueue
active_job_defined?, active_job_version, active_job_version_supports_queues?, create, valid_job_class?
Class Method Details
.extract_scheduled_at(scheduled_at) ⇒ Object
ActiveJob scheduled_at is returned as a float, or a string post Rails 7.1, and we want a Time for consistency
138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/que/scheduler/to_enqueue.rb', line 138 def extract_scheduled_at(scheduled_at) # rubocop:disable Style/EmptyElse if scheduled_at.is_a?(Float) Que::Scheduler::TimeZone.time_zone.at(scheduled_at) elsif scheduled_at.is_a?(String) Que::Scheduler::TimeZone.time_zone.parse(scheduled_at) else nil end # rubocop:enable Style/EmptyElse end |
Instance Method Details
#calculate_enqueued_values(job) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/que/scheduler/to_enqueue.rb', line 94 def calculate_enqueued_values(job) # Now read the just inserted job back out of the DB to get the actual values that will # be used when the job is worked. data = JSON.parse(job.to_json, symbolize_names: true) scheduled_at = self.class.extract_scheduled_at(data[:scheduled_at]) # Rails didn't support queues for ActiveJob for a while used_queue = data[:queue_name] if ToEnqueue.active_job_version_supports_queues? # We can't get the priority out of the DB, as the returned `job` doesn't give us access # to the underlying ActiveJob that was scheduled. We have no option but to assume # it was what we told it to use. If no priority was specified, we must assume it was # the Que default, which is 100 t.ly/1jRK5 assume_used_priority = priority.nil? ? 100 : priority { args: data.fetch(:arguments), queue: used_queue, priority: assume_used_priority, run_at: scheduled_at, job_class: job_class.to_s, job_id: data.fetch(:provider_job_id), } end |
#enqueue ⇒ Object
85 86 87 88 89 90 91 92 |
# File 'lib/que/scheduler/to_enqueue.rb', line 85 def enqueue job = enqueue_active_job return nil if job.nil? || !job # nil in Rails < 6.1, false after. enqueued_values = calculate_enqueued_values(job) EnqueuedJobType.new(enqueued_values) end |
#enqueue_active_job ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/que/scheduler/to_enqueue.rb', line 120 def enqueue_active_job job_settings = { priority: priority, wait_until: run_at, queue: queue || Que::Scheduler::VersionSupport.default_scheduler_queue, }.compact job_class_set = job_class.set(**job_settings) if args.is_a?(Hash) job_class_set.perform_later(**args) else job_class_set.perform_later(*args) end end |