Class: Celluloid::Mailbox

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/celluloid/mailbox.rb

Overview

Actors communicate with asynchronous messages. Messages are buffered in Mailboxes until Actors can act upon them.

Direct Known Subclasses

EventedMailbox

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMailbox

Returns a new instance of Mailbox.


16
17
18
19
20
21
22
23
# File 'lib/celluloid/mailbox.rb', line 16

def initialize
  @address   = Celluloid.uuid
  @messages  = []
  @mutex     = Mutex.new
  @dead      = false
  @condition = ConditionVariable.new
  @max_size  = nil
end

Instance Attribute Details

#addressObject (readonly)

A unique address at which this mailbox can be found


13
14
15
# File 'lib/celluloid/mailbox.rb', line 13

def address
  @address
end

#max_sizeObject

Returns the value of attribute max_size


14
15
16
# File 'lib/celluloid/mailbox.rb', line 14

def max_size
  @max_size
end

Instance Method Details

#<<(message) ⇒ Object

Add a message to the Mailbox


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/celluloid/mailbox.rb', line 26

def <<(message)
  @mutex.lock
  begin
    if mailbox_full || @dead
      dead_letter(message)
      return
    end
    if message.is_a?(SystemEvent)
      # SystemEvents are high priority messages so they get added to the
      # head of our message queue instead of the end
      @messages.unshift message
    else
      @messages << message
    end

    @condition.signal
    nil
  ensure
    @mutex.unlock rescue nil
  end
end

#alive?Boolean

Is the mailbox alive?

Returns:

  • (Boolean)

105
106
107
# File 'lib/celluloid/mailbox.rb', line 105

def alive?
  !@dead
end

#check(timeout = nil, &block) ⇒ Object

Receive a message from the Mailbox. May return nil and may return before the specified timeout.


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/celluloid/mailbox.rb', line 50

def check(timeout = nil, &block)
  message = nil

  @mutex.lock
  begin
    raise MailboxDead, "attempted to receive from a dead mailbox" if @dead

    Timers::Wait.for(timeout) do |remaining|
      message = next_message(&block)

      break message if message

      @condition.wait(@mutex, remaining)
    end
  ensure
    @mutex.unlock rescue nil
  end
  
  return message
end

#each(&block) ⇒ Object

Iterate through the mailbox


115
116
117
# File 'lib/celluloid/mailbox.rb', line 115

def each(&block)
  to_a.each(&block)
end

#inspectObject

Inspect the contents of the Mailbox


120
121
122
# File 'lib/celluloid/mailbox.rb', line 120

def inspect
  "#<#{self.class}:#{object_id.to_s(16)} @messages=[#{map(&:inspect).join(', ')}]>"
end

#receive(timeout = nil, &block) ⇒ Object

Receive a letter from the mailbox. Guaranteed to return a message. If timeout is exceeded, raise a TimeoutError.

Raises:


73
74
75
76
77
78
79
80
81
# File 'lib/celluloid/mailbox.rb', line 73

def receive(timeout = nil, &block)
  Timers::Wait.for(timeout) do |remaining|
    if message = check(timeout, &block)
      return message
    end
  end
  
  raise TimeoutError.new("receive timeout exceeded")
end

#shutdownObject

Shut down this mailbox and clean up its contents

Raises:


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/celluloid/mailbox.rb', line 84

def shutdown
  raise MailboxDead, "mailbox already shutdown" if @dead

  @mutex.lock
  begin
    yield if block_given?
    messages = @messages
    @messages = []
    @dead = true
  ensure
    @mutex.unlock rescue nil
  end

  messages.each do |msg|
    dead_letter msg
    msg.cleanup if msg.respond_to? :cleanup
  end
  true
end

#sizeObject

Number of messages in the Mailbox


125
126
127
# File 'lib/celluloid/mailbox.rb', line 125

def size
  @mutex.synchronize { @messages.size }
end

#to_aObject

Cast to an array


110
111
112
# File 'lib/celluloid/mailbox.rb', line 110

def to_a
  @mutex.synchronize { @messages.dup }
end