Class: Celluloid::Actor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/celluloid/actor.rb

Overview

Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Defined Under Namespace

Classes: Sleeper

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(behavior, options) ⇒ Actor

Returns a new instance of Actor.


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/celluloid/actor.rb', line 101

def initialize(behavior, options)
  @behavior         = behavior

  @actor_system     = options.fetch(:actor_system)
  @mailbox          = options.fetch(:mailbox_class, Mailbox).new
  @mailbox.max_size = options.fetch(:mailbox_size, nil)

  @task_class   = options[:task_class] || Celluloid.task_class
  @exit_handler = method(:default_exit_handler)
  @exclusive    = options.fetch(:exclusive, false)

  @tasks     = TaskSet.new
  @links     = Links.new
  @signals   = Signals.new
  @timers    = Timers::Group.new
  @receivers = Receivers.new(@timers)
  @handlers  = Handlers.new
  @running   = false
  @name      = nil

  handle(SystemEvent) do |message|
    handle_system_event message
  end
end

Instance Attribute Details

#behaviorObject (readonly)

Returns the value of attribute behavior


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def behavior
  @behavior
end

#exit_handler=(value) ⇒ Object (writeonly)

Sets the attribute exit_handler

Parameters:

  • value

    the value to set the attribute exit_handler to.


10
11
12
# File 'lib/celluloid/actor.rb', line 10

def exit_handler=(value)
  @exit_handler = value
end

Returns the value of attribute links


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def links
  @links
end

#mailboxObject (readonly)

Returns the value of attribute mailbox


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def mailbox
  @mailbox
end

#nameObject (readonly)

Returns the value of attribute name


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def name
  @name
end

#proxyObject (readonly)

Returns the value of attribute proxy


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def proxy
  @proxy
end

#tasksObject (readonly)

Returns the value of attribute tasks


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def tasks
  @tasks
end

#threadObject (readonly)

Returns the value of attribute thread


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def thread
  @thread
end

#timersObject (readonly)

Returns the value of attribute timers


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def timers
  @timers
end

Class Method Details

.allObject

Obtain all running actors in the system


50
51
52
# File 'lib/celluloid/actor.rb', line 50

def all
  Celluloid.actor_system.running
end

.async(mailbox, meth, *args, &block) ⇒ Object

Invoke a method asynchronously on an actor via its mailbox


38
39
40
41
# File 'lib/celluloid/actor.rb', line 38

def async(mailbox, meth, *args, &block)
  proxy = AsyncProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.call(mailbox, meth, *args, &block) ⇒ Object

Invoke a method on the given actor via its mailbox


32
33
34
35
# File 'lib/celluloid/actor.rb', line 32

def call(mailbox, meth, *args, &block)
  proxy = SyncProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.currentObject

Obtain the current actor

Raises:


18
19
20
21
22
# File 'lib/celluloid/actor.rb', line 18

def current
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.behavior_proxy
end

.future(mailbox, meth, *args, &block) ⇒ Object

Call a method asynchronously and retrieve its value later


44
45
46
47
# File 'lib/celluloid/actor.rb', line 44

def future(mailbox, meth, *args, &block)
  proxy = FutureProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.join(actor, timeout = nil) ⇒ Object

Wait for an actor to terminate


95
96
97
98
# File 'lib/celluloid/actor.rb', line 95

def join(actor, timeout = nil)
  actor.thread.join(timeout)
  actor
end

.kill(actor) ⇒ Object

Forcibly kill a given actor


89
90
91
92
# File 'lib/celluloid/actor.rb', line 89

def kill(actor)
  actor.thread.kill
  actor.mailbox.shutdown if actor.mailbox.alive?
end

Link to another actor


67
68
69
70
# File 'lib/celluloid/actor.rb', line 67

def link(actor)
  monitor actor
  Thread.current[:celluloid_actor].links << actor
end

.linked_to?(actor) ⇒ Boolean

Are we bidirectionally linked to the given actor?

Returns:

  • (Boolean)

84
85
86
# File 'lib/celluloid/actor.rb', line 84

def linked_to?(actor)
  monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor)
end

.monitor(actor) ⇒ Object

Watch for exit events from another actor

Raises:


55
56
57
58
# File 'lib/celluloid/actor.rb', line 55

def monitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :link)
end

.monitoring?(actor) ⇒ Boolean

Are we monitoring the given actor?

Returns:

  • (Boolean)

79
80
81
# File 'lib/celluloid/actor.rb', line 79

def monitoring?(actor)
  actor.links.include? Actor.current
end

.registered_nameObject

Obtain the name of the current actor

Raises:


25
26
27
28
29
# File 'lib/celluloid/actor.rb', line 25

def registered_name
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.name
end

Unlink from another actor


73
74
75
76
# File 'lib/celluloid/actor.rb', line 73

def unlink(actor)
  unmonitor actor
  Thread.current[:celluloid_actor].links.delete actor
end

.unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor

Raises:


61
62
63
64
# File 'lib/celluloid/actor.rb', line 61

def unmonitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :unlink)
end

Instance Method Details

#after(interval, &block) ⇒ Object

Schedule a block to run at the given time


235
236
237
# File 'lib/celluloid/actor.rb', line 235

def after(interval, &block)
  @timers.after(interval) { task(:timer, &block) }
end

#behavior_proxyObject


137
138
139
# File 'lib/celluloid/actor.rb', line 137

def behavior_proxy
  @behavior.proxy
end

#cleanup(exit_event) ⇒ Object

Clean up after this actor


336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/celluloid/actor.rb', line 336

def cleanup(exit_event)
  Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING
  @mailbox.shutdown
  @links.each do |actor|
    if actor.mailbox.alive?
      actor.mailbox << exit_event
    end
  end

  tasks.to_a.each(&:terminate)
rescue => ex
  # TODO: metadata
  Logger.crash("CLEANUP CRASHED!", ex)
end

#default_exit_handler(event) ⇒ Object


313
314
315
# File 'lib/celluloid/actor.rb', line 313

def default_exit_handler(event)
  raise event.reason if event.reason
end

#every(interval, &block) ⇒ Object

Schedule a block to run at the given time


240
241
242
# File 'lib/celluloid/actor.rb', line 240

def every(interval, &block)
  @timers.every(interval) { task(:timer, &block) }
end

#handle(*patterns, &block) ⇒ Object


220
221
222
# File 'lib/celluloid/actor.rb', line 220

def handle(*patterns, &block)
  @handlers.handle(*patterns, &block)
end

#handle_crash(exception) ⇒ Object

Handle any exceptions that occur within a running actor


318
319
320
321
322
323
324
# File 'lib/celluloid/actor.rb', line 318

def handle_crash(exception)
  # TODO: add meta info
  Logger.crash("Actor crashed!", exception)
  shutdown ExitEvent.new(behavior_proxy, exception)
rescue => ex
  Logger.crash("ERROR HANDLER CRASHED!", ex)
end

#handle_exit_event(event) ⇒ Object

Handle exit events received by this actor


307
308
309
310
311
# File 'lib/celluloid/actor.rb', line 307

def handle_exit_event(event)
  @links.delete event.actor

  @exit_handler.call(event)
end

#handle_message(message) ⇒ Object

Handle standard low-priority messages


279
280
281
282
283
284
285
286
# File 'lib/celluloid/actor.rb', line 279

def handle_message(message)
  unless @handlers.handle_message(message)
    unless @receivers.handle_message(message)
      Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
    end
  end
  message
end

#handle_system_event(event) ⇒ Object

Handle high-priority system event messages


289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/celluloid/actor.rb', line 289

def handle_system_event(event)
  if event.instance_of? ExitEvent
    handle_exit_event(event)
  elsif event.instance_of? LinkingRequest
    event.process(links)
  elsif event.instance_of? NamingRequest
    @name = event.name
    Celluloid::Probe.actor_named(self) if $CELLULOID_MONITORING
  elsif event.instance_of? TerminationRequest
    terminate
  elsif event.instance_of? SignalConditionRequest
    event.call
  else
    Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
  end
end

#linking_request(receiver, type) ⇒ Object

Perform a linking request with another actor


176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/celluloid/actor.rb', line 176

def linking_request(receiver, type)
  Celluloid.exclusive do
    receiver.mailbox << LinkingRequest.new(Actor.current, type)
    system_events = []

    Timers::Wait.for(LINKING_TIMEOUT) do |remaining|
      begin
        message = @mailbox.receive(remaining) do |msg|
          msg.is_a?(LinkingResponse) &&
          msg.actor.mailbox.address == receiver.mailbox.address &&
          msg.type == type
        end
      rescue TimeoutError
        next # IO reactor did something, no message in queue yet.
      end

      if message.instance_of? LinkingResponse
        Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING

        # We're done!
        system_events.each { |ev| @mailbox << ev }

        return
      elsif message.is_a? SystemEvent
        # Queue up pending system events to be processed after we've successfully linked
        system_events << message
      else raise "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent."
      end
    end

    raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded"
  end
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message


225
226
227
228
229
230
231
232
# File 'lib/celluloid/actor.rb', line 225

def receive(timeout = nil, &block)
  loop do
    message = @receivers.receive(timeout, &block)
    break message unless message.is_a?(SystemEvent)

    handle_system_event(message)
  end
end

#runObject

Run the actor loop


147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/celluloid/actor.rb', line 147

def run
  while @running
    begin
      @timers.wait do |interval|
        interval = 0 if interval and interval < 0

        if message = @mailbox.check(interval)
          handle_message(message)

          break unless @running
        end
      end
    rescue MailboxShutdown
      @running = false
    end
  end

  shutdown
rescue Exception => ex
  handle_crash(ex)
  raise unless ex.is_a? StandardError
end

#setup_threadObject


141
142
143
144
# File 'lib/celluloid/actor.rb', line 141

def setup_thread
  Thread.current[:celluloid_actor]   = self
  Thread.current[:celluloid_mailbox] = @mailbox
end

#shutdown(exit_event = ExitEvent.new(behavior_proxy)) ⇒ Object

Handle cleaning up this actor after it exits


327
328
329
330
331
332
333
# File 'lib/celluloid/actor.rb', line 327

def shutdown(exit_event = ExitEvent.new(behavior_proxy))
  @behavior.shutdown
  cleanup exit_event
ensure
  Thread.current[:celluloid_actor]   = nil
  Thread.current[:celluloid_mailbox] = nil
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods


211
212
213
# File 'lib/celluloid/actor.rb', line 211

def signal(name, value = nil)
  @signals.broadcast name, value
end

#sleep(interval) ⇒ Object

Sleep for the given amount of time


273
274
275
276
# File 'lib/celluloid/actor.rb', line 273

def sleep(interval)
  sleeper = Sleeper.new(@timers, interval)
  Celluloid.suspend(:sleeping, sleeper)
end

#startObject


126
127
128
129
130
131
132
133
134
135
# File 'lib/celluloid/actor.rb', line 126

def start
  @running = true
  @thread = ThreadHandle.new(@actor_system, :actor) do
    setup_thread
    run
  end

  @proxy = ActorProxy.new(@thread, @mailbox)
  Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING
end

#task(task_type, meta = nil) ⇒ Object

Run a method inside a task unless it's exclusive


352
353
354
355
356
357
358
359
360
# File 'lib/celluloid/actor.rb', line 352

def task(task_type, meta = nil)
  @task_class.new(task_type, meta) {
    if @exclusive
      Celluloid.exclusive { yield }
    else
      yield
    end
  }.resume
end

#terminateObject

Terminate this actor


171
172
173
# File 'lib/celluloid/actor.rb', line 171

def terminate
  @running = false
end

#timeout(duration) ⇒ Object


244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/celluloid/actor.rb', line 244

def timeout(duration)
  bt = caller
  task = Task.current
  timer = @timers.after(duration) do
    exception = Task::TimeoutError.new("execution expired")
    exception.set_backtrace bt
    task.resume exception
  end
  yield
ensure
  timer.cancel if timer
end

#wait(name) ⇒ Object

Wait for the given signal


216
217
218
# File 'lib/celluloid/actor.rb', line 216

def wait(name)
  @signals.wait name
end