Class: Ruote::Exp::ConcurrenceExpression

Inherits:
FlowExpression show all
Includes:
MergeMixin
Defined in:
lib/ruote/exp/fe_concurrence.rb

Overview

The 'concurrence' expression applies its child branches in parallel (well it makes a best effort to make them run in parallel).

concurrence do
  alpha
  bravo
end

attributes

The concurrence expression takes a number of attributes that allow for sophisticated control (especially at merge time).

:count

concurrence :count => 1 do
  alpha
  bravo
end

in that example, the concurrence will terminate as soon as 1 (count) of the branches replies. The other branch will get cancelled.

:remaining

As said for :count, the remaining branches get cancelled. By setting :remaining to :forget (or 'forget'), the remaining branches will continue their execution, forgotten.

concurrence :count => 1, :remaining => :forget do
  alpha
  bravo
end

:merge

By default, the workitems override each others. By default, the first workitem to reply will win.

sequence do
  concurrence do
    alpha
    bravo
  end
  charly
end

In that example, if 'alpha' replied first, the workitem that reaches 'charly' once 'bravo' replied will have the payload as seen/modified by 'alpha'.

The :merge attribute determines which branch wins the merge.

  • first (default)

  • last

  • highest

  • lowest

highest and lowest refer to the position in the list of branch. It's useful to set a fixed winner.

concurrence :merge => :highest do
  alpha
  bravo
end

makes sure that alpha's version of the workitem wins.

:merge_type

:override

By default, the merge type is set to 'override', which means that the 'winning' workitem's payload supplants all other workitems' payloads.

:mix

Setting :merge_type to :mix, will actually attempt to merge field by field, making sure that the field value of the winner(s) are used.

:isolate

:isolate will rearrange the resulting workitem payload so that there is a new field for each branch. The name of each field is the index of the branch from '0' to …

:stack

:stack will stack the workitems coming back from the concurrence branches in an array whose order is determined by the :merge attributes. The array is placed in the 'stack' field of the resulting workitem. Note that the :stack merge_type also creates a 'stack_attributes' field and populates it with the expanded attributes of the concurrence.

Thus

sequence do
  concurrence :merge => :highest, :merge_type => :stack do
    reviewer1
    reviewer2
  end
  editor
end

will see the 'editor' receive a workitem whose fields look like :

{ 'stack' => [{ ... reviewer1 fields ... }, { ... reviewer2 fields ... }],
  'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } }

This could prove useful for participant having to deal with multiple merge strategy results.

:over_if (and :over_unless)

Like the :count attribute controls how many branches have to reply before a concurrence ends, the :over attribute is used to specify a condition upon which the concurrence will [prematurely] end.

concurrence :over_if => '${f:over}'
  alpha
  bravo
  charly
end

will end the concurrence as soon as one of the branches replies with a workitem whose field 'over' is set to true. (the remaining branches will get cancelled unless :remaining => :forget is set).

:over_unless needs no explanation.

Direct Known Subclasses

ConcurrentIteratorExpression

Constant Summary

Constant Summary

Constants inherited from FlowExpression

FlowExpression::COMMON_ATT_KEYS

Instance Attribute Summary

Attributes inherited from FlowExpression

#context, #h

Instance Method Summary (collapse)

Methods included from MergeMixin

#merge_workitems

Methods inherited from FlowExpression

#ancestor?, #att, #attribute, #attribute_text, #attributes, #cancel, #compile_atts, #compile_variables, do_action, #do_apply, #do_cancel, #do_fail, #do_persist, #do_reply, #do_unpersist, #expand_atts, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #iterative_var_lookup, #launch_sub, #lookup_on_error, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #persist_or_raise, #set_variable, #to_h, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables

Methods included from WithMeta

#class_def, included

Methods included from WithH

included

Constructor Details

This class inherits a constructor from Ruote::Exp::FlowExpression

Instance Method Details

- (Object) apply



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/ruote/exp/fe_concurrence.rb', line 169

def apply

  h.ccount = attribute(:count).to_i rescue 0
  h.ccount = nil if h.ccount < 1

  h.cmerge = att(:merge, %w[ first last highest lowest ])
  h.cmerge_type = att(:merge_type, %w[ override mix isolate stack ])
  h.remaining = att(:remaining, %w[ cancel forget ])

  h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {}

  h.over = false

  apply_children
end

- (Object) reply(workitem)



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/ruote/exp/fe_concurrence.rb', line 185

def reply(workitem)

  if h.cmerge == 'first' || h.cmerge == 'last'
    h.workitems << workitem
  else
    h.workitems[workitem['fei']['expid']] = workitem
  end

  over = h.over
  h.over = over || over?(workitem)

  if (not over) && h.over
    # just became 'over'

    reply_to_parent(nil)

  elsif h.children.empty?

    do_unpersist || return

    @context.storage.put_msg(
      'ceased',
      'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem)
  else

    do_persist
  end
end