From d0e010ae6fe0d8b64990fe53f242f5af4bfca2d8 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 5 May 2014 15:57:49 +0200 Subject: [PATCH 1/5] Allow dereferencing of Agent values while updating ensure only one update is running --- lib/concurrent/agent.rb | 71 +++++++++++++++++++++++++---------- spec/concurrent/agent_spec.rb | 49 +++++++++++++++++++++--- 2 files changed, 96 insertions(+), 24 deletions(-) diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index a4e40aa44..d08a4b297 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -60,12 +60,14 @@ class Agent # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and # returning the value returned from the proc def initialize(initial, opts = {}) - @value = initial - @rescuers = [] - @validator = Proc.new { |result| true } - @timeout = opts.fetch(:timeout, TIMEOUT).freeze - self.observers = CopyOnWriteObserverSet.new - @executor = OptionsParser::get_executor_from(opts) + @value = initial + @rescuers = [] + @validator = Proc.new { |result| true } + @timeout = opts.fetch(:timeout, TIMEOUT).freeze + self.observers = CopyOnWriteObserverSet.new + @executor = OptionsParser::get_executor_from(opts) + @being_executed = false + @stash = [] init_mutex set_deref_options(opts) end @@ -111,7 +113,11 @@ def rescue(clazz = StandardError, &block) # @yieldparam [Object] value the result of the last update operation # @yieldreturn [Boolean] true if the value is valid else false def validate(&block) - @validator = block unless block.nil? + unless block.nil? + mutex.lock + @validator = block + mutex.unlock + end self end alias_method :validates, :validate @@ -124,8 +130,19 @@ def validate(&block) # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value + # @return [true, nil] nil when no block is given def post(&block) - @executor.post{ work(&block) } unless block.nil? + return nil if block.nil? + mutex.lock + post = if @being_executed + @stash << block + false + else + @being_executed = true + end + mutex.unlock + @executor.post { work(&block) } if post + true end # Update the current value with the result of the given block operation @@ -157,22 +174,38 @@ def try_rescue(ex) # :nodoc: # @!visibility private def work(&handler) # :nodoc: begin - - should_notify = false + should_notify = false + validator, value = mutex.synchronize { [@validator, @value] } + + begin + # FIXME creates second thread + result, valid = Concurrent::timeout(@timeout) do + [result = handler.call(value), + validator.call(result)] + end + rescue Exception => ex + exception = ex + end mutex.synchronize do - result = Concurrent::timeout(@timeout) do - handler.call(@value) - end - if @validator.call(result) - @value = result + if !exception && valid + @value = result should_notify = true end + + if (stashed = @stash.shift) + @executor.post { work(&stashed) } + else + @being_executed = false + end end - time = Time.now - observers.notify_observers{ [time, self.value] } if should_notify - rescue Exception => ex - try_rescue(ex) + + if should_notify + time = Time.now + observers.notify_observers { [time, self.value] } + end + + try_rescue(exception) end end end diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index ecb052c76..b4080ed1b 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -147,18 +147,17 @@ def trigger_observable(observable) context '#post' do it 'adds the given block to the queue' do - executor.should_receive(:post).with(no_args).exactly(3).times - subject.post { sleep(100) } + executor.should_receive(:post).with(no_args).exactly(1).times + subject.post { sleep(1) } subject.post { nil } subject.post { nil } sleep(0.1) + subject.instance_variable_get(:@stash).size.should eq 2 end it 'does not add to the queue when no block is given' do - executor.should_receive(:post).with(no_args).exactly(2).times - subject.post { sleep(100) } + executor.should_receive(:post).with(no_args).exactly(0).times subject.post - subject.post { nil } sleep(0.1) end end @@ -365,6 +364,46 @@ def trigger_observable(observable) end end + context 'clojure-like behaviour' do + it 'does not block dereferencing when updating the value' do + continue = IVar.new + agent = Agent.new(0, executor: executor) + agent.post { |old| old + continue.value } + sleep 0.1 + Concurrent.timeout(0.2) { agent.value.should eq 0 } + continue.set 1 + sleep 0.1 + end + + it 'does not allow to execute two updates at the same time' do + agent = Agent.new(0, executor: executor) + continue1 = IVar.new + continue2 = IVar.new + f1 = f2 = false + agent.post { |old| f1 = true; old + continue1.value } + agent.post { |old| f2 = true; old + continue2.value } + + sleep 0.1 + f1.should eq true + f2.should eq false + agent.value.should eq 0 + + continue1.set 1 + sleep 0.1 + f1.should eq true + f2.should eq true + agent.value.should eq 1 + + continue2.set 1 + sleep 0.1 + f1.should eq true + f2.should eq true + agent.value.should eq 2 + end + + it 'waits with sending functions to other agents until update is done' + end + context 'aliases' do it 'aliases #deref for #value' do From 1d054eb82024c8fa352abae7f889012e60eb33c2 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 6 May 2014 15:29:34 +0200 Subject: [PATCH 2/5] Make Agent work with ImmediateExecutor --- lib/concurrent/agent.rb | 20 ++++++++------------ spec/concurrent/agent_spec.rb | 7 +++++++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index d08a4b297..e04cab398 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -174,7 +174,6 @@ def try_rescue(ex) # :nodoc: # @!visibility private def work(&handler) # :nodoc: begin - should_notify = false validator, value = mutex.synchronize { [@validator, @value] } begin @@ -187,18 +186,15 @@ def work(&handler) # :nodoc: exception = ex end - mutex.synchronize do - if !exception && valid - @value = result - should_notify = true - end + mutex.lock + should_notify = if !exception && valid + @value = result + true + end + stashed = @stash.shift || (@being_executed = false) + mutex.unlock - if (stashed = @stash.shift) - @executor.post { work(&stashed) } - else - @being_executed = false - end - end + @executor.post { work(&stashed) } if stashed if should_notify time = Time.now diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index b4080ed1b..31a946cdc 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -160,6 +160,13 @@ def trigger_observable(observable) subject.post sleep(0.1) end + + it 'works with ImmediateExecutor' do + agent = Agent.new(0, executor: ImmediateExecutor.new) + agent.post { |old| old + 1 } + agent.post { |old| old + 1 } + agent.value.should eq 2 + end end context 'fulfillment' do From af42352fb727ae8098768ff9bb718b52d4bb859d Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 6 May 2014 15:30:20 +0200 Subject: [PATCH 3/5] whitespace --- lib/concurrent/agent.rb | 2 +- spec/concurrent/agent_spec.rb | 124 +++++++++++++++++----------------- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index e04cab398..cbc71ef89 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -164,7 +164,7 @@ def <<(block) # @!visibility private def try_rescue(ex) # :nodoc: rescuer = mutex.synchronize do - @rescuers.find{|r| ex.is_a?(r.clazz) } + @rescuers.find { |r| ex.is_a?(r.clazz) } end rescuer.block.call(ex) if rescuer rescue Exception => ex diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index 31a946cdc..b896a3a0a 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -34,18 +34,18 @@ def dereferenceable_observable(opts = {}) end def execute_dereferenceable(subject) - subject.post{|value| 10 } + subject.post { |value| 10 } sleep(0.1) end it_should_behave_like :dereferenceable # observable - - subject{ Agent.new(0) } - + + subject { Agent.new(0) } + def trigger_observable(observable) - observable.post{ nil } + observable.post { nil } sleep(0.1) end @@ -71,25 +71,25 @@ def trigger_observable(observable) it 'uses the executor given with the :executor option' do executor.should_receive(:post).with(any_args).and_return(0) agent = Agent.new(0, executor: executor) - agent.post{|value| 0 } + agent.post { |value| 0 } end it 'uses the global operation pool when :operation is true' do Concurrent.configuration.should_receive(:global_operation_pool).and_return(executor) agent = Agent.new(0, operation: true) - agent.post{|value| 0 } + agent.post { |value| 0 } end it 'uses the global task pool when :task is true' do Concurrent.configuration.should_receive(:global_task_pool).and_return(executor) agent = Agent.new(0, task: true) - agent.post{|value| 0 } + agent.post { |value| 0 } end it 'uses the global task pool by default' do Concurrent.configuration.should_receive(:global_task_pool).and_return(executor) agent = Agent.new(0) - agent.post{|value| 0 } + agent.post { |value| 0 } end end @@ -245,85 +245,85 @@ def trigger_observable(observable) it 'calls the first exception block with a matching class' do @expected = nil subject. - rescue(StandardError) { |ex| @expected = 1 }. - rescue(StandardError) { |ex| @expected = 2 }. - rescue(StandardError) { |ex| @expected = 3 } - subject.post { raise StandardError } - sleep(0.1) - @expected.should eq 1 - end + rescue(StandardError) { |ex| @expected = 1 }. + rescue(StandardError) { |ex| @expected = 2 }. + rescue(StandardError) { |ex| @expected = 3 } + subject.post { raise StandardError } + sleep(0.1) + @expected.should eq 1 + end it 'matches all with a rescue with no class given' do @expected = nil subject. - rescue(LoadError) { |ex| @expected = 1 }. - rescue { |ex| @expected = 2 }. - rescue(StandardError) { |ex| @expected = 3 } - subject.post { raise NoMethodError } - sleep(0.1) - @expected.should eq 2 - end + rescue(LoadError) { |ex| @expected = 1 }. + rescue { |ex| @expected = 2 }. + rescue(StandardError) { |ex| @expected = 3 } + subject.post { raise NoMethodError } + sleep(0.1) + @expected.should eq 2 + end it 'searches associated rescue handlers in order' do @expected = nil subject. - rescue(ArgumentError) { |ex| @expected = 1 }. - rescue(LoadError) { |ex| @expected = 2 }. - rescue(StandardError) { |ex| @expected = 3 } - subject.post { raise ArgumentError } - sleep(0.1) - @expected.should eq 1 + rescue(ArgumentError) { |ex| @expected = 1 }. + rescue(LoadError) { |ex| @expected = 2 }. + rescue(StandardError) { |ex| @expected = 3 } + subject.post { raise ArgumentError } + sleep(0.1) + @expected.should eq 1 - @expected = nil - subject. - rescue(ArgumentError) { |ex| @expected = 1 }. - rescue(LoadError) { |ex| @expected = 2 }. - rescue(StandardError) { |ex| @expected = 3 } - subject.post { raise LoadError } - sleep(0.1) - @expected.should eq 2 - - @expected = nil - subject. - rescue(ArgumentError) { |ex| @expected = 1 }. + @expected = nil + subject. + rescue(ArgumentError) { |ex| @expected = 1 }. rescue(LoadError) { |ex| @expected = 2 }. rescue(StandardError) { |ex| @expected = 3 } - subject.post { raise StandardError } - sleep(0.1) - @expected.should eq 3 - end + subject.post { raise LoadError } + sleep(0.1) + @expected.should eq 2 + + @expected = nil + subject. + rescue(ArgumentError) { |ex| @expected = 1 }. + rescue(LoadError) { |ex| @expected = 2 }. + rescue(StandardError) { |ex| @expected = 3 } + subject.post { raise StandardError } + sleep(0.1) + @expected.should eq 3 + end it 'passes the exception object to the matched block' do @expected = nil subject. - rescue(ArgumentError) { |ex| @expected = ex }. - rescue(LoadError) { |ex| @expected = ex }. - rescue(StandardError) { |ex| @expected = ex } - subject.post { raise StandardError } - sleep(0.1) - @expected.should be_a(StandardError) - end + rescue(ArgumentError) { |ex| @expected = ex }. + rescue(LoadError) { |ex| @expected = ex }. + rescue(StandardError) { |ex| @expected = ex } + subject.post { raise StandardError } + sleep(0.1) + @expected.should be_a(StandardError) + end it 'ignores rescuers without a block' do @expected = nil subject. - rescue(StandardError). - rescue(StandardError) { |ex| @expected = ex } - subject.post { raise StandardError } - sleep(0.1) - @expected.should be_a(StandardError) - end + rescue(StandardError). + rescue(StandardError) { |ex| @expected = ex } + subject.post { raise StandardError } + sleep(0.1) + @expected.should be_a(StandardError) + end it 'supresses the exception if no rescue matches' do lambda { subject. - rescue(ArgumentError) { |ex| @expected = ex }. - rescue(NotImplementedError) { |ex| @expected = ex }. - rescue(NoMethodError) { |ex| @expected = ex } + rescue(ArgumentError) { |ex| @expected = ex }. + rescue(NotImplementedError) { |ex| @expected = ex }. + rescue(NoMethodError) { |ex| @expected = ex } subject.post { raise StandardError } sleep(0.1) }.should_not raise_error - end + end it 'suppresses exceptions thrown from rescue handlers' do lambda { From 39b650a67e9eda509013bfc9959470d491a3868d Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 6 May 2014 15:43:27 +0200 Subject: [PATCH 4/5] Add support for await as in Clojure --- lib/concurrent/agent.rb | 11 +++++++++++ spec/concurrent/agent_spec.rb | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index cbc71ef89..87becd9fb 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -156,6 +156,16 @@ def <<(block) self end + # Waits/blocks until all the updates sent before this call are done. + # + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Boolean] false on timeout, true otherwise + def await(timeout = nil) + done = Event.new + post { done.set } + done.wait timeout + end + private # @!visibility private @@ -168,6 +178,7 @@ def try_rescue(ex) # :nodoc: end rescuer.block.call(ex) if rescuer rescue Exception => ex + # puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}" # supress end diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index b896a3a0a..f7db37d00 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -167,6 +167,25 @@ def trigger_observable(observable) agent.post { |old| old + 1 } agent.value.should eq 2 end + + end + + context '#await' do + + it 'waits until already sent updates are done' do + fn = false + subject.post { fn = true; sleep 0.1 } + subject.await + fn.should be_true + end + + it 'does not waits until updates sent after are done' do + fn = false + subject.await + subject.post { fn = true; sleep 0.1 } + fn.should be_false + end + end context 'fulfillment' do From 026641c1d2aa13d6e2b9cbba7e83e433d703b00d Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 6 May 2014 17:39:19 +0200 Subject: [PATCH 5/5] Extract OneByOne behavior from Agent it can be also later used for Actors --- lib/concurrent/agent.rb | 71 +++++++++++---------------- lib/concurrent/executor/one_by_one.rb | 71 +++++++++++++++++++++++++++ lib/concurrent/executors.rb | 1 + spec/concurrent/agent_spec.rb | 5 +- 4 files changed, 104 insertions(+), 44 deletions(-) create mode 100644 lib/concurrent/executor/one_by_one.rb diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 87becd9fb..86e728533 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -40,7 +40,7 @@ class Agent # is given at initialization TIMEOUT = 5 - attr_reader :timeout + attr_reader :timeout, :executor # Initialize a new Agent with the given initial value and provided options. # @@ -60,14 +60,12 @@ class Agent # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and # returning the value returned from the proc def initialize(initial, opts = {}) - @value = initial - @rescuers = [] - @validator = Proc.new { |result| true } - @timeout = opts.fetch(:timeout, TIMEOUT).freeze - self.observers = CopyOnWriteObserverSet.new - @executor = OptionsParser::get_executor_from(opts) - @being_executed = false - @stash = [] + @value = initial + @rescuers = [] + @validator = Proc.new { |result| true } + @timeout = opts.fetch(:timeout, TIMEOUT).freeze + self.observers = CopyOnWriteObserverSet.new + @executor = OneByOne.new OptionsParser::get_executor_from(opts) init_mutex set_deref_options(opts) end @@ -133,15 +131,7 @@ def validate(&block) # @return [true, nil] nil when no block is given def post(&block) return nil if block.nil? - mutex.lock - post = if @being_executed - @stash << block - false - else - @being_executed = true - end - mutex.unlock - @executor.post { work(&block) } if post + @executor.post { work(&block) } true end @@ -184,36 +174,31 @@ def try_rescue(ex) # :nodoc: # @!visibility private def work(&handler) # :nodoc: + validator, value = mutex.synchronize { [@validator, @value] } + begin - validator, value = mutex.synchronize { [@validator, @value] } - - begin - # FIXME creates second thread - result, valid = Concurrent::timeout(@timeout) do - [result = handler.call(value), - validator.call(result)] - end - rescue Exception => ex - exception = ex + # FIXME creates second thread + result, valid = Concurrent::timeout(@timeout) do + [result = handler.call(value), + validator.call(result)] end + rescue Exception => ex + exception = ex + end - mutex.lock - should_notify = if !exception && valid - @value = result - true - end - stashed = @stash.shift || (@being_executed = false) - mutex.unlock - - @executor.post { work(&stashed) } if stashed - - if should_notify - time = Time.now - observers.notify_observers { [time, self.value] } - end + mutex.lock + should_notify = if !exception && valid + @value = result + true + end + mutex.unlock - try_rescue(exception) + if should_notify + time = Time.now + observers.notify_observers { [time, self.value] } end + + try_rescue(exception) end end end diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/one_by_one.rb new file mode 100644 index 000000000..a84385e8d --- /dev/null +++ b/lib/concurrent/executor/one_by_one.rb @@ -0,0 +1,71 @@ +module Concurrent + + # Ensures that jobs are passed to the underlying executor one by one, + # never running at the same time. + class OneByOne + + attr_reader :executor + + Job = Struct.new(:args, :block) do + def call + block.call *args + end + end + + # @param [Executor] executor + def initialize(executor) + @executor = executor + @being_executed = false + @stash = [] + @mutex = Mutex.new + end + + # Submit a task to the executor for asynchronous processing. + # + # @param [Array] args zero or more arguments to be passed to the task + # + # @yield the asynchronous task to perform + # + # @return [Boolean] `true` if the task is queued, `false` if the executor + # is not running + # + # @raise [ArgumentError] if no task is given + def post(*args, &task) + return nil if task.nil? + job = Job.new args, task + @mutex.lock + post = if @being_executed + @stash << job + false + else + @being_executed = true + end + @mutex.unlock + @executor.post { work(job) } if post + true + end + + # Submit a task to the executor for asynchronous processing. + # + # @param [Proc] task the asynchronous task to perform + # + # @return [self] returns itself + def <<(task) + post(&task) + self + end + + private + + # ensures next job is executed if any is stashed + def work(job) + job.call + ensure + @mutex.lock + job = @stash.shift || (@being_executed = false) + @mutex.unlock + @executor.post { work(job) } if job + end + + end +end diff --git a/lib/concurrent/executors.rb b/lib/concurrent/executors.rb index 141d250df..ff6ced453 100644 --- a/lib/concurrent/executors.rb +++ b/lib/concurrent/executors.rb @@ -6,3 +6,4 @@ require 'concurrent/executor/single_thread_executor' require 'concurrent/executor/thread_pool_executor' require 'concurrent/executor/timer_set' +require 'concurrent/executor/one_by_one' diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index f7db37d00..1901d7afe 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -152,7 +152,10 @@ def trigger_observable(observable) subject.post { nil } subject.post { nil } sleep(0.1) - subject.instance_variable_get(:@stash).size.should eq 2 + subject. + executor. + instance_variable_get(:@stash). + size.should eq 2 end it 'does not add to the queue when no block is given' do