Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 94 additions & 13 deletions lib/concurrent/exchanger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Concurrent
# @see Concurrent::MVar
# @see Concurrent::Dereferenceable
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger
class Exchanger
class MVarExchanger

EMPTY = Object.new

Expand All @@ -34,23 +34,104 @@ def initialize(opts = {})
# thread. nil (default value) means no timeout
# @return [Object] the value exchanged by the other thread; nil if timed out
def exchange(value, timeout = nil)
first = @first.take(timeout)
if first == MVar::TIMEOUT
nil
elsif first == EMPTY
@first.put value
second = @second.take timeout
if second == MVar::TIMEOUT
nil

# Both threads modify the first variable
first_result = @first.modify(timeout) do |first|
# Does it currently contain the special empty value?
if first == EMPTY
# If so, modify it to contain our value
value
else
second
# Otherwise, modify it back to the empty state
EMPTY
end
end

# If that timed out, the whole operation timed out
return nil if first_result == MVar::TIMEOUT

# What was in @first before we modified it?
if first_result == EMPTY
# We stored our object - someone else will turn up with the second
# object at some point in the future

# Wait for the second object to appear
second_result = @second.take(timeout)

# If that timed out, the whole operation timed out
return nil if second_result == MVar::TIMEOUT

# BUT HOW DO WE CANCEL OUR RESULT BEING AVAILABLE IN @first?

# Return that second object
second_result
else
@first.put EMPTY
@second.put value
first
# We reset @first to be empty again - so the other value is in
# first_result and we need to tell the other thread about our value

# Tell the other thread about our object
second_result = @second.put(value, timeout)

# If that timed out, the whole operation timed out
return nil if second_result == MVar::TIMEOUT

# We already have its object
first_result
end
end
end

class SlotExchanger

def initialize
@mutex = Mutex.new
@condition = Condition.new
@slot = new_slot
end

def exchange(value, timeout = nil)
@mutex.synchronize do

replace_slot_if_fulfilled

slot = @slot

if slot.state == :empty
slot.value_1 = value
slot.state = :waiting
wait_for_value(slot, timeout)
slot.value_2
else
slot.value_2 = value
slot.state = :fulfilled
@condition.broadcast
slot.value_1
end
end
end

Slot = Struct.new(:value_1, :value_2, :state)

private_constant :Slot

private

def replace_slot_if_fulfilled
@slot = new_slot if @slot.state == :fulfilled
end

def wait_for_value(slot, timeout)
remaining = Condition::Result.new(timeout)
while slot.state == :waiting && remaining.can_wait?
remaining = @condition.wait(@mutex, remaining.remaining_time)
end
end

def new_slot
Slot.new(nil, nil, :empty)
end
end

Exchanger = SlotExchanger

end
37 changes: 13 additions & 24 deletions spec/concurrent/exchanger_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,62 +7,51 @@ module Concurrent
context 'without timeout' do

it 'should block' do
latch_1 = Concurrent::CountDownLatch.new
latch_2 = Concurrent::CountDownLatch.new
latch = Concurrent::CountDownLatch.new

t = Thread.new do
latch_1.count_down
subject.exchange(1)
latch_2.count_down
latch.count_down
end

latch_1.wait(1)
latch_2.wait(0.1)
expect(latch_2.count).to eq 1
t.join(0.3)
expect(latch.count).to eq 1
t.kill
end

it 'should receive the other value' do
first_value = nil
second_value = nil
latch = Concurrent::CountDownLatch.new(2)

threads = [
Thread.new { first_value = subject.exchange(2); latch.count_down },
Thread.new { second_value = subject.exchange(4); latch.count_down }
Thread.new { first_value = subject.exchange(2) },
Thread.new { second_value = subject.exchange(4) }
]

latch.wait(1)

threads.each {|t| t.join(1) }
expect(first_value).to eq 4
expect(second_value).to eq 2

threads.each {|t| t.kill }
end

it 'can be reused' do
first_value = nil
second_value = nil
latch_1 = Concurrent::CountDownLatch.new(2)
latch_2 = Concurrent::CountDownLatch.new(2)

threads = [
Thread.new { first_value = subject.exchange(1); latch_1.count_down },
Thread.new { second_value = subject.exchange(0); latch_1.count_down }
Thread.new { first_value = subject.exchange(1) },
Thread.new { second_value = subject.exchange(0) }
]

latch_1.wait(1)
threads.each {|t| t.kill }
threads.each {|t| t.join(1) }

threads = [
Thread.new { first_value = subject.exchange(10); latch_2.count_down },
Thread.new { second_value = subject.exchange(12); latch_2.count_down }
Thread.new { first_value = subject.exchange(10) },
Thread.new { second_value = subject.exchange(12) }
]

latch_2.wait(1)
threads.each {|t| t.join(1) }
expect(first_value).to eq 12
expect(second_value).to eq 10
threads.each {|t| t.kill }
end
end

Expand Down
6 changes: 3 additions & 3 deletions spec/concurrent/utility/timeout_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ module Concurrent

it 'kills the thread on success' do
result = Concurrent::timeout(1) { 42 }
expect(Thread).to receive(:kill).with(any_args())
expect(Thread).to receive(:kill).at_least(:once).with(any_args())
Concurrent::timeout(1){ 42 }
end

it 'kills the thread on timeout' do
expect(Thread).to receive(:kill).with(any_args())
expect(Thread).to receive(:kill).at_least(:once).with(any_args())
expect {
Concurrent::timeout(1){ sleep }
}.to raise_error
end

it 'kills the thread on exception' do
expect(Thread).to receive(:kill).with(any_args())
expect(Thread).to receive(:kill).at_least(:once).with(any_args())
expect {
Concurrent::timeout(1){ raise NotImplementedError }
}.to raise_error
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

RSpec.configure do |config|
#config.raise_errors_for_deprecations!
config.order = 'random'
#config.order = 'random'

config.before(:each) do
#TODO: Better configuration management in individual test suites
Expand Down