Class: Ruote::Exp::FlowExpression

Inherits:
Object
  • Object
show all
Includes:
WithH, WithMeta
Defined in:
lib/ruote/exp/ro_on_x.rb,
lib/ruote/exp/ro_timers.rb,
lib/ruote/exp/ro_persist.rb,
lib/ruote/exp/ro_filters.rb,
lib/ruote/exp/ro_variables.rb,
lib/ruote/exp/ro_attributes.rb,
lib/ruote/exp/flow_expression.rb

Overview

Ruote is a process definition interpreter. It doesn't directly “read” process definitions, it relies on a parser/generator to produce “abstract syntax trees” that look like

[ expression_name, { ... attributes ... }, [ children_expressions ] ]

The nodes (and leaves) in the trees are expressions. This is the base class for all expressions.

The most visible expressions are “define”, “sequence” and “participant”. Think :

pdef = Ruote.process_definition do
  sequence do
    participant :ref => 'customer'
    participant :ref => 'accounting'
    participant :ref => 'logistics'
  end
end

Each node is an expression…

the states of an expression

nil

the normal state

'cancelling'

the expression and its children are getting cancelled

'dying'

the expression and its children are getting killed

'failed'

the expression has finishing

'failing'

the expression just failed and it's cancelling its children

'timing_out'

the expression just timedout and it's cancelling its children

'paused'

the expression is paused, it will store downstream messages and play them only when a 'resume' message comes from upstream.

Direct Known Subclasses

AddBranchesExpression, ApplyExpression, AwaitExpression, CancelProcessExpression, CommandExpression, CommandedExpression, ConcurrenceExpression, CronExpression, DefineExpression, EchoExpression, EqualsExpression, ErrorExpression, FilterExpression, ForgetExpression, IfExpression, ListenExpression, LoseExpression, NoOpExpression, OnErrorExpression, OnceExpression, ParticipantExpression, ReadExpression, RedoExpression, RefExpression, RegisterpExpression, ReserveExpression, RestoreExpression, SaveExpression, SequenceExpression, StallExpression, SubprocessExpression, UndoExpression, WaitExpression

Defined Under Namespace

Classes: HandlerEntry

Constant Summary

COMMON_ATT_KEYS =
%w[
if unless
forget lose flank
timeout timers
on_error on_cancel on_timeout

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from WithMeta

#class_def, included

Methods included from WithH

#h, #h=, included, #to_h

Constructor Details

- (FlowExpression) initialize(context, hash)

Returns a new instance of FlowExpression



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/ruote/exp/flow_expression.rb', line 122

def initialize(context, hash)

  @context = context

  @msg = nil
    # contains generally the msg the expression got instantiated for

  self.h = hash

  h._id ||= Ruote.to_storage_id(h.fei)
  h['type'] ||= 'expressions'
  h.name ||= self.class.expression_names.first
  h.children ||= []
  h.applied_workitem['fei'] = h.fei
  h.created_time ||= Ruote.now_to_utc_s

  h.on_cancel ||= attribute(:on_cancel)
  h.on_error ||= attribute(:on_error)
  h.on_timeout ||= attribute(:on_timeout)
  h.on_terminate ||= attribute(:on_terminate)
end

Instance Attribute Details

- (Object) context (readonly)

Returns the value of attribute context



112
113
114
# File 'lib/ruote/exp/flow_expression.rb', line 112

def context
  @context
end

- (Object) error

Mostly used when the expression is returned via Ruote::Engine#ps(wfid) or Ruote::Engine#processes(). If an error occurred for this flow expression, #ps will set this error field so that it yields the ProcessError.

So, for short, usually, this attribute yields nil.



120
121
122
# File 'lib/ruote/exp/flow_expression.rb', line 120

def error
  @error
end

Class Method Details

+ (Object) do_action(context, msg)

Called by the worker when it has something to do for a FlowExpression.



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/ruote/exp/flow_expression.rb', line 304

def self.do_action(context, msg)

  fei = msg['fei']
  action = msg['action']

  if action == 'reply' && fei['engine_id'] != context.engine_id
    #
    # the reply has to go to another engine, let's locate the
    # 'engine participant' and give it the workitem/reply
    #
    # see ft_37 for a test/example

    engine_participant =
      context.plist.lookup(fei['engine_id'], msg['workitem'])

    raise(
      "no EngineParticipant found under name '#{fei['engine_id']}'"
    ) unless engine_participant

    engine_participant.reply(fei, msg['workitem'])
    return
  end

  # normal case

  fexp = nil

  n = context.storage.class.name.match(/Couch/) ? 3 : 1
    #
  n.times do |i|
    if fexp = fetch(context, msg['fei']); break; end
    sleep 0.028 unless i == (n - 1)
  end
    #
    # Simplify that once ruote-couch behaves

  fexp.do(action, msg) if fexp
end

+ (Object) dummy(h)

Returns a dummy expression. Only used by the error_handler service.



58
59
60
61
62
63
64
# File 'lib/ruote/exp/ro_on_x.rb', line 58

def self.dummy(h)

  fe = self.allocate
  fe.instance_variable_set(:@h, h)

  fe
end

+ (Object) fetch(context, fei)

Fetches an expression from the storage and readies it for service.



277
278
279
280
281
282
283
284
# File 'lib/ruote/exp/flow_expression.rb', line 277

def self.fetch(context, fei)

  return nil if fei.nil?

  fexp = context.storage.get('expressions', Ruote.to_storage_id(fei))

  fexp ? from_h(context, fexp) : nil
end

+ (Object) from_h(context, h)

Instantiates expression back from hash.



266
267
268
269
270
271
272
273
# File 'lib/ruote/exp/flow_expression.rb', line 266

def self.from_h(context, h)

  return self.new(nil, h) unless context

  exp_class = context.expmap.expression_class(h['name'])

  exp_class.new(context, h)
end

+ (Object) names(*exp_names)

Keeping track of names and aliases for the expression



292
293
294
295
296
# File 'lib/ruote/exp/flow_expression.rb', line 292

def self.names(*exp_names)

  exp_names = exp_names.collect { |n| n.to_s }
  meta_def(:expression_names) { exp_names }
end

Instance Method Details

- (Boolean) ancestor?(fei)

Returns true if the given fei points to an expression in the parent chain of this expression.

Returns:

  • (Boolean)


866
867
868
869
870
871
872
873
874
# File 'lib/ruote/exp/flow_expression.rb', line 866

def ancestor?(fei)

  fei = fei.to_h if fei.respond_to?(:to_h)

  return false unless h.parent_id
  return true if h.parent_id == fei

  parent.ancestor?(fei)
end

- (Object) applied_workitem

Returns a one-off Ruote::Workitem instance (the applied workitem).



240
241
242
243
# File 'lib/ruote/exp/flow_expression.rb', line 240

def applied_workitem

  @awi ||= Ruote::Workitem.new(h.applied_workitem)
end

- (Object) att(keys, values, opts = {})

Returns the value for attribute 'key', this value should be present in the array list 'values'. If not, the default value is returned. By default, the default value is the first element of 'values'.



75
76
77
78
79
80
81
82
# File 'lib/ruote/exp/ro_attributes.rb', line 75

def att(keys, values, opts={})

  default = opts[:default] || values.first

  val = Array(keys).collect { |key| attribute(key) }.compact.first.to_s

  values.include?(val) ? val : default
end

- (Object) att_text(workitem = h.applied_workitem)

Equivalent to #attribute_text, but will return nil if there is no attribute whose values is nil.



139
140
141
142
143
144
# File 'lib/ruote/exp/ro_attributes.rb', line 139

def att_text(workitem=h.applied_workitem)

  text = attributes.keys.find { |k| attributes[k] == nil }

  text ? dsub(text.to_s, workitem) : nil
end

- (Object) attribute(n, workitem = h.applied_workitem, options = {})

Looks up the value for attribute n.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ruote/exp/ro_attributes.rb', line 48

def attribute(n, workitem=h.applied_workitem, options={})

  n = n.to_s

  default = options[:default]
  escape = options[:escape]
  string = options[:to_s] || options[:string]

  v = attributes[n]

  v = if v == nil
    default
  elsif escape
    v
  else
    dsub(v, workitem)
  end

  v = v.to_s if v and string

  v
end

- (Object) attribute_text(workitem = h.applied_workitem)

Given something like

sequence do
  participant 'alpha'
end

in the context of the participant expression

attribute_text()

will yield 'alpha'.

Note : an empty text returns '', not the nil value.



129
130
131
132
133
134
# File 'lib/ruote/exp/ro_attributes.rb', line 129

def attribute_text(workitem=h.applied_workitem)

  text = attributes.keys.find { |k| attributes[k] == nil }

  dsub(text.to_s, workitem)
end

- (Object) attributes

Returns the attributes of this expression (like { 'ref' => 'toto' } or { 'timeout' => '2d' }.



920
921
922
923
# File 'lib/ruote/exp/flow_expression.rb', line 920

def attributes

  tree[1]
end

- (Object) await(att, msg)

If the expression has an :await attribute, the expression gets into a special “awaiting” state until the condition in the value of :await gets triggered and the trigger calls resume on the expression.

Raises:

  • (::ArgumentError)


436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/ruote/exp/flow_expression.rb', line 436

def await(att, msg)

  action, condition =
    Ruote::Exp::AwaitExpression.extract_await_ac(:await => att)

  raise ::ArgumentError.new(
    ":await does not understand #{att.inspect}"
  ) if action == nil

  msg.merge!('flavour' => 'awaiting')

  h.state = 'awaiting'
  h.paused_apply = msg

  persist_or_raise

  @context.tracker.add_tracker(
    h.fei['wfid'],
    action,
    Ruote.to_storage_id(h.fei),
    condition,
    { '_auto_remove' => true,
      'action' => 'resume',
      'fei' => h.fei,
      'flavour' => 'awaiting' })
end

- (Object) cancel(flavour)

This default implementation cancels all the [registered] children of this expression.



727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
# File 'lib/ruote/exp/flow_expression.rb', line 727

def cancel(flavour)

  return reply_to_parent(h.applied_workitem) if h.children.empty?
    #
    # there are no children, nothing to cancel, let's just reply to
    # the parent expression

  do_persist || return
    #
    # before firing the cancel message to the children
    #
    # if the do_persist returns false, it means it failed, implying this
    # expression is stale, let's return, thus discarding this cancel message

  children.each do |child_fei|
    #
    # let's send a cancel message to each of the children
    #
    # maybe some of them are gone or have not yet been applied, anyway,
    # the messages are sent

    @context.storage.put_msg(
      'cancel',
      'fei' => child_fei,
      'parent_id' => h.fei, # indicating that this is a "cancel child"
      'flavour' => flavour)
  end
end

- (Object) cancel_flanks(flavour)

Emits a cancel message for each flanking expression (if any).



709
710
711
712
713
714
715
716
717
718
719
720
721
722
# File 'lib/ruote/exp/flow_expression.rb', line 709

def cancel_flanks(flavour)

  return unless h.flanks

  h.flanks.each do |flank_fei|

    @context.storage.put_msg(
      'cancel',
      'fei' => flank_fei,
      'parent_id' => h.fei,
        # indicating that this is a "cancel child", well...
      'flavour' => flavour)
  end
end

- (Object) cfei_at(i)

Given an index, returns the child fei (among the currently registered children feis) whose fei.expid ends with this index (whose child_id is equal to that index).

Returns nil if not found or a child fei as a Hash.



251
252
253
254
# File 'lib/ruote/exp/flow_expression.rb', line 251

def cfei_at(i)

  children.find { |cfei| Ruote.extract_child_id(cfei) == i }
end

- (Object) child_id

Returns the child_id for this expression. (The rightmost part of the fei.expid).



172
173
174
175
# File 'lib/ruote/exp/flow_expression.rb', line 172

def child_id

  fei.child_id
end

- (Object) child_ids

Returns the list of child_ids (last part of the fei.expid) for the currently registered (active) children.



259
260
261
262
# File 'lib/ruote/exp/flow_expression.rb', line 259

def child_ids

  children.collect { |cfei| Ruote.extract_child_id(cfei) }
end

- (Object) compile_atts(opts = {})

Returns a Hash containing all attributes set for an expression with their values resolved.



108
109
110
111
112
113
# File 'lib/ruote/exp/ro_attributes.rb', line 108

def compile_atts(opts={})

  attributes.keys.each_with_object({}) { |k, r|
    r[dsub(k)] = attribute(k, h.applied_workitem, opts)
  }
end

- (Object) compile_variables

Returns a fresh hash of all the variables visible from this expression.

This is used mainly when forgetting an expression.



44
45
46
47
48
49
50
# File 'lib/ruote/exp/ro_variables.rb', line 44

def compile_variables

  vars = h.parent_id ? parent.compile_variables : {}
  vars.merge!(h.variables) if h.variables

  vars
end

- (Object) debug_id

Outputs ids like “0_2!d218c1b”, no wfid, only <expid>!<subid>[0, 7]



39
40
41
42
# File 'lib/ruote/exp/ro_persist.rb', line 39

def debug_id

  "#{h.fei['expid']}!#{h.fei['subid'][0, 7]}"
end

- (Object) deflate(err, level = 0)

Given this expression and an error, deflates the error into a hash (serializable).



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/ruote/exp/ro_on_x.rb', line 37

def deflate(err, level=0)

  {
    'fei' => h.fei,
    'at' => err.respond_to?(:at) ? err.at : Ruote.now_to_utc_s,
    'class' => err.class.to_s,
    'message' => err.message,
    'trace' => err.backtrace,
    'details' => err.respond_to?(:ruote_details) ? err.ruote_details : nil,
    'deviations' => err.respond_to?(:deviations) ? err.deviations : nil,
    'tree' => tree
  }

rescue => errr

  # could degenerate (and get stopped by a SystemStackError)
  deflate(errr, level + 1)
end

- (Object) do(action, msg)

Wraps a call to “apply”, “reply”, etc… Makes sure to set @msg with a deep copy of the msg before.



346
347
348
349
350
351
# File 'lib/ruote/exp/flow_expression.rb', line 346

def do(action, msg)

  @msg = Ruote.fulldup(msg)

  send("do_#{action}", msg)
end

- (Object) do_apply(msg)

Called by the worker when it has just created this FlowExpression and wants to apply it.



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/ruote/exp/flow_expression.rb', line 356

def do_apply(msg)

  if msg['state'] == 'paused'

    return pause_on_apply(msg)
  end

  if msg['flavour'].nil? && (aw = attribute(:await))

    return await(aw, msg)
  end

  unless Condition.apply?(attribute(:if), attribute(:unless))

    return do_reply_to_parent(h.applied_workitem)
  end

  pi = h.parent_id
  reply_immediately = false

  if attribute(:scope).to_s == 'true'

    h.variables ||= {}
  end

  if attribute(:forget).to_s == 'true'

    h.variables = compile_variables
    h.parent_id = nil
    h.forgotten = true

    reply_immediately = true

  elsif attribute(:lose).to_s == 'true'

    h.lost = true

  elsif msg['flanking'] or (attribute(:flank).to_s == 'true')

    h.flanking = true

    reply_immediately = true
  end

  if reply_immediately and pi

    @context.storage.put_msg(
      'reply',
      'fei' => pi,
      'workitem' => Ruote.fulldup(h.applied_workitem),
      'flanking' => h.flanking)
  end

  filter

  consider_tag
  consider_timers

  apply
end

- (Object) do_cancel(msg)

The raw handling of messages passed to expressions (the fine handling is done in the #cancel method).



667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
# File 'lib/ruote/exp/flow_expression.rb', line 667

def do_cancel(msg)

  flavour = msg['flavour']

  return if h.state == 'cancelling' && flavour != 'kill'
    # cancel on cancel gets discarded

  return if h.state == 'failed' && flavour == 'timeout'
    # do not timeout expressions that are "in error" (failed)

  h.state = case flavour
    when 'kill' then 'dying'
    when 'timeout' then 'timing_out'
    else 'cancelling'
  end

  if h.state == 'timing_out'

    h.applied_workitem['fields']['__timed_out__'] = [
      h.fei, Ruote.now_to_utc_s, tree.first, compile_atts
    ]

  elsif h.state == 'cancelling'

    if t = msg['on_cancel']

      h.on_cancel = t

    elsif ra_opts = msg['re_apply']

      ra_opts = {} if ra_opts == true
      ra_opts['tree'] ||= tree

      h.on_re_apply = ra_opts
    end
  end

  cancel(flavour)
end

- (Object) do_fail(msg)

Called when handling an on_error, will place itself in a 'failing' state and cancel the children (when the reply from the children comes back, the on_error will get triggered).



760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
# File 'lib/ruote/exp/flow_expression.rb', line 760

def do_fail(msg)

  h.state = 'failing'
  h.applied_workitem = msg['workitem']

  if h.children.size < 1

    reply_to_parent(h.applied_workitem)

  else

    flavour = msg['immediate'] ? 'kill' : nil

    persist_or_raise

    h.children.each do |i|
      @context.storage.put_msg('cancel', 'fei' => i, 'flavour' => flavour)
    end
  end
end

- (Object) do_pause(msg)

Expression received a “pause” message. Will put the expression in the “paused” state and then pass the message to the children.

If the expression is in a non-nil state (failed, timed_out, …), the message will be ignored.



787
788
789
790
791
792
793
794
795
796
797
798
# File 'lib/ruote/exp/flow_expression.rb', line 787

def do_pause(msg)

  return if h.state != nil

  h.state = 'paused'

  do_persist || return

  h.children.each { |i|
    @context.storage.put_msg('pause', 'fei' => i)
  } unless msg['breakpoint']
end

- (Object) do_persist

Make sure to persist (retry if necessary).



106
107
108
109
# File 'lib/ruote/exp/ro_persist.rb', line 106

def do_persist

  do_p(true)
end

- (Object) do_reply(msg) Also known as: do_receive

Wraps #reply (does the administrative part of the reply work).



606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/ruote/exp/flow_expression.rb', line 606

def do_reply(msg)

  workitem = msg['workitem']
  fei = workitem['fei']

  removed = h.children.delete(fei)
    # accept without any check ?

  if msg['flanking']

    (h.flanks ||= []) << fei

    if (not removed) # then it's a timer

      do_persist
      return
    end
  end

  if ut = msg['updated_tree']

    ct = tree.dup
    ct.last[Ruote::FlowExpressionId.child_id(fei)] = ut
    update_tree(ct)
  end

  if h.state == 'paused'

    (h.paused_replies ||= []) << msg

    do_persist

  elsif h.state != nil # failing or timing out ...

    if h.children.size < 1
      reply_to_parent(workitem)
    else
      #persist_or_raise # for the updated h.children
      do_persist
    end

  else # vanilla reply

    reply(workitem)
  end
end

- (Object) do_reply_to_parent(workitem, delete = true)

The essence of the reply_to_parent job…



477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# File 'lib/ruote/exp/flow_expression.rb', line 477

def do_reply_to_parent(workitem, delete=true)

  # propagate the cancel "flavour" back, so that one can know
  # why a branch got cancelled.

  flavour = if @msg.nil?
    nil
  elsif @msg['action'] == 'cancel'
    @msg['flavour'] || 'cancel'
  elsif h.state.nil?
    nil
  else
    @msg['flavour']
  end

  # deal with the timers and the schedules

  %w[ timeout_schedule_id job_id ].each do |sid|
    @context.storage.delete_schedule(h[sid]) if h[sid]
  end
    #
    # legacy schedule ids, to be removed for ruote 2.4.0

  @context.storage.delete_schedule(h.schedule_id) if h.schedule_id
    #
    # time-driven exps like cron, wait and once now all use h.schedule_id

  h.timers.each do |schedule_id, action|
    @context.storage.delete_schedule(schedule_id)
  end if h.timers

  # cancel flanking expressions if any

  cancel_flanks(h.state == 'dying' ? 'kill' : nil)

  # trigger or vanilla reply

  if h.state == 'failing' # on_error is implicit (#do_fail got called)

    trigger('on_error', workitem)

  elsif h.state == 'cancelling' && h.on_cancel

    trigger('on_cancel', workitem)

  elsif h.state == 'cancelling' && h.on_re_apply

    trigger('on_re_apply', workitem)

  elsif h.state == 'timing_out' && h.on_timeout

    trigger('on_timeout', workitem)

  elsif h.state == nil && h.on_reply

    trigger('on_reply', workitem)

  elsif h.flanking && h.state.nil?
    #
    # do vanish

    do_unpersist

  elsif h.lost && h.state.nil?
    #
    # do not reply, sit here (and wait for cancellation probably)

    do_persist

  elsif h.trigger && workitem['fields']["__#{h.trigger}__"]
    #
    # the "second take"

    trigger(h.trigger, workitem)

  else # vanilla reply

    filter(workitem) if h.state.nil?

    f = h.state.nil? && attribute(:vars_to_f)
    Ruote.set(workitem['fields'], f, h.variables) if f

    workitem['sub_wf_name'] = h.applied_workitem['sub_wf_name']
    workitem['sub_wf_revision'] = h.applied_workitem['sub_wf_revision']

    leave_tag(workitem) if h.tagname

    (do_unpersist || return) if delete
      # remove expression from storage

    if h.parent_id && ! h.attached

      @context.storage.put_msg(
        'reply',
        'fei' => h.parent_id,
        'workitem' => workitem.merge!('fei' => h.fei),
        'updated_tree' => h.updated_tree, # nil most of the time
        'flavour' => flavour)

    else

      @context.storage.put_msg(
        (h.forgotten || h.attached) ? 'ceased' : 'terminated',
        'wfid' => h.fei['wfid'],
        'fei' => h.fei,
        'workitem' => workitem,
        'variables' => h.variables,
        'flavour' => flavour)

      if
        h.state.nil? &&
        h.on_terminate == 'regenerate' &&
        ! (h.forgotten || h.attached)
      then
        @context.storage.put_msg(
          'regenerate',
          'wfid' => h.fei['wfid'],
          'tree' => h.original_tree,
          'workitem' => workitem,
          'variables' => h.variables,
          'flavour' => flavour)
          #'stash' =>
      end
    end
  end
end

- (Object) do_resume(msg)

Will “unpause” the expression (if it was paused), and trigger any 'paused_replies' (replies that came while the expression was paused).



803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
# File 'lib/ruote/exp/flow_expression.rb', line 803

def do_resume(msg)

  return unless h.state == 'paused' || h.state == 'awaiting'

  h.state = nil

  m = h.delete('paused_apply')
  return do_apply(m) if m
    # if it's a paused apply, pipe it directly to #do_apply

  replies = h.delete('paused_replies') || []

  do_persist || return

  h.children.each { |i| @context.storage.put_msg('resume', 'fei' => i) }
    # resume children

  replies.each { |m| @context.storage.put_msg(m.delete('action'), m) }
    # trigger replies
end

- (Object) do_unpersist

Make sure to unpersist (retry if necessary).



113
114
115
116
# File 'lib/ruote/exp/ro_persist.rb', line 113

def do_unpersist

  do_p(false)
end

- (Object) fei

Returns the Ruote::FlowExpressionId for this expression.



146
147
148
149
# File 'lib/ruote/exp/flow_expression.rb', line 146

def fei

  Ruote::FlowExpressionId.new(h.fei)
end

- (Object) handle_on_error(msg, error)

Looks up parent with on_error attribute and triggers it



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/ruote/exp/ro_on_x.rb', line 68

def handle_on_error(msg, error)

  return false if h.state == 'failing'

  err = deflate(error)
  oe_parent = lookup_on_error(err)

  return false unless oe_parent
    # no parent with on_error attribute found

  handler = oe_parent.local_on_error(err)

  return false if handler.to_s == ''
    # empty on_error handler nullifies ancestor's on_error

  workitem = msg['workitem']
  workitem['fields']['__error__'] = err

  immediate =
    if handler.is_a?(String)
      !! handler.match(/^!/)
    elsif handler.is_a?(Array)
      !! handler.first.to_s.match(/^!/)
    else
      false
    end

  # NOTE: why not pass the handler in the msg?
  #       no, because of HandlerEntry (not JSON serializable)

  @context.storage.put_msg(
    'fail',
    'fei' => oe_parent.h.fei,
    'workitem' => workitem,
    'immediate' => immediate)

  true # yes, error is being handled.
end

- (Object) has_attribute(*args) Also known as: has_att

Given a list of attribute names, returns the first attribute name for which there is a value.



37
38
39
40
41
42
# File 'lib/ruote/exp/ro_attributes.rb', line 37

def has_attribute(*args)

  args.each { |a| a = a.to_s; return a if attributes[a] != nil }

  nil
end

- (Object) initial_persist

Persists and fetches the _rev identifier from the storage.

Only used by the worker when creating the expression.



48
49
50
51
52
53
54
55
56
57
# File 'lib/ruote/exp/ro_persist.rb', line 48

def initial_persist

  r = @context.storage.put(@h, :update_rev => true)

  #t = Thread.current.object_id.to_s[-3..-1]
  #puts "+ per #{debug_id} #{tree[0]} r#{h._rev} t#{t} -> #{r.class}"
  #Ruote.p_caller('+ per')

  raise_or_return('initial_persist failed', r)
end

- (Boolean) is_concurrent?

Concurrent expressions (expressions that apply more than one child at a time) are supposed to return true here.

Returns:

  • (Boolean)


233
234
235
236
# File 'lib/ruote/exp/flow_expression.rb', line 233

def is_concurrent?

  false
end

- (Object) iterative_var_lookup(k)

TODO : rdoc rewrite needed

This method is mostly used by the worker when looking up a process name or participant name bound under a variable.



119
120
121
122
123
124
125
126
# File 'lib/ruote/exp/ro_variables.rb', line 119

def iterative_var_lookup(k)

  v = lookup_variable(k)

  return [ k, v ] unless (v.is_a?(String) or v.is_a?(Symbol))

  iterative_var_lookup(v)
end

- (Object) launch_sub(pos, subtree, opts = {})

Launches a subprocesses (usually called from the #apply of certain expression implementations.



831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
# File 'lib/ruote/exp/flow_expression.rb', line 831

def launch_sub(pos, subtree, opts={})

  i = h.fei.merge(
    'subid' => Ruote.generate_subid(h.fei.inspect),
    'expid' => pos)

  if ci = opts[:child_id]
    i['subid'] = "#{i['subid']}k#{ci}"
  end

  #p '=== launch_sub ==='
  #p [ :launcher, h.fei['expid'], h.fei['subid'], h.fei['wfid'] ]
  #p [ :launched, i['expid'], i['subid'], i['wfid'] ]

  forget = opts[:forget]

  register_child(i) unless forget

  variables = (
    forget ? compile_variables : {}
  ).merge!(opts[:variables] || {})

  @context.storage.put_msg(
    'launch',
    'fei' => i,
    'parent_id' => forget ? nil : h.fei,
    'tree' => subtree,
    'workitem' => opts[:workitem] || h.applied_workitem,
    'variables' => variables,
    'forgotten' => forget)
end

- (Object) lookup_val(att_options = {})



96
97
98
99
100
101
102
103
# File 'lib/ruote/exp/ro_attributes.rb', line 96

def lookup_val(att_options={})

  lval(
    VV,
    s_cartesian(%w[ v var variable ], VV),
    s_cartesian(%w[ f fld field ], VV),
    att_options)
end

- (Object) lookup_val_prefix(prefix, att_options = {})

prefix = 'on' => will lookup on, on_val, on_value, on_v, on_var, on_variable, on_f, on_fld, on_field…



87
88
89
90
91
92
93
94
# File 'lib/ruote/exp/ro_attributes.rb', line 87

def lookup_val_prefix(prefix, att_options={})

  lval(
    [ prefix ] + [ 'val', 'value' ].map { |s| "#{prefix}_#{s}" },
    %w[ v var variable ].map { |s| "#{prefix}_#{s}" },
    %w[ f fld field ].map { |s| "#{prefix}_#{s}" },
    att_options)
end

- (Object) lookup_variable(var, prefix = nil) Also known as: v, lv

Looks up the value of a variable in expression tree (seen from a leaf, it looks more like a stack than a tree)



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/ruote/exp/ro_variables.rb', line 55

def lookup_variable(var, prefix=nil)

  var, prefix = split_prefix(var, prefix)

  if prefix == '//'
    return @context.storage.get_engine_variable(var)
  end

  if prefix == '/' && par = parent
    return par.lookup_variable(var, prefix)
  end

  if h.variables and Ruote.has_key?(h.variables, var)
    return Ruote.lookup(h.variables, var)
  end

  if h.parent_id && h.parent_id['engine_id'] == @context.engine_id
    #
    # do not lookup variables in a remote engine ...

    (return parent.lookup_variable(var, prefix)) rescue nil
      # if the lookup fails (parent gone) then rescue and let go
  end

  k = var.split('.').first
  vars = { k => @context.storage.get_engine_variable(k) }
    #
    # engine var lookup might be expensive, only lookup the var we need

  Ruote.lookup(vars, var)
end

- (Object) name

Returns the name of this expression, like 'sequence', 'participant', 'cursor', etc…



912
913
914
915
# File 'lib/ruote/exp/flow_expression.rb', line 912

def name

  tree[0]
end

- (Object) parent

Fetches the parent expression, or returns nil if there is no parent expression.



180
181
182
183
184
185
# File 'lib/ruote/exp/flow_expression.rb', line 180

def parent

  h.parent_id ?
    Ruote::Exp::FlowExpression.fetch(@context, h.parent_id) :
    nil
end

- (Object) parent_id

Returns the Ruote::FlowExpressionIf of the parent expression, or nil if there is no parent expression.



162
163
164
165
166
167
# File 'lib/ruote/exp/flow_expression.rb', line 162

def parent_id

  h.parent_id ?
    Ruote::FlowExpressionId.new(h.parent_id) :
    nil
end

- (Object) pause_on_apply(msg)

Called by #do_apply when msg == 'paused'. Covers the “apply/launch it but it's immediately paused” case. Freezes the apply message in h.paused_apply and saves the expression.



421
422
423
424
425
426
427
428
429
# File 'lib/ruote/exp/flow_expression.rb', line 421

def pause_on_apply(msg)

  msg['state'] = nil

  h.state = 'paused'
  h.paused_apply = msg

  persist_or_raise
end

- (Object) persist_or_raise Also known as: persist



91
92
93
94
# File 'lib/ruote/exp/ro_persist.rb', line 91

def persist_or_raise

  p_or_raise(true)
end

- (Object) reply(workitem)

A default implementation for all the expressions.



659
660
661
662
# File 'lib/ruote/exp/flow_expression.rb', line 659

def reply(workitem)

  reply_to_parent(workitem)
end

- (Object) reply_to_parent(workitem, delete = true)

FlowExpression call this method when they're done and they want their parent expression to take over (it will end up calling the #reply of the parent expression).

Expression implementations are free to override this method. The common behaviour is in #do_reply_to_parent.



470
471
472
473
# File 'lib/ruote/exp/flow_expression.rb', line 470

def reply_to_parent(workitem, delete=true)

  do_reply_to_parent(workitem, delete)
end

- (Object) root(stubborn = false)

An expensive method, looks up all the expressions with the same wfid in the storage (for some storages this is not expensive at all), and determine the root of this expression. It does this by recursively going from an expression to its parent, starting with this expression. The root is when the parent can't be reached.

By default, this method will always return an expression, but if stubborn is set to true and the top expression points to a gone parent then nil will be returned. The default (stubborn=true) is probably what you want anyway.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/ruote/exp/flow_expression.rb', line 199

def root(stubborn=false)

  previous = nil
  current = @h

  exps = @context.storage.find_expressions(
    h.fei['wfid']
  ).each_with_object({}) { |exp, h|
    h[exp['fei']] = exp
  }

  while current && current['parent_id']
    previous = current
    current = exps[current['parent_id']]
  end

  current ||= previous unless stubborn

  current ? Ruote::Exp::FlowExpression.from_h(@context, current) : nil
end

- (Object) root_id(stubborn = false)

Returns the fei of the root expression of this expression. The result is an instance of Ruote::FlowExpressionId.

Uses #root behind the scenes, hence the stubborn option.



225
226
227
228
# File 'lib/ruote/exp/flow_expression.rb', line 225

def root_id(stubborn=false)

  root(stubborn).fei
end

- (Object) set_variable(var, val, override = false)

Sets a variable to a given value. (will set at the appropriate level).



98
99
100
101
102
103
# File 'lib/ruote/exp/ro_variables.rb', line 98

def set_variable(var, val, override=false)

  fexp, v = locate_set_var(var, override) || locate_var(var)

  fexp.un_set_variable(:set, v, val, (fexp.h.fei != h.fei)) if fexp
end

- (Object) tree

Returns the current version of the tree (returns the updated version if it got updated.



883
884
885
886
# File 'lib/ruote/exp/flow_expression.rb', line 883

def tree

  h.updated_tree || h.original_tree
end

- (Object) tree_children

Returns the “AST” view on the children of this expression…



927
928
929
930
# File 'lib/ruote/exp/flow_expression.rb', line 927

def tree_children

  tree[2]
end

- (Object) try_persist



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ruote/exp/ro_persist.rb', line 59

def try_persist

  r = @context.storage.put(@h)

  #t = Thread.current.object_id.to_s[-3..-1]
  #puts "+ per #{debug_id} #{tree[0]} r#{h._rev} t#{t} -> #{r.class}"
  #p self.h.children.collect { |i| Ruote.sid(i) }
  #Ruote.p_caller('+ per')

  r
end

- (Object) try_unpersist



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/ruote/exp/ro_persist.rb', line 71

def try_unpersist

  r = @context.storage.delete(@h)

  #t = Thread.current.object_id.to_s[-3..-1]
  #puts "- unp #{debug_id} #{tree[0]} r#{h._rev} t#{t} -> #{r.class}"
  #Ruote.p_caller('- unp')

  return r if r

  #if h.has_error
  err = @context.storage.get('errors', "err_#{Ruote.to_storage_id(h.fei)}")
  @context.storage.delete(err) if err
  #end
    # removes any error in the journal for this expression
    # since it will now be gone, no need to keep track of its errors

  nil
end

- (Object) unpersist_or_raise Also known as: unpersist



96
97
98
99
# File 'lib/ruote/exp/ro_persist.rb', line 96

def unpersist_or_raise

  p_or_raise(false)
end

- (Object) unset_variable(var, override = false)

Unbinds a variables.



107
108
109
110
111
112
# File 'lib/ruote/exp/ro_variables.rb', line 107

def unset_variable(var, override=false)

  fexp, v = locate_set_var(var, override) || locate_var(var)

  fexp.un_set_variable(:unset, v, nil, (fexp.h.fei != h.fei)) if fexp
end

- (Object) update_tree(t = nil)

Updates the tree of this expression

update_tree(t)

will set the updated tree to t

update_tree

will copy (deep copy) the original tree as the updated_tree.

Adding a child to a sequence expression :

seq.update_tree
seq.updated_tree[2] << [ 'participant', { 'ref' => 'bob' }, [] ]
seq.do_persist


904
905
906
907
# File 'lib/ruote/exp/flow_expression.rb', line 904

def update_tree(t=nil)

  h.updated_tree = t || Ruote.fulldup(h.original_tree)
end

- (Object) variables

A shortcut to the variables held in the expression (nil if none held).



35
36
37
38
# File 'lib/ruote/exp/ro_variables.rb', line 35

def variables

  @h['variables']
end

- (Object) wfid

Returns the workflow instance id of the workflow this expression belongs to.



154
155
156
157
# File 'lib/ruote/exp/flow_expression.rb', line 154

def wfid

  h.fei['wfid']
end