Class: Ruote::StorageParticipant

Inherits:
Object
  • Object
show all
Includes:
Enumerable, LocalParticipant
Defined in:
lib/ruote/part/storage_participant.rb

Overview

A participant that stores the workitem in the same storage used by the engine and the worker(s).

part = engine.register_participant 'alfred', Ruote::StorageParticipant

# ... a bit later

puts "workitems still open : "
part.each do |workitem|
  puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end

# ... when done with a workitem

part.reply(workitem)
  # this will remove the workitem from the storage and hand it back
  # to the engine

Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from LocalParticipant

#re_dispatch, #unschedule_re_dispatch

Methods included from ReceiverMixin

#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply_to_engine, #sign

Constructor Details

- (StorageParticipant) initialize(engine_or_options = {}, options = nil)

A new instance of StorageParticipant



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/ruote/part/storage_participant.rb', line 60

def initialize(engine_or_options={}, options=nil)

  if engine_or_options.respond_to?(:context)
    @context = engine_or_options.context
  elsif engine_or_options.is_a?(Ruote::Context)
    @context = engine_or_options
  else
    @options = engine_or_options
  end

  @options ||= {}

  @store_name = @options['store_name']
end

Instance Attribute Details

- (Object) context

Returns the value of attribute context



58
59
60
# File 'lib/ruote/part/storage_participant.rb', line 58

def context
  @context
end

Class Method Details

+ (Boolean) matches?(hwi, pname, criteria)

Used by #query when filtering workitems.

Returns:

  • (Boolean)


280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/ruote/part/storage_participant.rb', line 280

def self.matches?(hwi, pname, criteria)

  return false if pname && hwi['participant_name'] != pname

  fields = hwi['fields']

  criteria.each do |fname, fvalue|
    return false if fields[fname] != fvalue
  end

  true
end

Instance Method Details

- (Object) [](fei)



106
107
108
109
110
111
# File 'lib/ruote/part/storage_participant.rb', line 106

def [](fei)

  doc = fetch(fei)

  doc ? Ruote::Workitem.new(doc) : nil
end

- (Object) all(opts = {})

Returns all the workitems stored in here.



156
157
158
159
# File 'lib/ruote/part/storage_participant.rb', line 156

def all(opts={})

  fetch_all(opts).map { |hwi| Ruote::Workitem.new(hwi) }
end

- (Object) by_field(field, value = nil)

field : returns all the workitems with the given field name present.

field and value : returns all the workitems with the given field name and the given value for that field.

Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/ruote/part/storage_participant.rb', line 207

def by_field(field, value=nil)

  hwis = if @context.storage.respond_to?(:by_field)

    @context.storage.by_field('workitems', field, value)

  else

    fetch_all.select { |hwi|
      hwi['fields'].keys.include?(field) &&
      (value.nil? || hwi['fields'][field] == value)
    }
  end

  hwis.collect { |hwi| Ruote::Workitem.new(hwi) }
end

- (Object) by_participant(participant_name, opts = {})

Returns all workitems for the specified participant name



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/ruote/part/storage_participant.rb', line 182

def by_participant(participant_name, opts={})

  hwis = if @context.storage.respond_to?(:by_participant)

    @context.storage.by_participant('workitems', participant_name, opts)

  else

    fetch_all(opts).select { |wi|
      wi['participant_name'] == participant_name
    }
  end

  hwis.collect { |hwi| Ruote::Workitem.new(hwi) }
end

- (Object) by_wfid(wfid)

Return all workitems for the specified wfid



173
174
175
176
177
178
# File 'lib/ruote/part/storage_participant.rb', line 173

def by_wfid(wfid)

  @context.storage.get_many('workitems', wfid).collect { |hwi|
    Ruote::Workitem.new(hwi)
  }
end

- (Object) cancel(fei, flavour)

Removes the document/workitem from the storage



97
98
99
100
101
102
103
104
# File 'lib/ruote/part/storage_participant.rb', line 97

def cancel(fei, flavour)

  doc = fetch(fei)

  r = @context.storage.delete(doc)

  cancel(fei, flavour) if r != nil
end

- (Object) consume(workitem) Also known as: update



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/ruote/part/storage_participant.rb', line 79

def consume(workitem)

  doc = workitem.to_h

  doc.merge!(
    'type' => 'workitems',
    '_id' => to_id(doc['fei']),
    'participant_name' => doc['participant_name'],
    'wfid' => doc['fei']['wfid'])

  doc['store_name'] = @store_name if @store_name

  @context.storage.put(doc)
end

- (Object) do_not_thread

No need for a separate thread when delivering to this participant.



77
# File 'lib/ruote/part/storage_participant.rb', line 77

def do_not_thread; true; end

- (Object) each(&block)

Iterates over the workitems stored in here.



149
150
151
152
# File 'lib/ruote/part/storage_participant.rb', line 149

def each(&block)

  all.each { |wi| block.call(wi) }
end

- (Object) fetch(fei)



113
114
115
116
117
118
# File 'lib/ruote/part/storage_participant.rb', line 113

def fetch(fei)

  hfei = Ruote::FlowExpressionId.extract_h(fei)

  @context.storage.get('workitems', to_id(hfei))
end

- (Object) first

A convenience method (especially when testing), returns the first (only ?) workitem in the participant.



164
165
166
167
168
169
# File 'lib/ruote/part/storage_participant.rb', line 164

def first

  hwi = fetch_all.first

  hwi ? Ruote::Workitem.new(hwi) : nil
end

- (Object) per_participant

Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.



296
297
298
299
# File 'lib/ruote/part/storage_participant.rb', line 296

def per_participant

  inject({}) { |h, wi| (h[wi.participant_name] ||= []) << wi; h }
end

- (Object) per_participant_count

Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.



305
306
307
308
# File 'lib/ruote/part/storage_participant.rb', line 305

def per_participant_count

  per_participant.inject({}) { |h, (k, v)| h[k] = v.size; h }
end

- (Object) purge!

Cleans this participant out completely



273
274
275
276
# File 'lib/ruote/part/storage_participant.rb', line 273

def purge!

  fetch_all.each { |hwi| @context.storage.delete(hwi) }
end

- (Object) query(criteria)

Queries the store participant for workitems.

Some examples :

part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size

There are two 'reserved' criterion : 'wfid' and 'participant' ('participant_name' as well). The rest of the criteria are considered constraints for fields.

'offset' and 'limit' are reserved as well. They should prove useful for pagination. 'skip' can be used instead of 'offset'.

Note : the criteria is AND only, you'll have to do ORs (aggregation) by yourself.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/ruote/part/storage_participant.rb', line 243

def query(criteria)

  cr = criteria.inject({}) { |h, (k, v)| h[k.to_s] = v; h }

  if @context.storage.respond_to?(:query_workitems)
    return @context.storage.query_workitems(cr)
  end

  opts = {}
  opts[:skip] = cr.delete('offset') || cr.delete('skip')
  opts[:limit] = cr.delete('limit')
  opts[:count] = cr.delete('count')

  wfid = cr.delete('wfid')
  pname = cr.delete('participant_name') || cr.delete('participant')

  hwis = wfid ?
    @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts)

  return hwis if opts[:count]

  hwis.select { |hwi|
    Ruote::StorageParticipant.matches?(hwi, pname, cr)
  }.collect { |hwi|
    Ruote::Workitem.new(hwi)
  }
end

- (Object) reply(workitem)

Removes the workitem from the in-memory hash and replies to the engine.

TODO : should it raise if the workitem can't be found ? TODO : should it accept just the fei ?



125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/ruote/part/storage_participant.rb', line 125

def reply(workitem)

  # TODO: change method name (receiver mess cleanup)

  doc = fetch(Ruote::FlowExpressionId.extract_h(workitem))

  r = @context.storage.delete(doc)

  return reply(workitem) if r != nil

  workitem.h.delete('_rev')

  reply_to_engine(workitem)
end

- (Object) size

Returns the count of workitems stored in this participant.



142
143
144
145
# File 'lib/ruote/part/storage_participant.rb', line 142

def size

  fetch_all(:count => true)
end