Class: Servolux::Prefork

Inherits:
Object
  • Object
show all
Defined in:
lib/servolux/prefork.rb

Overview

Synopsis

The Prefork class provides a pre-forking worker pool for executing tasks in parallel using multiple processes.

Details

A pre-forking worker pool is a technique for executing code in parallel in a UNIX environment. Each worker in the pool forks a child process and then executes user supplied code in that child process. The child process can pull jobs from a queue (beanstalkd for example) or listen on a socket for network requests.

The code to execute in the child processes is passed as a block to the Prefork initialize method. The child processes executes this code in a loop; that is, your code block should not worry about keeping itself alive. This is handled by the library.

If your code raises an exception, it will be captured by the library code and marshalled back to the parent process. This will halt the child process. The Prefork worker pool does not restart dead workers. A method is provided to iterate over workers that have errors, and it is up to the user to handle errors as they please.

Instead of passing a block to the initialize method, you can provide a Ruby module that defines an "execute" method. This method will be executed in the child process' run loop. When using a module, you also have the option of defining a "before_executing" method and an "after_executing" method. These methods will be called before the child starts the execute loop and after the execute loop finishes. Each method will be called exactly once. Both methods are optional.

Sending a SIGHUP to a child process will cause that child to stop and restart. The child will send a signal to the parent asking to be shutdown. The parent will gracefully halt the child and then start a new child process to replace it. If you define a "hup" method in your worker module, it will be executed when SIGHUP is received by the child. Your "hup" method will be the last method executed in the signal handler.

This has the advantage of calling your before/after_executing methods again and reloading any code or resources your worker code will use. The SIGHUP will call Thread#wakeup on the main child process thread; please write your code to respond accordingly to this wakeup call (a thread waiting on a Queue#pop will not return when wakeup is called on the thread).

Examples

A pre-forking echo server: github.com/TwP/servolux/blob/master/examples/echo.rb

Pulling jobs from a beanstalkd work queue: github.com/TwP/servolux/blob/master/examples/beanstalk.rb

Before / After Executing

In this example, we are creating 42 worker processes that will log the process ID and the current time to a file. Each worker will do this every 2 seconds. The before/after_executing methods are used to open the file before the run loop starts and to close the file after the run loop completes. The execute method uses the stored file descriptor when logging the message.

module RunMe
  def before_executing
    @fd = File.open("#{Process.pid}.txt", 'w')
  end

  def after_executing
    @fd.close
  end

  def execute
    @fd.puts "Process #{Process.pid} @ #{Time.now}"
    sleep 2
  end
end

pool = Servolux::Prefork.new(:module => RunMe)
pool.start 42

Heartbeat

When a :timeout is supplied to the constructor, a "heartbeat" is setup between the parent and the child worker. Each loop through the child's execute code must return before :timeout seconds have elapsed. If one iteration through the loop takes longer than :timeout seconds, then the parent process will halt the child worker. An error will be raised in the parent process.

pool = Servolux::Prefork.new(:timeout => 2) {
  puts "Process #{Process.pid} is running."
  sleep(rand * 5)
}
pool.start 42

Eventually all 42 child processes will be killed by their parents. The random number generator will eventually cause the child to sleep longer than two seconds.

What is happening here is that each time the child processes executes the block of code, the Servolux library code will send a "heartbeat" message to the parent. The parent is using a Kernel#select call on the communications pipe to wait for this message. The timeout is passed to the select call, and this will cause it to return nil -- this is the error condition the heartbeat prevents.

Use the heartbeat with caution -- allow margins for timing issues and processor load spikes.

Signals

Forked child processes are configured to respond to two signals: SIGHUP and SIGTERM. The SIGHUP signal when sent to a child process is used to restart just that one child. The SIGTERM signal when sent to a child process is used to forcibly kill the child; it will not be restarted. The parent process uses SIGTERM to halt all the children when it is stopping.

SIGHUP Child processes are restarted by sending a SIGHUP signal to the child. This will shutdown the child worker and then start up a new one to replace it. For the child to shutdown gracefully, it needs to return from the "execute" method when it receives the signal. Define a "hup" method that will wake the execute thread from any pending operations -- listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.

SIGTERM Child processes are stopped by the prefork parent by sending a SIGTERM signal to the child. For the child to shutdown gracefully, it needs to return from the "execute" method when it receives the signal. Define a "term" method that will wake the execute thread from any pending operations -- listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.

Constant Summary

CommunicationError =
Class.new(::Servolux::Error)
UnknownSignal =
Class.new(::Servolux::Error)
UnknownResponse =
Class.new(::Servolux::Error)
START =

:stopdoc:

"\000START".freeze
HALT =
"\000HALT".freeze
ERROR =
"\000SHIT".freeze
HEARTBEAT =
"\000<3".freeze

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Prefork) initialize(opts = {}, &block)

call-seq:

Prefork.new { block }
Prefork.new( :module => Module )

Create a new pre-forking worker pool. You must provide a block of code for the workers to execute in their child processes. This code block can be passed either as a block to this method or as a module via the :module option.

If a :timeout is given, then each worker will setup a "heartbeat" between the parent process and the child process. If the child does not respond to the parent within :timeout seconds, then the child process will be halted. If you do not want to use the heartbeat then leave the :timeout unset or manually set it to nil.

Additionally, :min_workers and :max_workers options are avilable. If :min_workers is given, the method ensure_worker_pool_size will guarantee that at least :min_workers are up and running. If :max_workers is given, then add_workers will NOT allow ou to spawn more workers than :max_workers.

The pre-forking worker pool makes no effort to restart dead workers. It is left to the user to implement this functionality.

Raises:

  • (ArgumentError)


169
170
171
172
173
174
175
176
177
178
# File 'lib/servolux/prefork.rb', line 169

def initialize( opts = {}, &block )
  @timeout = opts.fetch(:timeout, nil)
  @module = opts.fetch(:module, nil)
  @max_workers = opts.fetch(:max_workers, nil)
  @min_workers = opts.fetch(:min_workers, nil)
  @module = Module.new { define_method :execute, &block } if block
  @workers = []

  raise ArgumentError, 'No code was given to execute by the workers.' unless @module
end

Instance Attribute Details

- (Object) max_workers

Communication timeout in seconds. Minimum number of workers Maximum number of workers



143
144
145
# File 'lib/servolux/prefork.rb', line 143

def max_workers
  @max_workers
end

- (Object) min_workers

Returns the value of attribute min_workers



142
143
144
# File 'lib/servolux/prefork.rb', line 142

def min_workers
  @min_workers
end

- (Object) timeout

:startdoc:



141
142
143
# File 'lib/servolux/prefork.rb', line 141

def timeout
  @timeout
end

Instance Method Details

- (Object) add_workers(number = 1)

call-seq:

add_workers( number = 1 )

Adds additional workers to the pool. It will not add more workers than the number set in :max_workers



245
246
247
248
249
250
251
252
253
254
# File 'lib/servolux/prefork.rb', line 245

def add_workers( number = 1 )
  number.times do
    break if at_max_workers?
    worker = Worker.new( self )
    worker.extend @module
    worker.start
    @workers << worker
    pause
  end
end

- (Boolean) at_max_workers?

call-seq:

at_max_workers?

Return true or false if we are currently at or above the maximum number of workers allowed.



298
299
300
301
# File 'lib/servolux/prefork.rb', line 298

def at_max_workers?
  return false unless @max_workers
  return @workers.size >= @max_workers
end

- (Boolean) below_minimum_workers?

call-seq:

below_minimum_workers?

Report if the number of workers is below the minimum threshold



287
288
289
290
# File 'lib/servolux/prefork.rb', line 287

def below_minimum_workers?
  return false unless @min_workers
  return @workers.size < @min_workers
end

- (Object) dead_worker_count

call-seq:

dead_worker_count -> Integer

Returns the number of dead workers in the pool



339
340
341
# File 'lib/servolux/prefork.rb', line 339

def dead_worker_count
  worker_counts[:dead]
end

- (Object) each_worker(&block)

call-seq:

each_worker { |worker| block }

Iterates over all the workers and yields each, in turn, to the given block.



234
235
236
237
# File 'lib/servolux/prefork.rb', line 234

def each_worker( &block )
  @workers.each(&block)
  self
end

- (Object) ensure_worker_pool_size

call-seq:

ensure_worker_pool_size()

Make sure that the worker pool has >= the minimum number of workers and less than the maximum number of workers.

Generally, this means prune the number of workers and then spawn workers up to the min_worker level. If min is not set, then we only prune



275
276
277
278
279
280
# File 'lib/servolux/prefork.rb', line 275

def ensure_worker_pool_size
  prune_workers
  while below_minimum_workers? do
    add_workers
  end
end

- (Object) errors

call-seq:

errors { |worker| block }

Iterates over all the workers and yields the worker to the given block only if the worker has an error condition.



309
310
311
312
# File 'lib/servolux/prefork.rb', line 309

def errors
  @workers.each { |worker| yield worker unless worker.error.nil? }
  self
end

- (Object) live_worker_count

call-seq:

live_worker_count -> Integer

Returns the number of live workers in the pool



331
332
333
# File 'lib/servolux/prefork.rb', line 331

def live_worker_count
  worker_counts[:alive]
end

- (Object) prune_workers

call-seq:

prune_workers()

Remove workers that are no longer alive from the worker pool



261
262
263
264
# File 'lib/servolux/prefork.rb', line 261

def prune_workers
  new_workers = @workers.find_all { |w| w.alive? }
  @workers = new_workers
end

- (Prefork) reap

This method should be called periodically in order to clear the return status from child processes that have either died or been restarted (via a HUP signal). This will remove zombie children from the process table.



209
210
211
212
# File 'lib/servolux/prefork.rb', line 209

def reap
  @workers.each { |worker| worker.alive? }
  self
end

- (Prefork) signal(signal = 'TERM') Also known as: kill

Send this given signal to all child process. The default signal is 'TERM'. The method waits for a short period of time after the signal is sent to each child; this is done to alleviate a flood of signals being sent simultaneously and overwhelming the CPU.



222
223
224
225
# File 'lib/servolux/prefork.rb', line 222

def signal( signal = 'TERM' )
  @workers.each { |worker| worker.signal(signal); pause }
  self
end

- (Prefork) start(number)

Start up the given number of workers. Each worker will create a child process and run the user supplied code in that child process.



186
187
188
189
190
# File 'lib/servolux/prefork.rb', line 186

def start( number )
  @workers.clear
  add_workers( number )
 self
end

- (Object) stop

Stop all workers. The current process will wait for each child process to exit before this method will return. The worker instances are not destroyed by this method; this means that the each_worker and the errors methods will still function correctly after stopping the workers.



197
198
199
200
201
# File 'lib/servolux/prefork.rb', line 197

def stop
  @workers.each { |worker| worker.stop; pause }
  reap
  self
end

- (Object) worker_counts

call-seq:

worker_counts -> { :alive => 2, :dead => 1 }

Returns a hash containing the counts of alive and dead workers



318
319
320
321
322
323
324
325
# File 'lib/servolux/prefork.rb', line 318

def worker_counts
  counts = { :alive => 0, :dead => 0 }
  each_worker do |worker|
    state = worker.alive? ? :alive : :dead
    counts[state] += 1
  end
  return counts
end