Module: GirlFriday

Defined in:
lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/batch.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb

Overview

QUEUE1 = GirlFriday::Queue.new('ham_cannon', :size => 15) do |msg|

puts msg

end QUEUE2 = GirlFriday::Queue.new('image_crawler', :size => 5) do |msg|

puts msg

end

Defined Under Namespace

Modules: ErrorHandler, Store Classes: Batch, Server, WorkQueue

Constant Summary

VERSION =
"0.9.7"
Queue =
WorkQueue

Class Method Summary (collapse)

Class Method Details

+ (Object) add_queue(ref)



22
23
24
25
26
27
28
# File 'lib/girl_friday.rb', line 22

def self.add_queue(ref)
  @lock.synchronize do
    @queues ||= []
    @queues.reject! { |q| !q.weakref_alive? }
    @queues << ref
  end
end

+ (Object) queues



36
37
38
# File 'lib/girl_friday.rb', line 36

def self.queues
  @queues || []
end

+ (Object) remove_queue(ref)



30
31
32
33
34
# File 'lib/girl_friday.rb', line 30

def self.remove_queue(ref)
  @lock.synchronize do
    @queues.delete ref
  end
end

+ (Object) shutdown!(timeout = 30)

Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.

Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/girl_friday.rb', line 58

def self.shutdown!(timeout=30)
  qs = queues.select { |q| q.weakref_alive? }
  count = qs.size

  if count > 0
    m = Mutex.new
    var = ConditionVariable.new

    qs.each do |q|
      next if !q.weakref_alive?
      begin
        q.__getobj__.shutdown do |queue|
          m.synchronize do
            count -= 1
            var.signal if count == 0
          end
        end
      rescue WeakRef::RefError
        m.synchronize do
          count -= 1
          var.signal if count == 0
        end
      end
    end

    m.synchronize do
      var.wait(m, timeout) if count != 0
    end
  end
  count
end

+ (Object) status



40
41
42
43
44
45
46
47
48
# File 'lib/girl_friday.rb', line 40

def self.status
  queues.inject({}) do |memo, queue|
    begin
      memo = memo.merge(queue.__getobj__.status)
    rescue WeakRef::RefError
    end
    memo
  end
end