Class: ZK::Threadpool

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/zk/threadpool.rb

Overview

a simple threadpool for running blocks of code off the main thread

Constant Summary

DEFAULT_SIZE =
5

Class Attribute Summary (collapse)

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Threadpool) initialize(size = nil)



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/zk/threadpool.rb', line 17

def initialize(size=nil)
  @size = size || self.class.default_size

  @threadpool = []
  @threadqueue = ::Queue.new

  @mutex = Monitor.new

  @error_callbacks = []

  start!
end

Class Attribute Details

+ (Object) default_size

size of the ZK.defer threadpool (defaults to 5)



10
11
12
# File 'lib/zk/threadpool.rb', line 10

def default_size
  @default_size
end

Instance Attribute Details

- (Object) size (readonly)

the size of this threadpool



15
16
17
# File 'lib/zk/threadpool.rb', line 15

def size
  @size
end

Instance Method Details

- (Object) defer(callable = nil, &blk)

Queue an operation to be run on an internal threadpool. You may either provide an object that responds_to?(:call) or pass a block. There is no mechanism for retrieving the result of the operation, it is purely fire-and-forget, so the user is expected to make arrangements for this in their code.

Raises:

  • (ArgumentError)


36
37
38
39
40
41
42
43
44
45
# File 'lib/zk/threadpool.rb', line 36

def defer(callable=nil, &blk)
  callable ||= blk

  # XXX(slyphon): do we care if the threadpool is not running?
#       raise Exceptions::ThreadpoolIsNotRunningException unless running?
  raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call)

  @threadqueue << callable
  nil
end

- (Object) on_exception(&blk)

Note:

if your exception callback block itself raises an exception, I will make fun of you.

register a block to be called back with unhandled exceptions that occur in the threadpool.



73
74
75
76
77
# File 'lib/zk/threadpool.rb', line 73

def on_exception(&blk)
  @mutex.synchronize do
    @error_callbacks << blk
  end
end

- (Boolean) on_threadpool?

returns true if the current thread is one of the threadpool threads



52
53
54
55
# File 'lib/zk/threadpool.rb', line 52

def on_threadpool?
  tp = @mutex.synchronize { @threadpool.dup }
  tp and tp.respond_to?(:include?) and tp.include?(Thread.current)
end

- (Boolean) running?



47
48
49
# File 'lib/zk/threadpool.rb', line 47

def running?
  @mutex.synchronize { @running }
end

- (Object) shutdown(timeout = 2)

join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info)

the default timeout is 2 seconds per thread



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/zk/threadpool.rb', line 86

def shutdown(timeout=2)
  @mutex.synchronize do
    return unless @running
    @running = false
    @threadqueue.clear
    @size.times { @threadqueue << KILL_TOKEN }

    threads, @threadpool = @threadpool, []

    while th = threads.shift
      begin
        th.join(timeout)
      rescue Exception => e
        logger.error { "Caught exception shutting down threadpool" }
        logger.error { e.to_std_format }
      end
    end

    @threadqueue = ::Queue.new
  end

  nil
end

- (Object) start!

starts the threadpool if not already running



58
59
60
61
62
63
64
65
# File 'lib/zk/threadpool.rb', line 58

def start!
  @mutex.synchronize do
    return false if @running
    @running = true
    spawn_threadpool
  end
  true
end