Module: Bj::Runner::Instance_Methods

Defined in:
lib/bj/runner.rb

Instance Method Summary (collapse)

Instance Method Details

- (Object) archive_jobs



322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/bj/runner.rb', line 322

def archive_jobs
  Bj.transaction do
    now = Time.now
    too_old = now - Bj.ttl
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old]
    jobs.each do |job|
      Bj.logger.info{ "#{ job.title } - archived" }
      hash = job.to_hash.update(:archived_at => now)
      Bj::Table::JobArchive.create! hash 
      job.destroy
    end
  end
end

- (Object) fill_morgue



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/bj/runner.rb', line 300

def fill_morgue
  Bj.transaction do
    now = Time.now
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["state = 'running' and runner = ?", Bj.hostname]
    jobs.each do |job|
      if job.is_restartable?
        Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" }
        %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column|
          job[column] = nil
        end
        job.state = 'pending'
      else
        Bj.logger.info{ "#{ job.title } - found dead and bloated" }
        job.state = 'dead'
        job.finished_at = now
      end
      job.save!
    end
  end
end

- (Instance_Methods) initialize(options = {}, &block)

Returns a new instance of Instance_Methods

Returns:



150
151
152
153
# File 'lib/bj/runner.rb', line 150

def initialize options = {}, &block
  options.to_options!
  @options, @block = options, block
end

- (Object) install_signal_handlers



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/bj/runner.rb', line 269

def install_signal_handlers
  Runner.hup_signaled false
  hup_handler = nil
  hup_handler =
    trap Runner.hup_signal do |*a|
      begin
        Runner.hup_signaled true
      rescue Exception => e
        Bj.logger.error{ e } rescue nil
      end
      hup_handler.call *a rescue nil
    end

  Runner.kill_signaled false
  kill_handler = nil
  kill_handler =
    trap Runner.kill_signal do |*a|
      begin
        Runner.kill_signaled true
      rescue Exception => e
        Bj.logger.error{ e } rescue nil
      end
      kill_handler.call *a rescue nil
    end

  begin
    trap("INT"){ exit }
  rescue Exception
  end
end

- (Object) key



363
364
365
# File 'lib/bj/runner.rb', line 363

def key
  @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key )
end

- (Object) ping_parent



257
258
259
260
261
262
263
264
265
266
267
# File 'lib/bj/runner.rb', line 257

def ping_parent
  ppid = options[:ppid]
  return unless ppid 
  begin
    Process.kill 0, Integer(ppid)
  rescue Errno::ESRCH
    Kernel.exit 42
  rescue Exception
    42
  end
end

- (Object) register



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/bj/runner.rb', line 337

def register
  Bj.transaction do
    pid = Bj.config[key]
    return false if Util.alive?(pid)
    Bj.config[key] = Process.pid
    unless Bj.util.ipc_signals_supported? # not winblows
      require "drb"
      DRb.start_service "druby://localhost:0", Process
      Bj.config["#{ Process.pid }.uri"] = DRb.uri 
    end
  end
  at_exit{ unregister }
  true
rescue Exception
  false
end

- (Object) run



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/bj/runner.rb', line 155

def run
  wait = options[:wait] || 42
  limit = options[:limit]
  forever = options[:forever]

  limit = false if forever
  wait = Integer wait
  loopno = 0

  Runner.thread = Thread.current
  Bj.chroot

  register or exit!(EXIT::WARNING)

  Bj.logger.info{ "STARTED" }
  at_exit{ Bj.logger.info{ "STOPPED" } }

  fill_morgue
  install_signal_handlers

  loop do
    ping_parent

    loopno += 1
    break if(limit and loopno > limit)

    archive_jobs

    catch :no_jobs do
      loop do
        job = thread = stdout = stderr = nil

        Bj.transaction(options) do
          now = Time.now

          job = Bj::Table::Job.find :first,
                                    :conditions => ["state = ? and submitted_at <= ?", "pending", now],
                                    :order => "priority DESC, submitted_at ASC", 
                                    :limit => 1,
                                    :lock => true
          throw :no_jobs unless job

          job.state = "running"
          job.save!
          job.reload

          Bj.logger.info{ "#{ job.title } - starting..." }

          command = job.command
          env = job.env ? YAML.load(job.env) : {}
          stdin = job.stdin || ''
          stdout = job.stdout || ''
          stderr = job.stderr || ''

          started_at = Time.now

          thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr

          job.state = "running"
          job.runner = Bj.hostname
          job.pid = thread.pid
          job.started_at = started_at 
          job.save!
          job.reload
        end

        job.run!

=begin
        exit_status = thread.value
        finished_at = Time.now

        Bj.transaction(options) do
          job = Bj::Table::Job.find job.id 
          break unless job
          job.state = "finished"
          job.finished_at = finished_at 
          job.stdout = stdout
          job.stderr = stderr
          job.exit_status = exit_status
          job.save!
          job.reload
          Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
        end
=end

          Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
      end
    end

    Runner.hup_signaled false
    wait.times do
      break if Runner.hup_signaled?
      break if Runner.kill_signaled?
      sleep 1
    end 

    break unless(limit or limit == false)
    break if Runner.kill_signaled?
  end
end

- (Object) unregister



354
355
356
357
358
359
360
361
# File 'lib/bj/runner.rb', line 354

def unregister
  Bj.transaction do
    Bj.config.delete key
  end
  true
rescue Exception
  false
end