Class: Celluloid::InternalPool

Inherits:
Object
  • Object
show all
Defined in:
lib/celluloid/internal_pool.rb

Overview

Maintain a thread pool FOR SPEED!!

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeInternalPool

Returns a new instance of InternalPool.


8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/celluloid/internal_pool.rb', line 8

def initialize
  @pid = $$

  @mutex = Mutex.new
  @idle_threads = []
  @all_threads  = []
  @busy_size = 0
  @idle_size = 0

  # TODO: should really adjust this based on usage
  @max_idle = 16
  @running = true
end

Instance Attribute Details

#max_idleObject

Returns the value of attribute max_idle


6
7
8
# File 'lib/celluloid/internal_pool.rb', line 6

def max_idle
  @max_idle
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)

48
49
50
# File 'lib/celluloid/internal_pool.rb', line 48

def active?
  busy_size + idle_size > 0
end

#assert_inactiveObject


34
35
36
37
38
39
40
41
42
# File 'lib/celluloid/internal_pool.rb', line 34

def assert_inactive
  return unless active?
  message = "Thread pool is still active"
  if defined?(JRUBY_VERSION)
    Celluloid.logger.warn message
  else
    raise Error, message
  end
end

#assert_runningObject

Raises:


30
31
32
# File 'lib/celluloid/internal_pool.rb', line 30

def assert_running
  raise Error, "Thread pool is not running" unless running?
end

#busy_sizeObject


22
23
24
# File 'lib/celluloid/internal_pool.rb', line 22

def busy_size
  @busy_size
end

#eachObject


52
53
54
# File 'lib/celluloid/internal_pool.rb', line 52

def each
  to_a.each {|thread| yield thread }
end

#get(&block) ⇒ Object

Get a thread from the pool, running the given block


62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/celluloid/internal_pool.rb', line 62

def get(&block)
  @mutex.synchronize do
    assert_running

    begin
      if @idle_threads.empty?
        thread = create
      else
        thread = @idle_threads.pop
        @idle_size = @idle_threads.length
      end
    end until thread.status # handle crashed threads

    thread.busy = true
    @busy_size += 1
    thread[:celluloid_queue] << block
    thread
  end
end

#idle_sizeObject


26
27
28
# File 'lib/celluloid/internal_pool.rb', line 26

def idle_size
  @idle_size
end

#killObject


112
113
114
115
116
117
118
119
120
121
122
# File 'lib/celluloid/internal_pool.rb', line 112

def kill
  @mutex.synchronize do
    finalize
    @running = false

    @all_threads.shift.kill until @all_threads.empty?
    @idle_threads.clear
    @busy_size = 0
    @idle_size = 0
  end
end

#put(thread) ⇒ Object

Return a thread to the pool


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/celluloid/internal_pool.rb', line 83

def put(thread)
  @mutex.synchronize do
    thread.busy = false
    if idle_size + 1 >= @max_idle
      thread[:celluloid_queue] << nil
      @busy_size -= 1
      @all_threads.delete(thread)
    else
      @idle_threads.push thread
      @busy_size -= 1
      @idle_size = @idle_threads.length
      clean_thread_locals(thread)
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)

44
45
46
# File 'lib/celluloid/internal_pool.rb', line 44

def running?
  @running
end

#shutdownObject


99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/celluloid/internal_pool.rb', line 99

def shutdown
  @mutex.synchronize do
    finalize
    @all_threads.each do |thread|
      thread[:celluloid_queue] << nil
    end
    @all_threads.clear
    @idle_threads.clear
    @busy_size = 0
    @idle_size = 0
  end
end

#to_aObject


56
57
58
59
# File 'lib/celluloid/internal_pool.rb', line 56

def to_a
  return [] if forked?
  @mutex.synchronize { @all_threads.dup }
end