Class: Ciri::DevP2P::Server::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Actor
Defined in:
lib/ciri/devp2p/server.rb

Constant Summary

Constants included from Actor

Actor::LOGGER

Instance Attribute Summary collapse

Attributes included from Actor

#executor

Instance Method Summary collapse

Methods included from Actor

#<<, #call, #enqueue, #raise_error, #send_stop, #start, #start_loop, #wait

Constructor Details

#initialize(server, executor:) ⇒ Scheduler

Returns a new instance of Scheduler.


133
134
135
136
137
138
139
140
# File 'lib/ciri/devp2p/server.rb', line 133

def initialize(server, executor:)
  @server = server
  @queued_tasks = []
  @running_tasks = []
  @peers = {}  # init actor

  super(executor: executor)
end

Instance Attribute Details

#serverObject (readonly)

Returns the value of attribute server


130
131
132
# File 'lib/ciri/devp2p/server.rb', line 130

def server
  @server
end

Instance Method Details

#loop_callback {|wait_message: false| ... } ⇒ Object

called by actor loop

Yields:

  • (wait_message: false)

143
144
145
146
# File 'lib/ciri/devp2p/server.rb', line 143

def loop_callback
  schedule_tasks
  yield(wait_message: false)
end

#schedule_tasksObject

invoke tasks, and prepare search peer tasks


162
163
164
165
166
167
168
# File 'lib/ciri/devp2p/server.rb', line 162

def schedule_tasks
  @queued_tasks = start_tasks(@queued_tasks)
  if @queued_tasks.size < MAX_ACTIVE_DIAL_TASKS
    tasks = server.dial.find_peer_tasks(@running_tasks.size + @queued_tasks.size, @peers, Time.now)
    @queued_tasks += tasks
  end
end

#start_tasks(tasks) ⇒ Object


148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/ciri/devp2p/server.rb', line 148

def start_tasks(tasks)
  tasks = tasks.dup
  while @running_tasks.size < MAX_ACTIVE_DIAL_TASKS
    break unless (task = tasks.pop)
    executor.post(task) do |task|
      task.call
      self << [:task_done, task]
    end
    @running_tasks << task
  end
  tasks
end