Class: Roby::ExecutionEngine Private

Inherits:
Object
  • Object
show all
Extended by:
Logger::Hierarchy, PropagationHandlerMethods
Includes:
Logger::Hierarchy, Roby::EventLogging::Mixin, PropagationHandlerMethods
Defined in:
lib/roby/execution_engine.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The core execution algorithm

It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).

The main method is #process_events. When executing a Roby application, it is called periodically by #event_loop.

Defined Under Namespace

Modules: PropagationHandlerMethods Classes: AlreadyRunning, EventLoopExitState, ExceptionPropagationVisitor, JoinAllWaitingWorkTimeout, NotPropagationContext, PollBlockDefinition, PropagationInfo, RecursivePropagationContext

Constant Summary collapse

PENDING_PROPAGATION_FORWARD =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

1
PENDING_PROPAGATION_SIGNAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

2
SLEEP_MIN_TIME =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Do not sleep or call Thread#pass if there is less that this much time left in the cycle

0.01
INTERRUPT_FORCE_EXIT_DEAD_ZONE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

How many seconds between two Interrupt before the execution engine's loop can forcefully quit

10
EXCEPTION_NONFATAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for non-fatal, unhandled exceptions

:nonfatal
EXCEPTION_FATAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for fatal, unhandled exceptions

:fatal
EXCEPTION_HANDLED =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for handled exceptions

:handled
EXCEPTION_FREE_EVENT =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Exception kind passed to #on_exception handlers for free event exceptions

:free_event

Instance Attribute Summary collapse

Attributes included from PropagationHandlerMethods

#external_events_handlers, #propagation_handlers, #side_work_handlers

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagationHandlerMethods

add_propagation_handler, add_side_work_handler, at_cycle_begin, create_propagation_handler, each_cycle, remove_propagation_handler, remove_side_work_handler

Methods included from Roby::EventLogging::Mixin

#log_flush_cycle, #log_queue_size, #log_timepoint, #log_timepoint_group, #log_timepoint_group_end, #log_timepoint_group_start

Constructor Details

#initialize(plan, control: Roby::DecisionControl.new) ⇒ ExecutionEngine

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create an execution engine acting on plan, using control as the decision control object

Parameters:

  • plan (ExecutablePlan)

    the plan on which this engine acts

  • control (DecisionControl) (defaults to: Roby::DecisionControl.new)

    the policy object, i.e. the object that embeds policies in cases where multiple reactions would be possible



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/roby/execution_engine.rb', line 75

def initialize(plan, control: Roby::DecisionControl.new)
    @plan = plan
    @event_logger = EventLogging::AggregateEventLogger.new
    @on_log_handlers = []

    @use_oob_gc = ExecutionEngine.use_oob_gc?

    @control = control
    @scheduler = Schedulers::Null.new(plan)
    reset_thread_pool
    @thread = Thread.current

    @propagation = nil
    @propagation_id = 0
    @propagation_exceptions = nil
    @application_exceptions = nil
    @delayed_events = []
    @event_ordering = []
    @event_priorities = {}
    @propagation_handlers = []
    @external_events_handlers = []
    @side_work_handlers = []
    @at_cycle_end_handlers = []
    @process_every = []
    @waiting_work = Concurrent::Array.new
    @emitted_events = []
    @exception_listeners = []

    @worker_threads_mtx = Mutex.new
    @worker_threads = []
    @once_blocks = Queue.new

    @pending_exceptions = {}

    each_cycle(&ExecutionEngine.method(:call_every))

    @quit = 0
    @allow_propagation = true
    @cycle_index = 0
    @cycle_start = Time.now
    @cycle_length = 0.1
    @last_stop_count = 0
    @finalizers = []
    @gc_warning = true

    refresh_relations

    @exception_display_handler = Roby.null_disposable
    self.display_exceptions = true
end

Instance Attribute Details

#additional_errorsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Used during exception propagation to inject new errors in the process

It shall not be accessed directly. Instead, Plan#add_error should be called



1477
1478
1479
# File 'lib/roby/execution_engine.rb', line 1477

def additional_errors
  @additional_errors
end

#application_exceptionsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of errors which have been generated outside of the plan's control. For now, those errors cause the whole controller to shut down.



1462
1463
1464
# File 'lib/roby/execution_engine.rb', line 1462

def application_exceptions
  @application_exceptions
end

#at_cycle_end_handlersObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of blocks that are called at each cycle end



2203
2204
2205
# File 'lib/roby/execution_engine.rb', line 2203

def at_cycle_end_handlers
  @at_cycle_end_handlers
end

#controlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The DecisionControl object associated with this engine



181
182
183
# File 'lib/roby/execution_engine.rb', line 181

def control
  @control
end

#cycle_indexObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The number of this cycle since the beginning



2262
2263
2264
# File 'lib/roby/execution_engine.rb', line 2262

def cycle_index
  @cycle_index
end

#cycle_lengthObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The cycle length in seconds



2256
2257
2258
# File 'lib/roby/execution_engine.rb', line 2256

def cycle_length
  @cycle_length
end

#cycle_startObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The starting Time of this cycle



2259
2260
2261
# File 'lib/roby/execution_engine.rb', line 2259

def cycle_start
  @cycle_start
end

#delayed_eventsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of pending delayed events. This is an array of the form

[[time, is_forward, source, target, context], ...]

See #add_event_delay for more information



546
547
548
# File 'lib/roby/execution_engine.rb', line 546

def delayed_events
  @delayed_events
end

#dependency_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for TaskStructure::Dependency

This is here for performance reasons, to avoid resolving the same graph over and over



170
171
172
# File 'lib/roby/execution_engine.rb', line 170

def dependency_graph
  @dependency_graph
end

#emitted_eventsArray<Event> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of events that have been emitted within the last call to #process_events (i.e. the last execution of the event loop)

Returns:



188
189
190
# File 'lib/roby/execution_engine.rb', line 188

def emitted_events
  @emitted_events
end

#event_loggerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The underlying DRoby::EventLogger

It is usually the same than the #plan's. Pass a DRoby::NullEventLogger at construction time to disable logging of execution events.



179
180
181
# File 'lib/roby/execution_engine.rb', line 179

def event_logger
  @event_logger
end

#event_orderingObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.



1006
1007
1008
# File 'lib/roby/execution_engine.rb', line 1006

def event_ordering
  @event_ordering
end

#event_prioritiesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The event => index hash which give the propagation priority for each event



1009
1010
1011
# File 'lib/roby/execution_engine.rb', line 1009

def event_priorities
  @event_priorities
end

#exception_listenersArray<#call> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The blocks that are currently listening to exceptions

Returns:

  • (Array<#call>)


191
192
193
# File 'lib/roby/execution_engine.rb', line 191

def exception_listeners
  @exception_listeners
end

#finalizersObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of proc objects which are to be called when the execution engine quits.



2577
2578
2579
# File 'lib/roby/execution_engine.rb', line 2577

def finalizers
  @finalizers
end

#forward_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Forward

This is here for performance reasons, to avoid resolving the same graph over and over



164
165
166
# File 'lib/roby/execution_engine.rb', line 164

def forward_graph
  @forward_graph
end

#last_stop_countObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

:nodoc:



2348
2349
2350
# File 'lib/roby/execution_engine.rb', line 2348

def last_stop_count
  @last_stop_count
end

#once_blocksQueue (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Thread-safe queue to push work to the execution engine

Do not access directly, use #once instead

Returns:

  • (Queue)

    blocks that should be executed at the beginning of the next execution cycle. It is the only thread safe way to queue work to be executed by the engine



199
200
201
# File 'lib/roby/execution_engine.rb', line 199

def once_blocks
  @once_blocks
end

#planObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Plan this engine is acting on



173
174
175
# File 'lib/roby/execution_engine.rb', line 173

def plan
  @plan
end

#precedence_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for Roby::EventStructure::Precedence

This is here for performance reasons, to avoid resolving the same graph over and over



152
153
154
# File 'lib/roby/execution_engine.rb', line 152

def precedence_graph
  @precedence_graph
end

#process_everyObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A set of blocks which are called every cycle



2225
2226
2227
# File 'lib/roby/execution_engine.rb', line 2225

def process_every
  @process_every
end

#propagation_idObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A numeric ID giving the count of the current propagation cycle



183
184
185
# File 'lib/roby/execution_engine.rb', line 183

def propagation_id
  @propagation_id
end

#propagation_sourcesObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of source events for the current propagation action. This is a mix of EventGenerator and Event objects.



515
516
517
# File 'lib/roby/execution_engine.rb', line 515

def propagation_sources
  @propagation_sources
end

#schedulerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its #initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.

See Schedulers::Basic



486
487
488
# File 'lib/roby/execution_engine.rb', line 486

def scheduler
  @scheduler
end

#signal_graphObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cached graph object for EventStructure::Signal

This is here for performance reasons, to avoid resolving the same graph over and over



158
159
160
# File 'lib/roby/execution_engine.rb', line 158

def signal_graph
  @signal_graph
end

#threadObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The execution thread if there is one running



2250
2251
2252
# File 'lib/roby/execution_engine.rb', line 2250

def thread
  @thread
end

#thread_poolConcurrent::CachedThreadPool (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A thread pool on which async work should be executed

Returns:

  • (Concurrent::CachedThreadPool)

See Also:



146
147
148
# File 'lib/roby/execution_engine.rb', line 146

def thread_pool
  @thread_pool
end

#waiting_workArray<#fail,#complete?> (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

A list of threaded objects waiting for the control thread

Objects registered here will be notified them by calling #fail on them when it quits. In addition, #join_all_waiting_work will wait for all pending jobs to finish.

Note that all Concurrent::Obligation subclasses fit the bill

Returns:

  • (Array<#fail,#complete?>)


2177
2178
2179
# File 'lib/roby/execution_engine.rb', line 2177

def waiting_work
  @waiting_work
end

Class Method Details

.call_every(plan) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calls the periodic blocks which should be called



2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
# File 'lib/roby/execution_engine.rb', line 2145

def self.call_every(plan) # :nodoc:
    engine = plan.execution_engine
    now = engine.cycle_start
    length = engine.cycle_length
    engine.process_every.map! do |handler, last_call, duration|
        next if handler.disposed?

        # Check if the nearest timepoint is the beginning of
        # this cycle or of the next cycle
        if !last_call || (duration - (now - last_call)) < length / 2
            unless handler.call(engine, engine.plan)
                next
            end

            next if handler.once?
            next if handler.disposed?

            last_call = now
        end
        [handler, last_call, duration]
    end.compact!
end

.make_delay(timeref, source, target, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a Time object which represents the absolute point in time referenced by timespec in the context of delaying a propagation between source and target.

See validate_timespec for more information



996
997
998
999
1000
1001
1002
# File 'lib/roby/execution_engine.rb', line 996

def self.make_delay(timeref, source, target, timespec)
    if delay = timespec[:delay] then timeref + delay
    elsif at = timespec[:at] then at
    else
        raise ArgumentError, "invalid timespec #{timespec}"
    end
end

.validate_timespec(timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validates timespec as a delay specification. A valid delay specification is either nil or a hash, in which case two forms are possible:

at: absolute_time
delay: number


985
986
987
988
989
# File 'lib/roby/execution_engine.rb', line 985

def self.validate_timespec(timespec)
    if timespec
        timespec = validate_options timespec, %i[delay at]
    end
end

Instance Method Details

#add_error(e, propagate_through: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a LocalizedError for future propagation

This method must be called in a error-gathering context (i.e. #gather_errors).

Parameters:

  • e (#to_execution_exception)

    the exception

Raises:



639
640
641
642
643
644
645
646
647
648
649
# File 'lib/roby/execution_engine.rb', line 639

def add_error(e, propagate_through: nil)
    plan_exception = e.to_execution_exception
    if @propagation_exceptions
        @propagation_exceptions << [plan_exception, propagate_through]
    else
        Roby.log_exception_with_backtrace(e, self, :fatal)
        raise NotPropagationContext,
              "#add_error called outside an error-gathering "\
              "context (#add_error)"
    end
end

#add_event_delay(time, is_forward, source, target, context) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a propagation step to be performed when the current time is greater than time. The propagation step is a signal if is_forward is false and a forward otherwise.

This method should not be called directly. Use #add_event_propagation with the appropriate timespec argument.

See also #delayed_events and #execute_delayed_events



556
557
558
# File 'lib/roby/execution_engine.rb', line 556

def add_event_delay(time, is_forward, source, target, context)
    delayed_events << [time, is_forward, source, target, context]
end

#add_event_logger(logger) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



126
127
128
# File 'lib/roby/execution_engine.rb', line 126

def add_event_logger(logger)
    @event_logger.add(logger)
end

#add_event_propagation(is_forward, sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a propagation to the next propagation step: it registers a propagation step to be performed between source and target with the given context. If is_forward is true, the propagation will be a forwarding, otherwise it is a signal.

If timespec is not nil, it defines a delay to be applied before calling the target event.

See #gather_propagation



765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
# File 'lib/roby/execution_engine.rb', line 765

def add_event_propagation(is_forward, sources, target, context, timespec)
    if target.plan != plan
        raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan"
    end

    target.pending(sources.find_all { |ev| ev.kind_of?(Event) })

    @propagation_step_id += 1
    target_info = (@propagation[target] ||= [@propagation_step_id, [], []])
    step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL]
    if sources.empty?
        step << nil << context << timespec
    else
        sources.each do |ev|
            step << ev << context << timespec
        end
    end
end

#add_exceptions_for_inhibition(fatal_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles



1975
1976
1977
1978
1979
1980
1981
1982
# File 'lib/roby/execution_engine.rb', line 1975

def add_exceptions_for_inhibition(fatal_errors)
    fatal_errors.each do |exception, involved_tasks|
        involved_tasks.each do |t|
            (@pending_exceptions[t] ||= Set.new) <<
                [exception.exception.class, exception.origin]
        end
    end
end

#add_framework_error(error, source) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers the given error and a description of its source in the list of application/framework errors

It must be called within an exception-gathering context, that is either within #process_events, or within #gather_framework_errors

These errors will terminate the event loop

Parameters:



718
719
720
721
722
723
724
725
# File 'lib/roby/execution_engine.rb', line 718

def add_framework_error(error, source)
    if @application_exceptions
        @application_exceptions << [error, source]
    else
        Roby.log_exception_with_backtrace(error, self, :fatal)
        raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context"
    end
end

#at_cycle_end(description: "at_cycle_end", **options) {|plan| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds a block to be called at the end of each execution cycle

Yield Parameters:

  • plan (Plan)

    the plan on which this engine runs

Returns:



2211
2212
2213
2214
2215
# File 'lib/roby/execution_engine.rb', line 2211

def at_cycle_end(description: "at_cycle_end", **options, &block)
    handler = PollBlockDefinition.new(description, block, **options)
    at_cycle_end_handlers << handler
    handler
end

#call_poll_blocks(blocks, late = false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper that calls the propagation handlers in propagation_handlers (which are expected to be instances of PollBlockDefinition) and handles the errors according of each handler's policy



805
806
807
808
809
810
811
812
813
814
815
816
817
818
# File 'lib/roby/execution_engine.rb', line 805

def call_poll_blocks(blocks, late = false)
    blocks.delete_if do |handler|
        if handler.disabled? || (handler.late? ^ late)
            next(handler.disposed?)
        end

        log_timepoint_group handler.description do
            unless handler.call(self, plan)
                handler.disabled = true
            end
        end
        handler.once? || handler.disposed?
    end
end

#call_propagation_handlersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
# File 'lib/roby/execution_engine.rb', line 841

def call_propagation_handlers
    process_once_blocks
    if scheduler.enabled?
        gather_framework_errors("scheduler") do
            log_timepoint_group "scheduler-initial-events" do
                scheduler.initial_events
            end
        end
    end
    call_poll_blocks(self.class.propagation_handlers, false)
    call_poll_blocks(self.propagation_handlers, false)

    unless has_queued_events?
        call_poll_blocks(self.class.propagation_handlers, true)
        call_poll_blocks(self.propagation_handlers, true)
    end
end

#clearObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.

Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.



2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
# File 'lib/roby/execution_engine.rb', line 2356

def clear
    plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) }
    plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) }
    plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) }
    plan.force_gc.merge(plan.tasks)

    quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks)
    remaining = plan.tasks - quaranteened_subplan

    @pending_exceptions.clear

    if remaining.empty?
        # Have to call #garbage_collect one more to make
        # sure that unneeded events are removed as well
        garbage_collect
        # Done cleaning the tasks, clear the remains
        plan.transactions.each do |trsc|
            trsc.discard_transaction if trsc.self_owned?
        end
        plan.clear
        emitted_events.clear
        return
    end
    remaining
end

#clear_application_exceptionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



1464
1465
1466
1467
1468
1469
1470
1471
# File 'lib/roby/execution_engine.rb', line 1464

def clear_application_exceptions
    unless @application_exceptions
        raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions"
    end

    result, @application_exceptions = @application_exceptions, nil
    result
end

#compute_errors(events_errors) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute the set of fatal errors in the current execution state

Parameters:

  • events_errors (Array)

    the set of errors gathered during event propagation

Returns:



1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
# File 'lib/roby/execution_engine.rb', line 1484

def compute_errors(events_errors)
    # Generate exceptions from task structure
    structure_errors = plan.check_structure
    log_timepoint "structure_check"

    # Propagate the errors. Note that the plan repairs are taken into
    # account in ExecutionEngine.propagate_exceptions directly.  We keep
    # event and structure errors separate since in the first case there
    # is not two-stage handling (all errors that have not been handled
    # are fatal), and in the second case we call #check_structure
    # again to errors that are remaining after the call to the exception
    # handlers
    events_errors, free_events_errors, events_handled =
        propagate_exceptions(events_errors)
    _, structure_handled = propagate_exceptions(structure_errors)
    log_timepoint "exception_propagation"

    # Get the remaining problems in the plan structure, and act on it
    structure_errors, structure_inhibited =
        remove_inhibited_exceptions(plan.check_structure)

    # Partition them by fatal/nonfatal
    fatal_errors, nonfatal_errors = [], []
    (structure_errors + events_errors).each do |e, involved_tasks|
        if e.fatal?
            fatal_errors << [e, involved_tasks]
        else
            nonfatal_errors << [e, involved_tasks]
        end
    end
    kill_tasks =
        compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set
    handled_errors = structure_handled + events_handled

    debug "#{fatal_errors.size} fatal errors found and "\
          "#{free_events_errors.size} errors involving free events"
    debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks"
    PropagationInfo.new(
        Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors,
        free_events_errors, handled_errors, structure_inhibited
    )
end

#compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute the set of unhandled fatal exceptions



921
922
923
924
925
926
927
928
# File 'lib/roby/execution_engine.rb', line 921

def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors)
    kill_tasks = fatal_errors.inject(Set.new) do |tasks, (_exception, affected_tasks)|
        tasks.merge(affected_tasks)
    end
    # Tasks might have been finalized during exception handling, filter
    # those out
    kill_tasks.find_all(&:plan)
end

#cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called at each cycle end



2609
2610
2611
2612
2613
# File 'lib/roby/execution_engine.rb', line 2609

def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?)
    gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do
        call_poll_blocks(at_cycle_end_handlers)
    end
end

#delayed(delay, description: "delayed block", **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules block to be called once after delay seconds passed, in the propagation context



1451
1452
1453
1454
1455
1456
1457
# File 'lib/roby/execution_engine.rb', line 1451

def delayed(delay, description: "delayed block", **options, &block)
    handler = PollBlockDefinition.new(description, block, once: true, **options)
    once do
        process_every << [handler, cycle_start, delay]
    end
    handler
end

#display_exceptions=(flag) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Controls whether this engine should indiscriminately display all fatal exceptions

This is on by default



2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
# File 'lib/roby/execution_engine.rb', line 2766

def display_exceptions=(flag)
    unless flag
        @exception_display_handler.dispose
        return
    end
    return unless @exception_display_handler.disposed?

    @exception_display_handler = on_exception do |kind, error, tasks|
        level = if kind == EXCEPTION_HANDLED then :debug
                else
                    :warn
                end

        send(level) do
            send(level, "encountered a #{kind} exception")
            Roby.log_exception_with_backtrace(error.exception, self, level)
            if kind == EXCEPTION_HANDLED
                send(level, "the exception was handled by")
            else
                send(level, "the exception involved")
            end
            tasks.each do |t|
                send(level, "  #{t}")
            end
            break
        end
    end
end

#display_exceptions?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

whether this engine should indiscriminately display all fatal exceptions

Returns:

  • (Boolean)


2797
2798
2799
# File 'lib/roby/execution_engine.rb', line 2797

def display_exceptions?
    !@exception_display_handler.disposed?
end

#error_handling_phase(events_errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compute errors in plan and handle the results



906
907
908
909
910
911
912
913
914
915
916
917
918
# File 'lib/roby/execution_engine.rb', line 906

def error_handling_phase(events_errors)
    # Do the exception handling phase
    errors = compute_errors(events_errors)
    notify_about_error_handling_results(errors)

    # nonfatal errors are only notified. Fatal errors (kill_tasks) are
    # handled in the propagation loop during garbage collection. Only
    # the free events errors have to be handled here.
    errors.free_events_errors.each do |exception, generators|
        generators.each { |g| g.unreachable!(exception.exception) }
    end
    errors
end

#event_loopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use #run to start the event loop in a separate thread.



2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
# File 'lib/roby/execution_engine.rb', line 2411

def event_loop
    @cycle_start = Time.now
    @cycle_index = 0

    exit_state = EventLoopExitState.new(0, Time.now, nil)
    @interrupted = false
    loop do
        GC::Profiler.enable if profile_gc?

        if @interrupted
            @interrupted = false
            event_loop_handle_interrupt(exit_state)
        end

        if quitting?
            return if forced_exit?
            return if event_loop_teardown(exit_state)
        end

        log_timepoint_group "cycle" do
            execute_one_cycle
        end

        GC::Profiler.disable if profile_gc?
    rescue Exception => e
        if quitting?
            fatal "Execution thread FORCEFULLY quitting "\
                  "because of unhandled exception"
            Roby.log_exception_with_backtrace(e, self, :fatal)
            raise
        else
            quit

            fatal "Execution thread quitting because of unhandled exception"
            Roby.log_exception_with_backtrace(e, self, :fatal)
        end
    end
ensure
    unless plan.tasks.empty?
        warn "the following tasks are still present in the plan:"
        plan.tasks.each do |t|
            warn "  #{t}"
        end
    end
end

#event_loop_handle_interrupt(exit_state) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle a received Interrupt for #event_loop



2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
# File 'lib/roby/execution_engine.rb', line 2486

def event_loop_handle_interrupt(exit_state)
    if quitting?
        exit_state.force_exit_deadline ||=
            Time.now + INTERRUPT_FORCE_EXIT_DEADLINE
        time_until_deadline = exit_state.force_exit_deadline - Time.now
        if time_until_deadline < 0
            fatal "Quitting without cleaning up"
            force_quit
        else
            fatal "Still #{time_until_deadline.ceil}s before "\
                  "interruption will quit without cleaning up"
        end
    else
        fatal "Received interruption request"
        fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s "\
              "to quit without cleaning up"
        quit
        exit_state.force_exit_deadline =
            Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE
    end
end

#event_loop_teardown(exit_state) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle the teardown logic for #event_loop

Returns:

  • (Boolean)

    whether #event_loop can stop now (true), or not (false)



2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
# File 'lib/roby/execution_engine.rb', line 2462

def event_loop_teardown(exit_state)
    return true unless (remaining = clear)

    display_warning = (exit_state.last_stop_count != remaining.size) ||
                      (Time.now - exit_state.last_quit_warning) > 10
    return unless display_warning

    if display_warning
        Robot.info "Roby quitting ..." if exit_state.last_stop_count == 0

        issue_quit_progression_warning(remaining)
        exit_state.last_quit_warning = Time.now
        exit_state.last_stop_count = remaining.size
    end
    false
rescue Exception => e
    Robot.warn "Execution thread failed to clean up"
    Roby.log_exception_with_backtrace(e, Robot, :warn, filter: false)
    true
end

#event_propagation_phase(initial_events, propagation_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calls its block in a #gather_propagation context and propagate events that have been called and/or emitted by the block

If a block is given, it is called with the initial set of events: the events we should consider as already emitted in the following propagation. seeds si a list of procs which should be called to initiate the propagation (i.e. build an initial set of events)



891
892
893
894
895
896
897
898
899
900
901
902
903
# File 'lib/roby/execution_engine.rb', line 891

def event_propagation_phase(initial_events, propagation_info)
    @propagation_id += 1

    gather_errors do
        next_steps = initial_events
        until next_steps.empty?
            until next_steps.empty?
                next_steps = event_propagation_step(next_steps, propagation_info)
            end
            next_steps = gather_propagation { call_propagation_handlers }
        end
    end
end

#event_propagation_step(current_step, propagation_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagate one step

current_step describes all pending emissions and calls.

This method calls ExecutionEngine.next_event to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.

The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.



1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
# File 'lib/roby/execution_engine.rb', line 1118

def event_propagation_step(current_step, propagation_info)
    signalled, _step_id, forward_info, call_info = next_event(current_step)

    next_step = nil
    if !call_info.empty?
        source_events, source_generators, context =
            prepare_propagation(signalled, false, call_info)
        if source_events
            log(:generator_propagate_events, false, source_events, signalled)

            if signalled.self_owned?
                next_step = gather_propagation(current_step) do
                    propagation_context(source_events | source_generators) do
                        begin
                            propagation_info.add_generator_call(signalled)
                            signalled.call_without_propagation(context)
                        rescue Roby::LocalizedError => e
                            if signalled.command_emitted?
                                add_error(e)
                            else
                                signalled.emit_failed(e)
                            end
                        rescue Exception => e
                            if signalled.command_emitted?
                                add_error(Roby::CommandFailed.new(e, signalled))
                            else
                                signalled.emit_failed(Roby::CommandFailed.new(e, signalled))
                            end
                        end
                    end
                end
            end
        end

        if forward_info
            next_step ||= {}
            target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []])
            target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info)
        end

    elsif !forward_info.empty?
        source_events, source_generators, context =
            prepare_propagation(signalled, true, forward_info)
        if source_events
            log(:generator_propagate_events, true, source_events, signalled)

            # If the destination event is not owned, but if the peer is not
            # connected, the event is our responsibility now.
            if signalled.self_owned? || signalled.owners.none? { |peer| peer != plan.local_owner && peer.connected? }
                next_step = gather_propagation(current_step) do
                    propagation_context(source_events | source_generators) do
                        begin
                            if event = signalled.emit_without_propagation(context)
                                propagation_info.add_event_emission(event)
                                emitted_events << event
                            end
                        rescue Roby::LocalizedError => e
                            Roby.warn(
                                "Internal Error: #emit_without_propagation "\
                                "emitted a LocalizedError exception. This is "\
                                "unsupported and will become a fatal error in "\
                                "the future. You should usually replace raise "\
                                "with engine.add_error"
                            )
                            Roby.display_exception(
                                Roby.logger.io(:warn), e, false
                            )
                            add_error(e)
                        rescue Exception => e
                            Roby.warn(
                                "Internal Error: #emit_without_propagation "\
                                "emitted an exception. This is unsupported "\
                                "and will become a fatal error in the future. "\
                                "You should create a proper localized error "\
                                "and replace raise with engine.add_error"
                            )
                            Roby.display_exception(
                                Roby.logger.io(:warn), e, false
                            )
                            add_error(Roby::EmissionFailed.new(e, signalled))
                        end
                    end
                end
            end
        end
    end

    current_step.merge!(next_step) if next_step
    current_step
end

#every(duration, description: "periodic handler", **options, &block) ⇒ #dispose

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is at least duration)

Returns:

  • (#dispose)

    an object whose dispose method deregisters the handler



2232
2233
2234
2235
2236
2237
2238
2239
2240
# File 'lib/roby/execution_engine.rb', line 2232

def every(duration, description: "periodic handler", **options, &block)
    handler = PollBlockDefinition.new(description, block, **options)
    once do
        if handler.call(self, plan)
            process_every << [handler, cycle_start, duration]
        end
    end
    handler
end

#execute(catch: [], type: :external_events, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.

Raises:

  • (ArgumentError)


2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
# File 'lib/roby/execution_engine.rb', line 2618

def execute(catch: [], type: :external_events, &block)
    raise ArgumentError, "a block is required" unless block_given?
    if inside_control?
        return yield
    end

    capture_catch = lambda do |symbol, *other|
        caught = catch(symbol) do
            if other.empty?
                return [:ret, yield]
            else
                return capture_catch(block, *other)
            end
        end
        [:throw, [symbol, caught]]
    end

    ivar = Concurrent::IVar.new
    once(sync: ivar, type: type) do
        begin
            if !catch.empty?
                result = capture_catch.call(*catch, &block)
                ivar.set(result)
            else
                ivar.set([:ret, yield])
            end
        rescue ::Exception => e # rubocop:disable Lint/RescueException
            ivar.set([:raise, e])
        end
    end

    mode, value = ivar.value!
    case mode
    when :ret
        value
    when :throw
        throw(*value)
    else
        raise value
    end
end

#execute_delayed_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Adds the events in delayed_events whose time has passed into the propagation. This must be called in propagation context.

See #add_event_delay and #delayed_events



564
565
566
567
568
569
570
571
572
# File 'lib/roby/execution_engine.rb', line 564

def execute_delayed_events
    reftime = Time.now
    delayed_events.delete_if do |time, forward, source, signalled, context|
        if time <= reftime
            add_event_propagation(forward, [source], signalled, context, nil)
            true
        end
    end
end

#execute_one_cycle(time = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
# File 'lib/roby/execution_engine.rb', line 2508

def execute_one_cycle(time = Time.now)
    last_process_times = Process.times
    last_dump_time = plan.event_logger.dump_time

    while time > cycle_start + cycle_length
        @cycle_start += cycle_length
        @cycle_index += 1
    end
    stats = {}
    stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec]
    stats[:actual_start] = time - cycle_start
    stats[:cycle_index] = cycle_index

    log_timepoint_group "process_events" do
        process_events
    end

    execute_side_work
    log_timepoint "side-work"

    if use_oob_gc?
        stats[:pre_oob_gc] = GC.stat
        GC::OOB.run
    end

    # Sleep if there is enough time for it
    remaining_cycle_time = cycle_length - (Time.now - cycle_start)
    if remaining_cycle_time > SLEEP_MIN_TIME
        sleep(remaining_cycle_time)
    end
    log_timepoint "sleep"

    # Log cycle statistics
    process_times = Process.times
    dump_time = plan.event_logger.dump_time
    stats[:log_queue_size]   = plan.log_queue_size
    stats[:plan_task_count]  = plan.num_tasks
    stats[:plan_event_count] = plan.num_free_events
    stats[:gc] = GC.stat
    stats[:utime] = process_times.utime - last_process_times.utime
    stats[:stime] = process_times.stime - last_process_times.stime
    stats[:dump_time] = dump_time - last_dump_time
    stats[:state] = Roby::State
    stats[:end] = Time.now - cycle_start
    if profile_gc?
        stats[:gc_profile_data] = GC::Profiler.raw_data
        stats[:gc_total_time] = GC::Profiler.total_time
    else
        stats[:gc_profile_data] = nil
        stats[:gc_total_time] = 0
    end

    cycle_end(stats)
    log_flush_cycle :cycle_end, stats

    @cycle_start += cycle_length
    @cycle_index += 1
end

#execute_side_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute the work registered with Roby::ExecutionEngine::PropagationHandlerMethods#add_side_work_handler



430
431
432
433
# File 'lib/roby/execution_engine.rb', line 430

def execute_side_work
    call_poll_blocks(self.side_work_handlers, false)
    call_poll_blocks(self.class.side_work_handlers, false)
end

#finalized_event(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #plan when an event has been finalized



585
586
587
588
589
# File 'lib/roby/execution_engine.rb', line 585

def finalized_event(event)
    @propagation&.delete(event)
    event.unreachable!("finalized", plan)
    # since the event is already finalized,
end

#finalized_task(task) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by #plan when a task has been finalized



580
581
582
# File 'lib/roby/execution_engine.rb', line 580

def finalized_task(task)
    @pending_exceptions.delete(task)
end

#force_quitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Force quitting, without cleaning up



2599
2600
2601
# File 'lib/roby/execution_engine.rb', line 2599

def force_quit
    @quit = 2
end

#forced_exit?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the control thread is currently quitting

Returns:

  • (Boolean)


2585
2586
2587
# File 'lib/roby/execution_engine.rb', line 2585

def forced_exit?
    @quit > 1
end

#garbage_collect(force_on = nil) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Kills and removes unneeded tasks for which there are no dependencies

Parameters:

  • force_on (Array<Task>, nil) (defaults to: nil)

    a set of task whose garbage-collection must be performed, even though those tasks are actually useful for the system. This is used to properly kill tasks for which errors have been detected.

Returns:

  • (Boolean)

    true if events have been called (thus requiring some propagation) and false otherwise



2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
# File 'lib/roby/execution_engine.rb', line 2012

def garbage_collect(force_on = nil)
    if force_on && !force_on.empty?
        info "GC: adding #{force_on.size} tasks in the force_gc set"
        valid_plan, mismatching_plan = force_on.partition do |t|
            t.plan == self.plan
        end
        plan.force_gc.merge(valid_plan)

        unless mismatching_plan.empty?
            mismatches_s = mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" }
                                           .join(", ")
            raise ArgumentError,
                  "#{mismatches_s} have been given to #{self}.garbage_collect, "\
                  "but they are not tasks in #{plan}"
        end
    end

    unmark_finished_missions_and_permanent_tasks

    # The set of tasks for which we will queue stop! at this cycle
    # #finishing? is false until the next event propagation cycle
    finishing = Set.new

    # Loop until no tasks have been removed
    loop do
        tasks = plan.unneeded_tasks | plan.force_gc
        local_tasks = plan.local_tasks & tasks
        remote_tasks = tasks - local_tasks

        # Remote tasks are simply removed, regardless of other concerns
        for t in remote_tasks
            debug { "GC: removing the remote task #{t}" }
            plan.garbage_task(t)
        end

        break if local_tasks.empty?

        debug do
            debug "#{local_tasks.size} tasks are unneeded in this plan"
            local_tasks.each do |t|
                debug "  #{t} mission=#{plan.mission_task?(t)} "\
                      "permanent=#{plan.permanent_task?(t)}"
            end
            break
        end

        # Find the roots, that is the tasks we should be trying to
        # stop. They are the tasks which have no running parents.
        roots = local_tasks.dup - finishing
        plan.default_useful_task_graphs.each do |g|
            roots.delete_if do |t|
                g.each_in_neighbour(t).any? { |p| !p.finished? }
            end

            break if roots.empty?
        end

        new_finishing_tasks =
            roots.find_all { |local_task| garbage_collect_stop_task(local_task) }
        finishing.merge(new_finishing_tasks)

        break unless roots.any?(&:finalized?)
    end

    finishing.each(&:stop!)

    plan.unneeded_events.each do |event|
        plan.garbage_event(event)
    end

    !finishing.empty?
end

#garbage_collect_stop_task(local_task) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle a single root task in the #garbage_collect process

Returns:

  • (Boolean)

    true if the task should be stopped, false otherwise. Test if the task has been removed from the plan using PlanObject#finalized?



2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
# File 'lib/roby/execution_engine.rb', line 2091

def garbage_collect_stop_task(local_task)
    if local_task.pending?
        info "GC: removing pending task #{local_task}"
        plan.garbage_task(local_task)
    elsif local_task.failed_to_start?
        info "GC: removing task that failed to start #{local_task}"
        plan.garbage_task(local_task)
    elsif local_task.starting?
        # wait for task to be started before killing it
        debug "GC: #{local_task} is starting"
    elsif local_task.finished?
        debug "GC: #{local_task} is not running, removed"
        plan.garbage_task(local_task)
    elsif local_task.finishing?
        debug do
            debug "GC: waiting for #{local_task} to finish"
            local_task.history.each do |ev|
                debug "GC:   #{ev}"
            end
            break
        end
    elsif local_task.quarantined?
        warn "GC: #{local_task} is running but in quarantine"
    elsif local_task.event(:stop).controlable?
        debug { "GC: attempting to stop #{local_task}" }
        if !local_task.respond_to?(:stop!)
            warn "something fishy: #{local_task}/stop is controlable but "\
                 "there is no #stop! method, putting in quarantine"
            plan.quarantine_task(local_task)
        else
            return true
        end
    else
        warn "GC: #{local_task} cannot be stopped, putting in quarantine"
        plan.quarantine_task(local_task)
    end
    false
end

#gather_errorsArray<ExecutionException>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes the given block while gathering errors, and returns the errors that have been declared with #add_error

Returns:



867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
# File 'lib/roby/execution_engine.rb', line 867

def gather_errors
    if @propagation_exceptions
        raise InternalError, "recursive call to #gather_errors"
    end

    # The ensure clause must NOT apply to the recursive check above.
    # Otherwise, we end up resetting @propagation_exceptions to nil,
    # which wreaks havoc
    begin
        @propagation_exceptions = []
        yield
        @propagation_exceptions
    ensure
        @propagation_exceptions = nil
    end
end

#gather_external_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gather the events that come out of this plan manager



834
835
836
837
838
839
# File 'lib/roby/execution_engine.rb', line 834

def gather_external_events
    process_once_blocks
    gather_framework_errors("delayed events") { execute_delayed_events }
    call_poll_blocks(self.class.external_events_handlers)
    call_poll_blocks(self.external_events_handlers)
end

#gather_framework_errors(source, raise_caught_exceptions: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields to the block and registers any raised exception using #add_framework_error

If the method is called within an exception-gathering context (either #process_events or #gather_framework_errors itself), nothing else is done. Otherwise, #process_pending_application_exceptions is called to re-raise any caught exception



658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
# File 'lib/roby/execution_engine.rb', line 658

def gather_framework_errors(source, raise_caught_exceptions: true)
    if @application_exceptions
        recursive_error_gathering_context = true
    else
        @application_exceptions = []
    end

    yield

    if !recursive_error_gathering_context && !raise_caught_exceptions
        clear_application_exceptions
    end
rescue Exception => e
    add_framework_error(e, source)
    if !recursive_error_gathering_context && !raise_caught_exceptions
        clear_application_exceptions
    end
ensure
    if !recursive_error_gathering_context && raise_caught_exceptions
        process_pending_application_exceptions
    end
end

#gather_propagation(initial_set = {}, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to #emit and #call are stored in an internal hash of the form:

target => [forward_sources, signal_sources]

where the two _sources are arrays of the form [[source, context], ...]

The method returns the resulting hash. Use #in_propagation_context? to know if the current engine is in a propagation context, and #add_event_propagation to add a new entry to this set.



607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
# File 'lib/roby/execution_engine.rb', line 607

def gather_propagation(initial_set = {}, &block)
    if in_propagation_context?
        raise InternalError, "nested call to #gather_propagation"
    end

    old_allow_propagation, @allow_propagation = @allow_propagation, true

    # The ensure clause must NOT apply to the recursive check above.
    # Otherwise, we end up resetting @propagation_exceptions to nil,
    # which wreaks havoc
    begin
        @propagation = initial_set
        @propagation_sources = nil
        @propagation_step_id = 0

        propagation_context([], &block)

        result, @propagation = @propagation, nil
        result
    ensure
        @propagation = nil
        @allow_propagation = old_allow_propagation
    end
end

#gathering?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


499
500
501
502
503
# File 'lib/roby/execution_engine.rb', line 499

def gathering?
    Roby.warn_deprecated "#gathering? is deprecated, use "\
                         "#in_propagation_context? instead"
    in_propagation_context?
end

#gathering_errors?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


859
860
861
# File 'lib/roby/execution_engine.rb', line 859

def gathering_errors?
    !!@propagation_exceptions
end

#has_pending_exception_matching?(e, object) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tests whether there is an exception registered by #add_fatal_exceptions_for_inhibition for a given error and object

Parameters:

Returns:

  • (Boolean)


1969
1970
1971
# File 'lib/roby/execution_engine.rb', line 1969

def has_pending_exception_matching?(e, object)
    @pending_exceptions[object]&.include?([e.exception.class, e.origin])
end

#has_pending_forward?(from, to, expected_context) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether a forward matching this signature is currently pending

Returns:

  • (Boolean)


785
786
787
788
789
790
791
# File 'lib/roby/execution_engine.rb', line 785

def has_pending_forward?(from, to, expected_context)
    if pending = @propagation[to]
        pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec|
            (from === event.generator) && (expected_context === context)
        end
    end
end

#has_pending_signal?(from, to, expected_context) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether a signal matching this signature is currently pending

Returns:

  • (Boolean)


794
795
796
797
798
799
800
# File 'lib/roby/execution_engine.rb', line 794

def has_pending_signal?(from, to, expected_context)
    if pending = @propagation[to]
        pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec|
            (from === event.generator) && (expected_context === context)
        end
    end
end

#has_propagation_for?(target) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


739
740
741
# File 'lib/roby/execution_engine.rb', line 739

def has_propagation_for?(target)
    @propagation&.has_key?(target)
end

#has_queued_events?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true if some events are queued

Returns:

  • (Boolean)


592
593
594
# File 'lib/roby/execution_engine.rb', line 592

def has_queued_events?
    !@propagation.empty?
end

#has_waiting_work?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether this EE has asynchronous waiting work waiting to be processed

Returns:

  • (Boolean)


1528
1529
1530
1531
1532
# File 'lib/roby/execution_engine.rb', line 1528

def has_waiting_work?
    # Filter out unscheduled promises (promises on which #execute was
    # not called). If they are unscheduled, we're not waiting on them
    waiting_work.any? { |w| !w.unscheduled? }
end

#in_propagation_context?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if we are within a propagation context (i.e. within event processing)

Returns:

  • (Boolean)


507
508
509
# File 'lib/roby/execution_engine.rb', line 507

def in_propagation_context?
    !!@propagation
end

#inhibited_exception?(exception) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Query whether the given exception is inhibited in this plan

Returns:

  • (Boolean)


1431
1432
1433
1434
# File 'lib/roby/execution_engine.rb', line 1431

def inhibited_exception?(exception)
    unhandled, = remove_inhibited_exceptions([exception.to_execution_exception])
    unhandled.empty?
end

#inside_control?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the current thread is the execution thread of this engine

See #outside_control? for a discussion of the use of #inside_control? and #outside_control? when testing the threading context

Returns:

  • (Boolean)


2268
2269
2270
2271
# File 'lib/roby/execution_engine.rb', line 2268

def inside_control?
    t = thread
    !t || t == Thread.current
end

#interruptObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2589
2590
2591
# File 'lib/roby/execution_engine.rb', line 2589

def interrupt
    @interrupted = true
end

#issue_quit_progression_warning(remaining) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
# File 'lib/roby/execution_engine.rb', line 2385

def issue_quit_progression_warning(remaining)
    Robot.info(
        "Waiting for #{remaining.size} tasks to finish "\
        "(#{plan.num_tasks} tasks still in plan) and "\
        "#{waiting_work.size} async work jobs"
    )
    remaining.each do |task|
        Robot.info "  #{task}"
    end
    quarantined = remaining.find_all(&:quarantined?)
    unless quarantined.empty?
        Robot.info "#{quarantined.size} tasks in quarantine"
    end
end

#join_all_waiting_work(timeout: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for all obligations in #waiting_work to finish



436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/roby/execution_engine.rb', line 436

def join_all_waiting_work(timeout: nil)
    return [], PropagationInfo.new if waiting_work.empty?

    deadline = if timeout
                   Time.now + timeout
               end

    finished = []
    propagation_info = PropagationInfo.new
    loop do
        framework_errors = gather_framework_errors(
            "#join_all_waiting_work",
            raise_caught_exceptions: false
        ) do
            next_steps = nil
            event_errors = gather_errors do
                next_steps = gather_propagation do
                    finished.concat(process_waiting_work)
                    blocks = []
                    until once_blocks.empty?
                        blocks << once_blocks.pop.last
                    end
                    call_poll_blocks(blocks)
                end
            end

            this_propagation = propagate_events_and_errors(
                next_steps, event_errors, garbage_collect_pass: false
            )
            propagation_info.merge(this_propagation)
        end
        propagation_info.add_framework_errors(framework_errors)

        Thread.pass
        has_scheduled_promises = has_waiting_work?
        if deadline && (Time.now > deadline) && has_scheduled_promises
            raise JoinAllWaitingWorkTimeout.new(waiting_work)
        end

        break unless has_waiting_work?
    end
    [finished, propagation_info]
end

#killallObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Kill all tasks that are currently running in the plan



2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
# File 'lib/roby/execution_engine.rb', line 2700

def killall
    scheduler_enabled = scheduler.enabled?

    plan.permanent_tasks.clear
    plan.permanent_events.clear
    plan.mission_tasks.clear

    scheduler.enabled = false
    quit

    start_new_cycle
    process_events
    cycle_end({})

    plan.transactions.each(&:discard_transaction!)

    start_new_cycle
    Thread.pass
    process_events
    cycle_end({})
ensure
    scheduler.enabled = scheduler_enabled
end

#log(event_name, *values) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log an event in the system

See



2183
2184
2185
2186
2187
2188
2189
# File 'lib/roby/execution_engine.rb', line 2183

def log(event_name, *values)
    super

    @on_log_handlers.delete_if do |handler|
        !handler.call(self, event_name, values)
    end
end

#next_event(pending) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

call-seq:

next_event(pending) => event, propagation_info

Determines the event in current_step which should be signalled now. Removes it from the set and returns the event and the associated propagation information.

See #gather_propagation for the format of the returned # propagation_info



1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
# File 'lib/roby/execution_engine.rb', line 1019

def next_event(pending)
    # this variable is 2 if selected_event is being forwarded, 1 if it
    # is both forwarded and signalled and 0 if it is only signalled
    priority, step_id, selected_event = nil
    for propagation_step in pending
        target_event = propagation_step[0]
        target_step_id, forwards, signals = *propagation_step[1]
        target_priority = if forwards.empty? && signals.empty? then 2
                          elsif forwards.empty? then 0
                          else
                              1
                          end

        do_select = if selected_event
                        if precedence_graph.reachable?(selected_event, target_event)
                            false
                        elsif precedence_graph.reachable?(target_event, selected_event)
                            true
                        elsif priority < target_priority
                            true
                        elsif priority == target_priority
                            # If they are of the same priority, handle
                            # earlier events first
                            step_id > target_step_id
                        else
                            false
                        end
                    else
                        true
                    end

        if do_select
            selected_event = target_event
            priority       = target_priority
            step_id        = target_step_id
        end
    end
    [selected_event, *pending.delete(selected_event)]
end

#notify_about_error_handling_results(errors) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions



932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
# File 'lib/roby/execution_engine.rb', line 932

def notify_about_error_handling_results(errors)
    kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors =
        errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors

    unless nonfatal_errors.empty?
        if display_exceptions?
            warn "#{nonfatal_errors.size} unhandled non-fatal exceptions"
        end
        nonfatal_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_NONFATAL, exception, tasks)
        end
    end

    unless handled_errors.empty?
        if display_exceptions?
            warn "#{handled_errors.size} handled errors"
        end
        handled_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_HANDLED, exception, tasks)
        end
    end

    unless free_events_errors.empty?
        if display_exceptions?
            warn "#{free_events_errors.size} free event exceptions"
        end
        free_events_errors.each do |exception, events|
            notify_exception(EXCEPTION_FREE_EVENT, exception, events)
        end
    end

    unless fatal_errors.empty?
        if display_exceptions?
            warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed"
        end
        fatal_errors.each do |exception, tasks|
            notify_exception(EXCEPTION_FATAL, exception, tasks)
        end
        if display_exceptions?
            kill_tasks.each do |task|
                log_pp :warn, task
            end
        end
    end
end

#notify_exception(kind, error, involved_objects) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Call to notify the listeners registered with #on_exception of the occurence of an exception



2803
2804
2805
2806
2807
2808
# File 'lib/roby/execution_engine.rb', line 2803

def notify_exception(kind, error, involved_objects)
    log(:exception_notification, plan.droby_id, kind, error, involved_objects)
    exception_listeners.each do |listener|
        listener.call(self, kind, error, involved_objects)
    end
end

#on_exception(description: "exception listener", on_error: :disable) {|kind, error, tasks| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers a callback that will be called when exceptions are propagated in the plan

Yield Parameters:

Returns:



2748
2749
2750
2751
2752
# File 'lib/roby/execution_engine.rb', line 2748

def on_exception(description: "exception listener", on_error: :disable, &block)
    handler = PollBlockDefinition.new(description, block, on_error: on_error)
    exception_listeners << handler
    Roby.disposable { exception_listeners.delete(handler) }
end

#on_log(on_error: :disable, &block) ⇒ #dispose

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set of blocks called with log events

Parameters:

  • on_error (Symbol) (defaults to: :disable)

    one of :ignore, :disable or :raise, see PollBlockDefinition.new

Returns:

  • (#dispose)

    a disposable that allows to remove the handler



2196
2197
2198
2199
2200
# File 'lib/roby/execution_engine.rb', line 2196

def on_log(on_error: :disable, &block)
    handler = PollBlockDefinition.new("", block, on_error: on_error)
    @on_log_handlers << handler
    handler
end

#once(sync: nil, description: "once block", type: :external_events, **options, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules block to be called at the beginning of the next execution cycle, in propagation context.

Parameters:

  • sync (#fail) (defaults to: nil)

    a synchronization object that is used to communicate between the once block and the calling thread. The main use of this parameter is to make sure that #fail is called if the execution engine quits

  • type (:external_events, :propagation) (defaults to: :external_events)

    whether the block should be registered as an :external_events block, processed at the beginning of the cycle, or a :propagation block, processed at each propagation loop.

  • description (String) (defaults to: "once block")

    a string describing the block. It will be used when adding timepoints to the event log



1444
1445
1446
1447
# File 'lib/roby/execution_engine.rb', line 1444

def once(sync: nil, description: "once block", type: :external_events, **options, &block)
    waiting_work << sync if sync
    once_blocks << create_propagation_handler(description: description, type: type, once: true, **options, &block)
end

#outside_control?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do

if Roby.inside_control?
ERROR
end

Do instead

if !Roby.outside_control?
ERROR
end

Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.

Returns:

  • (Boolean)


2290
2291
2292
2293
# File 'lib/roby/execution_engine.rb', line 2290

def outside_control?
    t = thread
    !t || t != Thread.current
end

#prepare_propagation(target, is_forward, info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

call-seq:

prepare_propagation(target, is_forward, info) => source_events, source_generators, context
prepare_propagation(target, is_forward, info) => nil

Parses the propagation information info in the context of a signalling if is_forward is true and a forwarding otherwise. target is the target event.

The method adds the appropriate delayed events using #add_event_delay, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.

The format of info is the same as the hash values described in #gather_propagation.



1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
# File 'lib/roby/execution_engine.rb', line 1073

def prepare_propagation(target, is_forward, info)
    timeref = Time.now

    source_events, source_generators, context = Set.new, Set.new, []

    delayed = true
    info.each_slice(3) do |src, ctxt, time|
        if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time))
            add_event_delay(delay, is_forward, src, target, ctxt)
            next
        end

        delayed = false

        # Merge identical signals. Needed because two different event handlers
        # can both call #emit, and two signals are set up
        if src
            if src.respond_to?(:generator)
                source_events << src
                source_generators << src.generator
            else
                source_generators << src
            end
        end
        if ctxt
            context.concat ctxt
        end
    end

    unless delayed
        [source_events, source_generators, context]
    end
end

#process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The inside part of the event loop

It gathers initial events and errors and propagate them

Returns:

Raises:

  • RecursivePropagationContext if called recursively



1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
# File 'lib/roby/execution_engine.rb', line 1786

def process_events(
    raise_framework_errors: Roby.app.abort_on_application_exception?,
    garbage_collect_pass: true, &caller_block
)
    if @application_exceptions
        raise RecursivePropagationContext, "recursive call to process_events"
    end

    # to avoid having a almost-method-global ensure block
    passed_recursive_check = true
    @application_exceptions = []
    @emitted_events = []

    # Gather new events and propagate them
    events_errors = nil
    next_steps = gather_propagation do
        events_errors = gather_errors do
            if caller_block
                yield
                caller_block = nil
            end

            if !quitting? || !garbage_collect([])
                process_waiting_work
                log_timepoint "workers"
                gather_external_events
                log_timepoint "external_events"
                call_propagation_handlers
                log_timepoint "propagation_handlers"
            end
        end
    end

    propagation_info = propagate_events_and_errors(
        next_steps, events_errors, garbage_collect_pass: garbage_collect_pass
    )
    if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty?
        raise Aborting.new(propagation_info.each_fatal_error.map(&:exception))
    end

    propagation_info.framework_errors.concat(@application_exceptions)
    propagation_info
ensure
    if passed_recursive_check
        process_pending_application_exceptions(raise_framework_errors: raise_framework_errors)
    end
end

#process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tests are using a special mode for propagation, in which everything is resolved when #emit or #call is called, including error handling. This mode is implemented using this method

When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked

If multiple errors are raised in a single call (this is possible due to Roby's error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors to wrap all the exceptions into one.



1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
# File 'lib/roby/execution_engine.rb', line 1846

def process_events_synchronous(seeds = {}, initial_errors = [], enable_scheduler: false, raise_errors: true, &block)
    Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead"

    if @application_exceptions
        raise RecursivePropagationContext, "recursive call to process_events"
    end

    passed_recursive_check = true # to avoid having a almost-method-global ensure block
    @application_exceptions = []

    # Save early for the benefit of the 'ensure' block
    current_scheduler_enabled = scheduler.enabled?

    if block_given?
        if seeds.empty? && initial_errors.empty?
            seeds = gather_propagation do
                initial_errors = gather_errors(&block)
            end
        else
            raise ArgumentError,
                  "cannot provide both seeds/inital errors and a block"
        end
    end

    scheduler.enabled = enable_scheduler

    propagation_info = propagate_events_and_errors(
        seeds, initial_errors, garbage_collect_pass: false
    )
    unless propagation_info.kill_tasks.empty?
        gc_initial_errors = nil
        gc_seeds = gather_propagation do
            gc_initial_errors = gather_errors do
                garbage_collect(propagation_info.kill_tasks)
            end
        end
        gc_errors = propagate_events_and_errors(
            gc_seeds, gc_initial_errors, garbage_collect_pass: false
        )
        propagation_info.merge(gc_errors)
    end

    if raise_errors
        propagation_info = propagation_info.exceptions
        if propagation_info.size == 1
            raise propagation_info.first
        elsif !propagation_info.empty?
            raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception))
        end
    else
        propagation_info
    end
rescue SynchronousEventProcessingMultipleErrors => e
    raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions)
rescue Exception => e
    if passed_recursive_check
        application_exceptions = clear_application_exceptions
        if !application_exceptions.empty?
            raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e])
        else
            raise e
        end
    else
        raise e
    end
ensure
    if passed_recursive_check && @application_exceptions
        process_pending_application_exceptions
    end
    scheduler.enabled = current_scheduler_enabled
end

#process_once_blocksObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Dispatch #once_blocks to the other handler sets for further processing



822
823
824
825
826
827
828
829
830
831
# File 'lib/roby/execution_engine.rb', line 822

def process_once_blocks
    until once_blocks.empty?
        type, block = once_blocks.pop
        if type == :external_events
            external_events_handlers << block
        else
            propagation_handlers << block
        end
    end
end

#process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
# File 'lib/roby/execution_engine.rb', line 681

def process_pending_application_exceptions(
    application_errors = clear_application_exceptions,
    raise_framework_errors: Roby.app.abort_on_application_exception?
)
    # We don't aggregate exceptions, so report them all and raise one
    if display_exceptions?
        application_errors.each do |error, source|
            unless error.kind_of?(Interrupt)
                fatal "Application error in #{source}"
                Roby.log_exception_with_backtrace(error, self, :fatal)
            end
        end
    end

    error, source = application_errors.find do |error, _|
        raise_framework_errors || error.kind_of?(SignalException)
    end
    if error
        if Roby.app.display_all_threads_state_on_abort?
            fatal "State of all running threads:"
            Roby.log_all_threads_backtraces(self, :fatal)
        end

        raise error, "in #{source}: #{error.message}", error.backtrace
    end
end

#process_waiting_workObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process asynchronous work registered in #waiting_work to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a Promise without a Promise#on_error handler)



1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
# File 'lib/roby/execution_engine.rb', line 1537

def process_waiting_work
    finished, not_finished = waiting_work.partition(&:complete?)

    unhandled_errors = finished.find_all do |work|
        work.rejected? &&
            (work.respond_to?(:handled_error?) && !work.handled_error?)
    end

    unhandled_errors.each do |work|
        e = work.reason
        e.set_backtrace(e.backtrace)
        add_framework_error(e, work.to_s)

        if work.respond_to?(:error_handling_failure) &&
           (e = work.error_handling_failure)
            e.set_backtrace(e.backtrace)
            add_framework_error(e, work.to_s)
        end
    end

    @waiting_work = not_finished
    finished
end

#promise(description: nil, executor: thread_pool, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a promise to execute the given block in a separate thread

Note that the returned value is a Promise. This means that callbacks added with #on_success or #rescue will be executed in the execution engine thread by default.



2815
2816
2817
# File 'lib/roby/execution_engine.rb', line 2815

def promise(description: nil, executor: thread_pool, &block)
    Promise.new(self, executor: executor, description: description, &block)
end

#propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) ⇒ PropagationInfo

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagate an initial set of event propagations and errors

Parameters:

  • next_steps (Array)

    the next propagations, as returned by #gather_propagation

  • initial_errors (Array)

    a set of errors that should be propagated

  • garbage_collect_pass (Boolean) (defaults to: true)

    whether the garbage collection pass should be performed or not. It is used in the tests' codepath for Roby::EventGenerator#call and Roby::EventGenerator#emit.

Returns:

  • (PropagationInfo)

    what happened during the propagation and propagated



1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
# File 'lib/roby/execution_engine.rb', line 1929

def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true)
    propagation_info = PropagationInfo.new
    events_errors = initial_errors.dup
    loop do
        log_timepoint_group "event_propagation_phase" do
            events_errors.concat(event_propagation_phase(next_steps, propagation_info))
        end

        next_steps = gather_propagation do
            exception_propagation_errors, error_phase_results = nil
            log_timepoint_group "error_handling_phase" do
                exception_propagation_errors = gather_errors do
                    error_phase_results = error_handling_phase(events_errors)
                end
            end

            add_exceptions_for_inhibition(error_phase_results.each_fatal_error)
            propagation_info.merge(error_phase_results)
            garbage_collection_errors = gather_errors do
                plan.generate_induced_errors(error_phase_results)
                if garbage_collect_pass
                    garbage_collect(error_phase_results.kill_tasks)
                else
                    []
                end
            end
            events_errors = (exception_propagation_errors + garbage_collection_errors)
            log_timepoint "garbage_collect"
        end

        break if next_steps.empty? && events_errors.empty?
    end
    propagation_info
end

#propagate_exception_in_plan(exceptions) {|exception, handling_object| ... } ⇒ Array<(ExecutionException,Array<Task>)>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The core exception propagation algorithm

Parameters:

  • exceptions (Array<(ExecutionException,Array<Task>)>)

    the set of exceptions to propagate, as well as the parents that towards which we should propagate them (if empty, all parents)

Yield Parameters:

  • exception (ExecutionException)

    the exception that is being propagated

  • handling_object (Task, Plan)

    the object we want to test whether it handles the exception or not

Yield Returns:

  • (Boolean)

    true if the exception is handled, false otherwise

Returns:

  • (Array<(ExecutionException,Array<Task>)>)

    the set of unhandled exceptions, as a mapping from an exception description to the set of tasks that are affected by it



1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
# File 'lib/roby/execution_engine.rb', line 1261

def propagate_exception_in_plan(exceptions, &handler)
    # We usually aim at not having special cases for empty sets ... but this
    # is a hot path and we do a reverse of the dependency graph
    return [[], []] if exceptions.empty?

    propagation_graph = dependency_graph.reverse

    # Propagate the exceptions in the hierarchy
    handled_unhandled = []
    exceptions.each do |exception, parents|
        origin = exception.origin
        if parents
            filtered_parents = parents.find_all { |t| t.depends_on?(origin) }
            if filtered_parents != parents
                warn "some parents specified for #{exception.exception}"\
                     "(#{exception.exception.class}) are actually not parents "\
                     "of #{origin}, they got filtered out"
                (parents - filtered_parents).each do |task|
                    warn "  #{task}"
                end

                if filtered_parents.empty?
                    parents = propagation_graph.out_neighbours(origin)
                else
                    parents = filtered_parents
                end
            end
        else
            parents = propagation_graph.out_neighbours(origin)
        end

        debug do
            debug "propagating exception "
            log_pp :debug, exception
            unless parents.empty?
                debug "  constrained to parents"
                log_nest(2) do
                    parents.each do |p|
                        log_pp :debug, p
                    end
                end
            end
            break
        end

        visitor = ExceptionPropagationVisitor.new(
            propagation_graph, exception, origin, parents, handler
        )
        visitor.visit

        unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) }
        handled   = visitor.handled_exceptions.inject { |a, b| a.merge(b) }
        handled_unhandled << [handled, unhandled]
    end

    exceptions_handled_by = []
    unhandled_exceptions  = []
    handled_unhandled.each do |handled, e|
        if e
            if e.handled = yield(e, plan)
                if handled
                    handled_by = (handled.propagation_leafs.to_set << plan)
                    exceptions_handled_by << [handled.merge(e), handled_by]
                else
                    handled = e
                    exceptions_handled_by << [e, [plan].to_set]
                end
            else
                affected_tasks = e.trace.vertices.to_set
                if handled
                    affected_tasks -= handled.trace.vertices
                    exceptions_handled_by << [handled, handled.propagation_leafs.to_set]
                end
                unhandled_exceptions << [e, affected_tasks]
            end
        else
            exceptions_handled_by << [handled, handled.propagation_leafs.to_set]
        end
    end

    debug do
        debug "#{unhandled_exceptions.size} unhandled exceptions remain"
        log_nest(2) do
            unhandled_exceptions.each do |e, affected_tasks|
                log_pp :debug, e
                debug "Affects #{affected_tasks.size} tasks"
                log_nest(2) do
                    affected_tasks.each do |t|
                        log_pp :debug, t
                    end
                end
            end
        end
        break
    end
    [unhandled_exceptions, exceptions_handled_by]
end

#propagate_exceptions(exceptions) ⇒ Array<(ExecutionException,Array<Task>)>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions

Parameters:

  • exceptions (Array<(ExecutionException,Array<Task>)>)

    the set of exceptions to propagate, as well as the parents that towards which we should propagate them (if empty, all parents)

Returns:

  • (Array<(ExecutionException,Array<Task>)>)

    the set of unhandled exceptions, as a mapping from an exception description to the set of tasks that are affected by it



1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
# File 'lib/roby/execution_engine.rb', line 1366

def propagate_exceptions(exceptions)
    if exceptions.empty?
        return [], [], []
    end

    # Remove all exception that are not associated with a task
    exceptions, free_events_exceptions = exceptions.partition do |e, _|
        e.origin
    end
    # Normalize the free events exceptions
    free_events_exceptions = free_events_exceptions.map do |e, _|
        if e.exception.failed_generator.plan
            [e, Set[e.exception.failed_generator]]
        end
    end.compact

    debug "Filtering inhibited exceptions"
    exceptions = log_nest(2) do
        non_inhibited, = remove_inhibited_exceptions(exceptions)
        # Reset the trace for the real propagation
        non_inhibited.map do |e, _|
            _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception }
            e.reset_trace
            [e, propagate_through]
        end
    end

    debug "Propagating #{exceptions.size} non-inhibited exceptions"
    log_nest(2) do
        # Note that the first half of the method filtered the free
        # events exceptions out of 'exceptions'
        unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object|
            object.handle_exception(e)
        end

        return unhandled, free_events_exceptions, handled
    end
end

#propagation_context(sources) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets the source_event and source_generator variables according to source. source is the from argument of #add_event_propagation



729
730
731
732
733
734
735
736
737
# File 'lib/roby/execution_engine.rb', line 729

def propagation_context(sources)
    current_sources = @propagation_sources
    raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context?

    @propagation_sources = sources
    yield
ensure
    @propagation_sources = current_sources
end

#propagation_source_eventsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of events extracted from #sources



518
519
520
521
522
523
524
525
526
# File 'lib/roby/execution_engine.rb', line 518

def propagation_source_events
    result = Set.new
    for ev in @propagation_sources
        if ev.respond_to?(:generator)
            result << ev
        end
    end
    result
end

#propagation_source_generatorsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The set of generators extracted from #sources



529
530
531
532
533
534
535
536
537
538
539
# File 'lib/roby/execution_engine.rb', line 529

def propagation_source_generators
    result = Set.new
    for ev in @propagation_sources
        result << if ev.respond_to?(:generator)
                      ev.generator
                  else
                      ev
                  end
    end
    result
end

#queue_forward(sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Queue a forwarding to be propagated



749
750
751
# File 'lib/roby/execution_engine.rb', line 749

def queue_forward(sources, target, context, timespec)
    add_event_propagation(true, sources, target, context, timespec)
end

#queue_signal(sources, target, context, timespec) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Queue a signal to be propagated



744
745
746
# File 'lib/roby/execution_engine.rb', line 744

def queue_signal(sources, target, context, timespec)
    add_event_propagation(false, sources, target, context, timespec)
end

#quitObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Make control quit properly



2594
2595
2596
# File 'lib/roby/execution_engine.rb', line 2594

def quit
    @quit = 1
end

#quitting?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True if the control thread is currently quitting

Returns:

  • (Boolean)


2580
2581
2582
# File 'lib/roby/execution_engine.rb', line 2580

def quitting?
    @quit > 0
end

#refresh_relationsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Refresh the value of cached relations

Some often-used relations are cached at #initialize, such as #dependency_graph and #precedence_graph. Call this when the actual graph objects have changed on the plan



135
136
137
138
139
140
# File 'lib/roby/execution_engine.rb', line 135

def refresh_relations
    @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency)
    @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence)
    @signal_graph     = plan.event_relation_graph_for(EventStructure::Signal)
    @forward_graph    = plan.event_relation_graph_for(EventStructure::Forwarding)
end

#remove_at_cycle_end(handler_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Removes a handler added by #at_cycle_end

Parameters:



2220
2221
2222
# File 'lib/roby/execution_engine.rb', line 2220

def remove_at_cycle_end(handler_id)
    handler_id.dispose
end

#remove_exception_listener(handler) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Removes an exception listener registered with #on_exception

Parameters:



2758
2759
2760
# File 'lib/roby/execution_engine.rb', line 2758

def remove_exception_listener(handler)
    handler.dispose if handler.respond_to?(:dispose)
end

#remove_inhibited_exceptions(exceptions) ⇒ Array<ExecutionException>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Process the given exceptions to remove the ones that are currently filtered by the plan repairs

The returned exceptions are propagated, i.e. their #trace method contains all the tasks that are affected by the absence of a handling mechanism

Parameters:

  • exceptions ((ExecutionException,Array<Roby::Task>))

    pairs of exceptions as well as the "root tasks", i.e. the parents of origin.task towards which they should be propagated

Returns:



1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
# File 'lib/roby/execution_engine.rb', line 1416

def remove_inhibited_exceptions(exceptions)
    exceptions = exceptions.find_all do |execution_exception, _|
        execution_exception.origin.plan
    end

    propagate_exception_in_plan(exceptions) do |e, object|
        if has_pending_exception_matching?(e, object)
            true
        elsif object.respond_to?(:handles_error?)
            object.handles_error?(e)
        end
    end
end

#remove_periodic_handler(id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Removes a periodic handler defined by #every. id is the value returned by #every.



2244
2245
2246
2247
# File 'lib/roby/execution_engine.rb', line 2244

def remove_periodic_handler(id)
    id.dispose
    nil
end

#resetObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Make a quit EE ready for reuse



2604
2605
2606
# File 'lib/roby/execution_engine.rb', line 2604

def reset
    @quit = 0
end

#reset_thread_poolObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2694
2695
2696
2697
# File 'lib/roby/execution_engine.rb', line 2694

def reset_thread_pool
    @thread_pool&.shutdown
    @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10)
end

#run(cycle: 0.1) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Main event loop. Valid options are

cycle

the cycle duration in seconds (default: 0.1)



2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
# File 'lib/roby/execution_engine.rb', line 2299

def run(cycle: 0.1)
    if running?
        raise AlreadyRunning, "#run has already been called"
    end

    self.running = true

    @allow_propagation = false
    @waiting_work = Concurrent::Array.new

    @thread = Thread.current
    @thread.name = "MAIN"

    @cycle_length = cycle
    trap("INT") do
        interrupt
    end
    event_loop
ensure
    self.running = false
    @thread = nil
    waiting_work.delete_if do |w|
        next(true) if w.complete?

        # rubocop:disable Lint/HandleExceptions
        begin
            w.fail ExecutionQuitError
            Roby.warn "forcefully terminated #{w} on quit"
        rescue Concurrent::MultipleAssignmentError
            # Race condition: something completed the promise while
            # we were trying to make it fail
        end
        # rubocop:enable Lint/HandleExceptions

        true
    end
    finalizers.each do |blk|
        begin
            blk.call
        rescue Exception => e
            Roby.warn "finalizer #{blk} failed"
            Roby.log_exception_with_backtrace(e, Roby, :warn)
        end
    end
    @quit = 0
    @allow_propagation = true
    trap("INT", "DEFAULT")
end

#shutdownObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



2689
2690
2691
2692
# File 'lib/roby/execution_engine.rb', line 2689

def shutdown
    killall
    thread_pool.shutdown
end

#start_new_cycle(time = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set the cycle_start attribute and increment cycle_index

This is only used for testing purposes



2570
2571
2572
2573
# File 'lib/roby/execution_engine.rb', line 2570

def start_new_cycle(time = Time.now)
    @cycle_start = time
    @cycle_index += 1
end

#unmark_finished_missions_and_permanent_tasksObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
# File 'lib/roby/execution_engine.rb', line 1984

def unmark_finished_missions_and_permanent_tasks
    to_unmark = plan.task_index.by_predicate[:finished?] |
                plan.task_index.by_predicate[:failed?]

    finished_missions = (plan.mission_tasks & to_unmark)
    # Remove all missions that are finished
    for finished_mission in finished_missions
        unless finished_mission.being_repaired?
            plan.unmark_mission_task(finished_mission)
        end
    end
    finished_permanent = (plan.permanent_tasks & to_unmark)
    for finished_permanent in (plan.permanent_tasks & to_unmark)
        unless finished_permanent.being_repaired?
            plan.unmark_permanent_task(finished_permanent)
        end
    end
end

#unreachable_event(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by EventGenerator when an event became unreachable



575
576
577
# File 'lib/roby/execution_engine.rb', line 575

def unreachable_event(event)
    delayed_events.delete_if { |_, _, _, signalled, _| signalled == event }
end

#wait_one_cycleObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks until at least once execution cycle has been done



2135
2136
2137
2138
2139
2140
2141
2142
# File 'lib/roby/execution_engine.rb', line 2135

def wait_one_cycle
    current_cycle = execute { cycle_index }
    while current_cycle == execute { cycle_index }
        raise ExecutionQuitError unless running?

        sleep(cycle_length)
    end
end

#wait_until(ev) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent exception is raised.



2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
# File 'lib/roby/execution_engine.rb', line 2662

def wait_until(ev)
    if inside_control?
        raise ThreadMismatch, "cannot use #wait_until in execution threads"
    end

    ivar = Concurrent::IVar.new
    result = nil
    once(sync: ivar) do
        if ev.unreachable?
            ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason))
        else
            ev.if_unreachable(cancel_at_emission: true) do |reason, event|
                ivar.fail(UnreachableEvent.new(event, reason)) unless ivar.complete?
            end
            ev.once do |ev|
                ivar.set(result) unless ivar.complete?
            end
            begin
                result = yield if block_given?
            rescue Exception => e
                ivar.fail(e)
            end
        end
    end
    ivar.value!
end