Class: CircularQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/circular_queue.rb

Overview

A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if `enq!` is used).

Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.

Exposes the same interface as the `Queue` from the Ruby stdlib.

Example:

# Capacity of 3
q = CircularQueue.new(3)

q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]

# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (CircularQueue) initialize(capacity)

Creates a new queue of the specified capacity

Parameters:

  • capacity (Integer)

    the maximum capacity of the queue



36
37
38
39
40
41
42
43
44
# File 'lib/circular_queue.rb', line 36

def initialize(capacity)
  @capacity = capacity
  @data     = Array.new(capacity)

  @mutex    = Mutex.new
  @waiting  = Array.new

  clear
end

Instance Attribute Details

- (Integer) capacity (readonly)

Returns the maximum number of elements that can be enqueued

Returns:

  • (Integer)


27
28
29
# File 'lib/circular_queue.rb', line 27

def capacity
  @capacity
end

- (Integer) size (readonly) Also known as: length

Returns the number of elements in the queue

Returns:

  • (Integer)


31
32
33
# File 'lib/circular_queue.rb', line 31

def size
  @size
end

Instance Method Details

- (Object) back

Returns the last/most recent item in the queue Peek at last item without removing

Returns:

  • (Object)


124
125
126
127
128
# File 'lib/circular_queue.rb', line 124

def back
  @mutex.synchronize do
    @data[(@back - 1) % @capacity]
  end
end

- (Object) clear

Removes all items from the queue



92
93
94
95
96
97
98
# File 'lib/circular_queue.rb', line 92

def clear
  @mutex.synchronize do
    @size  = 0
    @front = 0
    @back  = 0
  end
end

- (Array) data

Returns the data in the queue Allows for easy iteration of queue from front to back

Returns:

  • (Array)

    the queue



139
140
141
142
143
144
145
146
# File 'lib/circular_queue.rb', line 139

def data
  @mutex.synchronize do
    @data.clone.tap do |data|
      data.rotate!(@front)
      data.slice!(@size..-1)
    end
  end
end

- (Object) deq(non_block = false) Also known as: shift, pop

Removes an item from the queue

Parameters:

  • non_block (Boolean) (defaults to: false)

    true to raise an error if the queue is empty; otherwise, waits for an item to arrive from another thread

Raises:

  • (ThreadError)

    non_block was true and the queue was empty



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/circular_queue.rb', line 74

def deq(non_block = false)
  @mutex.synchronize do
    while true
      if empty?
        raise ThreadError.new("Queue is empty") if non_block

        @waiting.push(Thread.current) unless @waiting.include?(Thread.current)
        @mutex.sleep
      else
        return deq_item
      end
    end
  end
end

- (Boolean) empty?

Returns whether the queue is empty

Returns:

  • (Boolean)

    queue is empty



102
103
104
# File 'lib/circular_queue.rb', line 102

def empty?
  @size == 0
end

- (Object) enq(item) Also known as: <<, push

Adds an item to the queue

Parameters:

  • item (Object)

    item to add



48
49
50
51
52
53
# File 'lib/circular_queue.rb', line 48

def enq(item)
  @mutex.synchronize do
    enq_item(item)
    wakeup_next_waiter
  end
end

- (Object) enq!(item) Also known as: push!

Adds an item to the queue, raising an error if the queue is full

Parameters:

  • item (Object)

    item to add

Raises:

  • (ThreadError)

    queue is full



60
61
62
63
64
65
66
67
# File 'lib/circular_queue.rb', line 60

def enq!(item)
  @mutex.synchronize do
    raise ThreadError.new("Queue is full") if full?

    enq_item(item)
    wakeup_next_waiter
  end
end

- (Object) front

Returns thee first/oldest item in the queue Peek at first item without removing

Returns:

  • (Object)


115
116
117
118
119
# File 'lib/circular_queue.rb', line 115

def front
  @mutex.synchronize do
    @data[@front]
  end
end

- (Boolean) full?

Returns whether the queue is full

Returns:

  • (Boolean)

    queue is full



108
109
110
# File 'lib/circular_queue.rb', line 108

def full?
  @size == @capacity
end

- (Integer) num_waiting

Returns the number of threads waiting for items to arrive in the queue

Returns:

  • (Integer)

    number of threads waiting



132
133
134
# File 'lib/circular_queue.rb', line 132

def num_waiting
  @waiting.length
end