Class: GirlFriday::WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/girl_friday/work_queue.rb

Defined Under Namespace

Classes: Ready, Shutdown, Work

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (WorkQueue) initialize(name, options = {}, &block)

Returns a new instance of WorkQueue

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/girl_friday/work_queue.rb', line 9

def initialize(name, options={}, &block)
  raise ArgumentError, "#{self.class.name} requires a block" unless block_given?
  @name = name.to_s
  @size = options[:size] || 5
  @processor = block
  @error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)

  @shutdown = false
  @busy_workers = []
  @ready_workers = nil
  @created_at = Time.now.to_i
  @total_processed = @total_errors = @total_queued = 0
  @persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {}))
  @weakref = WeakRef.new(self)
  start
  GirlFriday.add_queue @weakref
end

Instance Attribute Details

- (Object) name (readonly)

Returns the value of attribute name



8
9
10
# File 'lib/girl_friday/work_queue.rb', line 8

def name
  @name
end

Class Method Details

+ (Object) immediate!



27
28
29
30
# File 'lib/girl_friday/work_queue.rb', line 27

def self.immediate!
  alias_method :push, :push_immediately
  alias_method :<<, :push_immediately
end

+ (Object) queue!



32
33
34
35
# File 'lib/girl_friday/work_queue.rb', line 32

def self.queue!
  alias_method :push, :push_async
  alias_method :<<, :push_async
end

Instance Method Details

- (Object) push_async(work, &block) Also known as: push, <<



43
44
45
# File 'lib/girl_friday/work_queue.rb', line 43

def push_async(work, &block)
  @supervisor << Work[work, block]
end

- (Object) push_immediately(work, &block)



37
38
39
40
41
# File 'lib/girl_friday/work_queue.rb', line 37

def push_immediately(work, &block)
  result = @processor.call(work)
  return yield result if block
  result
end

- (Object) shutdown(&block)



73
74
75
76
77
# File 'lib/girl_friday/work_queue.rb', line 73

def shutdown(&block)
  # Runtime state should never be modified by caller thread,
  # only the Supervisor thread.
  @supervisor << Shutdown[block]
end

- (Object) status



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/girl_friday/work_queue.rb', line 49

def status
  { @name => {
      :pid => $$,
      :pool_size => @size,
      :ready => @ready_workers ? @ready_workers.size : 0,
      :busy => @busy_workers.size,
      :backlog => @persister.size,
      :total_queued => @total_queued,
      :total_processed => @total_processed,
      :total_errors => @total_errors,
      :uptime => Time.now.to_i - @created_at,
      :created_at => @created_at,
    }
  }
end

- (Object) wait_for_empty

Busy wait for the queue to empty. Useful for testing.



67
68
69
70
71
# File 'lib/girl_friday/work_queue.rb', line 67

def wait_for_empty
  until @persister.size == 0
    sleep 0.1
  end
end