Class: Ruote::Exp::ConcurrenceExpression

Inherits:
FlowExpression show all
Defined in:
lib/ruote/exp/fe_concurrence.rb

Overview

The 'concurrence' expression applies its child branches in parallel (well it makes a best effort to make them run in parallel).

concurrence do
  alpha
  bravo
end

attributes

The concurrence expression takes a number of attributes that allow for sophisticated control (especially at merge time).

:count

concurrence :count => 1 do
  alpha
  bravo
end

in that example, the concurrence will terminate as soon as 1 (count) of the branches replies. The other branch will get cancelled.

:count and :wait_for may point to a negative integer, meaning “all but x”.

concurrence :count => -2 do # all the branches replied but 2
  # ...
end

:count can be shortened to :c.

:wait_for

This attribute accepts either an integer, either a list of tags.

When used with the integer, it's equivalent to the :count attribute:

concurrence :wait_for => 1 do
  # ...
end

It waits for 1 branch to respond and then moves on (concurrence over).

When used with a string (or an array), it extracts a list of tags and waits for the branches with those tags. Once all the tags have replied, the concurrence is over.

concurrence :wait_for => 'alpha, bravo' do
  sequence :tag => 'alpha' do
    # ...
  end
  sequence :tag => 'bravo' do
    # ...
  end
  sequence :tag => 'charly' do
    # ...
  end
end

This concurrence will be over when the branches alpha and bravo have replied. The charly branch may have replied or not, it doesn't matter.

:wait_for can be shortened to :wf.

:over_if (and :over_unless) attribute

Like the :count attribute controls how many branches have to reply before a concurrence ends, the :over attribute is used to specify a condition upon which the concurrence will [prematurely] end.

concurrence :over_if => '${f:over}'
  alpha
  bravo
  charly
end

will end the concurrence as soon as one of the branches replies with a workitem whose field 'over' is set to true. (the remaining branches will get cancelled unless :remaining => :forget is set).

:over_unless needs no explanation.

:remaining

As said for :count, the remaining branches get cancelled. By setting :remaining to :forget (or 'forget'), the remaining branches will continue their execution, forgotten.

concurrence :count => 1, :remaining => :forget do
  alpha
  bravo
end

:remaining can be shortened to :rem or :r.

The default is 'cancel', where all the remaining branches are cancelled while the hand is given back to the main flow.

There is a third setting, 'wait'. It behaves like 'cancel', but the concurrence waits for the cancelled children to reply. The workitems from cancelled branches are merged in as well.

:merge

By default, the workitems override each others. By default, the first workitem to reply will win.

sequence do
  concurrence do
    alpha
    bravo
  end
  charly
end

In that example, if 'alpha' replied first, the workitem that reaches 'charly' once 'bravo' replied will have the payload as seen/modified by 'alpha'.

The :merge attribute determines which branch wins the merge.

  • first (default)

  • last

  • highest

  • lowest

highest and lowest refer to the position in the list of branch. It's useful to set a fixed winner.

concurrence :merge => :highest do
  alpha
  bravo
end

makes sure that alpha's version of the workitem wins.

:merge can be shortened to :m.

:merge_type

:merge_type => :override (default)

By default, the merge type is set to 'override', which means that the 'winning' workitem's payload supplants all other workitems' payloads.

:merge_type => :mix

Setting :merge_type to :mix, will actually attempt to merge field by field, making sure that the field value of the winner(s) are used.

:merge_type => :isolate

:isolate will rearrange the resulting workitem payload so that there is a new field for each branch. The name of each field is the index of the branch from '0' to …

:merge_type => :stack

:stack will stack the workitems coming back from the concurrence branches in an array whose order is determined by the :merge attributes. The array is placed in the 'stack' field of the resulting workitem. Note that the :stack merge_type also creates a 'stack_attributes' field and populates it with the expanded attributes of the concurrence.

Thus

sequence do
  concurrence :merge => :highest, :merge_type => :stack do
    reviewer1
    reviewer2
  end
  editor
end

will see the 'editor' receive a workitem whose fields look like :

{ 'stack' => [{ ... reviewer1 fields ... }, { ... reviewer2 fields ... }],
  'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } }

This could prove useful for participant having to deal with multiple merge strategy results.

:merge_type => :union

(Available from ruote 2.3.0)

Will override atomic fields, concat arrays and merge hashes…

The union of those two workitems

{ 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' }

will be

{ 'a' => 1,
  'b' => [ 'x', 'y', 'z' ],
  'c' => { 'aa' => 'bb', 'cc' => 'dd' } }

Warning: duplicates in arrays present before the merge will be removed as well.

:merge_type => :concat

(Available from ruote 2.3.0)

Much like :union, but duplicates are not removed. Thus

{ 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' }
{ 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' }

will be

{ 'a' => 1,
  'b' => [ 'x', 'y', 'y', 'z' ],
  'c' => { 'aa' => 'bb', 'cc' => 'dd' } }

:merge_type => :deep

(Available from ruote 2.3.0)

Identical to :concat but hashes are merged with deep_merge (ActiveSupport flavour).

:merge_type => :ignore

(Available from ruote 2.3.0)

A very simple merge type, the workitems given back by the branches are simply discarded and the workitem as passed to the concurrence expression is used to reply to the parent expression (of the concurrence expression).

:merge_type can be shortened to :mt.

Direct Known Subclasses

ConcurrentIteratorExpression

Constant Summary

COUNT_R =
/^-?\d+$/

Constants inherited from FlowExpression

FlowExpression::COMMON_ATT_KEYS

Instance Attribute Summary

Attributes inherited from FlowExpression

#context, #error

Instance Method Summary (collapse)

Methods inherited from FlowExpression

#ancestor?, #applied_workitem, #att, #att_text, #attribute, #attribute_text, #attributes, #await, #cancel, #cancel_flanks, #cfei_at, #child_id, #child_ids, #compile_atts, #compile_variables, #debug_id, #deflate, #do, do_action, #do_apply, #do_cancel, #do_fail, #do_pause, #do_persist, #do_reply, #do_reply_to_parent, #do_resume, #do_unpersist, dummy, #fei, fetch, from_h, #handle_on_error, #has_attribute, #initial_persist, #initialize, #iterative_var_lookup, #launch_sub, #lookup_val, #lookup_val_prefix, #lookup_variable, #name, names, #parent, #parent_id, #pause_on_apply, #persist_or_raise, #root, #root_id, #set_variable, #tree, #tree_children, #try_persist, #try_unpersist, #unpersist_or_raise, #unset_variable, #update_tree, #variables, #wfid

Methods included from WithMeta

#class_def, included

Methods included from WithH

#h, #h=, included, #to_h

Constructor Details

This class inherits a constructor from Ruote::Exp::FlowExpression

Instance Method Details

- (Object) apply



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/ruote/exp/fe_concurrence.rb', line 281

def apply

  return do_reply_to_parent(h.applied_workitem) if tree_children.empty?

  #
  # count and wait_for

  count = (attribute(:count) || attribute(:c)).to_s
  count = nil unless COUNT_R.match(count)

  wf = count || attribute(:wait_for) || attribute(:wf)

  if COUNT_R.match(wf.to_s)
    h.ccount = wf.to_i
  elsif wf
    h.wait_for = Ruote.comma_split(wf)
  end

  #
  # other attributes

  h.cmerge = att(
    [ :merge, :m ],
    %w[ first last highest lowest ])
  h.cmerge_type = att(
    [ :merge_type, :mt ],
    %w[ override mix isolate stack union ignore concat deep ])
  h.remaining = att(
    [ :remaining, :rem, :r ],
    %w[ cancel forget wait ])

  #h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {}
    #
    # now merging iteratively, not keeping track of all the workitems,
    # but still able to deal with old flows with active h.workitems
    #
  h.workitems = [] if %w[ highest lowest ].include?(h.cmerge)
    #
    # still need to keep track of rank to get the right merging

  h.over = false

  apply_children

  @context.storage.put_msg(
    'reply', 'fei' => h.fei, 'workitem' => h.applied_workitem
  ) if h.ccount == 0
    #
    # force an immediate reply
end

- (Boolean) is_concurrent?

This method is used by some walking routines when analyzsing execution trees. Returns true for concurrence (and concurrent iterator).

Returns:

  • (Boolean)


276
277
278
279
# File 'lib/ruote/exp/fe_concurrence.rb', line 276

def is_concurrent?

  true
end

- (Object) reply(workitem)



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/ruote/exp/fe_concurrence.rb', line 332

def reply(workitem)

  workitem = Ruote.fulldup(workitem)
    #
    # since workitem field merging might happen, better to work on
    # a copy of the workitem (so that history, coming afterwards,
    # doesn't see a modified version of the workitem)

  if h.wait_for && tag = workitem['fields']['__left_tag__']
    h.wait_for.delete(tag)
  end

  over = h.over
  h.over = over || over?(workitem)

  keep(workitem)
    # is done after the over? determination for its looks at 'winner'

  if (not over) && h.over
    #
    # just became 'over'

    reply_to_parent(nil)

  elsif h.over && h.remaining == 'wait'

    reply_to_parent(nil)

  elsif h.children.empty?

    do_unpersist || return

    @context.storage.put_msg(
      'ceased',
      'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem)
  else

    do_persist
  end
end