Class: ActiveSupport::Testing::Parallelization::RoundRobinDistributor

Inherits:
TestDistributor show all
Defined in:
activesupport/lib/active_support/testing/parallelization/test_distributor.rb

Overview

Round-robin distributor - tests are assigned to workers as they arrive.

Tests arrive already shuffled by Minitest based on the seed. Since the arrival order is deterministic for a given seed, round-robin assignment produces reproducible test-to-worker distribution.

This is much simpler than buffering and re-shuffling: tests can start executing immediately as they arrive, and we avoid complex synchronization.

Direct Known Subclasses

RoundRobinWorkStealingDistributor

Constant Summary collapse

WORK_WAIT_TIMEOUT =
0.1

Instance Method Summary collapse

Constructor Details

#initialize(worker_count:) ⇒ RoundRobinDistributor



78
79
80
81
82
83
84
85
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 78

def initialize(worker_count:)
  @worker_count = worker_count
  @queues = Array.new(@worker_count) { Queue.new }
  @next_worker = 0
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @closed = false
end

Instance Method Details

#add_test(test) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 87

def add_test(test)
  @mutex.synchronize do
    return if @closed || !@queues

    worker_id = @next_worker
    @next_worker = (@next_worker + 1) % @worker_count
    queue = @queues[worker_id]
    queue << test unless queue.closed?
    @cv.signal  # Wake one waiting worker
  end
end

#closeObject



127
128
129
130
131
132
133
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 127

def close
  @mutex.synchronize do
    @queues&.each(&:close)
    @closed = true
    @cv.broadcast  # Wake all waiting workers
  end
end

#interruptObject



110
111
112
113
114
115
116
117
118
119
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 110

def interrupt
  @mutex.synchronize do
    @queues&.each do |q|
      q.clear
      q.close
    end
    @closed = true
    @cv.broadcast  # Wake all waiting workers
  end
end

#pending?Boolean



121
122
123
124
125
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 121

def pending?
  @mutex.synchronize do
    @queues&.any? { |q| !q.empty? }
  end
end

#take(worker_id:) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 99

def take(worker_id:)
  job = nil

  until job || exhausted?(worker_id)
    job = next_job(worker_id)
    wait(worker_id) unless job || exhausted?(worker_id)
  end

  job
end