From b87a79458e7e1f41c0da7c1158b0caf68f1b70e2 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Wed, 25 Mar 2015 19:10:20 -0400 Subject: [PATCH 01/15] Promise extends IVar --- lib/concurrent/promise.rb | 39 +++++++++++------ spec/concurrent/promise_spec.rb | 74 +++++++++++++++++++++++++-------- 2 files changed, 82 insertions(+), 31 deletions(-) diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index f7cb9b1a0..649c56aff 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -1,5 +1,6 @@ require 'thread' +require 'concurrent/ivar' require 'concurrent/obligation' require 'concurrent/executor/executor_options' @@ -181,8 +182,7 @@ module Concurrent # - `on_success { |result| ... }` is the same as `then {|result| ... }` # - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` # - `rescue` is aliased by `catch` and `on_error` - class Promise - include Obligation + class Promise < IVar include ExecutorOptions # Initialize a new Promise with the provided options. @@ -203,6 +203,7 @@ class Promise # @see http://promises-aplus.github.io/promises-spec/ def initialize(opts = {}, &block) opts.delete_if { |k, v| v.nil? } + super(IVar::NO_VALUE, opts) @executor = get_executor_from(opts) || Concurrent.global_io_executor @args = get_arguments_from(opts) @@ -214,9 +215,6 @@ def initialize(opts = {}, &block) @promise_body = block || Proc.new { |result| result } @state = :unscheduled @children = [] - - init_obligation - set_deref_options(opts) end # @return [Promise] @@ -224,7 +222,6 @@ def self.fulfill(value, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) } end - # @return [Promise] def self.reject(reason, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) } @@ -388,6 +385,18 @@ def self.any?(*promises) aggregate(:any?, *promises) end + def set(value) + raise PromiseExecutionError.new('supported only on root promises') unless root? + super + end + + def fail(reason = StandardError.new) + raise PromiseExecutionError.new('supported only on root promises') unless root? + super + end + + protected :complete + protected # Aggregate a collection of zero or more promises under a composite promise, @@ -444,17 +453,21 @@ def notify_child(child) if_state(:rejected) { child.on_reject(@reason) } end + # @!visibility private + def complete(success, value, reason) + children_to_notify = mutex.synchronize do + set_state!(success, value, reason) + @children.dup + end + + children_to_notify.each { |child| notify_child(child) } + end + # @!visibility private def realize(task) @executor.post do success, value, reason = SafeTaskExecutor.new(task).execute(*@args) - - children_to_notify = mutex.synchronize do - set_state!(success, value, reason) - @children.dup - end - - children_to_notify.each { |child| notify_child(child) } + complete(success, value, reason) end end diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index a7d86ec07..5df7c41d5 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -374,13 +374,13 @@ def get_ivar_from_args(opts) composite = Promise.all?(promise1, promise2, promise3). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #then condition when no promises are given' do counter = Concurrent::AtomicFixnum.new(0) @@ -388,13 +388,13 @@ def get_ivar_from_args(opts) composite = Promise.all?. then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #rescue handler if even one component fails' do counter = Concurrent::AtomicFixnum.new(0) @@ -402,13 +402,13 @@ def get_ivar_from_args(opts) composite = Promise.all?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq -1 - end + end end describe '.any?' do @@ -429,13 +429,13 @@ def get_ivar_from_args(opts) composite = Promise.any?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #then condition when no promises are given' do counter = Concurrent::AtomicFixnum.new(0) @@ -443,13 +443,13 @@ def get_ivar_from_args(opts) composite = Promise.any?. then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #rescue handler if all componenst fail' do counter = Concurrent::AtomicFixnum.new(0) @@ -457,18 +457,56 @@ def get_ivar_from_args(opts) composite = Promise.any?(rejected_subject, rejected_subject, rejected_subject, rejected_subject). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq -1 - end + end end end context 'fulfillment' do + context '#set' do + + it '#can only be called on the root promise' do + root = Promise.new{ :foo } + child = root.then{ :bar } + + expect { child.set('foo') }.to raise_error PromiseExecutionError + expect { root.set('foo') }.not_to raise_error + end + + it 'triggers children' do + expected = nil + root = Promise.new(executor: :immediate){ nil } + root.then{ |result| expected = result } + root.set(20) + expect(expected).to eq 20 + end + end + + context '#fail' do + + it 'can only be called on the root promise' do + root = Promise.new{ :foo } + child = root.then{ :bar } + + expect { child.fail }.to raise_error PromiseExecutionError + expect { root.fail }.not_to raise_error + end + + it 'rejects children' do + expected = nil + root = Promise.new(executor: :immediate) + root.then(Proc.new{ |reason| expected = reason }) + root.fail(ArgumentError.new('simulated error')) + expect(expected).to be_a ArgumentError + end + end + it 'passes the result of each block to all its children' do expected = nil Promise.new(executor: executor){ 20 }.then{ |result| expected = result }.execute From e789622e63634f399a11533dea760233c7fcb746 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Wed, 25 Mar 2015 22:18:10 -0400 Subject: [PATCH 02/15] IVar and Promise can be set with a block. --- lib/concurrent/ivar.rb | 13 +++++++++++-- lib/concurrent/promise.rb | 6 +++--- spec/concurrent/ivar_spec.rb | 34 +++++++++++++++++++++++++++++++++ spec/concurrent/promise_spec.rb | 12 ++++++++++++ 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 9d3d1e292..3c6fa1c70 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -105,8 +105,17 @@ def add_observer(observer = nil, func = :update, &block) # @param [Object] value the value to store in the `IVar` # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already # been set or otherwise completed - def set(value) - complete(true, value, nil) + def set(value = NO_VALUE) + if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE) + raise ArgumentError.new('must set with either a value or a block') + end + + begin + value = yield if block_given? + complete(true, value, nil) + rescue => ex + complete(false, nil, ex) + end end # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 649c56aff..0e4fbe7b1 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -385,13 +385,13 @@ def self.any?(*promises) aggregate(:any?, *promises) end - def set(value) - raise PromiseExecutionError.new('supported only on root promises') unless root? + def set(value = IVar::NO_VALUE) + raise PromiseExecutionError.new('supported only on root promise') unless root? super end def fail(reason = StandardError.new) - raise PromiseExecutionError.new('supported only on root promises') unless root? + raise PromiseExecutionError.new('supported only on root promise') unless root? super end diff --git a/spec/concurrent/ivar_spec.rb b/spec/concurrent/ivar_spec.rb index d3d007a6f..19990f4cb 100644 --- a/spec/concurrent/ivar_spec.rb +++ b/spec/concurrent/ivar_spec.rb @@ -115,6 +115,33 @@ def trigger_observable(observable) i = IVar.new expect(i.set(42)).to eq i end + + it 'fulfils when given a block which executes successfully' do + i = IVar.new + i.set{ 42 } + expect(i.value).to eq 42 + end + + it 'rejects when given a block which raises an exception' do + i = IVar.new + expected = ArgumentError.new + i.set{ raise expected } + expect(i.reason).to eq expected + end + + it 'raises an exception when given a value and a block' do + i = IVar.new + expect { + i.set(42){ :guide } + }.to raise_error(ArgumentError) + end + + it 'raises an exception when given neither a value nor a block' do + i = IVar.new + expect { + i.set + }.to raise_error(ArgumentError) + end end context '#fail' do @@ -131,6 +158,13 @@ def trigger_observable(observable) expect(i.value).to be_nil end + it 'sets the reason to the given exception' do + i = IVar.new + expected = ArgumentError.new + i.fail(expected) + expect(i.reason).to eq expected + end + it 'raises an exception if set more than once' do i = IVar.new i.fail diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index 5df7c41d5..346dc5c74 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -486,6 +486,18 @@ def get_ivar_from_args(opts) root.set(20) expect(expected).to eq 20 end + + it 'can be called with a block' do + p = Promise.new(executor: executor) + ch = p.then(&:to_s) + p.set { :value } + + expect(p.value).to eq :value + expect(p.state).to eq :fulfilled + + expect(ch.value).to eq 'value' + expect(ch.state).to eq :fulfilled + end end context '#fail' do From 700ddcbd33d6f1d66bd2dea1c86d5f2af9310423 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Thu, 26 Mar 2015 12:22:41 -0400 Subject: [PATCH 03/15] Promise#set now calls #execute. --- lib/concurrent/promise.rb | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 0e4fbe7b1..62a092b49 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -240,6 +240,26 @@ def execute self end + def set(value = IVar::NO_VALUE, &block) + raise PromiseExecutionError.new('supported only on root promise') unless root? + if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE) + raise ArgumentError.new('must set with either a value or a block') + end + mutex.synchronize do + if @state != :unscheduled + raise PromiseExecutionError.new('execution has already begun') + else + @promise_body = block || Proc.new { |result| value } + end + end + execute + end + + def fail(reason = StandardError.new) + raise PromiseExecutionError.new('supported only on root promise') unless root? + super + end + # Create a new `Promise` object with the given block, execute it, and return the # `:pending` object. # @@ -385,16 +405,6 @@ def self.any?(*promises) aggregate(:any?, *promises) end - def set(value = IVar::NO_VALUE) - raise PromiseExecutionError.new('supported only on root promise') unless root? - super - end - - def fail(reason = StandardError.new) - raise PromiseExecutionError.new('supported only on root promise') unless root? - super - end - protected :complete protected From 49f6718c8ae3bcbcdc3484ea32baa481ae60f35b Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Thu, 26 Mar 2015 23:44:18 -0400 Subject: [PATCH 04/15] Added shared specs for IVar, Future, and Promise. --- lib/concurrent/async.rb | 7 +-- lib/concurrent/future.rb | 14 ++++- lib/concurrent/ivar.rb | 22 ++++--- lib/concurrent/obligation.rb | 2 +- lib/concurrent/promise.rb | 11 +--- spec/concurrent/future_spec.rb | 23 ++----- spec/concurrent/ivar_shared.rb | 88 +++++++++++++++++++++++++++ spec/concurrent/ivar_spec.rb | 102 +++----------------------------- spec/concurrent/promise_spec.rb | 6 ++ 9 files changed, 140 insertions(+), 135 deletions(-) create mode 100644 spec/concurrent/ivar_shared.rb diff --git a/lib/concurrent/async.rb b/lib/concurrent/async.rb index bbf269d82..762518140 100644 --- a/lib/concurrent/async.rb +++ b/lib/concurrent/async.rb @@ -82,14 +82,11 @@ def method_missing(method, *args, &block) self.define_singleton_method(method) do |*args2| Async::validate_argc(@delegate, method, *args2) ivar = Concurrent::IVar.new - value, reason = nil, nil @serializer.post(@executor.value) do begin - value = @delegate.send(method, *args2, &block) + ivar.set(@delegate.send(method, *args2, &block)) rescue => reason - # caught - ensure - ivar.complete(reason.nil?, value, reason) + ivar.fail(reason) end end ivar.value if @blocking diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index 58d29bbbe..71284c40d 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -76,7 +76,19 @@ def self.execute(opts = {}, &block) Future.new(opts, &block).execute end - protected :set, :fail, :complete + def set(value = IVar::NO_VALUE, &block) + check_for_block_or_value!(block_given?, value) + mutex.synchronize do + if @state != :unscheduled + raise MultipleAssignmentError + else + @task = block || Proc.new { value } + end + end + execute + end + + protected :complete private diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 3c6fa1c70..0c0072edd 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -60,12 +60,9 @@ def initialize(value = NO_VALUE, opts = {}) init_obligation self.observers = CopyOnWriteObserverSet.new set_deref_options(opts) + @state = :pending - if value == NO_VALUE - @state = :pending - else - set(value) - end + set(value) unless value == NO_VALUE end # Add an observer on this object that will receive notification on update. @@ -108,6 +105,8 @@ def add_observer(observer = nil, func = :update, &block) def set(value = NO_VALUE) if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE) raise ArgumentError.new('must set with either a value or a block') + elsif ! compare_and_set_state(:processing, :pending) + raise MultipleAssignmentError end begin @@ -124,13 +123,15 @@ def set(value = NO_VALUE) # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already # been set or otherwise completed def fail(reason = StandardError.new) - complete(false, nil, reason) + set { raise reason } end + protected + # @!visibility private def complete(success, value, reason) # :nodoc: mutex.synchronize do - raise MultipleAssignmentError.new('multiple assignment') if [:fulfilled, :rejected].include? @state + raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state set_state(success, value, reason) event.set end @@ -139,5 +140,12 @@ def complete(success, value, reason) # :nodoc: observers.notify_and_delete_observers{ [time, self.value, reason] } self end + + # @!visibility private + def check_for_block_or_value!(block_given, value) # :nodoc: + if (block_given && value != NO_VALUE) || (! block_given && value == NO_VALUE) + raise ArgumentError.new('must set with either a value or a block') + end + end end end diff --git a/lib/concurrent/obligation.rb b/lib/concurrent/obligation.rb index f3cdf19c0..22b66d1d1 100644 --- a/lib/concurrent/obligation.rb +++ b/lib/concurrent/obligation.rb @@ -59,7 +59,7 @@ def completed? # # @return [Boolean] def incomplete? - [:unscheduled, :pending].include? state + ! complete? end # The current value of the obligation. Will be `nil` while the state is diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 62a092b49..2dca7253c 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -242,12 +242,10 @@ def execute def set(value = IVar::NO_VALUE, &block) raise PromiseExecutionError.new('supported only on root promise') unless root? - if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE) - raise ArgumentError.new('must set with either a value or a block') - end + check_for_block_or_value!(block_given?, value) mutex.synchronize do if @state != :unscheduled - raise PromiseExecutionError.new('execution has already begun') + raise MultipleAssignmentError else @promise_body = block || Proc.new { |result| value } end @@ -255,11 +253,6 @@ def set(value = IVar::NO_VALUE, &block) execute end - def fail(reason = StandardError.new) - raise PromiseExecutionError.new('supported only on root promise') unless root? - super - end - # Create a new `Promise` object with the given block, execute it, and return the # `:pending` object. # diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index 78a2d6165..9991ff980 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -1,4 +1,5 @@ require_relative 'dereferenceable_shared' +require_relative 'ivar_shared' require_relative 'obligation_shared' require_relative 'observable_shared' require_relative 'thread_arguments_shared' @@ -16,6 +17,11 @@ module Concurrent }.execute.tap{ sleep(0.1) } end + context 'manual completion' do + subject { Future.new(executor: :immediate){ nil } } + it_should_behave_like :ivar + end + context 'behavior' do # thread_arguments @@ -80,23 +86,6 @@ def trigger_observable(observable) it_should_behave_like :observable end - context 'subclassing' do - - subject{ Future.execute(executor: executor){ 42 } } - - it 'protects #set' do - expect{ subject.set(100) }.to raise_error - end - - it 'protects #fail' do - expect{ subject.fail }.to raise_error - end - - it 'protects #complete' do - expect{ subject.complete(true, 100, nil) }.to raise_error - end - end - context '#initialize' do let(:executor) { ImmediateExecutor.new } diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb new file mode 100644 index 000000000..2f95b0338 --- /dev/null +++ b/spec/concurrent/ivar_shared.rb @@ -0,0 +1,88 @@ +shared_examples :ivar do + + context 'initialization' do + + it 'sets the state to incomplete' do + expect(subject).to be_incomplete + end + end + + context '#set' do + + it 'sets the state to be fulfilled' do + subject.set(14) + expect(subject).to be_fulfilled + end + + it 'sets the value' do + subject.set(14) + expect(subject.value).to eq 14 + end + + it 'raises an exception if set more than once' do + subject.set(14) + expect {subject.set(2)}.to raise_error(Concurrent::MultipleAssignmentError) + expect(subject.value).to eq 14 + end + + it 'returns self' do + expect(subject.set(42)).to eq subject + end + it 'fulfils when given a block which executes successfully' do + subject.set{ 42 } + expect(subject.value).to eq 42 + end + + it 'rejects when given a block which raises an exception' do + expected = ArgumentError.new + subject.set{ raise expected } + expect(subject.reason).to eq expected + end + + it 'raises an exception when given a value and a block' do + expect { + subject.set(42){ :guide } + }.to raise_error(ArgumentError) + end + + it 'raises an exception when given neither a value nor a block' do + expect { + subject.set + }.to raise_error(ArgumentError) + end + end + + context '#fail' do + + it 'sets the state to be rejected' do + subject.fail + expect(subject).to be_rejected + end + + it 'sets the value to be nil' do + subject.fail + expect(subject.value).to be_nil + end + + it 'sets the reason to the given exception' do + expected = ArgumentError.new + subject.fail(expected) + expect(subject.reason).to eq expected + end + + it 'raises an exception if set more than once' do + subject.fail + expect {subject.fail}.to raise_error(Concurrent::MultipleAssignmentError) + expect(subject.value).to be_nil + end + + it 'defaults the reason to a StandardError' do + subject.fail + expect(subject.reason).to be_a StandardError + end + + it 'returns self' do + expect(subject.fail).to eq subject + end + end +end diff --git a/spec/concurrent/ivar_spec.rb b/spec/concurrent/ivar_spec.rb index 19990f4cb..4b2b85627 100644 --- a/spec/concurrent/ivar_spec.rb +++ b/spec/concurrent/ivar_spec.rb @@ -1,4 +1,5 @@ require_relative 'dereferenceable_shared' +require_relative 'ivar_shared' require_relative 'obligation_shared' require_relative 'observable_shared' @@ -14,6 +15,11 @@ module Concurrent i end + context 'manual completion' do + subject{ IVar.new } + it_should_behave_like :ivar + end + context 'behavior' do # obligation @@ -85,105 +91,11 @@ def trigger_observable(observable) it 'can set an initial value' do i = IVar.new(14) - expect(i).to be_completed + expect(i).to be_complete end end - context '#set' do - - it 'sets the state to be fulfilled' do - i = IVar.new - i.set(14) - expect(i).to be_fulfilled - end - - it 'sets the value' do - i = IVar.new - i.set(14) - expect(i.value).to eq 14 - end - - it 'raises an exception if set more than once' do - i = IVar.new - i.set(14) - expect {i.set(2)}.to raise_error(Concurrent::MultipleAssignmentError) - expect(i.value).to eq 14 - end - - it 'returns self' do - i = IVar.new - expect(i.set(42)).to eq i - end - - it 'fulfils when given a block which executes successfully' do - i = IVar.new - i.set{ 42 } - expect(i.value).to eq 42 - end - - it 'rejects when given a block which raises an exception' do - i = IVar.new - expected = ArgumentError.new - i.set{ raise expected } - expect(i.reason).to eq expected - end - - it 'raises an exception when given a value and a block' do - i = IVar.new - expect { - i.set(42){ :guide } - }.to raise_error(ArgumentError) - end - - it 'raises an exception when given neither a value nor a block' do - i = IVar.new - expect { - i.set - }.to raise_error(ArgumentError) - end - end - - context '#fail' do - - it 'sets the state to be rejected' do - i = IVar.new - i.fail - expect(i).to be_rejected - end - - it 'sets the value to be nil' do - i = IVar.new - i.fail - expect(i.value).to be_nil - end - - it 'sets the reason to the given exception' do - i = IVar.new - expected = ArgumentError.new - i.fail(expected) - expect(i.reason).to eq expected - end - - it 'raises an exception if set more than once' do - i = IVar.new - i.fail - expect {i.fail}.to raise_error(Concurrent::MultipleAssignmentError) - expect(i.value).to be_nil - end - - it 'defaults the reason to a StandardError' do - i = IVar.new - i.fail - expect(i.reason).to be_a StandardError - end - - it 'returns self' do - i = IVar.new - expect(i.fail).to eq i - end - end - context 'observation' do let(:clazz) do diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index 346dc5c74..aad473f77 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -1,3 +1,4 @@ +require_relative 'ivar_shared' require_relative 'obligation_shared' require_relative 'thread_arguments_shared' @@ -23,6 +24,11 @@ module Concurrent Promise.reject(rejected_reason, executor: executor) end + context 'manual completion' do + subject{ Promise.new(executor: :immediate) } + it_should_behave_like :ivar + end + context 'behavior' do # thread_arguments From 9e3a27f6b569693e5af744f50c5483d05527a834 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 27 Mar 2015 00:09:41 -0400 Subject: [PATCH 05/15] Obligation shared specs now called from IVar shared specs. --- lib/concurrent/ivar.rb | 2 + spec/concurrent/future_spec.rb | 54 +++++++++++---------------- spec/concurrent/ivar_shared.rb | 4 ++ spec/concurrent/ivar_spec.rb | 65 +++++++++++---------------------- spec/concurrent/promise_spec.rb | 16 ++------ 5 files changed, 52 insertions(+), 89 deletions(-) diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 0c0072edd..262657a9f 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -102,6 +102,7 @@ def add_observer(observer = nil, func = :update, &block) # @param [Object] value the value to store in the `IVar` # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already # been set or otherwise completed + # @return [IVar] self def set(value = NO_VALUE) if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE) raise ArgumentError.new('must set with either a value or a block') @@ -122,6 +123,7 @@ def set(value = NO_VALUE) # @param [Object] reason for the failure # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already # been set or otherwise completed + # @return [IVar] self def fail(reason = StandardError.new) set { raise reason } end diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index 9991ff980..4d7dde1ba 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -1,6 +1,5 @@ require_relative 'dereferenceable_shared' require_relative 'ivar_shared' -require_relative 'obligation_shared' require_relative 'observable_shared' require_relative 'thread_arguments_shared' @@ -17,14 +16,26 @@ module Concurrent }.execute.tap{ sleep(0.1) } end - context 'manual completion' do - subject { Future.new(executor: :immediate){ nil } } - it_should_behave_like :ivar + let!(:fulfilled_value) { 10 } + let!(:rejected_reason) { StandardError.new('mojo jojo') } + + let(:pending_subject) do + Future.new(executor: executor){ sleep(0.1); fulfilled_value }.execute end - context 'behavior' do + let(:fulfilled_subject) do + Future.new(executor: executor){ fulfilled_value }.execute.tap{ sleep(0.1) } + end - # thread_arguments + let(:rejected_subject) do + Future.new(executor: executor){ raise rejected_reason }.execute.tap{ sleep(0.1) } + end + + it_should_behave_like :ivar do + subject { Future.new(executor: :immediate){ nil } } + end + + it_should_behave_like :thread_arguments do def get_ivar_from_no_args Concurrent::Future.execute{|*args| args } @@ -33,29 +44,9 @@ def get_ivar_from_no_args def get_ivar_from_args(opts) Concurrent::Future.execute(opts){|*args| args } end + end - it_should_behave_like :thread_arguments - - # obligation - - let!(:fulfilled_value) { 10 } - let!(:rejected_reason) { StandardError.new('mojo jojo') } - - let(:pending_subject) do - Future.new(executor: executor){ sleep(3); fulfilled_value }.execute - end - - let(:fulfilled_subject) do - Future.new(executor: executor){ fulfilled_value }.execute.tap{ sleep(0.1) } - end - - let(:rejected_subject) do - Future.new(executor: executor){ raise rejected_reason }.execute.tap{ sleep(0.1) } - end - - it_should_behave_like :obligation - - # dereferenceable + it_should_behave_like :dereferenceable do def dereferenceable_subject(value, opts = {}) opts = opts.merge(executor: executor) @@ -71,10 +62,9 @@ def execute_dereferenceable(subject) subject.execute sleep(0.1) end + end - it_should_behave_like :dereferenceable - - # observable + it_should_behave_like :observable do subject{ Future.new{ nil } } @@ -82,8 +72,6 @@ def trigger_observable(observable) observable.execute sleep(0.1) end - - it_should_behave_like :observable end context '#initialize' do diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb index 2f95b0338..d1c178804 100644 --- a/spec/concurrent/ivar_shared.rb +++ b/spec/concurrent/ivar_shared.rb @@ -1,5 +1,9 @@ +require_relative 'obligation_shared' + shared_examples :ivar do + it_should_behave_like :obligation + context 'initialization' do it 'sets the state to incomplete' do diff --git a/spec/concurrent/ivar_spec.rb b/spec/concurrent/ivar_spec.rb index 4b2b85627..908533783 100644 --- a/spec/concurrent/ivar_spec.rb +++ b/spec/concurrent/ivar_spec.rb @@ -1,6 +1,5 @@ require_relative 'dereferenceable_shared' require_relative 'ivar_shared' -require_relative 'obligation_shared' require_relative 'observable_shared' module Concurrent @@ -9,48 +8,33 @@ module Concurrent let!(:value) { 10 } - subject do - i = IVar.new - i.set(14) - i - end - - context 'manual completion' do - subject{ IVar.new } - it_should_behave_like :ivar - end - - context 'behavior' do - - # obligation + let!(:fulfilled_value) { 10 } + let(:rejected_reason) { StandardError.new('Boom!') } - let!(:fulfilled_value) { 10 } - let(:rejected_reason) { StandardError.new('Boom!') } + subject { IVar.new(value) } - let(:pending_subject) do - @i = IVar.new - Thread.new do - sleep(3) - @i.set(fulfilled_value) - end - @i + let(:pending_subject) do + ivar = IVar.new + Thread.new do + sleep(0.1) + ivar.set(fulfilled_value) end + ivar + end - let(:fulfilled_subject) do - i = IVar.new - i.set(fulfilled_value) - i - end + let(:fulfilled_subject) do + IVar.new.set(fulfilled_value) + end - let(:rejected_subject) do - i = IVar.new - i.fail(rejected_reason) - i - end + let(:rejected_subject) do + IVar.new.fail(rejected_reason) + end - it_should_behave_like :obligation + it_should_behave_like :ivar do + subject{ IVar.new } + end - # dereferenceable + it_should_behave_like :dereferenceable do def dereferenceable_subject(value, opts = {}) IVar.new(value, opts) @@ -63,18 +47,15 @@ def dereferenceable_observable(opts = {}) def execute_dereferenceable(subject) subject.set('value') end + end - it_should_behave_like :dereferenceable - - # observable + it_should_behave_like :observable do subject{ IVar.new } def trigger_observable(observable) observable.set('value') end - - it_should_behave_like :observable end context '#initialize' do @@ -93,7 +74,6 @@ def trigger_observable(observable) i = IVar.new(14) expect(i).to be_complete end - end context 'observation' do @@ -154,7 +134,6 @@ def reentrant_observer(i) expect(obs.value).to eq 42 end end - end end end diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index aad473f77..699d52c1c 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -1,5 +1,4 @@ require_relative 'ivar_shared' -require_relative 'obligation_shared' require_relative 'thread_arguments_shared' module Concurrent @@ -13,7 +12,7 @@ module Concurrent let!(:rejected_reason) { StandardError.new('mojo jojo') } let(:pending_subject) do - Promise.new(executor: executor){ sleep(0.3); fulfilled_value }.execute + Promise.new(executor: executor){ sleep(0.1); fulfilled_value }.execute end let(:fulfilled_subject) do @@ -24,14 +23,11 @@ module Concurrent Promise.reject(rejected_reason, executor: executor) end - context 'manual completion' do + it_should_behave_like :ivar do subject{ Promise.new(executor: :immediate) } - it_should_behave_like :ivar end - context 'behavior' do - - # thread_arguments + it_should_behave_like :thread_arguments do def get_ivar_from_no_args Concurrent::Promise.execute{|*args| args } @@ -40,12 +36,6 @@ def get_ivar_from_no_args def get_ivar_from_args(opts) Concurrent::Promise.execute(opts){|*args| args } end - - it_should_behave_like :thread_arguments - - # obligation - - it_should_behave_like :obligation end it 'includes Dereferenceable' do From c7984f3f7b02b5c7fa9b1ddede47c0b054b94c61 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 27 Mar 2015 00:32:03 -0400 Subject: [PATCH 06/15] Dereferenceable shared specs now called from IVar shared specs. --- lib/concurrent/ivar.rb | 1 - lib/concurrent/promise.rb | 3 +-- spec/concurrent/future_spec.rb | 28 ++++++++++++---------------- spec/concurrent/ivar_shared.rb | 2 ++ spec/concurrent/ivar_spec.rb | 4 ---- spec/concurrent/promise_spec.rb | 23 +++++++++++++++++------ 6 files changed, 32 insertions(+), 29 deletions(-) diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 262657a9f..c808047cb 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -39,7 +39,6 @@ module Concurrent # ivar.get #=> 14 # ivar.set 2 # would now be an error class IVar - include Obligation include Observable diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 2dca7253c..951190cd4 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -398,8 +398,6 @@ def self.any?(*promises) aggregate(:any?, *promises) end - protected :complete - protected # Aggregate a collection of zero or more promises under a composite promise, @@ -464,6 +462,7 @@ def complete(success, value, reason) end children_to_notify.each { |child| notify_child(child) } + observers.notify_and_delete_observers{ [Time.now, self.value, reason] } end # @!visibility private diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index 4d7dde1ba..ecf4beb8b 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -1,4 +1,3 @@ -require_relative 'dereferenceable_shared' require_relative 'ivar_shared' require_relative 'observable_shared' require_relative 'thread_arguments_shared' @@ -32,21 +31,7 @@ module Concurrent end it_should_behave_like :ivar do - subject { Future.new(executor: :immediate){ nil } } - end - - it_should_behave_like :thread_arguments do - - def get_ivar_from_no_args - Concurrent::Future.execute{|*args| args } - end - - def get_ivar_from_args(opts) - Concurrent::Future.execute(opts){|*args| args } - end - end - - it_should_behave_like :dereferenceable do + subject { Future.new(executor: :immediate){ value } } def dereferenceable_subject(value, opts = {}) opts = opts.merge(executor: executor) @@ -64,6 +49,17 @@ def execute_dereferenceable(subject) end end + it_should_behave_like :thread_arguments do + + def get_ivar_from_no_args + Concurrent::Future.execute{|*args| args } + end + + def get_ivar_from_args(opts) + Concurrent::Future.execute(opts){|*args| args } + end + end + it_should_behave_like :observable do subject{ Future.new{ nil } } diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb index d1c178804..feb9ed2a0 100644 --- a/spec/concurrent/ivar_shared.rb +++ b/spec/concurrent/ivar_shared.rb @@ -1,8 +1,10 @@ +require_relative 'dereferenceable_shared' require_relative 'obligation_shared' shared_examples :ivar do it_should_behave_like :obligation + it_should_behave_like :dereferenceable context 'initialization' do diff --git a/spec/concurrent/ivar_spec.rb b/spec/concurrent/ivar_spec.rb index 908533783..1fa9ad1e3 100644 --- a/spec/concurrent/ivar_spec.rb +++ b/spec/concurrent/ivar_spec.rb @@ -1,4 +1,3 @@ -require_relative 'dereferenceable_shared' require_relative 'ivar_shared' require_relative 'observable_shared' @@ -32,9 +31,6 @@ module Concurrent it_should_behave_like :ivar do subject{ IVar.new } - end - - it_should_behave_like :dereferenceable do def dereferenceable_subject(value, opts = {}) IVar.new(value, opts) diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index 699d52c1c..1398891a7 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -5,6 +5,7 @@ module Concurrent describe Promise do + let!(:value) { 10 } let(:executor) { PerThreadExecutor.new } let(:empty_root) { Promise.new(executor: executor){ nil } } @@ -24,7 +25,22 @@ module Concurrent end it_should_behave_like :ivar do - subject{ Promise.new(executor: :immediate) } + subject{ Promise.new(executor: :immediate){ value } } + + def dereferenceable_subject(value, opts = {}) + opts = opts.merge(executor: executor) + Promise.new(opts){ value }.execute.tap{ sleep(0.1) } + end + + def dereferenceable_observable(opts = {}) + opts = opts.merge(executor: executor) + Promise.new(opts){ 'value' } + end + + def execute_dereferenceable(subject) + subject.execute + sleep(0.1) + end end it_should_behave_like :thread_arguments do @@ -38,11 +54,6 @@ def get_ivar_from_args(opts) end end - it 'includes Dereferenceable' do - promise = Promise.new{ nil } - expect(promise).to be_a(Dereferenceable) - end - context 'initializers' do describe '.fulfill' do From e037eeddb49249f40c8afbb05cf97b194805e7e4 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 27 Mar 2015 00:37:24 -0400 Subject: [PATCH 07/15] Observable shared specs now called from IVar shared specs. --- spec/concurrent/future_spec.rb | 16 +++++----------- spec/concurrent/ivar_shared.rb | 2 ++ spec/concurrent/ivar_spec.rb | 6 ------ spec/concurrent/promise_spec.rb | 5 +++++ 4 files changed, 12 insertions(+), 17 deletions(-) diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index ecf4beb8b..0450a3599 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -1,5 +1,4 @@ require_relative 'ivar_shared' -require_relative 'observable_shared' require_relative 'thread_arguments_shared' module Concurrent @@ -47,6 +46,11 @@ def execute_dereferenceable(subject) subject.execute sleep(0.1) end + + def trigger_observable(observable) + observable.execute + sleep(0.1) + end end it_should_behave_like :thread_arguments do @@ -60,16 +64,6 @@ def get_ivar_from_args(opts) end end - it_should_behave_like :observable do - - subject{ Future.new{ nil } } - - def trigger_observable(observable) - observable.execute - sleep(0.1) - end - end - context '#initialize' do let(:executor) { ImmediateExecutor.new } diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb index feb9ed2a0..a2a66fb4a 100644 --- a/spec/concurrent/ivar_shared.rb +++ b/spec/concurrent/ivar_shared.rb @@ -1,10 +1,12 @@ require_relative 'dereferenceable_shared' require_relative 'obligation_shared' +require_relative 'observable_shared' shared_examples :ivar do it_should_behave_like :obligation it_should_behave_like :dereferenceable + it_should_behave_like :observable context 'initialization' do diff --git a/spec/concurrent/ivar_spec.rb b/spec/concurrent/ivar_spec.rb index 1fa9ad1e3..cee303d98 100644 --- a/spec/concurrent/ivar_spec.rb +++ b/spec/concurrent/ivar_spec.rb @@ -1,5 +1,4 @@ require_relative 'ivar_shared' -require_relative 'observable_shared' module Concurrent @@ -43,11 +42,6 @@ def dereferenceable_observable(opts = {}) def execute_dereferenceable(subject) subject.set('value') end - end - - it_should_behave_like :observable do - - subject{ IVar.new } def trigger_observable(observable) observable.set('value') diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index 1398891a7..150fe81ed 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -41,6 +41,11 @@ def execute_dereferenceable(subject) subject.execute sleep(0.1) end + + def trigger_observable(observable) + observable.execute + sleep(0.1) + end end it_should_behave_like :thread_arguments do From ff77ec72b4475612a43fdc23f339d0468cf45513 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 27 Mar 2015 01:32:59 -0400 Subject: [PATCH 08/15] Added IVar#set? to unify Channel::Probe. --- lib/concurrent/ivar.rb | 7 ++++ spec/concurrent/ivar_shared.rb | 62 ++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index c808047cb..dfd8d2119 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -127,6 +127,13 @@ def fail(reason = StandardError.new) set { raise reason } end + def set?(value = NO_VALUE, &block) + set(value, &block) + true + rescue MultipleAssignmentError + false + end + protected # @!visibility private diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb index a2a66fb4a..27b6105c8 100644 --- a/spec/concurrent/ivar_shared.rb +++ b/spec/concurrent/ivar_shared.rb @@ -93,4 +93,66 @@ expect(subject.fail).to eq subject end end + + describe '#set?' do + + context 'when unset' do + + it 'assigns the value' do + subject.set?(32) + expect(subject.value).to eq 32 + end + + it 'assigns the block result' do + subject.set?{ 32 } + expect(subject.value).to eq 32 + end + + it 'returns true' do + expect(subject.set?('hi')).to eq true + end + end + + context 'when fulfilled' do + + before(:each) { subject.set(27) } + + it 'does not assign the value' do + subject.set?(88) + expect(subject.value).to eq 27 + end + + it 'does not assign the block result' do + subject.set?{ 88 } + expect(subject.value).to eq 27 + end + + it 'returns false' do + expect(subject.set?('hello')).to eq false + end + end + + context 'when rejected' do + + before(:each) { subject.fail } + + it 'does not assign the value' do + subject.set?(88) + expect(subject).to be_rejected + end + + it 'does not assign the block result' do + subject.set?{ 88 } + expect(subject).to be_rejected + end + + it 'has a nil value' do + expect(subject.value).to be_nil + end + + it 'returns false' do + expect(subject.set?('hello')).to eq false + end + end + end end From 850dff76c72578209e7e95ff6164dc57025fb3d3 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 27 Mar 2015 01:35:03 -0400 Subject: [PATCH 09/15] Channel::Probe is now just an IVar. --- lib/concurrent/channel/buffered_channel.rb | 4 +-- lib/concurrent/channel/channel.rb | 29 ++----------------- lib/concurrent/channel/unbuffered_channel.rb | 3 +- lib/concurrent/future.rb | 2 -- lib/concurrent/ivar.rb | 7 ++--- .../channel/buffered_channel_spec.rb | 10 +++---- spec/concurrent/channel/probe_spec.rb | 27 +++++++---------- .../channel/unbuffered_channel_spec.rb | 8 ++--- 8 files changed, 27 insertions(+), 63 deletions(-) diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index 8badab99c..f20770448 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -40,7 +40,7 @@ def select(probe) @probe_set.put(probe) true else - shift_buffer if probe.set_unless_assigned(peek_buffer, self) + shift_buffer if probe.set?([peek_buffer, self]) end end @@ -76,7 +76,7 @@ def set_probe_or_push_into_buffer(value) push_into_buffer(value) true else - @probe_set.take.set_unless_assigned(value, self) + @probe_set.take.set?([value, self]) end end end diff --git a/lib/concurrent/channel/channel.rb b/lib/concurrent/channel/channel.rb index f0357b9f1..0ce22befb 100644 --- a/lib/concurrent/channel/channel.rb +++ b/lib/concurrent/channel/channel.rb @@ -3,37 +3,12 @@ module Concurrent module Channel - class Probe < Concurrent::IVar - - def initialize(value = NO_VALUE, opts = {}) - super(value, opts) - end - - def set_unless_assigned(value, channel) - mutex.synchronize do - return false if [:fulfilled, :rejected].include? @state - - set_state(true, [value, channel], nil) - event.set - true - end - end - - alias_method :composite_value, :value - - def value - composite_value.nil? ? nil : composite_value[0] - end - - def channel - composite_value.nil? ? nil : composite_value[1] - end - end + Probe = IVar def self.select(*channels) probe = Probe.new channels.each { |channel| channel.select(probe) } - result = probe.composite_value + result = probe.value channels.each { |channel| channel.remove_probe(probe) } result end diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index da62418b6..e42f811c7 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -12,8 +12,7 @@ def probe_set_size end def push(value) - # TODO set_unless_assigned define on IVar as #set_state? or #try_set_state - until @probe_set.take.set_unless_assigned(value, self) + until @probe_set.take.set?([value, self]) end end diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index 71284c40d..0499ffb93 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -88,8 +88,6 @@ def set(value = IVar::NO_VALUE, &block) execute end - protected :complete - private # @!visibility private diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index dfd8d2119..9878306ca 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -103,11 +103,8 @@ def add_observer(observer = nil, func = :update, &block) # been set or otherwise completed # @return [IVar] self def set(value = NO_VALUE) - if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE) - raise ArgumentError.new('must set with either a value or a block') - elsif ! compare_and_set_state(:processing, :pending) - raise MultipleAssignmentError - end + check_for_block_or_value!(block_given?, value) + raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending) begin value = yield if block_given? diff --git a/spec/concurrent/channel/buffered_channel_spec.rb b/spec/concurrent/channel/buffered_channel_spec.rb index dd40b99ee..f4b4b4640 100644 --- a/spec/concurrent/channel/buffered_channel_spec.rb +++ b/spec/concurrent/channel/buffered_channel_spec.rb @@ -47,7 +47,7 @@ module Concurrent it 'should assign value to a probe if probe set is not empty' do channel.select(probe) Thread.new { sleep(0.1); channel.push 3 } - expect(probe.value).to eq 3 + expect(probe.value.first).to eq 3 end end @@ -62,14 +62,14 @@ module Concurrent channel.push 1 result = channel.pop - expect(result).to eq 1 + expect(result.first).to eq 1 end it 'removes the first value from the buffer' do channel.push 'a' channel.push 'b' - expect(channel.pop).to eq 'a' + expect(channel.pop.first).to eq 'a' expect(channel.buffer_queue_size).to eq 1 end end @@ -91,7 +91,7 @@ module Concurrent Thread.new { channel.push 82 } - expect(probe.value).to eq 82 + expect(probe.value.first).to eq 82 end end @@ -120,7 +120,7 @@ module Concurrent expect(channel.buffer_queue_size).to eq 1 - expect(channel.pop).to eq 82 + expect(channel.pop.first).to eq 82 end end diff --git a/spec/concurrent/channel/probe_spec.rb b/spec/concurrent/channel/probe_spec.rb index f11d44601..b6a344f14 100644 --- a/spec/concurrent/channel/probe_spec.rb +++ b/spec/concurrent/channel/probe_spec.rb @@ -20,20 +20,20 @@ def trigger_observable(observable) it_should_behave_like :observable end - describe '#set_unless_assigned' do + describe '#set?' do context 'empty probe' do it 'assigns the value' do - probe.set_unless_assigned(32, channel) - expect(probe.value).to eq 32 + probe.set?([32, channel]) + expect(probe.value.first).to eq 32 end it 'assign the channel' do - probe.set_unless_assigned(32, channel) - expect(probe.channel).to be channel + probe.set?([32, channel]) + expect(probe.value.last).to be channel end it 'returns true' do - expect(probe.set_unless_assigned('hi', channel)).to eq true + expect(probe.set?(['hi', channel])).to eq true end end @@ -41,12 +41,12 @@ def trigger_observable(observable) before(:each) { probe.set([27, nil]) } it 'does not assign the value' do - probe.set_unless_assigned(88, channel) - expect(probe.value).to eq 27 + probe.set?([88, channel]) + expect(probe.value.first).to eq 27 end it 'returns false' do - expect(probe.set_unless_assigned('hello', channel)).to eq false + expect(probe.set?(['hello', channel])).to eq false end end @@ -54,7 +54,7 @@ def trigger_observable(observable) before(:each) { probe.fail } it 'does not assign the value' do - probe.set_unless_assigned(88, channel) + probe.set?([88, channel]) expect(probe).to be_rejected end @@ -62,15 +62,10 @@ def trigger_observable(observable) expect(probe.value).to be_nil end - it 'has a nil channel' do - expect(probe.channel).to be_nil - end - it 'returns false' do - expect(probe.set_unless_assigned('hello', channel)).to eq false + expect(probe.set?(['hello', channel])).to eq false end end end - end end diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb index 72dd6556d..a2ededc71 100644 --- a/spec/concurrent/channel/unbuffered_channel_spec.rb +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -39,7 +39,7 @@ module Concurrent sleep(0.1) - expect(result).to eq 42 + expect(result.first).to eq 42 end it 'passes the pushed value to only one thread' do @@ -62,7 +62,7 @@ module Concurrent sleep(0.1) - expect(result).to eq 57 + expect(result.first).to eq 57 end end @@ -81,7 +81,7 @@ module Concurrent Thread.new { channel.push 82 } - expect(probe.value).to eq 82 + expect(probe.value.first).to eq 82 end it 'ignores already set probes and waits for a new one' do @@ -101,7 +101,7 @@ module Concurrent sleep(0.05) - expect(new_probe.value).to eq 72 + expect(new_probe.value.first).to eq 72 end end From e6527f8fde470c12dffdaa3e4a7a0d764ead85cb Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Tue, 21 Apr 2015 00:21:47 -0400 Subject: [PATCH 10/15] IVar is now a Synchronization::Object. * Dereferenceable supports injection of a mutex * Obligation supports injection of a mutex * IVar extends Synchronization::Object * IVar passes `self` to Obligation as injected mutex * IVar, Promise, and Future no longer call `mutex#lock` --- lib/concurrent/dereferenceable.rb | 32 ++++++++----------- lib/concurrent/ivar.rb | 6 ++-- lib/concurrent/obligation.rb | 51 ++++++++++++------------------- lib/concurrent/promise.rb | 5 +-- 4 files changed, 37 insertions(+), 57 deletions(-) diff --git a/lib/concurrent/dereferenceable.rb b/lib/concurrent/dereferenceable.rb index f5ee523e4..ed0f68f1a 100644 --- a/lib/concurrent/dereferenceable.rb +++ b/lib/concurrent/dereferenceable.rb @@ -39,12 +39,8 @@ module Dereferenceable # # @return [Object] the current value of the object def value - mutex.lock - apply_deref_options(@value) - ensure - mutex.unlock + mutex.synchronize { apply_deref_options(@value) } end - alias_method :deref, :value protected @@ -52,11 +48,8 @@ def value # Set the internal value of this object # # @param [Object] val the new value - def value=(val) - mutex.lock - @value = val - ensure - mutex.unlock + def value=(value) + mutex.synchronize{ @value = value } end # A mutex lock used for synchronizing thread-safe operations. Methods defined @@ -74,8 +67,8 @@ def mutex # @note This method *must* be called from within the constructor of the including class. # # @see #mutex - def init_mutex - @mutex = Mutex.new + def init_mutex(mutex = Mutex.new) + @mutex = mutex end # Set the options which define the operations #value performs before @@ -91,14 +84,13 @@ def init_mutex # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc def set_deref_options(opts = {}) - mutex.lock - @dup_on_deref = opts[:dup_on_deref] || opts[:dup] - @freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze] - @copy_on_deref = opts[:copy_on_deref] || opts[:copy] - @do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref) - nil - ensure - mutex.unlock + mutex.synchronize do + @dup_on_deref = opts[:dup_on_deref] || opts[:dup] + @freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze] + @copy_on_deref = opts[:copy_on_deref] || opts[:copy] + @do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref) + nil + end end # @!visibility private diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 9878306ca..086a441f9 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -3,6 +3,7 @@ require 'concurrent/errors' require 'concurrent/obligation' require 'concurrent/observable' +require 'concurrent/synchronization' module Concurrent @@ -38,7 +39,7 @@ module Concurrent # ivar.set 14 # ivar.get #=> 14 # ivar.set 2 # would now be an error - class IVar + class IVar < Synchronization::Object include Obligation include Observable @@ -56,7 +57,8 @@ class IVar # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc def initialize(value = NO_VALUE, opts = {}) - init_obligation + super(&nil) + init_obligation(self) self.observers = CopyOnWriteObserverSet.new set_deref_options(opts) @state = :pending diff --git a/lib/concurrent/obligation.rb b/lib/concurrent/obligation.rb index 22b66d1d1..1c7441c94 100644 --- a/lib/concurrent/obligation.rb +++ b/lib/concurrent/obligation.rb @@ -113,10 +113,7 @@ def value!(timeout = nil) # # @return [Symbol] the current state def state - mutex.lock - @state - ensure - mutex.unlock + mutex.synchronize { @state } end # If an exception was raised during processing this will return the @@ -125,10 +122,7 @@ def state # # @return [Exception] the exception raised during processing or `nil` def reason - mutex.lock - @reason - ensure - mutex.unlock + mutex.synchronize { @reason } end # @example allows Obligation to be risen @@ -147,8 +141,8 @@ def get_arguments_from(opts = {}) # :nodoc: end # @!visibility private - def init_obligation # :nodoc: - init_mutex + def init_obligation(*args) # :nodoc: + init_mutex(*args) @event = Event.new end @@ -170,10 +164,7 @@ def set_state(success, value, reason) # :nodoc: # @!visibility private def state=(value) # :nodoc: - mutex.lock - @state = value - ensure - mutex.unlock + mutex.synchronize { @state = value } end # Atomic compare and set operation @@ -186,15 +177,14 @@ def state=(value) # :nodoc: # # @!visibility private def compare_and_set_state(next_state, expected_current) # :nodoc: - mutex.lock - if @state == expected_current - @state = next_state - true - else - false + mutex.synchronize do + if @state == expected_current + @state = next_state + true + else + false + end end - ensure - mutex.unlock end # executes the block within mutex if current state is included in expected_states @@ -203,16 +193,15 @@ def compare_and_set_state(next_state, expected_current) # :nodoc: # # @!visibility private def if_state(*expected_states) # :nodoc: - mutex.lock - raise ArgumentError.new('no block given') unless block_given? - - if expected_states.include? @state - yield - else - false + mutex.synchronize do + raise ArgumentError.new('no block given') unless block_given? + + if expected_states.include? @state + yield + else + false + end end - ensure - mutex.unlock end end end diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 951190cd4..f7bf0c3ee 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -481,10 +481,7 @@ def set_state!(success, value, reason) # @!visibility private def synchronized_set_state!(success, value, reason) - mutex.lock - set_state!(success, value, reason) - ensure - mutex.unlock + mutex.synchronize { set_state!(success, value, reason) } end end end From c23b4e3896f86a9182b1ddeab2afba30feed2650 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Tue, 21 Apr 2015 00:24:36 -0400 Subject: [PATCH 11/15] Delay is now Synchronization::Object but is slower. --- lib/concurrent/delay.rb | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 8b3bebfe1..843ed868d 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -3,6 +3,7 @@ require 'concurrent/obligation' require 'concurrent/executor/executor_options' require 'concurrent/executor/immediate_executor' +require 'concurrent/synchronization' module Concurrent @@ -37,7 +38,7 @@ module Concurrent # execute on the given executor, allowing the call to timeout. # # @see Concurrent::Dereferenceable - class Delay + class Delay < Synchronization::Object include Obligation include ExecutorOptions @@ -59,7 +60,8 @@ class Delay def initialize(opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? - init_obligation + super(&nil) + init_obligation(self) set_deref_options(opts) @task_executor = get_executor_from(opts) @@ -145,16 +147,15 @@ def wait(timeout = nil) # @yield the delayed operation to perform # @return [true, false] if success def reconfigure(&block) - mutex.lock - raise ArgumentError.new('no block given') unless block_given? - unless @computing - @task = block - true - else - false + mutex.synchronize do + raise ArgumentError.new('no block given') unless block_given? + unless @computing + @task = block + true + else + false + end end - ensure - mutex.unlock end private @@ -163,10 +164,11 @@ def reconfigure(&block) def execute_task_once # :nodoc: # this function has been optimized for performance and # should not be modified without running new benchmarks - mutex.lock - execute = @computing = true unless @computing - task = @task - mutex.unlock + execute = task = nil + mutex.synchronize do + execute = @computing = true unless @computing + task = @task + end if execute @task_executor.post do @@ -176,10 +178,10 @@ def execute_task_once # :nodoc: rescue => ex reason = ex end - mutex.lock - set_state(success, result, reason) - event.set - mutex.unlock + mutex.synchronize do + set_state(success, result, reason) + event.set + end end end end From a9da18314d8018d61aa994294942822e782e561f Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Wed, 22 Apr 2015 20:08:18 -0400 Subject: [PATCH 12/15] Moved mutex unlocking into ensure clauses. --- .../atomic/copy_on_notify_observer_set.rb | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/lib/concurrent/atomic/copy_on_notify_observer_set.rb b/lib/concurrent/atomic/copy_on_notify_observer_set.rb index 21534da86..2c354a9b9 100644 --- a/lib/concurrent/atomic/copy_on_notify_observer_set.rb +++ b/lib/concurrent/atomic/copy_on_notify_observer_set.rb @@ -33,11 +33,10 @@ def add_observer(observer=nil, func=:update, &block) begin @mutex.lock @observers[observer] = func + observer ensure @mutex.unlock end - - observer end # @param [Object] observer the observer to remove @@ -45,9 +44,9 @@ def add_observer(observer=nil, func=:update, &block) def delete_observer(observer) @mutex.lock @observers.delete(observer) - @mutex.unlock - observer + ensure + @mutex.unlock end # Deletes all observers @@ -55,18 +54,17 @@ def delete_observer(observer) def delete_observers @mutex.lock @observers.clear - @mutex.unlock - self + ensure + @mutex.unlock end # @return [Integer] the observers count def count_observers @mutex.lock - result = @observers.count + @observers.count + ensure @mutex.unlock - - result end # Notifies all registered observers with optional args @@ -75,7 +73,6 @@ def count_observers def notify_observers(*args, &block) observers = duplicate_observers notify_to(observers, *args, &block) - self end @@ -86,7 +83,6 @@ def notify_observers(*args, &block) def notify_and_delete_observers(*args, &block) observers = duplicate_and_clear_observers notify_to(observers, *args, &block) - self end @@ -96,17 +92,17 @@ def duplicate_and_clear_observers @mutex.lock observers = @observers.dup @observers.clear - @mutex.unlock - observers + ensure + @mutex.unlock end def duplicate_observers @mutex.lock observers = @observers.dup - @mutex.unlock - observers + ensure + @mutex.unlock end def notify_to(observers, *args) From 14d4598dc46788eeb5ac35289f939575e0aebd10 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 24 Apr 2015 05:45:13 -0400 Subject: [PATCH 13/15] Minor refactoring and doc updates. --- lib/concurrent/delay.rb | 2 +- lib/concurrent/dereferenceable.rb | 2 +- lib/concurrent/ivar.rb | 18 ++++++++++++++---- lib/concurrent/promise.rb | 4 ++++ .../synchronization/abstract_object.rb | 4 +++- 5 files changed, 23 insertions(+), 7 deletions(-) diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 843ed868d..0a1dc5222 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -60,7 +60,7 @@ class Delay < Synchronization::Object def initialize(opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? - super(&nil) + super() init_obligation(self) set_deref_options(opts) @task_executor = get_executor_from(opts) diff --git a/lib/concurrent/dereferenceable.rb b/lib/concurrent/dereferenceable.rb index ed0f68f1a..6a9cf4a6c 100644 --- a/lib/concurrent/dereferenceable.rb +++ b/lib/concurrent/dereferenceable.rb @@ -47,7 +47,7 @@ def value # Set the internal value of this object # - # @param [Object] val the new value + # @param [Object] value the new value def value=(value) mutex.synchronize{ @value = value } end diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 086a441f9..b0b66dda3 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -100,9 +100,13 @@ def add_observer(observer = nil, func = :update, &block) # Set the `IVar` to a value and wake or notify all threads waiting on it. # - # @param [Object] value the value to store in the `IVar` - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already - # been set or otherwise completed + # @!macro [attach] ivar_set_parameters_and_exceptions + # @param [Object] value the value to store in the `IVar` + # @yield A block operation to use for setting the value + # @raise [ArgumentError] if both a value and a block are given + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already + # been set or otherwise completed + # # @return [IVar] self def set(value = NO_VALUE) check_for_block_or_value!(block_given?, value) @@ -123,9 +127,15 @@ def set(value = NO_VALUE) # been set or otherwise completed # @return [IVar] self def fail(reason = StandardError.new) - set { raise reason } + complete(false, nil, reason) end + # Attempt to set the `IVar` with the given value or block. Return a + # boolean indicating the success or failure of the set operation. + # + # @!macro ivar_set_parameters_and_exceptions + # + # @return [Boolean] true if the value was set else false def set?(value = NO_VALUE, &block) set(value, &block) true diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index f7bf0c3ee..2c156a549 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -253,6 +253,10 @@ def set(value = IVar::NO_VALUE, &block) execute end + def fail(reason = StandardError.new) + set { raise reason } + end + # Create a new `Promise` object with the given block, execute it, and return the # `:pending` object. # diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index 2dc81de73..7a3cfe52c 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -7,7 +7,9 @@ module Synchronization # Provides a single layer which can improve its implementation over time without changes needed to # the classes using it. Use {Synchronization::Object} not this abstract class. # - # @note this object does not support usage together with {Thread#wakeup} and {Thread#raise}. + # @note this object does not support usage together with + # [Thread#wakeup](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-wakeup) + # and [Thread#raise](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-raise). # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `Synchronization::Object#wait` and # `Thread#wakeup` will not work on all platforms. # From 65ee9f6fafa7311534b4113c706c0b0a3a975c20 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 24 Apr 2015 06:12:37 -0400 Subject: [PATCH 14/15] Renamed IVar #set? method to #try_set to be more idiomatic. --- lib/concurrent/channel/buffered_channel.rb | 4 +-- lib/concurrent/channel/unbuffered_channel.rb | 2 +- lib/concurrent/ivar.rb | 36 +++++++++++--------- spec/concurrent/channel/probe_spec.rb | 16 ++++----- spec/concurrent/ivar_shared.rb | 20 +++++------ 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index f20770448..ec4e80f3b 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -40,7 +40,7 @@ def select(probe) @probe_set.put(probe) true else - shift_buffer if probe.set?([peek_buffer, self]) + shift_buffer if probe.try_set([peek_buffer, self]) end end @@ -76,7 +76,7 @@ def set_probe_or_push_into_buffer(value) push_into_buffer(value) true else - @probe_set.take.set?([value, self]) + @probe_set.take.try_set([value, self]) end end end diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index e42f811c7..c516fc8aa 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -12,7 +12,7 @@ def probe_set_size end def push(value) - until @probe_set.take.set?([value, self]) + until @probe_set.take.try_set([value, self]) end end diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index b0b66dda3..6d4531e36 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -98,16 +98,17 @@ def add_observer(observer = nil, func = :update, &block) observer end - # Set the `IVar` to a value and wake or notify all threads waiting on it. - # - # @!macro [attach] ivar_set_parameters_and_exceptions - # @param [Object] value the value to store in the `IVar` - # @yield A block operation to use for setting the value - # @raise [ArgumentError] if both a value and a block are given - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already - # been set or otherwise completed - # - # @return [IVar] self + # @!macro [attach] ivar_set_method + # Set the `IVar` to a value and wake or notify all threads waiting on it. + # + # @!macro [attach] ivar_set_parameters_and_exceptions + # @param [Object] value the value to store in the `IVar` + # @yield A block operation to use for setting the value + # @raise [ArgumentError] if both a value and a block are given + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already + # been set or otherwise completed + # + # @return [IVar] self def set(value = NO_VALUE) check_for_block_or_value!(block_given?, value) raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending) @@ -120,12 +121,13 @@ def set(value = NO_VALUE) end end - # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. - # - # @param [Object] reason for the failure - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already - # been set or otherwise completed - # @return [IVar] self + # @!macro [attach] ivar_fail_method + # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. + # + # @param [Object] reason for the failure + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already + # been set or otherwise completed + # @return [IVar] self def fail(reason = StandardError.new) complete(false, nil, reason) end @@ -136,7 +138,7 @@ def fail(reason = StandardError.new) # @!macro ivar_set_parameters_and_exceptions # # @return [Boolean] true if the value was set else false - def set?(value = NO_VALUE, &block) + def try_set(value = NO_VALUE, &block) set(value, &block) true rescue MultipleAssignmentError diff --git a/spec/concurrent/channel/probe_spec.rb b/spec/concurrent/channel/probe_spec.rb index b6a344f14..bac8ab46d 100644 --- a/spec/concurrent/channel/probe_spec.rb +++ b/spec/concurrent/channel/probe_spec.rb @@ -20,20 +20,20 @@ def trigger_observable(observable) it_should_behave_like :observable end - describe '#set?' do + describe '#try_set' do context 'empty probe' do it 'assigns the value' do - probe.set?([32, channel]) + probe.try_set([32, channel]) expect(probe.value.first).to eq 32 end it 'assign the channel' do - probe.set?([32, channel]) + probe.try_set([32, channel]) expect(probe.value.last).to be channel end it 'returns true' do - expect(probe.set?(['hi', channel])).to eq true + expect(probe.try_set(['hi', channel])).to eq true end end @@ -41,12 +41,12 @@ def trigger_observable(observable) before(:each) { probe.set([27, nil]) } it 'does not assign the value' do - probe.set?([88, channel]) + probe.try_set([88, channel]) expect(probe.value.first).to eq 27 end it 'returns false' do - expect(probe.set?(['hello', channel])).to eq false + expect(probe.try_set(['hello', channel])).to eq false end end @@ -54,7 +54,7 @@ def trigger_observable(observable) before(:each) { probe.fail } it 'does not assign the value' do - probe.set?([88, channel]) + probe.try_set([88, channel]) expect(probe).to be_rejected end @@ -63,7 +63,7 @@ def trigger_observable(observable) end it 'returns false' do - expect(probe.set?(['hello', channel])).to eq false + expect(probe.try_set(['hello', channel])).to eq false end end end diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb index 27b6105c8..b22520fab 100644 --- a/spec/concurrent/ivar_shared.rb +++ b/spec/concurrent/ivar_shared.rb @@ -94,22 +94,22 @@ end end - describe '#set?' do + describe '#try_set' do context 'when unset' do it 'assigns the value' do - subject.set?(32) + subject.try_set(32) expect(subject.value).to eq 32 end it 'assigns the block result' do - subject.set?{ 32 } + subject.try_set{ 32 } expect(subject.value).to eq 32 end it 'returns true' do - expect(subject.set?('hi')).to eq true + expect(subject.try_set('hi')).to eq true end end @@ -118,17 +118,17 @@ before(:each) { subject.set(27) } it 'does not assign the value' do - subject.set?(88) + subject.try_set(88) expect(subject.value).to eq 27 end it 'does not assign the block result' do - subject.set?{ 88 } + subject.try_set{ 88 } expect(subject.value).to eq 27 end it 'returns false' do - expect(subject.set?('hello')).to eq false + expect(subject.try_set('hello')).to eq false end end @@ -137,12 +137,12 @@ before(:each) { subject.fail } it 'does not assign the value' do - subject.set?(88) + subject.try_set(88) expect(subject).to be_rejected end it 'does not assign the block result' do - subject.set?{ 88 } + subject.try_set{ 88 } expect(subject).to be_rejected end @@ -151,7 +151,7 @@ end it 'returns false' do - expect(subject.set?('hello')).to eq false + expect(subject.try_set('hello')).to eq false end end end From 1314eda07c2c3813a7f123e8ced8509906a0cee6 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Fri, 24 Apr 2015 06:13:38 -0400 Subject: [PATCH 15/15] Better documentation. --- lib/concurrent/agent.rb | 13 +-------- lib/concurrent/delay.rb | 15 +++++++++-- lib/concurrent/future.rb | 1 + lib/concurrent/promise.rb | 55 +++++++++++++++++++++++++++++++++++---- 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 4b0628e1f..8b3c67e2e 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -24,18 +24,7 @@ class Agent # # @param [Object] initial the initial value # - # @!macro [attach] executor_and_deref_options - # - # @param [Hash] opts the options used to define the behavior at update and deref - # and to specify the executor on which to perform actions - # @option opts [Executor] :executor when set use the given `Executor` instance. - # Three special values are also supported: `:task` returns the global task pool, - # `:operation` returns the global operation pool, and `:immediate` returns a new - # `ImmediateExecutor` object. - # @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro executor_and_deref_options def initialize(initial, opts = {}) @value = initial @rescuers = [] diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 0a1dc5222..034c6ceed 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -52,9 +52,20 @@ class Delay < Synchronization::Object # Create a new `Delay` in the `:pending` state. # - # @yield the delayed operation to perform + # @!macro [attach] executor_and_deref_options + # + # @param [Hash] opts the options used to define the behavior at update and deref + # and to specify the executor on which to perform actions + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:task` returns the global task pool, + # `:operation` returns the global operation pool, and `:immediate` returns a new + # `ImmediateExecutor` object. + # @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data + # @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data + # @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing + # the internal value and returning the value returned from the proc # - # @!macro executor_and_deref_options + # @yield the delayed operation to perform # # @raise [ArgumentError] if no block is given def initialize(opts = {}, &block) diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index 0499ffb93..be78e8791 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -76,6 +76,7 @@ def self.execute(opts = {}, &block) Future.new(opts, &block).execute end + # @!macro ivar_set_method def set(value = IVar::NO_VALUE, &block) check_for_block_or_value!(block_given?, value) mutex.synchronize do diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 2c156a549..0831c39cc 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -197,6 +197,8 @@ class Promise < IVar # @option opts [object, Array] :args zero or more arguments to be passed # the task block on execution # + # @yield The block operation to be performed asynchronously. + # # @raise [ArgumentError] if no block is given # # @see http://wiki.commonjs.org/wiki/Promises/A @@ -217,17 +219,37 @@ def initialize(opts = {}, &block) @children = [] end - # @return [Promise] + # Create a new `Promise` and fulfill it immediately. + # + # @!macro executor_and_deref_options + # + # @!macro promise_init_options + # + # @raise [ArgumentError] if no block is given + # + # @return [Promise] the newly created `Promise` def self.fulfill(value, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) } end - # @return [Promise] + # Create a new `Promise` and reject it immediately. + # + # @!macro executor_and_deref_options + # + # @!macro promise_init_options + # + # @raise [ArgumentError] if no block is given + # + # @return [Promise] the newly created `Promise` def self.reject(reason, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) } end - # @return [Promise] + # Execute an `:unscheduled` `Promise`. Immediately sets the state to `:pending` and + # passes the block to a new thread/thread pool for eventual execution. + # Does nothing if the `Promise` is in any state other than `:unscheduled`. + # + # @return [Promise] a reference to `self` def execute if root? if compare_and_set_state(:pending, :unscheduled) @@ -240,6 +262,9 @@ def execute self end + # @!macro ivar_set_method + # + # @raise [Concurrent::PromiseExecutionError] if not the root promise def set(value = IVar::NO_VALUE, &block) raise PromiseExecutionError.new('supported only on root promise') unless root? check_for_block_or_value!(block_given?, value) @@ -253,6 +278,9 @@ def set(value = IVar::NO_VALUE, &block) execute end + # @!macro ivar_fail_method + # + # @raise [Concurrent::PromiseExecutionError] if not the root promise def fail(reason = StandardError.new) set { raise reason } end @@ -275,6 +303,13 @@ def self.execute(opts = {}, &block) new(opts, &block).execute end + # Chain a new promise off the current promise. + # + # @param [Proc] rescuer An optional rescue block to be executed if the + # promise is rejected. + # + # @yield The block operation to be performed asynchronously. + # # @return [Promise] the new promise def then(rescuer = nil, &block) raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given? @@ -296,13 +331,23 @@ def then(rescuer = nil, &block) child end - # @return [Promise] + # Chain onto this promise an action to be undertaken on success + # (fulfillment). + # + # @yield The block to execute + # + # @return [Promise] self def on_success(&block) raise ArgumentError.new('no block given') unless block_given? self.then(&block) end - # @return [Promise] + # Chain onto this promise an action to be undertaken on failure + # (rejection). + # + # @yield The block to execute + # + # @return [Promise] self def rescue(&block) self.then(block) end