Class: ActiveSupport::Testing::Parallelization::RoundRobinDistributor
- Inherits:
-
TestDistributor
show all
- Defined in:
- activesupport/lib/active_support/testing/parallelization/test_distributor.rb
Overview
Round-robin distributor - tests are assigned to workers as they arrive.
Tests arrive already shuffled by Minitest based on the seed. Since the arrival order is deterministic for a given seed, round-robin assignment produces reproducible test-to-worker distribution.
This is much simpler than buffering and re-shuffling: tests can start executing immediately as they arrive, and we avoid complex synchronization.
Constant Summary
collapse
- WORK_WAIT_TIMEOUT =
0.1
Instance Method Summary
collapse
Constructor Details
78
79
80
81
82
83
84
85
|
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 78
def initialize(worker_count:)
@worker_count = worker_count
@queues = Array.new(@worker_count) { Queue.new }
@next_worker = 0
@mutex = Mutex.new
@cv = ConditionVariable.new
@closed = false
end
|
Instance Method Details
#add_test(test) ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
|
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 87
def add_test(test)
@mutex.synchronize do
return if @closed || !@queues
worker_id = @next_worker
@next_worker = (@next_worker + 1) % @worker_count
queue = @queues[worker_id]
queue << test unless queue.closed?
@cv.signal
end
end
|
127
128
129
130
131
132
133
|
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 127
def close
@mutex.synchronize do
@queues&.each(&:close)
@closed = true
@cv.broadcast
end
end
|
#interrupt ⇒ Object
110
111
112
113
114
115
116
117
118
119
|
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 110
def interrupt
@mutex.synchronize do
@queues&.each do |q|
q.clear
q.close
end
@closed = true
@cv.broadcast
end
end
|
#pending? ⇒ Boolean
121
122
123
124
125
|
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 121
def pending?
@mutex.synchronize do
@queues&.any? { |q| !q.empty? }
end
end
|
#take(worker_id:) ⇒ Object
99
100
101
102
103
104
105
106
107
108
|
# File 'activesupport/lib/active_support/testing/parallelization/test_distributor.rb', line 99
def take(worker_id:)
job = nil
until job || exhausted?(worker_id)
job = next_job(worker_id)
wait(worker_id) unless job || exhausted?(worker_id)
end
job
end
|