Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

require 'concurrent/cached_thread_pool'
require 'concurrent/fixed_thread_pool'
require 'concurrent/immediate_executor'

require 'concurrent/event_machine_defer_proxy' if defined?(EventMachine)

Expand Down
36 changes: 14 additions & 22 deletions lib/concurrent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,42 @@ class Event
def initialize
@set = false
@mutex = Mutex.new
@waiters = []
@condition = ConditionVariable.new
end

# Is the object in the set state?
#
# @return [Boolean] indicating whether or not the `Event` has been set
def set?
return @set == true
@mutex.synchronize do
@set
end
end

# Trigger the event, setting the state to `set` and releasing all threads
# waiting on the event. Has no effect if the `Event` has already been set.
#
# @return [Boolean] should always return `true`
def set
return true if set?
@mutex.synchronize do
return true if @set
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I expected this to raise a LocalJumpError, which is the normal case when using return within a block. But I've tried it in several version of MRI and it works. So I've learned a new trick with Mutex.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think return always works when there's a method context somewhere up in the stack.

@set = true
@waiters.each {|waiter| waiter.run if waiter.status == 'sleep'}
@condition.broadcast
end
return true

true
end

# Reset a previously set event back to the `unset` state.
# Has no effect if the `Event` has not yet been set.
#
# @return [Boolean] should always return `true`
def reset
return true unless set?
@mutex.synchronize do
@set = false
@waiters.clear # just in case there's garbage
end
return true

true
end

# Wait a given number of seconds for the `Event` to be set by another
Expand All @@ -62,21 +64,11 @@ def reset
#
# @return [Boolean] true if the `Event` was set before timeout else false
def wait(timeout = nil)
return true if set?

@mutex.synchronize { @waiters << Thread.current }
return true if set? # if event was set while waiting for mutex

if timeout.nil?
slept = sleep
else
slept = sleep(timeout)
@mutex.synchronize do
return true if @set
@condition.wait(@mutex, timeout)
@set
end
rescue
# let it fail
ensure
@mutex.synchronize { @waiters.delete(Thread.current) }
return set?
end
end
end
14 changes: 14 additions & 0 deletions lib/concurrent/immediate_executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Concurrent
class ImmediateExecutor

def post(*args, &block)
block.call(*args)
end

def <<(block)
post(&block)
self
end

end
end
19 changes: 10 additions & 9 deletions spec/concurrent/future_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,37 @@ module Concurrent

context 'fulfillment' do

before(:each) do
Future.thread_pool = ImmediateExecutor.new
end

it 'passes all arguments to handler' do
@a = @b = @c = nil
f = Future.new(1, 2, 3) do |a, b, c|
@a, @b, @c = a, b, c
result = nil

Future.new(1, 2, 3) do |a, b, c|
result = [a, b, c]
end
sleep(0.1)
[@a, @b, @c].should eq [1, 2, 3]

result.should eq [1, 2, 3]
end

it 'sets the value to the result of the handler' do
f = Future.new(10){|a| a * 2 }
sleep(0.1)
f.value.should eq 20
end

it 'sets the state to :fulfilled when the block completes' do
f = Future.new(10){|a| a * 2 }
sleep(0.1)
f.should be_fulfilled
end

it 'sets the value to nil when the handler raises an exception' do
f = Future.new{ raise StandardError }
sleep(0.1)
f.value.should be_nil
end

it 'sets the state to :rejected when the handler raises an exception' do
f = Future.new{ raise StandardError }
sleep(0.1)
f.should be_rejected
end

Expand Down
33 changes: 33 additions & 0 deletions spec/concurrent/immediate_executor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'spec_helper'

module Concurrent

describe ImmediateExecutor do

let(:executor) { ImmediateExecutor.new }

context "#post" do
it 'executes the block using the arguments as parameters' do
result = executor.post(1, 2, 3, 4) { |a, b, c, d| [a, b, c, d] }
result.should eq [1, 2, 3, 4]
end
end

context "#<<" do

it "returns true" do
result = executor << proc { false }
result.should be_true
end

it "executes the passed callable" do
x = 0

executor << proc { x = 5 }

x.should eq 5
end

end
end
end