Class: RSMP::Collector

Inherits:
Listener show all
Defined in:
lib/rsmp/collector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Listener

#listen

Methods included from Inspect

#inspector

Constructor Details

#initialize(proxy, options = {}) ⇒ Collector

Returns a new instance of Collector.


10
11
12
13
14
15
16
17
18
19
# File 'lib/rsmp/collector.rb', line 10

def initialize proxy, options={}
  super proxy, options
  @ingoing = options[:ingoing] == nil ? true  : options[:ingoing]
  @outgoing = options[:outgoing] == nil ? false : options[:outgoing]
  @messages = []
  @condition = Async::Notification.new
  @done = false
  @options = options
  @num = options[:num]
end

Instance Attribute Details

#conditionObject (readonly)

Returns the value of attribute condition.


8
9
10
# File 'lib/rsmp/collector.rb', line 8

def condition
  @condition
end

#doneObject (readonly)

Returns the value of attribute done.


8
9
10
# File 'lib/rsmp/collector.rb', line 8

def done
  @done
end

#messagesObject (readonly)

Returns the value of attribute messages.


8
9
10
# File 'lib/rsmp/collector.rb', line 8

def messages
  @messages
end

Instance Method Details

#collect(task, options = {}, &block) ⇒ Object


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/rsmp/collector.rb', line 43

def collect task, options={}, &block
  @num = options[:num] if options[:num]
  @options[:timeout] = options[:timeout] if options[:timeout]
  @block = block

  unless @done
    listen do
      task.with_timeout(@options[:timeout]) do
        @condition.wait
      end
    end
  end

  if @num == 1
    @messages = @messages.first       # if one message was requested, return it instead of array
  else
    @messages = @messages.first @num  # return array, but ensure we never return more than requested
  end
  @messages
end

#collect_for(task, duration) ⇒ Object


37
38
39
40
41
# File 'lib/rsmp/collector.rb', line 37

def collect_for task, duration
  siphon do
    task.sleep duration
  end
end

#ingoing?Boolean

Returns:

  • (Boolean)

25
26
27
# File 'lib/rsmp/collector.rb', line 25

def ingoing?
  @ingoing == true
end

#inspectObject


21
22
23
# File 'lib/rsmp/collector.rb', line 21

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(:@messages)}>"
end

#matches?(message) ⇒ Boolean

Returns:

  • (Boolean)

Raises:

  • (ArgumentError)

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rsmp/collector.rb', line 84

def matches? message
  raise ArgumentError unless message

  if @options[:type]
    return false if message == nil
    if @options[:type].is_a? Array
      return false unless @options[:type].include? message.type
    else
      return false unless message.type == @options[:type]
    end
  end
  if @options[:component]
    return false if message.attributes['cId'] && message.attributes['cId'] != @options[:component]
  end
  if @block
    return false if @block.call(message) == false
  end
  true
end

#notify(message) ⇒ Object

Raises:

  • (ArgumentError)

69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/rsmp/collector.rb', line 69

def notify message
  raise ArgumentError unless message
  return true if @done
  return if message.direction == :in && @ingoing == false
  return if message.direction == :out && @outgoing == false
  if matches? message
    @messages << message
    if @num && @messages.size >= @num
      @done = true
      @proxy.remove_listener self
      @condition.signal
    end
  end
end

#outgoing?Boolean

Returns:

  • (Boolean)

29
30
31
# File 'lib/rsmp/collector.rb', line 29

def outgoing?
  @outgoing == true
end

#resetObject


64
65
66
67
# File 'lib/rsmp/collector.rb', line 64

def reset
  @message.clear
  @done = false
end

#waitObject


33
34
35
# File 'lib/rsmp/collector.rb', line 33

def wait
  @condition.wait
end