Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Defined in:
lib/miga/daemon.rb

Overview

MiGA Daemons handling job submissions.

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, initialized?, #result_files_exist?, root_path, tabulate

Constructor Details

#initialize(project) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.


35
36
37
38
39
40
41
42
43
# File 'lib/miga/daemon.rb', line 35

def initialize(project)
  @project = project
  @runopts = JSON.parse(
    File.read(File.expand_path("daemon/daemon.json", project.path)),
      {:symbolize_names=>true})
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running.


28
29
30
# File 'lib/miga/daemon.rb', line 28

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed.


26
27
28
# File 'lib/miga/daemon.rb', line 26

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration.


30
31
32
# File 'lib/miga/daemon.rb', line 30

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon.


24
25
26
# File 'lib/miga/daemon.rb', line 24

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running.


22
23
24
# File 'lib/miga/daemon.rb', line 22

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.


15
16
17
18
19
# File 'lib/miga/daemon.rb', line 15

def self.last_alive(project)
  f = File.expand_path("daemon/alive", project.path)
  return nil unless File.size? f
  DateTime.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets


122
123
124
125
126
127
128
129
130
131
132
# File 'lib/miga/daemon.rb', line 122

def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project."
      project.load
    else
      to_run = ds.next_preprocessing(true)
      queue_job(to_run, ds) unless to_run.nil?
    end
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks


137
138
139
140
141
142
143
144
# File 'lib/miga/daemon.rb', line 137

def check_project
  return if project.dataset_names.empty?
  if project.done_preprocessing?(false)
    to_run = project.next_distances(true)
    to_run = project.next_inclade(true) if to_run.nil?
    queue_job(to_run) unless to_run.nil?
  end
end

#daemon(task, opts = []) ⇒ Object

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.


105
106
107
108
109
110
# File 'lib/miga/daemon.rb', line 105

def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) { loop { in_loop } }
end

#declare_aliveObject

Tell the world that you're alive


114
115
116
117
118
# File 'lib/miga/daemon.rb', line 114

def declare_alive
  f = File.open(File.expand_path("daemon/alive", project.path), "w")
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.


54
55
56
57
# File 'lib/miga/daemon.rb', line 54

def default_options
  { dir_mode: :normal, dir: File.expand_path("daemon", project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.


192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/miga/daemon.rb', line 192

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false)
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil?
    r.nil?
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    job = @jobs_to_run.shift
    if runopts(:type) == "bash"
      job[:pid] = spawn job[:cmd]
      Process.detach job[:pid]
    else
      job[:pid] = `#{job[:cmd]}`.chomp
    end
    @jobs_running << job
    say "Spawned pid:#{job[:pid]} for #{job[:task_name]}."
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.


179
180
181
182
183
184
185
186
187
# File 'lib/miga/daemon.rb', line 179

def get_job(job, ds=nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds==nil
      j[:ds].nil? and j[:job]==job
    else
      (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job
    end
  end
end

#in_loopObject

Run one loop step.


226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/miga/daemon.rb', line 226

def in_loop
  if loop_i == -1
    say "-----------------------------------"
    say "MiGA:#{project.name} launched."
    say "-----------------------------------"
    @loop_i = 0
  end
  @loop_i += 1
  declare_alive
  check_datasets
  check_project
  flush!
  if loop_i==12
    say "Housekeeping for sanity"
    @loop_i = 0
    purge!
    project.load
  end
  sleep(latency)
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns DateTime.


48
49
50
# File 'lib/miga/daemon.rb', line 48

def last_alive
  MiGA::Daemon.last_alive project
end

#latencyObject

Returns Integer indicating the number of seconds to sleep between checks.


76
# File 'lib/miga/daemon.rb', line 76

def latency() runopts(:latency) ; end

#maxjobsObject

Returns Integer indicating the maximum number of concurrent jobs to run.


80
# File 'lib/miga/daemon.rb', line 80

def maxjobs() runopts(:maxjobs) ; end

#ppnObject

Returns Integer indicating the number of CPUs per job.


84
# File 'lib/miga/daemon.rb', line 84

def ppn() runopts(:ppn) ; end

#purge!Object

Remove dead jobs.


218
219
220
221
222
# File 'lib/miga/daemon.rb', line 218

def purge!
  @jobs_running.select! do |job|
    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.


150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/miga/daemon.rb', line 150

def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? "miga-project" : ds.name)
  say "Queueing ", ds_name, ":#{job}"
  vars = { "PROJECT"=>project.path, "RUNTYPE"=>runopts(:type),
    "CORES"=>ppn, "MIGA"=>MiGA::MiGA.root_path }
  vars["DATASET"] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      File.expand_path("scripts/#{job}.bash", vars["MIGA"]),
      # 2: vars
      vars.keys.map { |k|
 sprintf(runopts(:var), k, vars[k]) }.join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end

#restart(opts = []) ⇒ Object

Restarts the daemon with opts.


96
# File 'lib/miga/daemon.rb', line 96

def restart(opts=[]) daemon("restart", opts) ; end

#runopts(k, v = nil, force = false) ⇒ Object

Set/get #options, where k is the Symbol of the option and v is the value (or nil to use as getter). Skips consistency tests if force. Returns new value.


63
64
65
66
67
68
69
70
71
72
# File 'lib/miga/daemon.rb', line 63

def runopts(k, v=nil, force=false)
  k = k.to_sym
  unless v.nil?
    v = v.to_i if [:latency, :maxjobs, :ppn].include? k
    raise "Daemon's #{k} cannot be set to zero." if
      !force and v.is_a? Integer and v==0
    @runopts[k] = v
  end
  @runopts[k]
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.


249
250
251
# File 'lib/miga/daemon.rb', line 249

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#start(opts = []) ⇒ Object

Initializes the daemon with opts.


88
# File 'lib/miga/daemon.rb', line 88

def start(opts=[]) daemon("start", opts) ; end

#status(opts = []) ⇒ Object

Returns the status of the daemon with opts.


100
# File 'lib/miga/daemon.rb', line 100

def status(opts=[]) daemon("status", opts) ; end

#stop(opts = []) ⇒ Object

Stops the daemon with opts.


92
# File 'lib/miga/daemon.rb', line 92

def stop(opts=[]) daemon("stop", opts) ; end