Class: RubyChannel::Channel
- Inherits:
-
Object
- Object
- RubyChannel::Channel
- Defined in:
- lib/ruby_channel/channel.rb
Instance Attribute Summary collapse
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the channel.
-
#empty? ⇒ Boolean
Returns
true
if the channel is empty. -
#initialize ⇒ Channel
constructor
Creates a new channel.
-
#length ⇒ Object
(also: #size)
Returns the length of the channel.
-
#pop ⇒ Object
Retrieves data from channel.
-
#pop! ⇒ Object
Retrieves data from channel like method
pop
, but if thread isn’t suspended, and an exception is raised. -
#push(obj) ⇒ Object
(also: #<<)
Pushes
obj
to the queue. -
#redirect_to(channel, callback_method = nil, *args) {|value| ... } ⇒ Object
(also: #>>)
Redirect signal to other channel.
-
#subscribe(selector) ⇒ Object
Method called only by selector to subscribe listeners, dont use it, unless you understand exactly what you’re doing!.
-
#waiting_size ⇒ Object
Returns the number of threads waiting on the queue.
Constructor Details
#initialize ⇒ Channel
Creates a new channel.
8 9 10 11 12 |
# File 'lib/ruby_channel/channel.rb', line 8 def initialize @queue = [] @waiting = [] @mutex = Mutex.new end |
Instance Attribute Details
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
3 4 5 |
# File 'lib/ruby_channel/channel.rb', line 3 def mutex @mutex end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
3 4 5 |
# File 'lib/ruby_channel/channel.rb', line 3 def queue @queue end |
Instance Method Details
#clear ⇒ Object
Removes all objects from the channel.
90 91 92 |
# File 'lib/ruby_channel/channel.rb', line 90 def clear @queue.clear end |
#empty? ⇒ Boolean
Returns true
if the channel is empty.
83 84 85 |
# File 'lib/ruby_channel/channel.rb', line 83 def empty? @queue.empty? end |
#length ⇒ Object Also known as: size
Returns the length of the channel.
97 98 99 |
# File 'lib/ruby_channel/channel.rb', line 97 def length @queue.length end |
#pop ⇒ Object
Retrieves data from channel. If the channel is empty, the calling thread is suspended until data is pushed onto channel.
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/ruby_channel/channel.rb', line 38 def pop @mutex.synchronize do loop do if @queue.empty? @waiting.push Thread.current @mutex.sleep else return @queue.shift end end end end |
#pop! ⇒ Object
Retrieves data from channel like method pop
, but if thread isn’t suspended, and an exception is raised.
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/ruby_channel/channel.rb', line 55 def pop! @mutex.synchronize do loop do if @queue.empty? raise ThreadError, "Empty Channel" @waiting.push Thread.current @mutex.sleep else return @queue.shift end end end end |
#push(obj) ⇒ Object Also known as: <<
Pushes obj
to the queue.
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/ruby_channel/channel.rb', line 17 def push(obj) @mutex.synchronize do @queue.push obj begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end end end |
#redirect_to(channel, callback_method = nil, *args) {|value| ... } ⇒ Object Also known as: >>
Redirect signal to other channel
72 73 74 75 76 77 |
# File 'lib/ruby_channel/channel.rb', line 72 def redirect_to(channel, callback_method=nil, *args) value = self.pop value.send(callback_method, *args) if callback_method yield value if block_given? channel << value end |
#subscribe(selector) ⇒ Object
Method called only by selector to subscribe listeners, dont use it, unless you understand exactly what you’re doing!
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/ruby_channel/channel.rb', line 117 def subscribe(selector) channel = self @mutex.synchronize do loop do return selector.result unless selector.waiting? if @queue.empty? @waiting.push Thread.current @mutex.sleep else selector.mutex.synchronize do if selector.waiting? result = selector.update_result(channel, @queue.shift) yield result end end selector.release_result return selector.result end end end end |
#waiting_size ⇒ Object
Returns the number of threads waiting on the queue.
109 110 111 |
# File 'lib/ruby_channel/channel.rb', line 109 def waiting_size @waiting.size end |