Class: Celluloid::PoolManager

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/celluloid/pool_manager.rb

Overview

Manages a fixed-size pool of workers Delegates work (i.e. methods) and supervises workers Don't use this class directly. Instead use MyKlass.pool

Constant Summary

Constants included from Celluloid

BARE_OBJECT_WARNING_MESSAGE, DeadActorError, Error, LINKING_TIMEOUT, NotActorError, OWNER_IVAR, TIMER_QUANTUM, TimeoutError, VERSION

Instance Method Summary collapse

Methods included from Celluloid

#abort, actor?, #after, #async, boot, #call_chain_id, cores, #current_actor, #defer, detect_recursion, #every, exception_handler, #exclusive, #exclusive?, #future, included, init, #link, #linked_to?, #links, mailbox, #monitor, #monitoring?, #receive, register_shutdown, running?, shutdown, #signal, #sleep, stack_dump, start, suspend, #tasks, #terminate, #timeout, #unlink, #unmonitor, uuid, version, #wait

Constructor Details

#initialize(worker_class, options = {}) ⇒ PoolManager

Returns a new instance of PoolManager.

Raises:

  • (ArgumentError)

12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/celluloid/pool_manager.rb', line 12

def initialize(worker_class, options = {})
  @size = options[:size] || [Celluloid.cores || 2, 2].max
  raise ArgumentError, "minimum pool size is 2" if @size < 2

  @worker_class = worker_class
  @args = options[:args] ? Array(options[:args]) : []

  @idle = @size.times.map { worker_class.new_link(*@args) }

  # FIXME: Another data structure (e.g. Set) would be more appropriate
  # here except it causes MRI to crash :o
  @busy = []
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object


160
161
162
163
164
165
166
# File 'lib/celluloid/pool_manager.rb', line 160

def method_missing(method, *args, &block)
  if respond_to?(method)
    _send_ method, *args, &block
  else
    super
  end
end

Instance Method Details

#__crash_handler__(actor, reason) ⇒ Object

Spawn a new worker for every crashed one


129
130
131
132
133
134
135
136
# File 'lib/celluloid/pool_manager.rb', line 129

def __crash_handler__(actor, reason)
  @busy.delete actor
  @idle.delete actor
  return unless reason

  @idle << @worker_class.new_link(*@args)
  signal :respawn_complete
end

#__provision_worker__Object

Provision a new worker


114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/celluloid/pool_manager.rb', line 114

def __provision_worker__
  Task.current.guard_warnings = true
  while @idle.empty?    # Wait for responses from one of the busy workers

    response = exclusive { receive { |msg| msg.is_a?(Response) } }
    Thread.current[:celluloid_actor].handle_message(response)
  end

  worker = @idle.shift
  @busy << worker

  worker
end

#__shutdown__Object


26
27
28
29
30
31
32
33
34
35
# File 'lib/celluloid/pool_manager.rb', line 26

def __shutdown__
  terminators = (@idle + @busy).map do |actor|
    begin
      actor.future(:terminate)
    rescue DeadActorError
    end
  end

  terminators.compact.each { |terminator| terminator.value rescue nil }
end

#_send_(method, *args, &block) ⇒ Object


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/celluloid/pool_manager.rb', line 37

def _send_(method, *args, &block)
  worker = __provision_worker__

  begin
    worker._send_ method, *args, &block
  rescue DeadActorError # if we get a dead actor out of the pool
    wait :respawn_complete
    worker = __provision_worker__
    retry
  rescue Exception => ex
    abort ex
  ensure
    if worker.alive?
      @idle << worker
      @busy.delete worker

      # Broadcast that worker is done processing and
      # waiting idle
      signal :worker_idle
    end
  end
end

#busy_sizeObject


105
106
107
# File 'lib/celluloid/pool_manager.rb', line 105

def busy_size
  @busy.length
end

#idle_sizeObject


109
110
111
# File 'lib/celluloid/pool_manager.rb', line 109

def idle_size
  @idle.length
end

#inspectObject


80
81
82
# File 'lib/celluloid/pool_manager.rb', line 80

def inspect
  _send_ :inspect
end

#is_a?(klass) ⇒ Boolean

Returns:

  • (Boolean)

64
65
66
# File 'lib/celluloid/pool_manager.rb', line 64

def is_a?(klass)
  _send_ :is_a?, klass
end

#kind_of?(klass) ⇒ Boolean

Returns:

  • (Boolean)

68
69
70
# File 'lib/celluloid/pool_manager.rb', line 68

def kind_of?(klass)
  _send_ :kind_of?, klass
end

#method(meth) ⇒ Object

Since PoolManager allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class


171
172
173
174
175
# File 'lib/celluloid/pool_manager.rb', line 171

def method(meth)
  super
rescue NameError
  @worker_class.instance_method(meth.to_sym)
end

#methods(include_ancestors = true) ⇒ Object


72
73
74
# File 'lib/celluloid/pool_manager.rb', line 72

def methods(include_ancestors = true)
  _send_ :methods, include_ancestors
end

#nameObject


60
61
62
# File 'lib/celluloid/pool_manager.rb', line 60

def name
  _send_ @mailbox, :name
end

#respond_to?(meth, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)

138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/celluloid/pool_manager.rb', line 138

def respond_to?(meth, include_private = false)
  # NOTE: use method() here since this class
  # shouldn't be used directly, and method() is less
  # likely to be "reimplemented" inconsistently
  # with other Object.*method* methods.

  found = method(meth)
  if include_private
    found ? true : false
  else
    if found.is_a?(UnboundMethod)
      found.owner.public_instance_methods.include?(meth) ||
        found.owner.protected_instance_methods.include?(meth)
    else
      found.receiver.public_methods.include?(meth) ||
        found.receiver.protected_methods.include?(meth)
    end
  end
rescue NameError
  false
end

#sizeObject


84
85
86
# File 'lib/celluloid/pool_manager.rb', line 84

def size
  @size
end

#size=(new_size) ⇒ Object


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/celluloid/pool_manager.rb', line 88

def size=(new_size)
  new_size = [0, new_size].max

  if new_size > size
    delta = new_size - size
    delta.times { @idle << @worker_class.new_link(*@args) }
  else
    (size - new_size).times do
      worker = __provision_worker__
      unlink worker
      @busy.delete worker
      worker.terminate
    end
  end
  @size = new_size
end

#to_sObject


76
77
78
# File 'lib/celluloid/pool_manager.rb', line 76

def to_s
  _send_ :to_s
end