diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 0cb964cf6..6869c1ebf 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -21,6 +21,7 @@ require 'concurrent/cached_thread_pool' require 'concurrent/fixed_thread_pool' +require 'concurrent/immediate_executor' require 'concurrent/event_machine_defer_proxy' if defined?(EventMachine) diff --git a/lib/concurrent/event.rb b/lib/concurrent/event.rb index ad19d05eb..17d721d0a 100644 --- a/lib/concurrent/event.rb +++ b/lib/concurrent/event.rb @@ -20,14 +20,16 @@ class Event def initialize @set = false @mutex = Mutex.new - @waiters = [] + @condition = ConditionVariable.new end # Is the object in the set state? # # @return [Boolean] indicating whether or not the `Event` has been set def set? - return @set == true + @mutex.synchronize do + @set + end end # Trigger the event, setting the state to `set` and releasing all threads @@ -35,12 +37,13 @@ def set? # # @return [Boolean] should always return `true` def set - return true if set? @mutex.synchronize do + return true if @set @set = true - @waiters.each {|waiter| waiter.run if waiter.status == 'sleep'} + @condition.broadcast end - return true + + true end # Reset a previously set event back to the `unset` state. @@ -48,12 +51,11 @@ def set # # @return [Boolean] should always return `true` def reset - return true unless set? @mutex.synchronize do @set = false - @waiters.clear # just in case there's garbage end - return true + + true end # Wait a given number of seconds for the `Event` to be set by another @@ -62,21 +64,11 @@ def reset # # @return [Boolean] true if the `Event` was set before timeout else false def wait(timeout = nil) - return true if set? - - @mutex.synchronize { @waiters << Thread.current } - return true if set? # if event was set while waiting for mutex - - if timeout.nil? - slept = sleep - else - slept = sleep(timeout) + @mutex.synchronize do + return true if @set + @condition.wait(@mutex, timeout) + @set end - rescue - # let it fail - ensure - @mutex.synchronize { @waiters.delete(Thread.current) } - return set? end end end diff --git a/lib/concurrent/immediate_executor.rb b/lib/concurrent/immediate_executor.rb new file mode 100644 index 000000000..4365bedac --- /dev/null +++ b/lib/concurrent/immediate_executor.rb @@ -0,0 +1,14 @@ +module Concurrent + class ImmediateExecutor + + def post(*args, &block) + block.call(*args) + end + + def <<(block) + post(&block) + self + end + + end +end \ No newline at end of file diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index 21b01ef61..0a7208b01 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -58,36 +58,37 @@ module Concurrent context 'fulfillment' do + before(:each) do + Future.thread_pool = ImmediateExecutor.new + end + it 'passes all arguments to handler' do - @a = @b = @c = nil - f = Future.new(1, 2, 3) do |a, b, c| - @a, @b, @c = a, b, c + result = nil + + Future.new(1, 2, 3) do |a, b, c| + result = [a, b, c] end - sleep(0.1) - [@a, @b, @c].should eq [1, 2, 3] + + result.should eq [1, 2, 3] end it 'sets the value to the result of the handler' do f = Future.new(10){|a| a * 2 } - sleep(0.1) f.value.should eq 20 end it 'sets the state to :fulfilled when the block completes' do f = Future.new(10){|a| a * 2 } - sleep(0.1) f.should be_fulfilled end it 'sets the value to nil when the handler raises an exception' do f = Future.new{ raise StandardError } - sleep(0.1) f.value.should be_nil end it 'sets the state to :rejected when the handler raises an exception' do f = Future.new{ raise StandardError } - sleep(0.1) f.should be_rejected end diff --git a/spec/concurrent/immediate_executor_spec.rb b/spec/concurrent/immediate_executor_spec.rb new file mode 100644 index 000000000..bb9ad2a19 --- /dev/null +++ b/spec/concurrent/immediate_executor_spec.rb @@ -0,0 +1,33 @@ +require 'spec_helper' + +module Concurrent + + describe ImmediateExecutor do + + let(:executor) { ImmediateExecutor.new } + + context "#post" do + it 'executes the block using the arguments as parameters' do + result = executor.post(1, 2, 3, 4) { |a, b, c, d| [a, b, c, d] } + result.should eq [1, 2, 3, 4] + end + end + + context "#<<" do + + it "returns true" do + result = executor << proc { false } + result.should be_true + end + + it "executes the passed callable" do + x = 0 + + executor << proc { x = 5 } + + x.should eq 5 + end + + end + end +end \ No newline at end of file