Class: RubyChannel::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_channel/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeChannel

Creates a new channel.



8
9
10
11
12
# File 'lib/ruby_channel/channel.rb', line 8

def initialize
  @queue   = []
  @waiting = []
  @mutex   = Mutex.new
end

Instance Attribute Details

#mutexObject (readonly)

Returns the value of attribute mutex.



3
4
5
# File 'lib/ruby_channel/channel.rb', line 3

def mutex
  @mutex
end

#queueObject (readonly)

Returns the value of attribute queue.



3
4
5
# File 'lib/ruby_channel/channel.rb', line 3

def queue
  @queue
end

Instance Method Details

#clearObject

Removes all objects from the channel.



90
91
92
# File 'lib/ruby_channel/channel.rb', line 90

def clear
  @queue.clear
end

#empty?Boolean

Returns true if the channel is empty.

Returns:

  • (Boolean)


83
84
85
# File 'lib/ruby_channel/channel.rb', line 83

def empty?
  @queue.empty?
end

#lengthObject Also known as: size

Returns the length of the channel.



97
98
99
# File 'lib/ruby_channel/channel.rb', line 97

def length
  @queue.length
end

#popObject

Retrieves data from channel. If the channel is empty, the calling thread is suspended until data is pushed onto channel.



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/ruby_channel/channel.rb', line 38

def pop
  @mutex.synchronize do
    loop do
      if @queue.empty?
        @waiting.push Thread.current
        @mutex.sleep
      else
        return @queue.shift
      end
    end
  end
end

#pop!Object

Retrieves data from channel like method pop, but if thread isn’t suspended, and an exception is raised.



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/ruby_channel/channel.rb', line 55

def pop!
  @mutex.synchronize do
    loop do
      if @queue.empty?
        raise ThreadError, "Empty Channel"
        @waiting.push Thread.current
        @mutex.sleep
      else
        return @queue.shift
      end
    end
  end
end

#push(obj) ⇒ Object Also known as: <<

Pushes obj to the queue.



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/ruby_channel/channel.rb', line 17

def push(obj)
  @mutex.synchronize do
    @queue.push obj
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  end
end

#redirect_to(channel, callback_method = nil, *args) {|value| ... } ⇒ Object Also known as: >>

Redirect signal to other channel

Yields:

  • (value)


72
73
74
75
76
77
# File 'lib/ruby_channel/channel.rb', line 72

def redirect_to(channel, callback_method=nil, *args)
  value = self.pop
  value.send(callback_method, *args) if callback_method
  yield value if block_given?
  channel << value
end

#subscribe(selector) ⇒ Object

Method called only by selector to subscribe listeners, dont use it, unless you understand exactly what you’re doing!



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/ruby_channel/channel.rb', line 117

def subscribe(selector)
  channel = self
  @mutex.synchronize do
    loop do
      return selector.result unless selector.waiting?
      if @queue.empty?
        @waiting.push Thread.current
        @mutex.sleep
      else
        selector.mutex.synchronize do
          if selector.waiting?
            result = selector.update_result(channel, @queue.shift)
            yield result
          end
        end
        selector.release_result
        return selector.result
      end
    end
  end
end

#waiting_sizeObject

Returns the number of threads waiting on the queue.



109
110
111
# File 'lib/ruby_channel/channel.rb', line 109

def waiting_size
  @waiting.size
end