diff --git a/lib/concurrent/exchanger.rb b/lib/concurrent/exchanger.rb index f0e83ad14..f52a63a86 100644 --- a/lib/concurrent/exchanger.rb +++ b/lib/concurrent/exchanger.rb @@ -12,7 +12,7 @@ module Concurrent # @see Concurrent::MVar # @see Concurrent::Dereferenceable # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger - class Exchanger + class MVarExchanger EMPTY = Object.new @@ -34,23 +34,104 @@ def initialize(opts = {}) # thread. nil (default value) means no timeout # @return [Object] the value exchanged by the other thread; nil if timed out def exchange(value, timeout = nil) - first = @first.take(timeout) - if first == MVar::TIMEOUT - nil - elsif first == EMPTY - @first.put value - second = @second.take timeout - if second == MVar::TIMEOUT - nil + + # Both threads modify the first variable + first_result = @first.modify(timeout) do |first| + # Does it currently contain the special empty value? + if first == EMPTY + # If so, modify it to contain our value + value else - second + # Otherwise, modify it back to the empty state + EMPTY end + end + + # If that timed out, the whole operation timed out + return nil if first_result == MVar::TIMEOUT + + # What was in @first before we modified it? + if first_result == EMPTY + # We stored our object - someone else will turn up with the second + # object at some point in the future + + # Wait for the second object to appear + second_result = @second.take(timeout) + + # If that timed out, the whole operation timed out + return nil if second_result == MVar::TIMEOUT + + # BUT HOW DO WE CANCEL OUR RESULT BEING AVAILABLE IN @first? + + # Return that second object + second_result else - @first.put EMPTY - @second.put value - first + # We reset @first to be empty again - so the other value is in + # first_result and we need to tell the other thread about our value + + # Tell the other thread about our object + second_result = @second.put(value, timeout) + + # If that timed out, the whole operation timed out + return nil if second_result == MVar::TIMEOUT + + # We already have its object + first_result end end + end + + class SlotExchanger + + def initialize + @mutex = Mutex.new + @condition = Condition.new + @slot = new_slot + end + + def exchange(value, timeout = nil) + @mutex.synchronize do + + replace_slot_if_fulfilled + slot = @slot + + if slot.state == :empty + slot.value_1 = value + slot.state = :waiting + wait_for_value(slot, timeout) + slot.value_2 + else + slot.value_2 = value + slot.state = :fulfilled + @condition.broadcast + slot.value_1 + end + end + end + + Slot = Struct.new(:value_1, :value_2, :state) + + private_constant :Slot + + private + + def replace_slot_if_fulfilled + @slot = new_slot if @slot.state == :fulfilled + end + + def wait_for_value(slot, timeout) + remaining = Condition::Result.new(timeout) + while slot.state == :waiting && remaining.can_wait? + remaining = @condition.wait(@mutex, remaining.remaining_time) + end + end + + def new_slot + Slot.new(nil, nil, :empty) + end end + + Exchanger = SlotExchanger + end diff --git a/spec/concurrent/exchanger_spec.rb b/spec/concurrent/exchanger_spec.rb index 3cfe577c2..92ad6f6fc 100644 --- a/spec/concurrent/exchanger_spec.rb +++ b/spec/concurrent/exchanger_spec.rb @@ -7,62 +7,51 @@ module Concurrent context 'without timeout' do it 'should block' do - latch_1 = Concurrent::CountDownLatch.new - latch_2 = Concurrent::CountDownLatch.new + latch = Concurrent::CountDownLatch.new t = Thread.new do - latch_1.count_down subject.exchange(1) - latch_2.count_down + latch.count_down end - latch_1.wait(1) - latch_2.wait(0.1) - expect(latch_2.count).to eq 1 + t.join(0.3) + expect(latch.count).to eq 1 t.kill end it 'should receive the other value' do first_value = nil second_value = nil - latch = Concurrent::CountDownLatch.new(2) threads = [ - Thread.new { first_value = subject.exchange(2); latch.count_down }, - Thread.new { second_value = subject.exchange(4); latch.count_down } + Thread.new { first_value = subject.exchange(2) }, + Thread.new { second_value = subject.exchange(4) } ] - latch.wait(1) - + threads.each {|t| t.join(1) } expect(first_value).to eq 4 expect(second_value).to eq 2 - - threads.each {|t| t.kill } end it 'can be reused' do first_value = nil second_value = nil - latch_1 = Concurrent::CountDownLatch.new(2) - latch_2 = Concurrent::CountDownLatch.new(2) threads = [ - Thread.new { first_value = subject.exchange(1); latch_1.count_down }, - Thread.new { second_value = subject.exchange(0); latch_1.count_down } + Thread.new { first_value = subject.exchange(1) }, + Thread.new { second_value = subject.exchange(0) } ] - latch_1.wait(1) - threads.each {|t| t.kill } + threads.each {|t| t.join(1) } threads = [ - Thread.new { first_value = subject.exchange(10); latch_2.count_down }, - Thread.new { second_value = subject.exchange(12); latch_2.count_down } + Thread.new { first_value = subject.exchange(10) }, + Thread.new { second_value = subject.exchange(12) } ] - latch_2.wait(1) + threads.each {|t| t.join(1) } expect(first_value).to eq 12 expect(second_value).to eq 10 - threads.each {|t| t.kill } end end diff --git a/spec/concurrent/utility/timeout_spec.rb b/spec/concurrent/utility/timeout_spec.rb index 1858bee4d..1bc9ef3d7 100644 --- a/spec/concurrent/utility/timeout_spec.rb +++ b/spec/concurrent/utility/timeout_spec.rb @@ -27,19 +27,19 @@ module Concurrent it 'kills the thread on success' do result = Concurrent::timeout(1) { 42 } - expect(Thread).to receive(:kill).with(any_args()) + expect(Thread).to receive(:kill).at_least(:once).with(any_args()) Concurrent::timeout(1){ 42 } end it 'kills the thread on timeout' do - expect(Thread).to receive(:kill).with(any_args()) + expect(Thread).to receive(:kill).at_least(:once).with(any_args()) expect { Concurrent::timeout(1){ sleep } }.to raise_error end it 'kills the thread on exception' do - expect(Thread).to receive(:kill).with(any_args()) + expect(Thread).to receive(:kill).at_least(:once).with(any_args()) expect { Concurrent::timeout(1){ raise NotImplementedError } }.to raise_error diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9a893c615..9d4817d1d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -33,7 +33,7 @@ RSpec.configure do |config| #config.raise_errors_for_deprecations! - config.order = 'random' + #config.order = 'random' config.before(:each) do #TODO: Better configuration management in individual test suites