Class: Ruote::StorageParticipant
- Inherits:
-
Object
- Object
- Ruote::StorageParticipant
- Includes:
- Enumerable, LocalParticipant
- Defined in:
- lib/ruote/part/storage_participant.rb
Overview
A participant that stores the workitem in the same storage used by the engine and the worker(s).
part = engine.register_participant 'alfred', Ruote::StorageParticipant
# ... a bit later
puts "workitems still open : "
part.each do |workitem|
puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end
# ... when done with a workitem
part.reply(workitem)
# this will remove the workitem from the storage and hand it back
# to the engine
Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).
Instance Attribute Summary
Attributes included from LocalParticipant
#context, #error, #fei, #flavour, #msg, #workitem
Class Method Summary collapse
-
.matches?(hwi, pname, criteria) ⇒ Boolean
Used by #query when filtering workitems.
Instance Method Summary collapse
-
#[](fei) ⇒ Object
(also: #by_fei)
Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).
-
#all(opts = {}) ⇒ Object
Returns all the workitems stored in here.
-
#by_field(field, value = nil, opts = {}) ⇒ Object
field : returns all the workitems with the given field name present.
-
#by_participant(participant_name, opts = {}) ⇒ Object
Returns all workitems for the specified participant name.
-
#by_wfid(wfid, opts = {}) ⇒ Object
Return all workitems for the specified wfid.
-
#delegate(workitem, new_owner) ⇒ Object
Delegates a currently owned workitem to a new owner.
-
#do_not_thread ⇒ Object
No need for a separate thread when delivering to this participant.
- #do_update(workitem = @workitem) ⇒ Object
-
#each(&block) ⇒ Object
Iterates over the workitems stored in here.
-
#first ⇒ Object
A convenience method (especially when testing), returns the first (only ?) workitem in the participant.
-
#flunk(workitem, err_class_or_instance, *err_arguments) ⇒ Object
Removes the workitem and hands it back to the flow with an error to raise for the participant expression that emitted the workitem.
-
#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant
constructor
A new instance of StorageParticipant.
-
#on_cancel ⇒ Object
Removes the document/workitem from the storage.
-
#on_workitem ⇒ Object
This is the method called by ruote when passing a workitem to this participant.
-
#per_participant ⇒ Object
Mostly a test method.
-
#per_participant_count ⇒ Object
Mostly a test method.
-
#proceed(workitem) ⇒ Object
Removes the workitem from the storage and replies to the engine.
-
#purge! ⇒ Object
Cleans this participant out completely.
-
#query(criteria) ⇒ Object
Queries the store participant for workitems.
-
#reply(workitem) ⇒ Object
(soon to be removed).
-
#reserve(workitem_or_fei, owner) ⇒ Object
Claims a workitem.
-
#size ⇒ Object
Returns the count of workitems stored in this participant.
-
#update(workitem) ⇒ Object
Used by client code when “saving” a workitem (fields may have changed).
Methods included from Enumerable
Methods included from LocalParticipant
#_accept?, #_dont_thread?, #_on_cancel, #_on_reply, #_on_workitem, #_rtimeout, #applied_workitem, #fexp, #is_cancelled?, #is_gone?, #lookup_variable, #participant_name, #re_dispatch, #reply_to_engine, #unschedule_re_dispatch
Methods included from ReceiverMixin
#fetch_flow_expression, #fetch_workitem, #launch, #receive, #sign
Constructor Details
#initialize(engine_or_options = {}, options = nil) ⇒ StorageParticipant
Returns a new instance of StorageParticipant.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/ruote/part/storage_participant.rb', line 58 def initialize(={}, =nil) if .respond_to?(:context) @context = .context elsif .is_a?(Ruote::Context) @context = else = end ||= {} @store_name = ['store_name'] end |
Class Method Details
.matches?(hwi, pname, criteria) ⇒ Boolean
Used by #query when filtering workitems.
331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/ruote/part/storage_participant.rb', line 331 def self.matches?(hwi, pname, criteria) return false if pname && hwi['participant_name'] != pname fields = hwi['fields'] criteria.each do |fname, fvalue| return false if fields[fname] != fvalue end true end |
Instance Method Details
#[](fei) ⇒ Object Also known as: by_fei
Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).
144 145 146 147 148 149 |
# File 'lib/ruote/part/storage_participant.rb', line 144 def [](fei) doc = fetch(fei) doc ? Ruote::Workitem.new(doc) : nil end |
#all(opts = {}) ⇒ Object
Returns all the workitems stored in here.
209 210 211 212 213 214 |
# File 'lib/ruote/part/storage_participant.rb', line 209 def all(opts={}) res = fetch_all(opts) res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res end |
#by_field(field, value = nil, opts = {}) ⇒ Object
field : returns all the workitems with the given field name present.
field and value : returns all the workitems with the given field name and the given value for that field.
Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.
257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/ruote/part/storage_participant.rb', line 257 def by_field(field, value=nil, opts={}) (value, opts = nil, value) if value.is_a?(Hash) if @context.storage.respond_to?(:by_field) return @context.storage.by_field('workitems', field, value, opts) end do_select(opts) do |hwi| hwi['fields'].keys.include?(field) && (value.nil? || hwi['fields'][field] == value) end end |
#by_participant(participant_name, opts = {}) ⇒ Object
Returns all workitems for the specified participant name
237 238 239 240 241 242 243 244 245 246 |
# File 'lib/ruote/part/storage_participant.rb', line 237 def by_participant(participant_name, opts={}) return @context.storage.by_participant( 'workitems', participant_name, opts ) if @context.storage.respond_to?(:by_participant) do_select(opts) do |hwi| hwi['participant_name'] == participant_name end end |
#by_wfid(wfid, opts = {}) ⇒ Object
Return all workitems for the specified wfid
226 227 228 229 230 231 232 233 |
# File 'lib/ruote/part/storage_participant.rb', line 226 def by_wfid(wfid, opts={}) if @context.storage.respond_to?(:by_wfid) return @context.storage.by_wfid('workitems', wfid, opts) end wis(@context.storage.get_many('workitems', wfid, opts)) end |
#delegate(workitem, new_owner) ⇒ Object
Delegates a currently owned workitem to a new owner.
Fails if the workitem can’t be found, belongs to noone, or if the workitem passed as argument is out of date (got modified in the mean time).
It’s OK to delegate to nil, thus freeing the workitem.
See #reserve for an an explanation of the reserve/delegate/proceed flow.
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/ruote/part/storage_participant.rb', line 405 def delegate(workitem, new_owner) hwi = fetch(workitem) fail ArgumentError.new( "workitem not found" ) if hwi == nil fail ArgumentError.new( "cannot delegate, workitem doesn't belong to anyone" ) if hwi['owner'] == nil fail ArgumentError.new( "cannot delegate, " + "workitem owned by '#{hwi['owner']}', not '#{workitem.owner}'" ) if hwi['owner'] != workitem.owner hwi['owner'] = new_owner r = @context.storage.put(hwi, :update_rev => true) fail ArgumentError.new("workitem is gone") if r == true fail ArgumentError.new("workitem got modified meanwhile") if r != nil Workitem.new(hwi) end |
#do_not_thread ⇒ Object
No need for a separate thread when delivering to this participant.
75 |
# File 'lib/ruote/part/storage_participant.rb', line 75 def do_not_thread; true; end |
#do_update(workitem = @workitem) ⇒ Object
Added for groups.google.com/forum/?fromgroups#!topic/openwferu-users/5bpV2yfKwM0
Makes sure the workitem get saved to the storage. Fails if the workitem is already gone. Returns nil in case of success.
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/ruote/part/storage_participant.rb', line 115 def do_update(workitem=@workitem) r = update(workitem) fail ArgumentError.new("workitem is gone") if r == true return nil if r.nil? r.h['fields'] = workitem.fields do_update(r) end |
#each(&block) ⇒ Object
Iterates over the workitems stored in here.
202 203 204 205 |
# File 'lib/ruote/part/storage_participant.rb', line 202 def each(&block) all.each { |wi| block.call(wi) } end |
#first ⇒ Object
A convenience method (especially when testing), returns the first (only ?) workitem in the participant.
219 220 221 222 |
# File 'lib/ruote/part/storage_participant.rb', line 219 def first wi(fetch_all({}).first) end |
#flunk(workitem, err_class_or_instance, *err_arguments) ⇒ Object
Removes the workitem and hands it back to the flow with an error to raise for the participant expression that emitted the workitem.
169 170 171 172 173 174 175 176 177 178 |
# File 'lib/ruote/part/storage_participant.rb', line 169 def flunk(workitem, err_class_or_instance, *err_arguments) r = remove_workitem('reject', workitem) return flunk(workitem) if r != nil workitem.h.delete('_rev') super(workitem, err_class_or_instance, *err_arguments) end |
#on_cancel ⇒ Object
Removes the document/workitem from the storage.
Warning: this method is called by the engine (worker), i.e. not by you.
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/ruote/part/storage_participant.rb', line 130 def on_cancel doc = fetch(fei) return unless doc r = @context.storage.delete(doc) on_cancel(fei, flavour) if r != nil end |
#on_workitem ⇒ Object
This is the method called by ruote when passing a workitem to this participant.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/ruote/part/storage_participant.rb', line 80 def on_workitem doc = workitem.to_h doc.merge!( 'type' => 'workitems', '_id' => to_id(doc['fei']), 'participant_name' => doc['participant_name'], 'wfid' => doc['fei']['wfid']) doc['store_name'] = @store_name if @store_name @context.storage.put(doc) end |
#per_participant ⇒ Object
Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.
347 348 349 350 |
# File 'lib/ruote/part/storage_participant.rb', line 347 def per_participant each_with_object({}) { |wi, h| (h[wi.participant_name] ||= []) << wi } end |
#per_participant_count ⇒ Object
Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.
356 357 358 359 |
# File 'lib/ruote/part/storage_participant.rb', line 356 def per_participant_count per_participant.remap { |(k, v), h| h[k] = v.size } end |
#proceed(workitem) ⇒ Object
Removes the workitem from the storage and replies to the engine.
155 156 157 158 159 160 161 162 163 164 |
# File 'lib/ruote/part/storage_participant.rb', line 155 def proceed(workitem) r = remove_workitem('proceed', workitem) return proceed(workitem) if r != nil workitem.h.delete('_rev') reply_to_engine(workitem) end |
#purge! ⇒ Object
Cleans this participant out completely
324 325 326 327 |
# File 'lib/ruote/part/storage_participant.rb', line 324 def purge! fetch_all({}).each { |hwi| @context.storage.delete(hwi) } end |
#query(criteria) ⇒ Object
Queries the store participant for workitems.
Some examples :
part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size
There are two ‘reserved’ criterion : ‘wfid’ and ‘participant’ (‘participant_name’ as well). The rest of the criteria are considered constraints for fields.
‘offset’ and ‘limit’ are reserved as well. They should prove useful for pagination. ‘skip’ can be used instead of ‘offset’.
Note : the criteria is AND only, you’ll have to do ORs (aggregation) by yourself.
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/ruote/part/storage_participant.rb', line 290 def query(criteria) cr = Ruote.keys_to_s(criteria) if @context.storage.respond_to?(:query_workitems) return @context.storage.query_workitems(cr) end opts = {} opts[:skip] = cr.delete('offset') || cr.delete('skip') opts[:limit] = cr.delete('limit') opts[:count] = cr.delete('count') wfid = cr.delete('wfid') count = opts[:count] pname = cr.delete('participant_name') || cr.delete('participant') opts.delete(:count) if pname hwis = wfid ? @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts) return hwis unless hwis.is_a?(Array) hwis = hwis.select { |hwi| Ruote::StorageParticipant.matches?(hwi, pname, cr) } count ? hwis.size : wis(hwis) end |
#reply(workitem) ⇒ Object
(soon to be removed)
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/ruote/part/storage_participant.rb', line 182 def reply(workitem) puts '-' * 80 puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)' puts ' instead of #reply(wi) which is deprecated' #caller.each { |l| puts l } puts '-' * 80 proceed(workitem) end |
#reserve(workitem_or_fei, owner) ⇒ Object
Claims a workitem. Returns the [updated] workitem if successful.
Returns nil if the workitem is already reserved.
Fails if the workitem can’t be found, is gone, or got modified elsewhere.
Here is a mini-diagram explaining the reserve/delegate/proceed flow:
in delegate(nil) delegate(other)
| +---------------+ +------+
v v | | v
+-------+ reserve +----------+ proceed
| ready | ---------> | reserved | ---------> out
+-------+ +----------+
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/ruote/part/storage_participant.rb', line 377 def reserve(workitem_or_fei, owner) hwi = fetch(workitem_or_fei) fail ArgumentError.new("workitem not found") if hwi.nil? return nil if hwi['owner'] && hwi['owner'] != owner hwi['owner'] = owner r = @context.storage.put(hwi, :update_rev => true) fail ArgumentError.new("workitem is gone") if r == true fail ArgumentError.new("workitem got modified meanwhile") if r != nil Workitem.new(hwi) end |
#size ⇒ Object
Returns the count of workitems stored in this participant.
195 196 197 198 |
# File 'lib/ruote/part/storage_participant.rb', line 195 def size fetch_all(:count => true) end |
#update(workitem) ⇒ Object
Used by client code when “saving” a workitem (fields may have changed). Calling #update won’t proceed the workitem.
Returns nil in case of success, true if the workitem is already gone and the newer version of the workitem if the workitem changed in the mean time.
102 103 104 105 106 107 |
# File 'lib/ruote/part/storage_participant.rb', line 102 def update(workitem) r = @context.storage.put(workitem.h) r.is_a?(Hash) ? Ruote::Workitem.new(r) : r end |