Class: Roby::ExecutionEngine Private
- Extended by:
- Logger::Hierarchy, PropagationHandlerMethods
- Includes:
- Logger::Hierarchy, Roby::EventLogging::Mixin, PropagationHandlerMethods
- Defined in:
- lib/roby/execution_engine.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The core execution algorithm
It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).
The main method is #process_events. When executing a Roby application, it is called periodically by #event_loop.
Defined Under Namespace
Modules: PropagationHandlerMethods Classes: AlreadyRunning, EventLoopExitState, ExceptionPropagationVisitor, JoinAllWaitingWorkTimeout, NotPropagationContext, PollBlockDefinition, PropagationInfo, RecursivePropagationContext
Constant Summary collapse
- PENDING_PROPAGATION_FORWARD =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
1- PENDING_PROPAGATION_SIGNAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
2- SLEEP_MIN_TIME =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Do not sleep or call Thread#pass if there is less that this much time left in the cycle
0.01- INTERRUPT_FORCE_EXIT_DEAD_ZONE =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
How many seconds between two Interrupt before the execution engine's loop can forcefully quit
10- EXCEPTION_NONFATAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for non-fatal, unhandled exceptions
:nonfatal- EXCEPTION_FATAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for fatal, unhandled exceptions
:fatal- EXCEPTION_HANDLED =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for handled exceptions
:handled- EXCEPTION_FREE_EVENT =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Exception kind passed to #on_exception handlers for free event exceptions
:free_event
Instance Attribute Summary collapse
-
#additional_errors ⇒ Object
readonly
private
Used during exception propagation to inject new errors in the process.
-
#application_exceptions ⇒ Object
readonly
private
The set of errors which have been generated outside of the plan's control.
-
#at_cycle_end_handlers ⇒ Object
readonly
private
A set of blocks that are called at each cycle end.
-
#control ⇒ Object
private
The DecisionControl object associated with this engine.
-
#cycle_index ⇒ Object
readonly
private
The number of this cycle since the beginning.
-
#cycle_length ⇒ Object
readonly
private
The cycle length in seconds.
-
#cycle_start ⇒ Object
readonly
private
The starting Time of this cycle.
-
#delayed_events ⇒ Object
readonly
private
The set of pending delayed events.
-
#dependency_graph ⇒ Object
readonly
private
Cached graph object for TaskStructure::Dependency.
-
#emitted_events ⇒ Array<Event>
readonly
private
The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop).
-
#event_logger ⇒ Object
private
The underlying DRoby::EventLogger.
-
#event_ordering ⇒ Object
readonly
private
The topological ordering of events w.r.t.
-
#event_priorities ⇒ Object
readonly
private
The event => index hash which give the propagation priority for each event.
-
#exception_listeners ⇒ Array<#call>
readonly
private
The blocks that are currently listening to exceptions.
-
#finalizers ⇒ Object
readonly
private
A set of proc objects which are to be called when the execution engine quits.
-
#forward_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Forward.
-
#last_stop_count ⇒ Object
readonly
private
:nodoc:.
-
#once_blocks ⇒ Queue
readonly
private
Thread-safe queue to push work to the execution engine.
-
#plan ⇒ Object
private
The Plan this engine is acting on.
-
#precedence_graph ⇒ Object
readonly
private
Cached graph object for Roby::EventStructure::Precedence.
-
#process_every ⇒ Object
readonly
private
A set of blocks which are called every cycle.
-
#propagation_id ⇒ Object
readonly
private
A numeric ID giving the count of the current propagation cycle.
-
#propagation_sources ⇒ Object
readonly
private
The set of source events for the current propagation action.
-
#scheduler ⇒ Object
private
The scheduler is the object which handles non-generic parts of the propagation cycle.
-
#signal_graph ⇒ Object
readonly
private
Cached graph object for EventStructure::Signal.
-
#thread ⇒ Object
private
The execution thread if there is one running.
-
#thread_pool ⇒ Concurrent::CachedThreadPool
readonly
private
A thread pool on which async work should be executed.
-
#waiting_work ⇒ Array<#fail,#complete?>
readonly
private
A list of threaded objects waiting for the control thread.
Attributes included from PropagationHandlerMethods
#external_events_handlers, #propagation_handlers, #side_work_handlers
Class Method Summary collapse
-
.call_every(plan) ⇒ Object
private
Calls the periodic blocks which should be called.
-
.make_delay(timeref, source, target, timespec) ⇒ Object
private
Returns a Time object which represents the absolute point in time referenced by
timespecin the context of delaying a propagation betweensourceandtarget. -
.validate_timespec(timespec) ⇒ Object
private
Validates
timespecas a delay specification.
Instance Method Summary collapse
-
#add_error(e, propagate_through: nil) ⇒ Object
private
Register a LocalizedError for future propagation.
-
#add_event_delay(time, is_forward, source, target, context) ⇒ Object
private
Adds a propagation step to be performed when the current time is greater than
time. - #add_event_logger(logger) ⇒ Object private
-
#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object
private
Adds a propagation to the next propagation step: it registers a propagation step to be performed between
sourceandtargetwith the givencontext. -
#add_exceptions_for_inhibition(fatal_errors) ⇒ Object
private
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles.
-
#add_framework_error(error, source) ⇒ Object
private
Registers the given error and a description of its source in the list of application/framework errors.
-
#at_cycle_end(description: "at_cycle_end", **options) {|plan| ... } ⇒ Object
private
Adds a block to be called at the end of each execution cycle.
-
#call_poll_blocks(blocks, late = false) ⇒ Object
private
Helper that calls the propagation handlers in
propagation_handlers(which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler's policy. - #call_propagation_handlers ⇒ Object private
-
#clear ⇒ Object
private
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
- #clear_application_exceptions ⇒ Object private
-
#compute_errors(events_errors) ⇒ PropagationInfo
private
Compute the set of fatal errors in the current execution state.
-
#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object
private
Compute the set of unhandled fatal exceptions.
-
#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
private
Called at each cycle end.
-
#delayed(delay, description: "delayed block", **options, &block) ⇒ Object
private
Schedules
blockto be called once afterdelayseconds passed, in the propagation context. -
#display_exceptions=(flag) ⇒ Object
private
Controls whether this engine should indiscriminately display all fatal exceptions.
-
#display_exceptions? ⇒ Boolean
private
whether this engine should indiscriminately display all fatal exceptions.
-
#error_handling_phase(events_errors) ⇒ Object
private
Compute errors in plan and handle the results.
-
#event_loop ⇒ Object
private
The main event loop.
-
#event_loop_handle_interrupt(exit_state) ⇒ Object
private
Handle a received Interrupt for #event_loop.
-
#event_loop_teardown(exit_state) ⇒ Boolean
private
Handle the teardown logic for #event_loop.
-
#event_propagation_phase(initial_events, propagation_info) ⇒ Object
private
Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block.
-
#event_propagation_step(current_step, propagation_info) ⇒ Object
private
Propagate one step.
-
#every(duration, description: "periodic handler", **options, &block) ⇒ #dispose
private
Call
blockeverydurationseconds. -
#execute(catch: [], type: :external_events, &block) ⇒ Object
private
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context.
-
#execute_delayed_events ⇒ Object
private
Adds the events in
delayed_eventswhose time has passed into the propagation. - #execute_one_cycle(time = Time.now) ⇒ Object private
-
#execute_side_work ⇒ Object
private
Execute the work registered with PropagationHandlerMethods#add_side_work_handler.
-
#finalized_event(event) ⇒ Object
private
Called by #plan when an event has been finalized.
-
#finalized_task(task) ⇒ Object
private
Called by #plan when a task has been finalized.
-
#force_quit ⇒ Object
private
Force quitting, without cleaning up.
-
#forced_exit? ⇒ Boolean
private
True if the control thread is currently quitting.
-
#garbage_collect(force_on = nil) ⇒ Boolean
private
Kills and removes unneeded tasks for which there are no dependencies.
-
#garbage_collect_stop_task(local_task) ⇒ Boolean
private
Handle a single root task in the #garbage_collect process.
-
#gather_errors ⇒ Array<ExecutionException>
private
Executes the given block while gathering errors, and returns the errors that have been declared with #add_error.
-
#gather_external_events ⇒ Object
private
Gather the events that come out of this plan manager.
-
#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object
private
Yields to the block and registers any raised exception using #add_framework_error.
-
#gather_propagation(initial_set = {}, &block) ⇒ Object
private
Sets up a propagation context, yielding the block in it.
- #gathering? ⇒ Boolean private
- #gathering_errors? ⇒ Boolean private
-
#has_pending_exception_matching?(e, object) ⇒ Boolean
private
Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object.
-
#has_pending_forward?(from, to, expected_context) ⇒ Boolean
private
Whether a forward matching this signature is currently pending.
-
#has_pending_signal?(from, to, expected_context) ⇒ Boolean
private
Whether a signal matching this signature is currently pending.
- #has_propagation_for?(target) ⇒ Boolean private
-
#has_queued_events? ⇒ Boolean
private
Returns true if some events are queued.
-
#has_waiting_work? ⇒ Boolean
private
Whether this EE has asynchronous waiting work waiting to be processed.
-
#in_propagation_context? ⇒ Boolean
private
True if we are within a propagation context (i.e. within event processing).
-
#inhibited_exception?(exception) ⇒ Boolean
private
Query whether the given exception is inhibited in this plan.
-
#initialize(plan, control: Roby::DecisionControl.new) ⇒ ExecutionEngine
constructor
private
Create an execution engine acting on
plan, usingcontrolas the decision control object. -
#inside_control? ⇒ Boolean
private
True if the current thread is the execution thread of this engine.
- #interrupt ⇒ Object private
- #issue_quit_progression_warning(remaining) ⇒ Object private
-
#join_all_waiting_work(timeout: nil) ⇒ Object
private
Waits for all obligations in #waiting_work to finish.
-
#killall ⇒ Object
private
Kill all tasks that are currently running in the plan.
-
#log(event_name, *values) ⇒ Object
private
Log an event in the system.
-
#next_event(pending) ⇒ Object
private
call-seq: next_event(pending) => event, propagation_info.
-
#notify_about_error_handling_results(errors) ⇒ Object
private
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions.
-
#notify_exception(kind, error, involved_objects) ⇒ Object
private
Call to notify the listeners registered with #on_exception of the occurence of an exception.
-
#on_exception(description: "exception listener", on_error: :disable) {|kind, error, tasks| ... } ⇒ Object
private
Registers a callback that will be called when exceptions are propagated in the plan.
-
#on_log(on_error: :disable, &block) ⇒ #dispose
private
Set of blocks called with log events.
-
#once(sync: nil, description: "once block", type: :external_events, **options, &block) ⇒ Object
private
Schedules
blockto be called at the beginning of the next execution cycle, in propagation context. -
#outside_control? ⇒ Boolean
private
True if the current thread is not the execution thread of this engine, or if there is not control thread.
-
#prepare_propagation(target, is_forward, info) ⇒ Object
private
call-seq: prepare_propagation(target, is_forward, info) => source_events, source_generators, context prepare_propagation(target, is_forward, info) => nil.
-
#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo
private
The inside part of the event loop.
-
#process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) ⇒ Object
private
Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling.
-
#process_once_blocks ⇒ Object
private
Dispatch #once_blocks to the other handler sets for further processing.
- #process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object private
-
#process_waiting_work ⇒ Object
private
Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler).
-
#promise(description: nil, executor: thread_pool, &block) ⇒ Object
private
Create a promise to execute the given block in a separate thread.
-
#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo
private
Propagate an initial set of event propagations and errors.
-
#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>
private
The core exception propagation algorithm.
-
#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>
private
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions.
-
#propagation_context(sources) ⇒ Object
private
Sets the source_event and source_generator variables according to
source. -
#propagation_source_events ⇒ Object
private
The set of events extracted from #sources.
-
#propagation_source_generators ⇒ Object
private
The set of generators extracted from #sources.
-
#queue_forward(sources, target, context, timespec) ⇒ Object
private
Queue a forwarding to be propagated.
-
#queue_signal(sources, target, context, timespec) ⇒ Object
private
Queue a signal to be propagated.
-
#quit ⇒ Object
private
Make control quit properly.
-
#quitting? ⇒ Boolean
private
True if the control thread is currently quitting.
-
#refresh_relations ⇒ Object
private
Refresh the value of cached relations.
-
#remove_at_cycle_end(handler_id) ⇒ Object
private
Removes a handler added by #at_cycle_end.
-
#remove_exception_listener(handler) ⇒ void
private
Removes an exception listener registered with #on_exception.
-
#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>
private
Process the given exceptions to remove the ones that are currently filtered by the plan repairs.
-
#remove_periodic_handler(id) ⇒ Object
private
Removes a periodic handler defined by #every.
-
#reset ⇒ Object
private
Make a quit EE ready for reuse.
- #reset_thread_pool ⇒ Object private
-
#run(cycle: 0.1) ⇒ Object
private
Main event loop.
- #shutdown ⇒ Object private
-
#start_new_cycle(time = Time.now) ⇒ Object
private
Set the cycle_start attribute and increment cycle_index.
- #unmark_finished_missions_and_permanent_tasks ⇒ Object private
-
#unreachable_event(event) ⇒ Object
private
Called by EventGenerator when an event became unreachable.
-
#wait_one_cycle ⇒ Object
private
Blocks until at least once execution cycle has been done.
-
#wait_until(ev) ⇒ Object
private
Stops the current thread until the given even is emitted.
Methods included from PropagationHandlerMethods
add_propagation_handler, add_side_work_handler, at_cycle_begin, create_propagation_handler, each_cycle, remove_propagation_handler, remove_side_work_handler
Methods included from Roby::EventLogging::Mixin
#log_flush_cycle, #log_queue_size, #log_timepoint, #log_timepoint_group, #log_timepoint_group_end, #log_timepoint_group_start
Constructor Details
#initialize(plan, control: Roby::DecisionControl.new) ⇒ ExecutionEngine
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create an execution engine acting on plan, using control as the
decision control object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/roby/execution_engine.rb', line 75 def initialize(plan, control: Roby::DecisionControl.new) @plan = plan @event_logger = EventLogging::AggregateEventLogger.new @on_log_handlers = [] @use_oob_gc = ExecutionEngine.use_oob_gc? @control = control @scheduler = Schedulers::Null.new(plan) reset_thread_pool @thread = Thread.current @propagation = nil @propagation_id = 0 @propagation_exceptions = nil @application_exceptions = nil @delayed_events = [] @event_ordering = [] @event_priorities = {} @propagation_handlers = [] @external_events_handlers = [] @side_work_handlers = [] @at_cycle_end_handlers = [] @process_every = [] @waiting_work = Concurrent::Array.new @emitted_events = [] @exception_listeners = [] @worker_threads_mtx = Mutex.new @worker_threads = [] @once_blocks = Queue.new @pending_exceptions = {} each_cycle(&ExecutionEngine.method(:call_every)) @quit = 0 @allow_propagation = true @cycle_index = 0 @cycle_start = Time.now @cycle_length = 0.1 @last_stop_count = 0 @finalizers = [] @gc_warning = true refresh_relations @exception_display_handler = Roby.null_disposable self.display_exceptions = true end |
Instance Attribute Details
#additional_errors ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used during exception propagation to inject new errors in the process
It shall not be accessed directly. Instead, Plan#add_error should be called
1477 1478 1479 |
# File 'lib/roby/execution_engine.rb', line 1477 def additional_errors @additional_errors end |
#application_exceptions ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of errors which have been generated outside of the plan's control. For now, those errors cause the whole controller to shut down.
1462 1463 1464 |
# File 'lib/roby/execution_engine.rb', line 1462 def application_exceptions @application_exceptions end |
#at_cycle_end_handlers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of blocks that are called at each cycle end
2203 2204 2205 |
# File 'lib/roby/execution_engine.rb', line 2203 def at_cycle_end_handlers @at_cycle_end_handlers end |
#control ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The DecisionControl object associated with this engine
181 182 183 |
# File 'lib/roby/execution_engine.rb', line 181 def control @control end |
#cycle_index ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The number of this cycle since the beginning
2262 2263 2264 |
# File 'lib/roby/execution_engine.rb', line 2262 def cycle_index @cycle_index end |
#cycle_length ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The cycle length in seconds
2256 2257 2258 |
# File 'lib/roby/execution_engine.rb', line 2256 def cycle_length @cycle_length end |
#cycle_start ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The starting Time of this cycle
2259 2260 2261 |
# File 'lib/roby/execution_engine.rb', line 2259 def cycle_start @cycle_start end |
#delayed_events ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of pending delayed events. This is an array of the form
[[time, is_forward, source, target, context], ...]
See #add_event_delay for more information
546 547 548 |
# File 'lib/roby/execution_engine.rb', line 546 def delayed_events @delayed_events end |
#dependency_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for TaskStructure::Dependency
This is here for performance reasons, to avoid resolving the same graph over and over
170 171 172 |
# File 'lib/roby/execution_engine.rb', line 170 def dependency_graph @dependency_graph end |
#emitted_events ⇒ Array<Event> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop)
188 189 190 |
# File 'lib/roby/execution_engine.rb', line 188 def emitted_events @emitted_events end |
#event_logger ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The underlying DRoby::EventLogger
It is usually the same than the #plan's. Pass a DRoby::NullEventLogger at construction time to disable logging of execution events.
179 180 181 |
# File 'lib/roby/execution_engine.rb', line 179 def event_logger @event_logger end |
#event_ordering ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.
1006 1007 1008 |
# File 'lib/roby/execution_engine.rb', line 1006 def event_ordering @event_ordering end |
#event_priorities ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The event => index hash which give the propagation priority for each event
1009 1010 1011 |
# File 'lib/roby/execution_engine.rb', line 1009 def event_priorities @event_priorities end |
#exception_listeners ⇒ Array<#call> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The blocks that are currently listening to exceptions
191 192 193 |
# File 'lib/roby/execution_engine.rb', line 191 def exception_listeners @exception_listeners end |
#finalizers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of proc objects which are to be called when the execution engine quits.
2577 2578 2579 |
# File 'lib/roby/execution_engine.rb', line 2577 def finalizers @finalizers end |
#forward_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Forward
This is here for performance reasons, to avoid resolving the same graph over and over
164 165 166 |
# File 'lib/roby/execution_engine.rb', line 164 def forward_graph @forward_graph end |
#last_stop_count ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
:nodoc:
2348 2349 2350 |
# File 'lib/roby/execution_engine.rb', line 2348 def last_stop_count @last_stop_count end |
#once_blocks ⇒ Queue (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Thread-safe queue to push work to the execution engine
Do not access directly, use #once instead
199 200 201 |
# File 'lib/roby/execution_engine.rb', line 199 def once_blocks @once_blocks end |
#plan ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Plan this engine is acting on
173 174 175 |
# File 'lib/roby/execution_engine.rb', line 173 def plan @plan end |
#precedence_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for Roby::EventStructure::Precedence
This is here for performance reasons, to avoid resolving the same graph over and over
152 153 154 |
# File 'lib/roby/execution_engine.rb', line 152 def precedence_graph @precedence_graph end |
#process_every ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A set of blocks which are called every cycle
2225 2226 2227 |
# File 'lib/roby/execution_engine.rb', line 2225 def process_every @process_every end |
#propagation_id ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A numeric ID giving the count of the current propagation cycle
183 184 185 |
# File 'lib/roby/execution_engine.rb', line 183 def propagation_id @propagation_id end |
#propagation_sources ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of source events for the current propagation action. This is a mix of EventGenerator and Event objects.
515 516 517 |
# File 'lib/roby/execution_engine.rb', line 515 def propagation_sources @propagation_sources end |
#scheduler ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its #initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.
See Schedulers::Basic
486 487 488 |
# File 'lib/roby/execution_engine.rb', line 486 def scheduler @scheduler end |
#signal_graph ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cached graph object for EventStructure::Signal
This is here for performance reasons, to avoid resolving the same graph over and over
158 159 160 |
# File 'lib/roby/execution_engine.rb', line 158 def signal_graph @signal_graph end |
#thread ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The execution thread if there is one running
2250 2251 2252 |
# File 'lib/roby/execution_engine.rb', line 2250 def thread @thread end |
#thread_pool ⇒ Concurrent::CachedThreadPool (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A thread pool on which async work should be executed
146 147 148 |
# File 'lib/roby/execution_engine.rb', line 146 def thread_pool @thread_pool end |
#waiting_work ⇒ Array<#fail,#complete?> (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A list of threaded objects waiting for the control thread
Objects registered here will be notified them by calling #fail on them when it quits. In addition, #join_all_waiting_work will wait for all pending jobs to finish.
Note that all Concurrent::Obligation subclasses fit the bill
2177 2178 2179 |
# File 'lib/roby/execution_engine.rb', line 2177 def waiting_work @waiting_work end |
Class Method Details
.call_every(plan) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calls the periodic blocks which should be called
2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 |
# File 'lib/roby/execution_engine.rb', line 2145 def self.call_every(plan) # :nodoc: engine = plan.execution_engine now = engine.cycle_start length = engine.cycle_length engine.process_every.map! do |handler, last_call, duration| next if handler.disposed? # Check if the nearest timepoint is the beginning of # this cycle or of the next cycle if !last_call || (duration - (now - last_call)) < length / 2 unless handler.call(engine, engine.plan) next end next if handler.once? next if handler.disposed? last_call = now end [handler, last_call, duration] end.compact! end |
.make_delay(timeref, source, target, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a Time object which represents the absolute point in time
referenced by timespec in the context of delaying a propagation
between source and target.
See validate_timespec for more information
996 997 998 999 1000 1001 1002 |
# File 'lib/roby/execution_engine.rb', line 996 def self.make_delay(timeref, source, target, timespec) if delay = timespec[:delay] then timeref + delay elsif at = timespec[:at] then at else raise ArgumentError, "invalid timespec #{timespec}" end end |
.validate_timespec(timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Validates timespec as a delay specification. A valid delay
specification is either nil or a hash, in which case two forms are
possible:
at: absolute_time
delay: number
985 986 987 988 989 |
# File 'lib/roby/execution_engine.rb', line 985 def self.validate_timespec(timespec) if timespec timespec = timespec, %i[delay at] end end |
Instance Method Details
#add_error(e, propagate_through: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a LocalizedError for future propagation
This method must be called in a error-gathering context (i.e. #gather_errors).
639 640 641 642 643 644 645 646 647 648 649 |
# File 'lib/roby/execution_engine.rb', line 639 def add_error(e, propagate_through: nil) plan_exception = e.to_execution_exception if @propagation_exceptions @propagation_exceptions << [plan_exception, propagate_through] else Roby.log_exception_with_backtrace(e, self, :fatal) raise NotPropagationContext, "#add_error called outside an error-gathering "\ "context (#add_error)" end end |
#add_event_delay(time, is_forward, source, target, context) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a propagation step to be performed when the current time is
greater than time. The propagation step is a signal if is_forward
is false and a forward otherwise.
This method should not be called directly. Use #add_event_propagation
with the appropriate timespec argument.
See also #delayed_events and #execute_delayed_events
556 557 558 |
# File 'lib/roby/execution_engine.rb', line 556 def add_event_delay(time, is_forward, source, target, context) delayed_events << [time, is_forward, source, target, context] end |
#add_event_logger(logger) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
126 127 128 |
# File 'lib/roby/execution_engine.rb', line 126 def add_event_logger(logger) @event_logger.add(logger) end |
#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a propagation to the next propagation step: it registers a
propagation step to be performed between source and target with
the given context. If is_forward is true, the propagation will be
a forwarding, otherwise it is a signal.
If timespec is not nil, it defines a delay to be applied before
calling the target event.
See #gather_propagation
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 |
# File 'lib/roby/execution_engine.rb', line 765 def add_event_propagation(is_forward, sources, target, context, timespec) if target.plan != plan raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan" end target.pending(sources.find_all { |ev| ev.kind_of?(Event) }) @propagation_step_id += 1 target_info = (@propagation[target] ||= [@propagation_step_id, [], []]) step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL] if sources.empty? step << nil << context << timespec else sources.each do |ev| step << ev << context << timespec end end end |
#add_exceptions_for_inhibition(fatal_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles
1975 1976 1977 1978 1979 1980 1981 1982 |
# File 'lib/roby/execution_engine.rb', line 1975 def add_exceptions_for_inhibition(fatal_errors) fatal_errors.each do |exception, involved_tasks| involved_tasks.each do |t| (@pending_exceptions[t] ||= Set.new) << [exception.exception.class, exception.origin] end end end |
#add_framework_error(error, source) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers the given error and a description of its source in the list of application/framework errors
It must be called within an exception-gathering context, that is either within #process_events, or within #gather_framework_errors
These errors will terminate the event loop
718 719 720 721 722 723 724 725 |
# File 'lib/roby/execution_engine.rb', line 718 def add_framework_error(error, source) if @application_exceptions @application_exceptions << [error, source] else Roby.log_exception_with_backtrace(error, self, :fatal) raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context" end end |
#at_cycle_end(description: "at_cycle_end", **options) {|plan| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds a block to be called at the end of each execution cycle
2211 2212 2213 2214 2215 |
# File 'lib/roby/execution_engine.rb', line 2211 def at_cycle_end(description: "at_cycle_end", **, &block) handler = PollBlockDefinition.new(description, block, **) at_cycle_end_handlers << handler handler end |
#call_poll_blocks(blocks, late = false) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Helper that calls the propagation handlers in propagation_handlers
(which are expected to be instances of PollBlockDefinition) and
handles the errors according of each handler's policy
805 806 807 808 809 810 811 812 813 814 815 816 817 818 |
# File 'lib/roby/execution_engine.rb', line 805 def call_poll_blocks(blocks, late = false) blocks.delete_if do |handler| if handler.disabled? || (handler.late? ^ late) next(handler.disposed?) end log_timepoint_group handler.description do unless handler.call(self, plan) handler.disabled = true end end handler.once? || handler.disposed? end end |
#call_propagation_handlers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 |
# File 'lib/roby/execution_engine.rb', line 841 def call_propagation_handlers process_once_blocks if scheduler.enabled? gather_framework_errors("scheduler") do log_timepoint_group "scheduler-initial-events" do scheduler.initial_events end end end call_poll_blocks(self.class.propagation_handlers, false) call_poll_blocks(self.propagation_handlers, false) unless has_queued_events? call_poll_blocks(self.class.propagation_handlers, true) call_poll_blocks(self.propagation_handlers, true) end end |
#clear ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 |
# File 'lib/roby/execution_engine.rb', line 2356 def clear plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) } plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) } plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) } plan.force_gc.merge(plan.tasks) quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks) remaining = plan.tasks - quaranteened_subplan @pending_exceptions.clear if remaining.empty? # Have to call #garbage_collect one more to make # sure that unneeded events are removed as well garbage_collect # Done cleaning the tasks, clear the remains plan.transactions.each do |trsc| trsc.discard_transaction if trsc.self_owned? end plan.clear emitted_events.clear return end remaining end |
#clear_application_exceptions ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
1464 1465 1466 1467 1468 1469 1470 1471 |
# File 'lib/roby/execution_engine.rb', line 1464 def clear_application_exceptions unless @application_exceptions raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions" end result, @application_exceptions = @application_exceptions, nil result end |
#compute_errors(events_errors) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the set of fatal errors in the current execution state
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 |
# File 'lib/roby/execution_engine.rb', line 1484 def compute_errors(events_errors) # Generate exceptions from task structure structure_errors = plan.check_structure log_timepoint "structure_check" # Propagate the errors. Note that the plan repairs are taken into # account in ExecutionEngine.propagate_exceptions directly. We keep # event and structure errors separate since in the first case there # is not two-stage handling (all errors that have not been handled # are fatal), and in the second case we call #check_structure # again to errors that are remaining after the call to the exception # handlers events_errors, free_events_errors, events_handled = propagate_exceptions(events_errors) _, structure_handled = propagate_exceptions(structure_errors) log_timepoint "exception_propagation" # Get the remaining problems in the plan structure, and act on it structure_errors, structure_inhibited = remove_inhibited_exceptions(plan.check_structure) # Partition them by fatal/nonfatal fatal_errors, nonfatal_errors = [], [] (structure_errors + events_errors).each do |e, involved_tasks| if e.fatal? fatal_errors << [e, involved_tasks] else nonfatal_errors << [e, involved_tasks] end end kill_tasks = compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set handled_errors = structure_handled + events_handled debug "#{fatal_errors.size} fatal errors found and "\ "#{free_events_errors.size} errors involving free events" debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks" PropagationInfo.new( Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors, structure_inhibited ) end |
#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute the set of unhandled fatal exceptions
921 922 923 924 925 926 927 928 |
# File 'lib/roby/execution_engine.rb', line 921 def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) kill_tasks = fatal_errors.inject(Set.new) do |tasks, (_exception, affected_tasks)| tasks.merge(affected_tasks) end # Tasks might have been finalized during exception handling, filter # those out kill_tasks.find_all(&:plan) end |
#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called at each cycle end
2609 2610 2611 2612 2613 |
# File 'lib/roby/execution_engine.rb', line 2609 def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do call_poll_blocks(at_cycle_end_handlers) end end |
#delayed(delay, description: "delayed block", **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules block to be called once after delay seconds passed, in
the propagation context
1451 1452 1453 1454 1455 1456 1457 |
# File 'lib/roby/execution_engine.rb', line 1451 def delayed(delay, description: "delayed block", **, &block) handler = PollBlockDefinition.new(description, block, once: true, **) once do process_every << [handler, cycle_start, delay] end handler end |
#display_exceptions=(flag) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Controls whether this engine should indiscriminately display all fatal exceptions
This is on by default
2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 |
# File 'lib/roby/execution_engine.rb', line 2766 def display_exceptions=(flag) unless flag @exception_display_handler.dispose return end return unless @exception_display_handler.disposed? @exception_display_handler = on_exception do |kind, error, tasks| level = if kind == EXCEPTION_HANDLED then :debug else :warn end send(level) do send(level, "encountered a #{kind} exception") Roby.log_exception_with_backtrace(error.exception, self, level) if kind == EXCEPTION_HANDLED send(level, "the exception was handled by") else send(level, "the exception involved") end tasks.each do |t| send(level, " #{t}") end break end end end |
#display_exceptions? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
whether this engine should indiscriminately display all fatal exceptions
2797 2798 2799 |
# File 'lib/roby/execution_engine.rb', line 2797 def display_exceptions? !@exception_display_handler.disposed? end |
#error_handling_phase(events_errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Compute errors in plan and handle the results
906 907 908 909 910 911 912 913 914 915 916 917 918 |
# File 'lib/roby/execution_engine.rb', line 906 def error_handling_phase(events_errors) # Do the exception handling phase errors = compute_errors(events_errors) notify_about_error_handling_results(errors) # nonfatal errors are only notified. Fatal errors (kill_tasks) are # handled in the propagation loop during garbage collection. Only # the free events errors have to be handled here. errors.free_events_errors.each do |exception, generators| generators.each { |g| g.unreachable!(exception.exception) } end errors end |
#event_loop ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use #run to start the event loop in a separate thread.
2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 |
# File 'lib/roby/execution_engine.rb', line 2411 def event_loop @cycle_start = Time.now @cycle_index = 0 exit_state = EventLoopExitState.new(0, Time.now, nil) @interrupted = false loop do GC::Profiler.enable if profile_gc? if @interrupted @interrupted = false event_loop_handle_interrupt(exit_state) end if quitting? return if forced_exit? return if event_loop_teardown(exit_state) end log_timepoint_group "cycle" do execute_one_cycle end GC::Profiler.disable if profile_gc? rescue Exception => e if quitting? fatal "Execution thread FORCEFULLY quitting "\ "because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) raise else quit fatal "Execution thread quitting because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) end end ensure unless plan.tasks.empty? warn "the following tasks are still present in the plan:" plan.tasks.each do |t| warn " #{t}" end end end |
#event_loop_handle_interrupt(exit_state) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle a received Interrupt for #event_loop
2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 |
# File 'lib/roby/execution_engine.rb', line 2486 def event_loop_handle_interrupt(exit_state) if quitting? exit_state.force_exit_deadline ||= Time.now + INTERRUPT_FORCE_EXIT_DEADLINE time_until_deadline = exit_state.force_exit_deadline - Time.now if time_until_deadline < 0 fatal "Quitting without cleaning up" force_quit else fatal "Still #{time_until_deadline.ceil}s before "\ "interruption will quit without cleaning up" end else fatal "Received interruption request" fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s "\ "to quit without cleaning up" quit exit_state.force_exit_deadline = Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE end end |
#event_loop_teardown(exit_state) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle the teardown logic for #event_loop
2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 |
# File 'lib/roby/execution_engine.rb', line 2462 def event_loop_teardown(exit_state) return true unless (remaining = clear) display_warning = (exit_state.last_stop_count != remaining.size) || (Time.now - exit_state.last_quit_warning) > 10 return unless display_warning if display_warning Robot.info "Roby quitting ..." if exit_state.last_stop_count == 0 issue_quit_progression_warning(remaining) exit_state.last_quit_warning = Time.now exit_state.last_stop_count = remaining.size end false rescue Exception => e Robot.warn "Execution thread failed to clean up" Roby.log_exception_with_backtrace(e, Robot, :warn, filter: false) true end |
#event_propagation_phase(initial_events, propagation_info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block
If a block is given, it is called with the initial set of events: the
events we should consider as already emitted in the following propagation.
seeds si a list of procs which should be called to initiate the propagation
(i.e. build an initial set of events)
891 892 893 894 895 896 897 898 899 900 901 902 903 |
# File 'lib/roby/execution_engine.rb', line 891 def event_propagation_phase(initial_events, propagation_info) @propagation_id += 1 gather_errors do next_steps = initial_events until next_steps.empty? until next_steps.empty? next_steps = event_propagation_step(next_steps, propagation_info) end next_steps = gather_propagation { call_propagation_handlers } end end end |
#event_propagation_step(current_step, propagation_info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagate one step
current_step describes all pending emissions and calls.
This method calls ExecutionEngine.next_event to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.
The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 |
# File 'lib/roby/execution_engine.rb', line 1118 def event_propagation_step(current_step, propagation_info) signalled, _step_id, forward_info, call_info = next_event(current_step) next_step = nil if !call_info.empty? source_events, source_generators, context = prepare_propagation(signalled, false, call_info) if source_events log(:generator_propagate_events, false, source_events, signalled) if signalled.self_owned? next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin propagation_info.add_generator_call(signalled) signalled.call_without_propagation(context) rescue Roby::LocalizedError => e if signalled.command_emitted? add_error(e) else signalled.emit_failed(e) end rescue Exception => e if signalled.command_emitted? add_error(Roby::CommandFailed.new(e, signalled)) else signalled.emit_failed(Roby::CommandFailed.new(e, signalled)) end end end end end end if forward_info next_step ||= {} target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []]) target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info) end elsif !forward_info.empty? source_events, source_generators, context = prepare_propagation(signalled, true, forward_info) if source_events log(:generator_propagate_events, true, source_events, signalled) # If the destination event is not owned, but if the peer is not # connected, the event is our responsibility now. if signalled.self_owned? || signalled.owners.none? { |peer| peer != plan.local_owner && peer.connected? } next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin if event = signalled.emit_without_propagation(context) propagation_info.add_event_emission(event) emitted_events << event end rescue Roby::LocalizedError => e Roby.warn( "Internal Error: #emit_without_propagation "\ "emitted a LocalizedError exception. This is "\ "unsupported and will become a fatal error in "\ "the future. You should usually replace raise "\ "with engine.add_error" ) Roby.display_exception( Roby.logger.io(:warn), e, false ) add_error(e) rescue Exception => e Roby.warn( "Internal Error: #emit_without_propagation "\ "emitted an exception. This is unsupported "\ "and will become a fatal error in the future. "\ "You should create a proper localized error "\ "and replace raise with engine.add_error" ) Roby.display_exception( Roby.logger.io(:warn), e, false ) add_error(Roby::EmissionFailed.new(e, signalled)) end end end end end end current_step.merge!(next_step) if next_step current_step end |
#every(duration, description: "periodic handler", **options, &block) ⇒ #dispose
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call block every duration seconds. Note that duration is round
up to the cycle size (time between calls is at least duration)
2232 2233 2234 2235 2236 2237 2238 2239 2240 |
# File 'lib/roby/execution_engine.rb', line 2232 def every(duration, description: "periodic handler", **, &block) handler = PollBlockDefinition.new(description, block, **) once do if handler.call(self, plan) process_every << [handler, cycle_start, duration] end end handler end |
#execute(catch: [], type: :external_events, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.
2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 |
# File 'lib/roby/execution_engine.rb', line 2618 def execute(catch: [], type: :external_events, &block) raise ArgumentError, "a block is required" unless block_given? if inside_control? return yield end capture_catch = lambda do |symbol, *other| caught = catch(symbol) do if other.empty? return [:ret, yield] else return capture_catch(block, *other) end end [:throw, [symbol, caught]] end ivar = Concurrent::IVar.new once(sync: ivar, type: type) do begin if !catch.empty? result = capture_catch.call(*catch, &block) ivar.set(result) else ivar.set([:ret, yield]) end rescue ::Exception => e # rubocop:disable Lint/RescueException ivar.set([:raise, e]) end end mode, value = ivar.value! case mode when :ret value when :throw throw(*value) else raise value end end |
#execute_delayed_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Adds the events in delayed_events whose time has passed into the
propagation. This must be called in propagation context.
See #add_event_delay and #delayed_events
564 565 566 567 568 569 570 571 572 |
# File 'lib/roby/execution_engine.rb', line 564 def execute_delayed_events reftime = Time.now delayed_events.delete_if do |time, forward, source, signalled, context| if time <= reftime add_event_propagation(forward, [source], signalled, context, nil) true end end end |
#execute_one_cycle(time = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 |
# File 'lib/roby/execution_engine.rb', line 2508 def execute_one_cycle(time = Time.now) last_process_times = Process.times last_dump_time = plan.event_logger.dump_time while time > cycle_start + cycle_length @cycle_start += cycle_length @cycle_index += 1 end stats = {} stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec] stats[:actual_start] = time - cycle_start stats[:cycle_index] = cycle_index log_timepoint_group "process_events" do process_events end execute_side_work log_timepoint "side-work" if use_oob_gc? stats[:pre_oob_gc] = GC.stat GC::OOB.run end # Sleep if there is enough time for it remaining_cycle_time = cycle_length - (Time.now - cycle_start) if remaining_cycle_time > SLEEP_MIN_TIME sleep(remaining_cycle_time) end log_timepoint "sleep" # Log cycle statistics process_times = Process.times dump_time = plan.event_logger.dump_time stats[:log_queue_size] = plan.log_queue_size stats[:plan_task_count] = plan.num_tasks stats[:plan_event_count] = plan.num_free_events stats[:gc] = GC.stat stats[:utime] = process_times.utime - last_process_times.utime stats[:stime] = process_times.stime - last_process_times.stime stats[:dump_time] = dump_time - last_dump_time stats[:state] = Roby::State stats[:end] = Time.now - cycle_start if profile_gc? stats[:gc_profile_data] = GC::Profiler.raw_data stats[:gc_total_time] = GC::Profiler.total_time else stats[:gc_profile_data] = nil stats[:gc_total_time] = 0 end cycle_end(stats) log_flush_cycle :cycle_end, stats @cycle_start += cycle_length @cycle_index += 1 end |
#execute_side_work ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute the work registered with Roby::ExecutionEngine::PropagationHandlerMethods#add_side_work_handler
430 431 432 433 |
# File 'lib/roby/execution_engine.rb', line 430 def execute_side_work call_poll_blocks(self.side_work_handlers, false) call_poll_blocks(self.class.side_work_handlers, false) end |
#finalized_event(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #plan when an event has been finalized
585 586 587 588 589 |
# File 'lib/roby/execution_engine.rb', line 585 def finalized_event(event) @propagation&.delete(event) event.unreachable!("finalized", plan) # since the event is already finalized, end |
#finalized_task(task) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #plan when a task has been finalized
580 581 582 |
# File 'lib/roby/execution_engine.rb', line 580 def finalized_task(task) @pending_exceptions.delete(task) end |
#force_quit ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Force quitting, without cleaning up
2599 2600 2601 |
# File 'lib/roby/execution_engine.rb', line 2599 def force_quit @quit = 2 end |
#forced_exit? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the control thread is currently quitting
2585 2586 2587 |
# File 'lib/roby/execution_engine.rb', line 2585 def forced_exit? @quit > 1 end |
#garbage_collect(force_on = nil) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Kills and removes unneeded tasks for which there are no dependencies
2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 |
# File 'lib/roby/execution_engine.rb', line 2012 def garbage_collect(force_on = nil) if force_on && !force_on.empty? info "GC: adding #{force_on.size} tasks in the force_gc set" valid_plan, mismatching_plan = force_on.partition do |t| t.plan == self.plan end plan.force_gc.merge(valid_plan) unless mismatching_plan.empty? mismatches_s = mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" } .join(", ") raise ArgumentError, "#{mismatches_s} have been given to #{self}.garbage_collect, "\ "but they are not tasks in #{plan}" end end unmark_finished_missions_and_permanent_tasks # The set of tasks for which we will queue stop! at this cycle # #finishing? is false until the next event propagation cycle finishing = Set.new # Loop until no tasks have been removed loop do tasks = plan.unneeded_tasks | plan.force_gc local_tasks = plan.local_tasks & tasks remote_tasks = tasks - local_tasks # Remote tasks are simply removed, regardless of other concerns for t in remote_tasks debug { "GC: removing the remote task #{t}" } plan.garbage_task(t) end break if local_tasks.empty? debug do debug "#{local_tasks.size} tasks are unneeded in this plan" local_tasks.each do |t| debug " #{t} mission=#{plan.mission_task?(t)} "\ "permanent=#{plan.permanent_task?(t)}" end break end # Find the roots, that is the tasks we should be trying to # stop. They are the tasks which have no running parents. roots = local_tasks.dup - finishing plan.default_useful_task_graphs.each do |g| roots.delete_if do |t| g.each_in_neighbour(t).any? { |p| !p.finished? } end break if roots.empty? end new_finishing_tasks = roots.find_all { |local_task| garbage_collect_stop_task(local_task) } finishing.merge(new_finishing_tasks) break unless roots.any?(&:finalized?) end finishing.each(&:stop!) plan.unneeded_events.each do |event| plan.garbage_event(event) end !finishing.empty? end |
#garbage_collect_stop_task(local_task) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle a single root task in the #garbage_collect process
2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 |
# File 'lib/roby/execution_engine.rb', line 2091 def garbage_collect_stop_task(local_task) if local_task.pending? info "GC: removing pending task #{local_task}" plan.garbage_task(local_task) elsif local_task.failed_to_start? info "GC: removing task that failed to start #{local_task}" plan.garbage_task(local_task) elsif local_task.starting? # wait for task to be started before killing it debug "GC: #{local_task} is starting" elsif local_task.finished? debug "GC: #{local_task} is not running, removed" plan.garbage_task(local_task) elsif local_task.finishing? debug do debug "GC: waiting for #{local_task} to finish" local_task.history.each do |ev| debug "GC: #{ev}" end break end elsif local_task.quarantined? warn "GC: #{local_task} is running but in quarantine" elsif local_task.event(:stop).controlable? debug { "GC: attempting to stop #{local_task}" } if !local_task.respond_to?(:stop!) warn "something fishy: #{local_task}/stop is controlable but "\ "there is no #stop! method, putting in quarantine" plan.quarantine_task(local_task) else return true end else warn "GC: #{local_task} cannot be stopped, putting in quarantine" plan.quarantine_task(local_task) end false end |
#gather_errors ⇒ Array<ExecutionException>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes the given block while gathering errors, and returns the errors that have been declared with #add_error
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 |
# File 'lib/roby/execution_engine.rb', line 867 def gather_errors if @propagation_exceptions raise InternalError, "recursive call to #gather_errors" end # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation_exceptions = [] yield @propagation_exceptions ensure @propagation_exceptions = nil end end |
#gather_external_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gather the events that come out of this plan manager
834 835 836 837 838 839 |
# File 'lib/roby/execution_engine.rb', line 834 def gather_external_events process_once_blocks gather_framework_errors("delayed events") { execute_delayed_events } call_poll_blocks(self.class.external_events_handlers) call_poll_blocks(self.external_events_handlers) end |
#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Yields to the block and registers any raised exception using #add_framework_error
If the method is called within an exception-gathering context (either #process_events or #gather_framework_errors itself), nothing else is done. Otherwise, #process_pending_application_exceptions is called to re-raise any caught exception
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 |
# File 'lib/roby/execution_engine.rb', line 658 def gather_framework_errors(source, raise_caught_exceptions: true) if @application_exceptions recursive_error_gathering_context = true else @application_exceptions = [] end yield if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end rescue Exception => e add_framework_error(e, source) if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end ensure if !recursive_error_gathering_context && raise_caught_exceptions process_pending_application_exceptions end end |
#gather_propagation(initial_set = {}, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to #emit and #call are stored in an internal hash of the form:
target => [forward_sources, signal_sources]
where the two _sources are arrays of the form
[[source, context], ...]
The method returns the resulting hash. Use #in_propagation_context? to know if the current engine is in a propagation context, and #add_event_propagation to add a new entry to this set.
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 |
# File 'lib/roby/execution_engine.rb', line 607 def gather_propagation(initial_set = {}, &block) if in_propagation_context? raise InternalError, "nested call to #gather_propagation" end old_allow_propagation, @allow_propagation = @allow_propagation, true # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation = initial_set @propagation_sources = nil @propagation_step_id = 0 propagation_context([], &block) result, @propagation = @propagation, nil result ensure @propagation = nil @allow_propagation = old_allow_propagation end end |
#gathering? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
499 500 501 502 503 |
# File 'lib/roby/execution_engine.rb', line 499 def gathering? Roby.warn_deprecated "#gathering? is deprecated, use "\ "#in_propagation_context? instead" in_propagation_context? end |
#gathering_errors? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
859 860 861 |
# File 'lib/roby/execution_engine.rb', line 859 def gathering_errors? !!@propagation_exceptions end |
#has_pending_exception_matching?(e, object) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object
1969 1970 1971 |
# File 'lib/roby/execution_engine.rb', line 1969 def has_pending_exception_matching?(e, object) @pending_exceptions[object]&.include?([e.exception.class, e.origin]) end |
#has_pending_forward?(from, to, expected_context) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether a forward matching this signature is currently pending
785 786 787 788 789 790 791 |
# File 'lib/roby/execution_engine.rb', line 785 def has_pending_forward?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end |
#has_pending_signal?(from, to, expected_context) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether a signal matching this signature is currently pending
794 795 796 797 798 799 800 |
# File 'lib/roby/execution_engine.rb', line 794 def has_pending_signal?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end |
#has_propagation_for?(target) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
739 740 741 |
# File 'lib/roby/execution_engine.rb', line 739 def has_propagation_for?(target) @propagation&.has_key?(target) end |
#has_queued_events? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true if some events are queued
592 593 594 |
# File 'lib/roby/execution_engine.rb', line 592 def has_queued_events? !@propagation.empty? end |
#has_waiting_work? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether this EE has asynchronous waiting work waiting to be processed
1528 1529 1530 1531 1532 |
# File 'lib/roby/execution_engine.rb', line 1528 def has_waiting_work? # Filter out unscheduled promises (promises on which #execute was # not called). If they are unscheduled, we're not waiting on them waiting_work.any? { |w| !w.unscheduled? } end |
#in_propagation_context? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if we are within a propagation context (i.e. within event processing)
507 508 509 |
# File 'lib/roby/execution_engine.rb', line 507 def in_propagation_context? !!@propagation end |
#inhibited_exception?(exception) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Query whether the given exception is inhibited in this plan
1431 1432 1433 1434 |
# File 'lib/roby/execution_engine.rb', line 1431 def inhibited_exception?(exception) unhandled, = remove_inhibited_exceptions([exception.to_execution_exception]) unhandled.empty? end |
#inside_control? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the current thread is the execution thread of this engine
See #outside_control? for a discussion of the use of #inside_control? and #outside_control? when testing the threading context
2268 2269 2270 2271 |
# File 'lib/roby/execution_engine.rb', line 2268 def inside_control? t = thread !t || t == Thread.current end |
#interrupt ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2589 2590 2591 |
# File 'lib/roby/execution_engine.rb', line 2589 def interrupt @interrupted = true end |
#issue_quit_progression_warning(remaining) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 |
# File 'lib/roby/execution_engine.rb', line 2385 def issue_quit_progression_warning(remaining) Robot.info( "Waiting for #{remaining.size} tasks to finish "\ "(#{plan.num_tasks} tasks still in plan) and "\ "#{waiting_work.size} async work jobs" ) remaining.each do |task| Robot.info " #{task}" end quarantined = remaining.find_all(&:quarantined?) unless quarantined.empty? Robot.info "#{quarantined.size} tasks in quarantine" end end |
#join_all_waiting_work(timeout: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits for all obligations in #waiting_work to finish
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/roby/execution_engine.rb', line 436 def join_all_waiting_work(timeout: nil) return [], PropagationInfo.new if waiting_work.empty? deadline = if timeout Time.now + timeout end finished = [] propagation_info = PropagationInfo.new loop do framework_errors = gather_framework_errors( "#join_all_waiting_work", raise_caught_exceptions: false ) do next_steps = nil event_errors = gather_errors do next_steps = gather_propagation do finished.concat(process_waiting_work) blocks = [] until once_blocks.empty? blocks << once_blocks.pop.last end call_poll_blocks(blocks) end end this_propagation = propagate_events_and_errors( next_steps, event_errors, garbage_collect_pass: false ) propagation_info.merge(this_propagation) end propagation_info.add_framework_errors(framework_errors) Thread.pass has_scheduled_promises = has_waiting_work? if deadline && (Time.now > deadline) && has_scheduled_promises raise JoinAllWaitingWorkTimeout.new(waiting_work) end break unless has_waiting_work? end [finished, propagation_info] end |
#killall ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Kill all tasks that are currently running in the plan
2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 |
# File 'lib/roby/execution_engine.rb', line 2700 def killall scheduler_enabled = scheduler.enabled? plan.permanent_tasks.clear plan.permanent_events.clear plan.mission_tasks.clear scheduler.enabled = false quit start_new_cycle process_events cycle_end({}) plan.transactions.each(&:discard_transaction!) start_new_cycle Thread.pass process_events cycle_end({}) ensure scheduler.enabled = scheduler_enabled end |
#log(event_name, *values) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Log an event in the system
See
2183 2184 2185 2186 2187 2188 2189 |
# File 'lib/roby/execution_engine.rb', line 2183 def log(event_name, *values) super @on_log_handlers.delete_if do |handler| !handler.call(self, event_name, values) end end |
#next_event(pending) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
call-seq:
next_event(pending) => event, propagation_info
Determines the event in current_step which should be signalled now.
Removes it from the set and returns the event and the associated
propagation information.
See #gather_propagation for the format of the returned # propagation_info
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 |
# File 'lib/roby/execution_engine.rb', line 1019 def next_event(pending) # this variable is 2 if selected_event is being forwarded, 1 if it # is both forwarded and signalled and 0 if it is only signalled priority, step_id, selected_event = nil for propagation_step in pending target_event = propagation_step[0] target_step_id, forwards, signals = *propagation_step[1] target_priority = if forwards.empty? && signals.empty? then 2 elsif forwards.empty? then 0 else 1 end do_select = if selected_event if precedence_graph.reachable?(selected_event, target_event) false elsif precedence_graph.reachable?(target_event, selected_event) true elsif priority < target_priority true elsif priority == target_priority # If they are of the same priority, handle # earlier events first step_id > target_step_id else false end else true end if do_select selected_event = target_event priority = target_priority step_id = target_step_id end end [selected_event, *pending.delete(selected_event)] end |
#notify_about_error_handling_results(errors) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 |
# File 'lib/roby/execution_engine.rb', line 932 def notify_about_error_handling_results(errors) kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors = errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors unless nonfatal_errors.empty? if display_exceptions? warn "#{nonfatal_errors.size} unhandled non-fatal exceptions" end nonfatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_NONFATAL, exception, tasks) end end unless handled_errors.empty? if display_exceptions? warn "#{handled_errors.size} handled errors" end handled_errors.each do |exception, tasks| notify_exception(EXCEPTION_HANDLED, exception, tasks) end end unless free_events_errors.empty? if display_exceptions? warn "#{free_events_errors.size} free event exceptions" end free_events_errors.each do |exception, events| notify_exception(EXCEPTION_FREE_EVENT, exception, events) end end unless fatal_errors.empty? if display_exceptions? warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed" end fatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_FATAL, exception, tasks) end if display_exceptions? kill_tasks.each do |task| log_pp :warn, task end end end end |
#notify_exception(kind, error, involved_objects) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Call to notify the listeners registered with #on_exception of the occurence of an exception
2803 2804 2805 2806 2807 2808 |
# File 'lib/roby/execution_engine.rb', line 2803 def notify_exception(kind, error, involved_objects) log(:exception_notification, plan.droby_id, kind, error, involved_objects) exception_listeners.each do |listener| listener.call(self, kind, error, involved_objects) end end |
#on_exception(description: "exception listener", on_error: :disable) {|kind, error, tasks| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers a callback that will be called when exceptions are propagated in the plan
2748 2749 2750 2751 2752 |
# File 'lib/roby/execution_engine.rb', line 2748 def on_exception(description: "exception listener", on_error: :disable, &block) handler = PollBlockDefinition.new(description, block, on_error: on_error) exception_listeners << handler Roby.disposable { exception_listeners.delete(handler) } end |
#on_log(on_error: :disable, &block) ⇒ #dispose
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Set of blocks called with log events
2196 2197 2198 2199 2200 |
# File 'lib/roby/execution_engine.rb', line 2196 def on_log(on_error: :disable, &block) handler = PollBlockDefinition.new("", block, on_error: on_error) @on_log_handlers << handler handler end |
#once(sync: nil, description: "once block", type: :external_events, **options, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules block to be called at the beginning of the next execution
cycle, in propagation context.
1444 1445 1446 1447 |
# File 'lib/roby/execution_engine.rb', line 1444 def once(sync: nil, description: "once block", type: :external_events, **, &block) waiting_work << sync if sync once_blocks << create_propagation_handler(description: description, type: type, once: true, **, &block) end |
#outside_control? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do
if Roby.inside_control?
ERROR
end
Do instead
if !Roby.outside_control?
ERROR
end
Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.
2290 2291 2292 2293 |
# File 'lib/roby/execution_engine.rb', line 2290 def outside_control? t = thread !t || t != Thread.current end |
#prepare_propagation(target, is_forward, info) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
call-seq:
prepare_propagation(target, is_forward, info) => source_events, source_generators, context
prepare_propagation(target, is_forward, info) => nil
Parses the propagation information info in the context of a
signalling if is_forward is true and a forwarding otherwise.
target is the target event.
The method adds the appropriate delayed events using #add_event_delay, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.
The format of info is the same as the hash values described in
#gather_propagation.
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 |
# File 'lib/roby/execution_engine.rb', line 1073 def prepare_propagation(target, is_forward, info) timeref = Time.now source_events, source_generators, context = Set.new, Set.new, [] delayed = true info.each_slice(3) do |src, ctxt, time| if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time)) add_event_delay(delay, is_forward, src, target, ctxt) next end delayed = false # Merge identical signals. Needed because two different event handlers # can both call #emit, and two signals are set up if src if src.respond_to?(:generator) source_events << src source_generators << src.generator else source_generators << src end end if ctxt context.concat ctxt end end unless delayed [source_events, source_generators, context] end end |
#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The inside part of the event loop
It gathers initial events and errors and propagate them
1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 |
# File 'lib/roby/execution_engine.rb', line 1786 def process_events( raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block ) if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end # to avoid having a almost-method-global ensure block passed_recursive_check = true @application_exceptions = [] @emitted_events = [] # Gather new events and propagate them events_errors = nil next_steps = gather_propagation do events_errors = gather_errors do if caller_block yield caller_block = nil end if !quitting? || !garbage_collect([]) process_waiting_work log_timepoint "workers" gather_external_events log_timepoint "external_events" call_propagation_handlers log_timepoint "propagation_handlers" end end end propagation_info = propagate_events_and_errors( next_steps, events_errors, garbage_collect_pass: garbage_collect_pass ) if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty? raise Aborting.new(propagation_info.each_fatal_error.map(&:exception)) end propagation_info.framework_errors.concat(@application_exceptions) propagation_info ensure if passed_recursive_check process_pending_application_exceptions(raise_framework_errors: raise_framework_errors) end end |
#process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling. This mode is implemented using this method
When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked
If multiple errors are raised in a single call (this is possible due to Roby's error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors to wrap all the exceptions into one.
1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 |
# File 'lib/roby/execution_engine.rb', line 1846 def process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead" if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end passed_recursive_check = true # to avoid having a almost-method-global ensure block @application_exceptions = [] # Save early for the benefit of the 'ensure' block current_scheduler_enabled = scheduler.enabled? if block_given? if seeds.empty? && initial_errors.empty? seeds = gather_propagation do initial_errors = gather_errors(&block) end else raise ArgumentError, "cannot provide both seeds/inital errors and a block" end end scheduler.enabled = enable_scheduler propagation_info = propagate_events_and_errors( seeds, initial_errors, garbage_collect_pass: false ) unless propagation_info.kill_tasks.empty? gc_initial_errors = nil gc_seeds = gather_propagation do gc_initial_errors = gather_errors do garbage_collect(propagation_info.kill_tasks) end end gc_errors = propagate_events_and_errors( gc_seeds, gc_initial_errors, garbage_collect_pass: false ) propagation_info.merge(gc_errors) end if raise_errors propagation_info = propagation_info.exceptions if propagation_info.size == 1 raise propagation_info.first elsif !propagation_info.empty? raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception)) end else propagation_info end rescue SynchronousEventProcessingMultipleErrors => e raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions) rescue Exception => e if passed_recursive_check application_exceptions = clear_application_exceptions if !application_exceptions.empty? raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e]) else raise e end else raise e end ensure if passed_recursive_check && @application_exceptions process_pending_application_exceptions end scheduler.enabled = current_scheduler_enabled end |
#process_once_blocks ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Dispatch #once_blocks to the other handler sets for further processing
822 823 824 825 826 827 828 829 830 831 |
# File 'lib/roby/execution_engine.rb', line 822 def process_once_blocks until once_blocks.empty? type, block = once_blocks.pop if type == :external_events external_events_handlers << block else propagation_handlers << block end end end |
#process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 |
# File 'lib/roby/execution_engine.rb', line 681 def process_pending_application_exceptions( application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception? ) # We don't aggregate exceptions, so report them all and raise one if display_exceptions? application_errors.each do |error, source| unless error.kind_of?(Interrupt) fatal "Application error in #{source}" Roby.log_exception_with_backtrace(error, self, :fatal) end end end error, source = application_errors.find do |error, _| raise_framework_errors || error.kind_of?(SignalException) end if error if Roby.app.display_all_threads_state_on_abort? fatal "State of all running threads:" Roby.log_all_threads_backtraces(self, :fatal) end raise error, "in #{source}: #{error.}", error.backtrace end end |
#process_waiting_work ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler)
1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 |
# File 'lib/roby/execution_engine.rb', line 1537 def process_waiting_work finished, not_finished = waiting_work.partition(&:complete?) unhandled_errors = finished.find_all do |work| work.rejected? && (work.respond_to?(:handled_error?) && !work.handled_error?) end unhandled_errors.each do |work| e = work.reason e.set_backtrace(e.backtrace) add_framework_error(e, work.to_s) if work.respond_to?(:error_handling_failure) && (e = work.error_handling_failure) e.set_backtrace(e.backtrace) add_framework_error(e, work.to_s) end end @waiting_work = not_finished finished end |
#promise(description: nil, executor: thread_pool, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a promise to execute the given block in a separate thread
Note that the returned value is a Promise. This means that callbacks added with #on_success or #rescue will be executed in the execution engine thread by default.
2815 2816 2817 |
# File 'lib/roby/execution_engine.rb', line 2815 def promise(description: nil, executor: thread_pool, &block) Promise.new(self, executor: executor, description: description, &block) end |
#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagate an initial set of event propagations and errors
1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 |
# File 'lib/roby/execution_engine.rb', line 1929 def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) propagation_info = PropagationInfo.new events_errors = initial_errors.dup loop do log_timepoint_group "event_propagation_phase" do events_errors.concat(event_propagation_phase(next_steps, propagation_info)) end next_steps = gather_propagation do exception_propagation_errors, error_phase_results = nil log_timepoint_group "error_handling_phase" do exception_propagation_errors = gather_errors do error_phase_results = error_handling_phase(events_errors) end end add_exceptions_for_inhibition(error_phase_results.each_fatal_error) propagation_info.merge(error_phase_results) garbage_collection_errors = gather_errors do plan.generate_induced_errors(error_phase_results) if garbage_collect_pass garbage_collect(error_phase_results.kill_tasks) else [] end end events_errors = (exception_propagation_errors + garbage_collection_errors) log_timepoint "garbage_collect" end break if next_steps.empty? && events_errors.empty? end propagation_info end |
#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The core exception propagation algorithm
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 |
# File 'lib/roby/execution_engine.rb', line 1261 def propagate_exception_in_plan(exceptions, &handler) # We usually aim at not having special cases for empty sets ... but this # is a hot path and we do a reverse of the dependency graph return [[], []] if exceptions.empty? propagation_graph = dependency_graph.reverse # Propagate the exceptions in the hierarchy handled_unhandled = [] exceptions.each do |exception, parents| origin = exception.origin if parents filtered_parents = parents.find_all { |t| t.depends_on?(origin) } if filtered_parents != parents warn "some parents specified for #{exception.exception}"\ "(#{exception.exception.class}) are actually not parents "\ "of #{origin}, they got filtered out" (parents - filtered_parents).each do |task| warn " #{task}" end if filtered_parents.empty? parents = propagation_graph.out_neighbours(origin) else parents = filtered_parents end end else parents = propagation_graph.out_neighbours(origin) end debug do debug "propagating exception " log_pp :debug, exception unless parents.empty? debug " constrained to parents" log_nest(2) do parents.each do |p| log_pp :debug, p end end end break end visitor = ExceptionPropagationVisitor.new( propagation_graph, exception, origin, parents, handler ) visitor.visit unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) } handled = visitor.handled_exceptions.inject { |a, b| a.merge(b) } handled_unhandled << [handled, unhandled] end exceptions_handled_by = [] unhandled_exceptions = [] handled_unhandled.each do |handled, e| if e if e.handled = yield(e, plan) if handled handled_by = (handled.propagation_leafs.to_set << plan) exceptions_handled_by << [handled.merge(e), handled_by] else handled = e exceptions_handled_by << [e, [plan].to_set] end else affected_tasks = e.trace.vertices.to_set if handled affected_tasks -= handled.trace.vertices exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end unhandled_exceptions << [e, affected_tasks] end else exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end end debug do debug "#{unhandled_exceptions.size} unhandled exceptions remain" log_nest(2) do unhandled_exceptions.each do |e, affected_tasks| log_pp :debug, e debug "Affects #{affected_tasks.size} tasks" log_nest(2) do affected_tasks.each do |t| log_pp :debug, t end end end end break end [unhandled_exceptions, exceptions_handled_by] end |
#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 |
# File 'lib/roby/execution_engine.rb', line 1366 def propagate_exceptions(exceptions) if exceptions.empty? return [], [], [] end # Remove all exception that are not associated with a task exceptions, free_events_exceptions = exceptions.partition do |e, _| e.origin end # Normalize the free events exceptions free_events_exceptions = free_events_exceptions.map do |e, _| if e.exception.failed_generator.plan [e, Set[e.exception.failed_generator]] end end.compact debug "Filtering inhibited exceptions" exceptions = log_nest(2) do non_inhibited, = remove_inhibited_exceptions(exceptions) # Reset the trace for the real propagation non_inhibited.map do |e, _| _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception } e.reset_trace [e, propagate_through] end end debug "Propagating #{exceptions.size} non-inhibited exceptions" log_nest(2) do # Note that the first half of the method filtered the free # events exceptions out of 'exceptions' unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object| object.handle_exception(e) end return unhandled, free_events_exceptions, handled end end |
#propagation_context(sources) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets the source_event and source_generator variables according
to source. source is the from argument of #add_event_propagation
729 730 731 732 733 734 735 736 737 |
# File 'lib/roby/execution_engine.rb', line 729 def propagation_context(sources) current_sources = @propagation_sources raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context? @propagation_sources = sources yield ensure @propagation_sources = current_sources end |
#propagation_source_events ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of events extracted from #sources
518 519 520 521 522 523 524 525 526 |
# File 'lib/roby/execution_engine.rb', line 518 def propagation_source_events result = Set.new for ev in @propagation_sources if ev.respond_to?(:generator) result << ev end end result end |
#propagation_source_generators ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The set of generators extracted from #sources
529 530 531 532 533 534 535 536 537 538 539 |
# File 'lib/roby/execution_engine.rb', line 529 def propagation_source_generators result = Set.new for ev in @propagation_sources result << if ev.respond_to?(:generator) ev.generator else ev end end result end |
#queue_forward(sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Queue a forwarding to be propagated
749 750 751 |
# File 'lib/roby/execution_engine.rb', line 749 def queue_forward(sources, target, context, timespec) add_event_propagation(true, sources, target, context, timespec) end |
#queue_signal(sources, target, context, timespec) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Queue a signal to be propagated
744 745 746 |
# File 'lib/roby/execution_engine.rb', line 744 def queue_signal(sources, target, context, timespec) add_event_propagation(false, sources, target, context, timespec) end |
#quit ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Make control quit properly
2594 2595 2596 |
# File 'lib/roby/execution_engine.rb', line 2594 def quit @quit = 1 end |
#quitting? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
True if the control thread is currently quitting
2580 2581 2582 |
# File 'lib/roby/execution_engine.rb', line 2580 def quitting? @quit > 0 end |
#refresh_relations ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Refresh the value of cached relations
Some often-used relations are cached at #initialize, such as #dependency_graph and #precedence_graph. Call this when the actual graph objects have changed on the plan
135 136 137 138 139 140 |
# File 'lib/roby/execution_engine.rb', line 135 def refresh_relations @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency) @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence) @signal_graph = plan.event_relation_graph_for(EventStructure::Signal) @forward_graph = plan.event_relation_graph_for(EventStructure::Forwarding) end |
#remove_at_cycle_end(handler_id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Removes a handler added by #at_cycle_end
2220 2221 2222 |
# File 'lib/roby/execution_engine.rb', line 2220 def remove_at_cycle_end(handler_id) handler_id.dispose end |
#remove_exception_listener(handler) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Removes an exception listener registered with #on_exception
2758 2759 2760 |
# File 'lib/roby/execution_engine.rb', line 2758 def remove_exception_listener(handler) handler.dispose if handler.respond_to?(:dispose) end |
#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process the given exceptions to remove the ones that are currently filtered by the plan repairs
The returned exceptions are propagated, i.e. their #trace method contains all the tasks that are affected by the absence of a handling mechanism
1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 |
# File 'lib/roby/execution_engine.rb', line 1416 def remove_inhibited_exceptions(exceptions) exceptions = exceptions.find_all do |execution_exception, _| execution_exception.origin.plan end propagate_exception_in_plan(exceptions) do |e, object| if has_pending_exception_matching?(e, object) true elsif object.respond_to?(:handles_error?) object.handles_error?(e) end end end |
#remove_periodic_handler(id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Removes a periodic handler defined by #every. id is the value
returned by #every.
2244 2245 2246 2247 |
# File 'lib/roby/execution_engine.rb', line 2244 def remove_periodic_handler(id) id.dispose nil end |
#reset ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Make a quit EE ready for reuse
2604 2605 2606 |
# File 'lib/roby/execution_engine.rb', line 2604 def reset @quit = 0 end |
#reset_thread_pool ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2694 2695 2696 2697 |
# File 'lib/roby/execution_engine.rb', line 2694 def reset_thread_pool @thread_pool&.shutdown @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10) end |
#run(cycle: 0.1) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Main event loop. Valid options are
- cycle
the cycle duration in seconds (default: 0.1)
2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 |
# File 'lib/roby/execution_engine.rb', line 2299 def run(cycle: 0.1) if running? raise AlreadyRunning, "#run has already been called" end self.running = true @allow_propagation = false @waiting_work = Concurrent::Array.new @thread = Thread.current @thread.name = "MAIN" @cycle_length = cycle trap("INT") do interrupt end event_loop ensure self.running = false @thread = nil waiting_work.delete_if do |w| next(true) if w.complete? # rubocop:disable Lint/HandleExceptions begin w.fail ExecutionQuitError Roby.warn "forcefully terminated #{w} on quit" rescue Concurrent::MultipleAssignmentError # Race condition: something completed the promise while # we were trying to make it fail end # rubocop:enable Lint/HandleExceptions true end finalizers.each do |blk| begin blk.call rescue Exception => e Roby.warn "finalizer #{blk} failed" Roby.log_exception_with_backtrace(e, Roby, :warn) end end @quit = 0 @allow_propagation = true trap("INT", "DEFAULT") end |
#shutdown ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
2689 2690 2691 2692 |
# File 'lib/roby/execution_engine.rb', line 2689 def shutdown killall thread_pool.shutdown end |
#start_new_cycle(time = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Set the cycle_start attribute and increment cycle_index
This is only used for testing purposes
2570 2571 2572 2573 |
# File 'lib/roby/execution_engine.rb', line 2570 def start_new_cycle(time = Time.now) @cycle_start = time @cycle_index += 1 end |
#unmark_finished_missions_and_permanent_tasks ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 |
# File 'lib/roby/execution_engine.rb', line 1984 def unmark_finished_missions_and_permanent_tasks to_unmark = plan.task_index.by_predicate[:finished?] | plan.task_index.by_predicate[:failed?] finished_missions = (plan.mission_tasks & to_unmark) # Remove all missions that are finished for finished_mission in finished_missions unless finished_mission.being_repaired? plan.unmark_mission_task(finished_mission) end end finished_permanent = (plan.permanent_tasks & to_unmark) for finished_permanent in (plan.permanent_tasks & to_unmark) unless finished_permanent.being_repaired? plan.unmark_permanent_task(finished_permanent) end end end |
#unreachable_event(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by EventGenerator when an event became unreachable
575 576 577 |
# File 'lib/roby/execution_engine.rb', line 575 def unreachable_event(event) delayed_events.delete_if { |_, _, _, signalled, _| signalled == event } end |
#wait_one_cycle ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Blocks until at least once execution cycle has been done
2135 2136 2137 2138 2139 2140 2141 2142 |
# File 'lib/roby/execution_engine.rb', line 2135 def wait_one_cycle current_cycle = execute { cycle_index } while current_cycle == execute { cycle_index } raise ExecutionQuitError unless running? sleep(cycle_length) end end |
#wait_until(ev) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent exception is raised.
2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 |
# File 'lib/roby/execution_engine.rb', line 2662 def wait_until(ev) if inside_control? raise ThreadMismatch, "cannot use #wait_until in execution threads" end ivar = Concurrent::IVar.new result = nil once(sync: ivar) do if ev.unreachable? ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason)) else ev.if_unreachable(cancel_at_emission: true) do |reason, event| ivar.fail(UnreachableEvent.new(event, reason)) unless ivar.complete? end ev.once do |ev| ivar.set(result) unless ivar.complete? end begin result = yield if block_given? rescue Exception => e ivar.fail(e) end end end ivar.value! end |