Module: Celluloid

Extended by:
Celluloid
Included in:
Celluloid, IncidentReporter, Notifications::Fanout, PoolManager, Probe, SupervisionGroup
Defined in:
lib/celluloid.rb,
lib/celluloid/fsm.rb,
lib/celluloid/cell.rb,
lib/celluloid/uuid.rb,
lib/celluloid/actor.rb,
lib/celluloid/calls.rb,
lib/celluloid/links.rb,
lib/celluloid/probe.rb,
lib/celluloid/rspec.rb,
lib/celluloid/tasks.rb,
lib/celluloid/future.rb,
lib/celluloid/logger.rb,
lib/celluloid/method.rb,
lib/celluloid/thread.rb,
lib/celluloid/mailbox.rb,
lib/celluloid/signals.rb,
lib/celluloid/handlers.rb,
lib/celluloid/registry.rb,
lib/celluloid/task_set.rb,
lib/celluloid/condition.rb,
lib/celluloid/receivers.rb,
lib/celluloid/responses.rb,
lib/celluloid/call_chain.rb,
lib/celluloid/exceptions.rb,
lib/celluloid/properties.rb,
lib/celluloid/stack_dump.rb,
lib/celluloid/supervisor.rb,
lib/celluloid/cpu_counter.rb,
lib/celluloid/actor_system.rb,
lib/celluloid/pool_manager.rb,
lib/celluloid/internal_pool.rb,
lib/celluloid/notifications.rb,
lib/celluloid/system_events.rb,
lib/celluloid/thread_handle.rb,
lib/celluloid/evented_mailbox.rb,
lib/celluloid/logging/incident.rb,
lib/celluloid/tasks/task_fiber.rb,
lib/celluloid/logging/log_event.rb,
lib/celluloid/supervision_group.rb,
lib/celluloid/tasks/task_thread.rb,
lib/celluloid/proxies/cell_proxy.rb,
lib/celluloid/proxies/sync_proxy.rb,
lib/celluloid/logging/ring_buffer.rb,
lib/celluloid/proxies/actor_proxy.rb,
lib/celluloid/proxies/async_proxy.rb,
lib/celluloid/proxies/block_proxy.rb,
lib/celluloid/proxies/future_proxy.rb,
lib/celluloid/proxies/abstract_proxy.rb,
lib/celluloid/logging/incident_logger.rb,
lib/celluloid/logging/incident_reporter.rb

Defined Under Namespace

Modules: CPUCounter, ClassMethods, FSM, InstanceMethods, Logger, Notifications, Properties, UUID Classes: AbortError, AbstractProxy, Actor, ActorProxy, ActorSystem, AsyncCall, AsyncProxy, BlockCall, BlockProxy, BlockResponse, Call, CallChain, Cell, CellProxy, Condition, ConditionError, DeadTaskError, ErrorResponse, EventedMailbox, ExitEvent, FiberStackError, Future, FutureProxy, Handler, Handlers, Incident, IncidentLogger, IncidentReporter, InternalPool, LinkingRequest, LinkingResponse, Links, LogEvent, Mailbox, MailboxDead, MailboxShutdown, Method, NamingRequest, NotTaskError, PoolManager, Probe, Receiver, Receivers, Registry, Response, ResumableError, RingBuffer, SignalConditionRequest, Signals, StackDump, SuccessResponse, SupervisionGroup, Supervisor, SyncCall, SyncProxy, SystemEvent, Task, TaskFiber, TaskSet, TaskThread, TerminationRequest, Thread, ThreadHandle

Constant Summary collapse

VERSION =
'0.16.0'
LINKING_TIMEOUT =

Linking times out after 5 seconds

5
BARE_OBJECT_WARNING_MESSAGE =

Warning message added to Celluloid objects accessed outside their actors

"WARNING: BARE CELLULOID OBJECT "
OWNER_IVAR =

reference to owning actor

:@celluloid_owner
TIMER_QUANTUM =

Timer accuracy enforced by the tests (50ms)

0.05
Error =

Base class of all Celluloid errors

Class.new(StandardError)
NotActorError =

Don't do Actor-like things outside Actor scope

Class.new(Celluloid::Error)
DeadActorError =

Trying to do something to a dead actor

Class.new(Celluloid::Error)
TimeoutError =

A timeout occured before the given request could complete

Class.new(Celluloid::Error)

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.actor_systemObject


27
28
29
30
31
32
33
# File 'lib/celluloid.rb', line 27

def actor_system
  if Thread.current.celluloid?
    Thread.current[:celluloid_actor_system] or raise Error, "actor system not running"
  else
    Thread.current[:celluloid_actor_system] || @actor_system or raise Error, "Celluloid is not yet started; use Celluloid.boot"
  end
end

.log_actor_crashesObject

Returns the value of attribute log_actor_crashes


23
24
25
# File 'lib/celluloid.rb', line 23

def log_actor_crashes
  @log_actor_crashes
end

.loggerObject

Thread-safe logger class


22
23
24
# File 'lib/celluloid.rb', line 22

def logger
  @logger
end

.shutdown_timeoutObject

How long actors have to terminate


25
26
27
# File 'lib/celluloid.rb', line 25

def shutdown_timeout
  @shutdown_timeout
end

.task_classObject

Default task type to use


24
25
26
# File 'lib/celluloid.rb', line 24

def task_class
  @task_class
end

Class Method Details

.actor?Boolean

Are we currently inside of an actor?


69
70
71
# File 'lib/celluloid.rb', line 69

def actor?
  !!Thread.current[:celluloid_actor]
end

.bootObject


123
124
125
126
# File 'lib/celluloid.rb', line 123

def boot
  init
  start
end

.coresObject Also known as: cpus, ncpus

Obtain the number of CPUs in the system


84
85
86
# File 'lib/celluloid.rb', line 84

def cores
 CPUCounter.cores
end

.detect_recursionObject

Detect if a particular call is recursing through multiple actors


97
98
99
100
101
102
103
104
105
106
# File 'lib/celluloid.rb', line 97

def detect_recursion
  actor = Thread.current[:celluloid_actor]
  return unless actor

  task = Thread.current[:celluloid_task]
  return unless task

  chain_id = CallChain.current_id
  actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id }
end

.exception_handler(&block) ⇒ Object

Define an exception handler for actor crashes


109
110
111
# File 'lib/celluloid.rb', line 109

def exception_handler(&block)
  Logger.exception_handler(&block)
end

.included(klass) ⇒ Object


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/celluloid.rb', line 35

def included(klass)
  klass.send :extend,  ClassMethods
  klass.send :include, InstanceMethods

  klass.send :extend, Properties

  klass.property :mailbox_class, :default => Celluloid::Mailbox
  klass.property :proxy_class,   :default => Celluloid::CellProxy
  klass.property :task_class,    :default => Celluloid.task_class
  klass.property :mailbox_size

  klass.property :exclusive_actor, :default => false
  klass.property :exclusive_methods, :multi => true
  klass.property :execute_block_on_receiver,
    :default => [:after, :every, :receive],
    :multi   => true

  klass.property :finalizer
  klass.property :exit_handler_name

  klass.send(:define_singleton_method, :trap_exit) do |*args|
    exit_handler_name(*args)
  end

  klass.send(:define_singleton_method, :exclusive) do |*args|
    if args.any?
      exclusive_methods(*exclusive_methods, *args)
    else
      exclusive_actor true
    end
  end
end

.initObject


128
129
130
# File 'lib/celluloid.rb', line 128

def init
  @actor_system = ActorSystem.new
end

.mailboxObject

Retrieve the mailbox for the current thread or lazily initialize it


74
75
76
# File 'lib/celluloid.rb', line 74

def mailbox
  Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new
end

.register_shutdownObject


140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/celluloid.rb', line 140

def register_shutdown
  return if defined?(@shutdown_registered) && @shutdown_registered

  # Terminate all actors at exit
  at_exit do
    if defined?(RUBY_ENGINE) && RUBY_ENGINE == "ruby" && RUBY_VERSION >= "1.9"      # workaround for MRI bug losing exit status in at_exit block
      # http://bugs.ruby-lang.org/issues/5218

      exit_status = $!.status if $!.is_a?(SystemExit)
      Celluloid.shutdown
      exit exit_status if exit_status
    else
      Celluloid.shutdown
    end
  end
  @shutdown_registered = true
end

.running?Boolean


136
137
138
# File 'lib/celluloid.rb', line 136

def running?
  actor_system && actor_system.running?
end

.shutdownObject

Shut down all running actors


159
160
161
# File 'lib/celluloid.rb', line 159

def shutdown
  actor_system.shutdown
end

.stack_dump(output = STDERR) ⇒ Object Also known as: dump

Perform a stack dump of all actors to the given output object


91
92
93
# File 'lib/celluloid.rb', line 91

def stack_dump(output = STDERR)
  actor_system.stack_dump.print(output)
end

.startObject


132
133
134
# File 'lib/celluloid.rb', line 132

def start
  actor_system.start
end

.suspend(status, waiter) ⇒ Object


113
114
115
116
117
118
119
120
121
# File 'lib/celluloid.rb', line 113

def suspend(status, waiter)
  task = Thread.current[:celluloid_task]
  if task && !Celluloid.exclusive?
    waiter.before_suspend(task) if waiter.respond_to?(:before_suspend)
    Task.suspend(status)
  else
    waiter.wait
  end
end

.uuidObject

Generate a Universally Unique Identifier


79
80
81
# File 'lib/celluloid.rb', line 79

def uuid
  UUID.generate
end

.versionObject


163
164
165
# File 'lib/celluloid.rb', line 163

def version
  VERSION
end

Instance Method Details

#abort(cause) ⇒ Object

Raise an exception in sender context, but stay running

Raises:


314
315
316
317
318
319
320
321
# File 'lib/celluloid.rb', line 314

def abort(cause)
  cause = case cause
    when String then RuntimeError.new(cause)
    when Exception then cause
    else raise TypeError, "Exception object/String expected, but #{cause.class} received"
  end
  raise AbortError.new(cause)
end

#after(interval, &block) ⇒ Object

Call a block after a given interval, returning a Celluloid::Timer object


428
429
430
# File 'lib/celluloid.rb', line 428

def after(interval, &block)
  Thread.current[:celluloid_actor].after(interval, &block)
end

#async(meth = nil, *args, &block) ⇒ Object

Handle async calls within an actor itself


447
448
449
# File 'lib/celluloid.rb', line 447

def async(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.async meth, *args, &block
end

#call_chain_idObject

Obtain the UUID of the current call chain


344
345
346
# File 'lib/celluloid.rb', line 344

def call_chain_id
  CallChain.current_id
end

#current_actorObject

Obtain the current_actor


339
340
341
# File 'lib/celluloid.rb', line 339

def current_actor
  Actor.current
end

#defer(&block) ⇒ Object

Perform a blocking or computationally intensive action inside an asynchronous thread pool, allowing the sender to continue processing other messages in its mailbox in the meantime


440
441
442
443
444
# File 'lib/celluloid.rb', line 440

def defer(&block)
  # This implementation relies on the present implementation of
  # Celluloid::Future, which uses a thread from InternalPool to run the block
  Future.new(&block).value
end

#every(interval, &block) ⇒ Object

Call a block every given interval, returning a Celluloid::Timer object


433
434
435
# File 'lib/celluloid.rb', line 433

def every(interval, &block)
  Thread.current[:celluloid_actor].every(interval, &block)
end

#exclusive(&block) ⇒ Object

Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.


417
418
419
# File 'lib/celluloid.rb', line 417

def exclusive(&block)
  Thread.current[:celluloid_task].exclusive(&block)
end

#exclusive?Boolean

Are we currently exclusive


422
423
424
425
# File 'lib/celluloid.rb', line 422

def exclusive?
  task = Thread.current[:celluloid_task]
  task && task.exclusive?
end

#future(meth = nil, *args, &block) ⇒ Object

Handle calls to future within an actor itself


452
453
454
# File 'lib/celluloid.rb', line 452

def future(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block
end

Link this actor to another, allowing it to crash or react to errors


369
370
371
# File 'lib/celluloid.rb', line 369

def link(actor)
  Actor.link(actor)
end

#linked_to?(actor) ⇒ Boolean

Is this actor linked to another?


384
385
386
# File 'lib/celluloid.rb', line 384

def linked_to?(actor)
  Actor.linked_to?(actor)
end

Obtain the Celluloid::Links for this actor


354
355
356
# File 'lib/celluloid.rb', line 354

def links
  Thread.current[:celluloid_actor].links
end

#monitor(actor) ⇒ Object

Watch for exit events from another actor


359
360
361
# File 'lib/celluloid.rb', line 359

def monitor(actor)
  Actor.monitor(actor)
end

#monitoring?(actor) ⇒ Boolean

Are we monitoring another actor?


379
380
381
# File 'lib/celluloid.rb', line 379

def monitoring?(actor)
  Actor.monitoring?(actor)
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message via the actor protocol


389
390
391
392
393
394
395
396
# File 'lib/celluloid.rb', line 389

def receive(timeout = nil, &block)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.receive(timeout, &block)
  else
    Celluloid.mailbox.receive(timeout, &block)
  end
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods


329
330
331
# File 'lib/celluloid.rb', line 329

def signal(name, value = nil)
  Thread.current[:celluloid_actor].signal name, value
end

#sleep(interval) ⇒ Object

Sleep letting the actor continue processing messages


399
400
401
402
403
404
405
406
# File 'lib/celluloid.rb', line 399

def sleep(interval)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.sleep(interval)
  else
    Kernel.sleep interval
  end
end

#tasksObject

Obtain the running tasks for this actor


349
350
351
# File 'lib/celluloid.rb', line 349

def tasks
  Thread.current[:celluloid_actor].tasks.to_a
end

#terminateObject

Terminate this actor


324
325
326
# File 'lib/celluloid.rb', line 324

def terminate
  Thread.current[:celluloid_actor].behavior_proxy.terminate!
end

#timeout(duration) ⇒ Object

Timeout on task suspension (eg Sync calls to other actors)


409
410
411
412
413
# File 'lib/celluloid.rb', line 409

def timeout(duration)
  Thread.current[:celluloid_actor].timeout(duration) do
    yield
  end
end

Remove links to another actor


374
375
376
# File 'lib/celluloid.rb', line 374

def unlink(actor)
  Actor.unlink(actor)
end

#unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor


364
365
366
# File 'lib/celluloid.rb', line 364

def unmonitor(actor)
  Actor.unmonitor(actor)
end

#wait(name) ⇒ Object

Wait for the given signal


334
335
336
# File 'lib/celluloid.rb', line 334

def wait(name)
  Thread.current[:celluloid_actor].wait name
end