Class: Ruote::StorageParticipant
- Inherits:
-
Object
- Object
- Ruote::StorageParticipant
- Includes:
- Enumerable, LocalParticipant
- Defined in:
- lib/ruote/part/storage_participant.rb
Overview
A participant that stores the workitem in the same storage used by the engine and the worker(s).
part = engine.register_participant 'alfred', Ruote::StorageParticipant
# ... a bit later
puts "workitems still open : "
part.each do |workitem|
puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end
# ... when done with a workitem
part.reply(workitem)
# this will remove the workitem from the storage and hand it back
# to the engine
Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).
Instance Attribute Summary (collapse)
-
- (Object) context
Returns the value of attribute context.
Class Method Summary (collapse)
-
+ (Boolean) matches?(hwi, pname, criteria)
Used by #query when filtering workitems.
Instance Method Summary (collapse)
- - (Object) [](fei)
-
- (Object) all(opts = {})
Returns all the workitems stored in here.
-
- (Object) by_field(field, value = nil)
field : returns all the workitems with the given field name present.
-
- (Object) by_participant(participant_name, opts = {})
Returns all workitems for the specified participant name.
-
- (Object) by_wfid(wfid)
Return all workitems for the specified wfid.
-
- (Object) cancel(fei, flavour)
Removes the document/workitem from the storage.
- - (Object) consume(workitem) (also: #update)
-
- (Object) do_not_thread
No need for a separate thread when delivering to this participant.
-
- (Object) each(&block)
Iterates over the workitems stored in here.
- - (Object) fetch(fei)
-
- (Object) first
A convenience method (especially when testing), returns the first (only ?) workitem in the participant.
-
- (StorageParticipant) initialize(engine_or_options = {}, options = nil)
constructor
A new instance of StorageParticipant.
-
- (Object) per_participant
Mostly a test method.
-
- (Object) per_participant_count
Mostly a test method.
-
- (Object) purge!
Cleans this participant out completely.
-
- (Object) query(criteria)
Queries the store participant for workitems.
-
- (Object) reply(workitem)
Removes the workitem from the in-memory hash and replies to the engine.
-
- (Object) size
Returns the count of workitems stored in this participant.
Methods included from LocalParticipant
#re_dispatch, #unschedule_re_dispatch
Methods included from ReceiverMixin
#applied_workitem, #fetch_flow_expression, #launch, #receive, #reply_to_engine, #sign
Constructor Details
- (StorageParticipant) initialize(engine_or_options = {}, options = nil)
A new instance of StorageParticipant
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/ruote/part/storage_participant.rb', line 60 def initialize(={}, =nil) if .respond_to?(:context) @context = .context elsif .is_a?(Ruote::Context) @context = else @options = end @options ||= {} @store_name = @options['store_name'] end |
Instance Attribute Details
- (Object) context
Returns the value of attribute context
58 59 60 |
# File 'lib/ruote/part/storage_participant.rb', line 58 def context @context end |
Class Method Details
+ (Boolean) matches?(hwi, pname, criteria)
Used by #query when filtering workitems.
280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/ruote/part/storage_participant.rb', line 280 def self.matches?(hwi, pname, criteria) return false if pname && hwi['participant_name'] != pname fields = hwi['fields'] criteria.each do |fname, fvalue| return false if fields[fname] != fvalue end true end |
Instance Method Details
- (Object) [](fei)
106 107 108 109 110 111 |
# File 'lib/ruote/part/storage_participant.rb', line 106 def [](fei) doc = fetch(fei) doc ? Ruote::Workitem.new(doc) : nil end |
- (Object) all(opts = {})
Returns all the workitems stored in here.
156 157 158 159 |
# File 'lib/ruote/part/storage_participant.rb', line 156 def all(opts={}) fetch_all(opts).map { |hwi| Ruote::Workitem.new(hwi) } end |
- (Object) by_field(field, value = nil)
field : returns all the workitems with the given field name present.
field and value : returns all the workitems with the given field name and the given value for that field.
Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/ruote/part/storage_participant.rb', line 207 def by_field(field, value=nil) hwis = if @context.storage.respond_to?(:by_field) @context.storage.by_field('workitems', field, value) else fetch_all.select { |hwi| hwi['fields'].keys.include?(field) && (value.nil? || hwi['fields'][field] == value) } end hwis.collect { |hwi| Ruote::Workitem.new(hwi) } end |
- (Object) by_participant(participant_name, opts = {})
Returns all workitems for the specified participant name
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/ruote/part/storage_participant.rb', line 182 def by_participant(participant_name, opts={}) hwis = if @context.storage.respond_to?(:by_participant) @context.storage.by_participant('workitems', participant_name, opts) else fetch_all(opts).select { |wi| wi['participant_name'] == participant_name } end hwis.collect { |hwi| Ruote::Workitem.new(hwi) } end |
- (Object) by_wfid(wfid)
Return all workitems for the specified wfid
173 174 175 176 177 178 |
# File 'lib/ruote/part/storage_participant.rb', line 173 def by_wfid(wfid) @context.storage.get_many('workitems', wfid).collect { |hwi| Ruote::Workitem.new(hwi) } end |
- (Object) cancel(fei, flavour)
Removes the document/workitem from the storage
97 98 99 100 101 102 103 104 |
# File 'lib/ruote/part/storage_participant.rb', line 97 def cancel(fei, flavour) doc = fetch(fei) r = @context.storage.delete(doc) cancel(fei, flavour) if r != nil end |
- (Object) consume(workitem) Also known as: update
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/ruote/part/storage_participant.rb', line 79 def consume(workitem) doc = workitem.to_h doc.merge!( 'type' => 'workitems', '_id' => to_id(doc['fei']), 'participant_name' => doc['participant_name'], 'wfid' => doc['fei']['wfid']) doc['store_name'] = @store_name if @store_name @context.storage.put(doc) end |
- (Object) do_not_thread
No need for a separate thread when delivering to this participant.
77 |
# File 'lib/ruote/part/storage_participant.rb', line 77 def do_not_thread; true; end |
- (Object) each(&block)
Iterates over the workitems stored in here.
149 150 151 152 |
# File 'lib/ruote/part/storage_participant.rb', line 149 def each(&block) all.each { |wi| block.call(wi) } end |
- (Object) fetch(fei)
113 114 115 116 117 118 |
# File 'lib/ruote/part/storage_participant.rb', line 113 def fetch(fei) hfei = Ruote::FlowExpressionId.extract_h(fei) @context.storage.get('workitems', to_id(hfei)) end |
- (Object) first
A convenience method (especially when testing), returns the first (only ?) workitem in the participant.
164 165 166 167 168 169 |
# File 'lib/ruote/part/storage_participant.rb', line 164 def first hwi = fetch_all.first hwi ? Ruote::Workitem.new(hwi) : nil end |
- (Object) per_participant
Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.
296 297 298 299 |
# File 'lib/ruote/part/storage_participant.rb', line 296 def per_participant inject({}) { |h, wi| (h[wi.participant_name] ||= []) << wi; h } end |
- (Object) per_participant_count
Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.
305 306 307 308 |
# File 'lib/ruote/part/storage_participant.rb', line 305 def per_participant_count per_participant.inject({}) { |h, (k, v)| h[k] = v.size; h } end |
- (Object) purge!
Cleans this participant out completely
273 274 275 276 |
# File 'lib/ruote/part/storage_participant.rb', line 273 def purge! fetch_all.each { |hwi| @context.storage.delete(hwi) } end |
- (Object) query(criteria)
Queries the store participant for workitems.
Some examples :
part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size
There are two 'reserved' criterion : 'wfid' and 'participant' ('participant_name' as well). The rest of the criteria are considered constraints for fields.
'offset' and 'limit' are reserved as well. They should prove useful for pagination. 'skip' can be used instead of 'offset'.
Note : the criteria is AND only, you'll have to do ORs (aggregation) by yourself.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/ruote/part/storage_participant.rb', line 243 def query(criteria) cr = criteria.inject({}) { |h, (k, v)| h[k.to_s] = v; h } if @context.storage.respond_to?(:query_workitems) return @context.storage.query_workitems(cr) end opts = {} opts[:skip] = cr.delete('offset') || cr.delete('skip') opts[:limit] = cr.delete('limit') opts[:count] = cr.delete('count') wfid = cr.delete('wfid') pname = cr.delete('participant_name') || cr.delete('participant') hwis = wfid ? @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts) return hwis if opts[:count] hwis.select { |hwi| Ruote::StorageParticipant.matches?(hwi, pname, cr) }.collect { |hwi| Ruote::Workitem.new(hwi) } end |
- (Object) reply(workitem)
Removes the workitem from the in-memory hash and replies to the engine.
TODO : should it raise if the workitem can't be found ? TODO : should it accept just the fei ?
125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/ruote/part/storage_participant.rb', line 125 def reply(workitem) # TODO: change method name (receiver mess cleanup) doc = fetch(Ruote::FlowExpressionId.extract_h(workitem)) r = @context.storage.delete(doc) return reply(workitem) if r != nil workitem.h.delete('_rev') reply_to_engine(workitem) end |
- (Object) size
Returns the count of workitems stored in this participant.
142 143 144 145 |
# File 'lib/ruote/part/storage_participant.rb', line 142 def size fetch_all(:count => true) end |