Module: Ruote::LocalParticipant

Includes:
ReceiverMixin
Included in:
BlockParticipant, CodeParticipant, EngineParticipant, Participant, RevParticipant, SmtpParticipant, StorageParticipant
Defined in:
lib/ruote/part/local_participant.rb

Overview

Provides methods for 'local' participants.

Assumes the class that includes this module has a #context method that points to the worker or engine ruote context.

It's “local” because it has access to the ruote storage.

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from ReceiverMixin

#fetch_flow_expression, #fetch_workitem, #flunk, #launch, #receive, #sign

Instance Attribute Details

- (Object) context

The engine context, it's a local participant so it knows about the context in which the engine operates…



46
47
48
# File 'lib/ruote/part/local_participant.rb', line 46

def context
  @context
end

- (Object) error

Set right before a call to #on_error



62
63
64
# File 'lib/ruote/part/local_participant.rb', line 62

def error
  @error
end

- (Object) fei

Returns the current fei (Ruote::FlowExpressionId).



83
84
85
86
# File 'lib/ruote/part/local_participant.rb', line 83

def fei

  @fei ? @fei : @workitem.fei
end

- (Object) flavour

Usually set right before a call to #on_cancel or #cancel



58
59
60
# File 'lib/ruote/part/local_participant.rb', line 58

def flavour
  @flavour
end

- (Object) msg

Set right before a call to #on_error



66
67
68
# File 'lib/ruote/part/local_participant.rb', line 66

def msg
  @msg
end

- (Object) workitem(fei = nil)

Returns the current workitem if no fei is given. If a fei is given, it will return the applied workitem for that fei (if any).

The optional fei is mostly here for backward compatibility (with 2.2.0)



74
75
76
77
78
79
# File 'lib/ruote/part/local_participant.rb', line 74

def workitem(fei=nil)

  return fetch_workitem(fei) if fei

  @workitem ? @workitem : applied_workitem
end

Instance Method Details

- (Boolean) _accept?(wi)

Test shortcut, alleviates the need to set the workitem before calling accept?



274
275
276
# File 'lib/ruote/part/local_participant.rb', line 274

def _accept?(wi)
  Ruote.participant_send(self, :accept?, 'workitem' => wi)
end

- (Boolean) _dont_thread?(wi) Also known as: _do_not_thread, _do_not_thread?

Test shortcut, alleviates the need to set the workitem before calling dont_thread?, do_not_thread? or do_not_thread.



281
282
283
284
285
286
# File 'lib/ruote/part/local_participant.rb', line 281

def _dont_thread?(wi)
  Ruote.participant_send(
    self,
    [ :dont_thread?, :do_not_thread?, :do_not_thread ],
    'workitem' => wi)
end

- (Object) _on_cancel(fei, flavour) Also known as: _cancel

Test shortcut, alleviates the need to set fei and flavour before calling cancel / on_consume.



258
259
260
261
# File 'lib/ruote/part/local_participant.rb', line 258

def _on_cancel(fei, flavour)
  Ruote.participant_send(
    self, [ :on_cancel, :cancel ], 'fei' => fei, 'flavour' => flavour)
end

- (Object) _on_reply(wi)

Test shortcut, alleviates the need to set the workitem before calling on_reply.



267
268
269
# File 'lib/ruote/part/local_participant.rb', line 267

def _on_reply(wi)
  Ruote.participant_send(self, :on_reply, 'workitem' => wi)
end

- (Object) _on_workitem(wi) Also known as: _consume

Test shortcut, alleviates the need to set the workitem before calling consume / on_workitem.



249
250
251
252
# File 'lib/ruote/part/local_participant.rb', line 249

def _on_workitem(wi)
  Ruote.participant_send(
    self, [ :on_workitem, :consume ], 'workitem' => wi)
end

- (Object) _rtimeout(wi)

Test shortcut, alleviates the need to set the workitem before calling rtimeout.



293
294
295
# File 'lib/ruote/part/local_participant.rb', line 293

def _rtimeout(wi)
  Ruote.participant_send(self, :rtimeout, 'workitem' => wi)
end

- (Object) applied_workitem(_fei = nil)

Returns the workitem as was applied when the Ruote::ParticipantExpression was reached.

If the _fei arg is specified, it will return the corresponding applied workitem. This args is mostly here for backward compatibility.



105
106
107
108
# File 'lib/ruote/part/local_participant.rb', line 105

def applied_workitem(_fei=nil)

  fetch_workitem(_fei || fei)
end

- (Object) fexp(wi_or_fei = nil)

Returns the Ruote::ParticipantExpression that corresponds with this participant.

If a wi_or_fei arg is given, will return the corresponding flow expression. This arg is mostly here for backward compatibility.



94
95
96
97
# File 'lib/ruote/part/local_participant.rb', line 94

def fexp(wi_or_fei=nil)

  flow_expression(wi_or_fei || fei)
end

- (Boolean) is_cancelled? Also known as: is_canceled?

Returns true if the underlying participant expression is gone or cancelling.



308
309
310
311
312
313
314
315
# File 'lib/ruote/part/local_participant.rb', line 308

def is_cancelled?

  if fe = fexp
    return fe.h.state == 'cancelling'
  else
    true
  end
end

- (Boolean) is_gone?

Returns true if the underlying participant expression is 'gone' (probably cancelled somehow).



300
301
302
303
# File 'lib/ruote/part/local_participant.rb', line 300

def is_gone?

  fexp.nil?
end

- (Object) lookup_variable(key)

A shortcut for

fexp.lookup_variable(key)


128
129
130
131
# File 'lib/ruote/part/local_participant.rb', line 128

def lookup_variable(key)

  fexp.lookup_variable(key)
end

- (Object) participant_name

Up until ruote 2.3.0, the participant name had to be fetched from the workitem. This is a shortcut, it lets you write participant code that look like

def on_workitem
  (workitem.fields['supervisors'] || []) << participant_name
  reply
end


119
120
121
122
# File 'lib/ruote/part/local_participant.rb', line 119

def participant_name

  workitem.participant_name
end

- (Object) re_dispatch(wi = nil, opts = nil) Also known as: reject

Use this method to re_dispatch the workitem.

It takes two options :in and :at for “later re_dispatch”.

Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.

Without one of those options, the method is a “reject”.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/ruote/part/local_participant.rb', line 156

def re_dispatch(wi=nil, opts=nil)

  wi, opts = [ nil, wi ] if wi.is_a?(Hash) && opts.nil?
  wi ||= workitem()
  opts ||= {}

  wi.h.re_dispatch_count = wi.h.re_dispatch_count.to_s.to_i + 1

  msg = {
    'action' => 'dispatch',
    'fei' => wi.h.fei,
    'workitem' => wi.to_h,
    'participant_name' => wi.participant_name
  }

  if t = opts[:in] || opts[:at]

    sched_id = @context.storage.put_schedule('at', wi.h.fei, t, msg)

    exp = fexp(wi)
    exp.h['re_dispatch_sched_id'] = sched_id
    exp.try_persist

  else

    @context.storage.put_msg('dispatch', msg)
  end
end

- (Object) reply_to_engine(wi = workitem) Also known as: reply

Participant implementations call this method when their #on_workitem (#consume) methods are done and they want to hand back the workitem to the engine so that the flow can resume.

the (wi=workitem) is mostly for backward compatibility (or for passing a totally different workitem to the engine).



140
141
142
143
# File 'lib/ruote/part/local_participant.rb', line 140

def reply_to_engine(wi=workitem)

  receive(wi)
end

- (Object) unschedule_re_dispatch(fei = nil)

Cancels the scheduled re_dispatch, if any.

An example of 'retrying participant' :

class RetryParticipant
  include Ruote::LocalParticipant

  def initialize(opts)
    @opts = opts
  end

  def on_workitem
    begin
      do_the_job
      reply
    rescue
      re_dispatch(:in => @opts['delay'] || '1s')
    end
  end

  def cancel
    unschedule_re_dispatch
  end
end

Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever…



213
214
215
216
217
218
# File 'lib/ruote/part/local_participant.rb', line 213

def unschedule_re_dispatch(fei=nil)

  if s = fexp.h.re_dispatch_sched_id
    @context.storage.delete_schedule(s)
  end
end