Module: Ciri::DevP2P::Actor

Included in:
Peer, Server::Scheduler
Defined in:
lib/ciri/devp2p/actor.rb

Overview

simple actor model implementation Example:

class Hello
  include Actor

  def say_hello
    puts 'hello world'
    'hello world'
  end
end

actor = Hello.new# start actor loop

actor.start# push message to actor inbox

actor << :say_hello# push message and wait until get response

actor.call(:say_hello).value

# raise error
actor.call(:hello).value # NoMethodError

# stop actor
actor.send_stop
actor.wait

Defined Under Namespace

Classes: Error, Future, StateError, StopError

Constant Summary collapse

LOGGER =
Logger.new(STDERR, datetime_format: '%Y-%m-%d %H:%M:%S', level: Logger::INFO)

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#executorObject

Returns the value of attribute executor


110
111
112
# File 'lib/ciri/devp2p/actor.rb', line 110

def executor
  @executor
end

Instance Method Details

#<<(args) ⇒ Object


124
125
126
# File 'lib/ciri/devp2p/actor.rb', line 124

def <<(args)
  @inbox << args
end

#call(method, *args) ⇒ Object

sync call, push msg to inbox, and return future

Example:

future = actor.call(:result) # future
future.value # blocking and wait for result

134
135
136
137
138
# File 'lib/ciri/devp2p/actor.rb', line 134

def call(method, *args)
  future = Future.new
  self << [future, method, *args]
  future
end

#enqueue(method, *args) ⇒ Object

async call


120
121
122
# File 'lib/ciri/devp2p/actor.rb', line 120

def enqueue(method, *args)
  self << [method, *args]
end

#initialize(executor: nil) ⇒ Object


112
113
114
115
116
117
# File 'lib/ciri/devp2p/actor.rb', line 112

def initialize(executor: nil)
  @inbox = Queue.new
  @executor = executor
  @future = Future.new
  @running = false
end

#loop_callbackObject

allow inject callback into actor loop Example:

class A
  include Actor

  def loop_callback    # before handle msg

    yield    # after handle msg

  end
end

214
215
216
# File 'lib/ciri/devp2p/actor.rb', line 214

def loop_callback
  yield
end

#raise_error(e) ⇒ Object


218
219
220
# File 'lib/ciri/devp2p/actor.rb', line 218

def raise_error(e)
  raise e
end

#send_stopObject

send stop to actor

Example:

actor.send_stop# wait for actor actually stopped

actor.wait

157
158
159
# File 'lib/ciri/devp2p/actor.rb', line 157

def send_stop
  self << [:raise_error, StopError.new]
end

#startObject

start actor

Raises:


141
142
143
144
145
146
147
148
# File 'lib/ciri/devp2p/actor.rb', line 141

def start
  raise Error.new("must set executor before start") unless executor

  @running = true
  executor.post do
    start_loop
  end
end

#start_loopObject

start loop


168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/ciri/devp2p/actor.rb', line 168

def start_loop
  loop_callback do |wait_message: true|
    # check inbox
    next Thread.pass if @inbox.empty? && !wait_message
    msg = @inbox.pop

    # extract sync or async call
    future = nil
    method, *args = msg
    if method.is_a?(Future)
      future = method
      method, *args = args
    end
    begin
      val = send(method, *args)
    rescue StandardError => e
      future.raise_error(e) if future
      raise
    end    # if future not nil, set value

    future.value = val if future
  end while true

rescue StopError  # actor stop

  @future.value = nil
rescue StandardError => e
  @future.raise_error e
  LOGGER.error("Actor #{self}") {"#{e}\n#{e.backtrace.join("\n")}"}
ensure
  @running = false
end

#waitObject

wait until an error occurs

Raises:


162
163
164
165
# File 'lib/ciri/devp2p/actor.rb', line 162

def wait
  raise StateError.new('actor not running!') unless @running
  @future.value
end