Class: Celluloid::EventedMailbox

Inherits:
Mailbox
  • Object
show all
Defined in:
lib/celluloid/evented_mailbox.rb

Overview

An alternative implementation of Celluloid::Mailbox using Reactor

Instance Attribute Summary collapse

Attributes inherited from Mailbox

#address, #max_size

Instance Method Summary collapse

Methods inherited from Mailbox

#alive?, #each, #inspect, #receive, #size, #to_a

Constructor Details

#initialize(reactor_class) ⇒ EventedMailbox

Returns a new instance of EventedMailbox.


6
7
8
9
10
# File 'lib/celluloid/evented_mailbox.rb', line 6

def initialize(reactor_class)
  super()
  # @condition won't be used in the class.
  @reactor = reactor_class.new
end

Instance Attribute Details

#reactorObject (readonly)

Returns the value of attribute reactor


4
5
6
# File 'lib/celluloid/evented_mailbox.rb', line 4

def reactor
  @reactor
end

Instance Method Details

#<<(message) ⇒ Object

Add a message to the Mailbox


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/celluloid/evented_mailbox.rb', line 13

def <<(message)
  @mutex.lock
  begin
    if mailbox_full || @dead
      dead_letter(message)
      return
    end
    if message.is_a?(SystemEvent)
      # SystemEvents are high priority messages so they get added to the
      # head of our message queue instead of the end
      @messages.unshift message
    else
      @messages << message
    end
  ensure
    @mutex.unlock rescue nil
  end
  begin
    current_actor = Thread.current[:celluloid_actor]
    @reactor.wakeup unless current_actor && current_actor.mailbox == self
  rescue IOError
    Logger.crash "reactor crashed", $!
    dead_letter(message)
  end
  nil
end

#check(timeout = nil, &block) ⇒ Object

Receive a message from the Mailbox


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/celluloid/evented_mailbox.rb', line 41

def check(timeout = nil, &block)
  # Get a message if it is available and process it immediately if possible:
  if message = next_message(block)
    return message
  end

  # ... otherwise, run the reactor once, either blocking or will return
  # after the given timeout:
  @reactor.run_once(timeout)

  # No message was received:
  return nil
rescue IOError
  raise MailboxShutdown, "mailbox shutdown called during receive"
end

#next_message(block) ⇒ Object

Obtain the next message from the mailbox that matches the given block


58
59
60
61
62
63
64
65
# File 'lib/celluloid/evented_mailbox.rb', line 58

def next_message(block)
  @mutex.lock
  begin
    super(&block)
  ensure
    @mutex.unlock rescue nil
  end
end

#shutdownObject

Cleanup any IO objects this Mailbox may be using


68
69
70
71
72
# File 'lib/celluloid/evented_mailbox.rb', line 68

def shutdown
  super do
    @reactor.shutdown
  end
end