From 6dbfa9271fa02e878a6b18935cbf4eccde614096 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 28 Apr 2015 13:57:17 +0200 Subject: [PATCH 01/16] Add ns_initialize which is always called inside synchronize block --- .../ext/SynchronizationLibrary.java | 14 +++++---- lib/concurrent/actor/core.rb | 2 +- lib/concurrent/at_exit.rb | 13 ++++----- lib/concurrent/atomic/count_down_latch.rb | 9 ++++-- lib/concurrent/atomic/cyclic_barrier.rb | 12 ++++---- lib/concurrent/atomic/event.rb | 9 +++--- lib/concurrent/edge/future.rb | 16 ++-------- .../executor/serialized_execution.rb | 13 ++++----- lib/concurrent/synchronization.rb | 29 +++++++++---------- .../synchronization/abstract_object.rb | 12 +++++--- .../synchronization/java_pure_object.rb | 3 +- .../synchronization/monitor_object.rb | 3 +- .../synchronization/mutex_object.rb | 3 +- lib/concurrent/synchronization/rbx_object.rb | 7 +++-- 14 files changed, 72 insertions(+), 73 deletions(-) diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index ae979b42a..35b7cbdc9 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -47,9 +47,11 @@ public JavaObject(Ruby runtime, RubyClass metaClass) { super(runtime, metaClass); } - @JRubyMethod - public IRubyObject initialize(ThreadContext context) { - return context.nil; + @JRubyMethod(rest = true) + public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) { + synchronized (this) { + return callMethod(context, "ns_initialize", args, block); + } } @JRubyMethod(name = "synchronize") @@ -59,7 +61,7 @@ public IRubyObject rubySynchronize(ThreadContext context, Block block) { } } - @JRubyMethod(name = "ns_wait", optional = 1) + @JRubyMethod(name = "ns_wait", optional = 1, visibility = 'private') public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) { Ruby runtime = context.runtime; if (args.length > 1) { @@ -91,13 +93,13 @@ public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) { return this; } - @JRubyMethod(name = "ns_signal") + @JRubyMethod(name = "ns_signal", visibility = 'private') public IRubyObject nsSignal(ThreadContext context) { notify(); return this; } - @JRubyMethod(name = "ns_broadcast") + @JRubyMethod(name = "ns_broadcast", visibility = 'private') public IRubyObject nsBroadcast(ThreadContext context) { notifyAll(); return this; diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 8fe6f6981..d593bc7f0 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -47,7 +47,7 @@ class Core < Synchronization::Object # any logging system # @param [Proc] block for class instantiation def initialize(opts = {}, &block) - super(&nil) + super(&nil) # TODO use ns_initialize synchronize do @mailbox = Array.new @serialized_execution = SerializedExecution.new diff --git a/lib/concurrent/at_exit.rb b/lib/concurrent/at_exit.rb index c5dfa2be1..eb530e5fd 100644 --- a/lib/concurrent/at_exit.rb +++ b/lib/concurrent/at_exit.rb @@ -8,14 +8,6 @@ module Concurrent class AtExitImplementation < Synchronization::Object include Logging - def initialize(enabled = true) - super() - synchronize do - @handlers = {} - @enabled = enabled - end - end - # Add a handler to be run at `Kernel#at_exit` # @param [Object] handler_id optionally provide an id, if allready present, handler is replaced # @yield the handler @@ -80,6 +72,11 @@ def run private + def ns_initialize(enabled = true) + @handlers = {} + @enabled = enabled + end + def runner run if synchronize { @enabled } end diff --git a/lib/concurrent/atomic/count_down_latch.rb b/lib/concurrent/atomic/count_down_latch.rb index 38bc349ac..c0131b149 100644 --- a/lib/concurrent/atomic/count_down_latch.rb +++ b/lib/concurrent/atomic/count_down_latch.rb @@ -21,11 +21,10 @@ class PureCountDownLatch < Synchronization::Object # # @raise [ArgumentError] if `count` is not an integer or is less than zero def initialize(count = 1) - super() unless count.is_a?(Fixnum) && count >= 0 raise ArgumentError.new('count must be in integer greater than or equal zero') end - synchronize { @count = count } + super(count) end # @!macro [attach] count_down_latch_method_wait @@ -58,6 +57,12 @@ def count_down def count synchronize { @count } end + + private + + def ns_initialize(count) + @count = count + end end if Concurrent.on_jruby? diff --git a/lib/concurrent/atomic/cyclic_barrier.rb b/lib/concurrent/atomic/cyclic_barrier.rb index 69dc8e5d4..c6130fe9b 100644 --- a/lib/concurrent/atomic/cyclic_barrier.rb +++ b/lib/concurrent/atomic/cyclic_barrier.rb @@ -15,15 +15,10 @@ class CyclicBarrier < Synchronization::Object # # @raise [ArgumentError] if `parties` is not an integer or is less than zero def initialize(parties, &block) - super(&nil) if !parties.is_a?(Fixnum) || parties < 1 raise ArgumentError.new('count must be in integer greater than or equal zero') end - synchronize do - @parties = parties - @action = block - ns_next_generation - end + super(parties, &block) end # @return [Fixnum] the number of threads needed to pass the barrier @@ -101,6 +96,11 @@ def ns_next_generation @number_waiting = 0 end + def ns_initialize(parties, &block) + @parties = parties + @action = block + ns_next_generation + end end end diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index ac320828a..2877aca80 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -19,10 +19,6 @@ class Event < Synchronization::Object # `Event` will block. def initialize super - synchronize do - @set = false - @iteration = 0 - end end # Is the object in the set state? @@ -83,5 +79,10 @@ def ns_set end true end + + def ns_initialize + @set = false + @iteration = 0 + end end end diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index a0a4bb1c9..7a1f571cf 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -33,8 +33,8 @@ def future(default_executor = :io, &task) alias_method :async, :future - # Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delays until - # requested by {Future#wait} method, {Future#value} and {Future#value!} methods are calling {Future#wait} internally. + # Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delayed until + # requested by `#wait`, `#value`, `#value!`, etc. # @return [Delay] def delay(default_executor = :io, &task) Delay.new(default_executor).event.chain(&task) @@ -84,12 +84,6 @@ def post_on(executor, *args, &job) class Event < Synchronization::Object extend FutureShortcuts - # @api private - def initialize(promise, default_executor = :io) - super() - synchronize { ns_initialize(promise, default_executor) } - end - # Is obligation completion still pending? # @return [Boolean] def pending? @@ -624,12 +618,6 @@ def evaluate_to!(*args, &block) # @abstract class AbstractPromise < Synchronization::Object - # @api private - def initialize(*args, &block) - super(&nil) - synchronize { ns_initialize(*args, &block) } - end - def default_executor future.default_executor end diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index b8c4e0d45..1a84379c5 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -15,14 +15,6 @@ def call end end - def initialize - super(&nil) - synchronize do - @being_executed = false - @stash = [] - end - end - # Submit a task to the executor for asynchronous processing. # # @param [Executor] executor to be used for this job @@ -71,6 +63,11 @@ def posts(posts) private + def ns_initialize + @being_executed = false + @stash = [] + end + def call_job(job) did_it_run = begin job.executor.post { work(job) } diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index 8d379da3b..96a4ce7ba 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -7,22 +7,21 @@ module Concurrent module Synchronization - class Object < case - when Concurrent.on_jruby? - JavaObject + Implementation = case + when Concurrent.on_jruby? + JavaObject + when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) <= 0 + MonitorObject + when Concurrent.on_cruby? + MutexObject + when Concurrent.on_rbx? + RbxObject + else + MutexObject + end + private_constant :Implementation - when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) <= 0 - MonitorObject - - when Concurrent.on_cruby? - MutexObject - - when Concurrent.on_rbx? - RbxObject - - else - MutexObject - end + class Object < Implementation end end end diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index 7a3cfe52c..b25da15af 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -8,8 +8,8 @@ module Synchronization # the classes using it. Use {Synchronization::Object} not this abstract class. # # @note this object does not support usage together with - # [Thread#wakeup](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-wakeup) - # and [Thread#raise](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-raise). + # [`Thread#wakeup`](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-wakeup) + # and [`Thread#raise`](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-raise). # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `Synchronization::Object#wait` and # `Thread#wakeup` will not work on all platforms. # @@ -29,8 +29,8 @@ module Synchronization class AbstractObject # @abstract for helper ivar initialization if needed, - # otherwise it can be left empty. - def initialize + # otherwise it can be left empty. It has to call ns_initialize. + def initialize(*args, &block) raise NotImplementedError end @@ -52,6 +52,10 @@ def wait(timeout = nil) self end + # initialization of the object called inside synchronize block + def ns_initialize(*args, &block) + end + # Wait until condition is met or timeout passes, # protects against spurious wake-ups. # @param [Numeric, nil] timeout in seconds, `nil` means no timeout diff --git a/lib/concurrent/synchronization/java_pure_object.rb b/lib/concurrent/synchronization/java_pure_object.rb index 12eee3b74..88f4088a0 100644 --- a/lib/concurrent/synchronization/java_pure_object.rb +++ b/lib/concurrent/synchronization/java_pure_object.rb @@ -5,7 +5,8 @@ module Synchronization require 'jruby' class JavaPureObject < AbstractObject - def initialize + def initialize(*args, &block) + synchronize { ns_initialize(*args, &block) } end def synchronize diff --git a/lib/concurrent/synchronization/monitor_object.rb b/lib/concurrent/synchronization/monitor_object.rb index 3ff2fac44..4ff75dee6 100644 --- a/lib/concurrent/synchronization/monitor_object.rb +++ b/lib/concurrent/synchronization/monitor_object.rb @@ -1,9 +1,10 @@ module Concurrent module Synchronization class MonitorObject < MutexObject - def initialize + def initialize(*args, &block) @__lock__do_not_use_directly = ::Monitor.new @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond + synchronize { ns_initialize(*args, &block) } end def synchronize diff --git a/lib/concurrent/synchronization/mutex_object.rb b/lib/concurrent/synchronization/mutex_object.rb index 523b9fde1..7ad63cf33 100644 --- a/lib/concurrent/synchronization/mutex_object.rb +++ b/lib/concurrent/synchronization/mutex_object.rb @@ -1,9 +1,10 @@ module Concurrent module Synchronization class MutexObject < AbstractObject - def initialize + def initialize(*args, &block) @__lock__do_not_use_directly = ::Mutex.new @__condition__do_not_use_directly = ::ConditionVariable.new + synchronize { ns_initialize(*args, &block) } end def synchronize diff --git a/lib/concurrent/synchronization/rbx_object.rb b/lib/concurrent/synchronization/rbx_object.rb index 8c5c4e267..6a749594e 100644 --- a/lib/concurrent/synchronization/rbx_object.rb +++ b/lib/concurrent/synchronization/rbx_object.rb @@ -2,8 +2,11 @@ module Concurrent module Synchronization if Concurrent.on_rbx? class RbxObject < AbstractObject - def initialize - @waiters = [] + def initialize(*args, &block) + synchronize do + @waiters = [] + ns_initialize(*args, &block) + end end def synchronize(&block) From 2a182b4331f18b9e2d394a8dc870aade9196afc2 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 28 Apr 2015 14:00:25 +0200 Subject: [PATCH 02/16] Remove wait, signal, broadcast in favor of ns_ counterparts - no to pollute the object's space - it can be easily added as noted in documentation --- .../synchronization/abstract_object.rb | 75 +++++++++---------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index b25da15af..bb836f5c6 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -42,16 +42,6 @@ def synchronize private - # wait until another thread calls #signal or #broadcast, - # spurious wake-ups can happen. - # @param [Numeric, nil] timeout in seconds, `nil` means no timeout - # @return [self] - # @note intended to be made public if required in child classes - def wait(timeout = nil) - synchronize { ns_wait(timeout) } - self - end - # initialization of the object called inside synchronize block def ns_initialize(*args, &block) end @@ -61,32 +51,14 @@ def ns_initialize(*args, &block) # @param [Numeric, nil] timeout in seconds, `nil` means no timeout # @yield condition to be met # @yieldreturn [true, false] - # @return [true, false] - # @note intended to be made public if required in child classes - def wait_until(timeout = nil, &condition) - synchronize { ns_wait_until(timeout, &condition) } - end - - # signal one waiting thread - # @return [self] - # @note intended to be made public if required in child classes - def signal - synchronize { ns_signal } - self - end - - # broadcast to all waiting threads - # @return [self] - # @note intended to be made public if required in child classes - def broadcast - synchronize { ns_broadcast } - self - end - + # @return [true, false] if condition met # @note only to be used inside synchronized block - # @yield condition - # @return [true, false] - # see #wait_until + # @note to provide direct access to this method in a descendant add method + # ``` + # def wait_until(timeout = nil, &condition) + # synchronize { ns_wait_until(timeout, &condition) } + # end + # ``` def ns_wait_until(timeout, &condition) if timeout wait_until = Concurrent.monotonic_time + timeout @@ -104,23 +76,44 @@ def ns_wait_until(timeout, &condition) end end - # @note only to be used inside synchronized block + # Wait until another thread calls #signal or #broadcast, + # spurious wake-ups can happen. + # + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout # @return [self] - # @see #wait + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def wait(timeout = nil) + # synchronize { ns_wait(timeout) } + # end + # ``` def ns_wait(timeout = nil) raise NotImplementedError end - # @note only to be used inside synchronized block + # Signal one waiting thread # @return [self] - # @see #signal + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def signal + # synchronize { ns_signal } + # end + # ``` def ns_signal raise NotImplementedError end - # @note only to be used inside synchronized block + # Broadcast to all waiting threads # @return [self] - # @see #broadcast + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def broadcast + # synchronize { ns_broadcast } + # end + # ``` def ns_broadcast raise NotImplementedError end From 505d6dd57f784b047ae61e6a8293981ef8d39c6a Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 28 Apr 2015 14:15:17 +0200 Subject: [PATCH 03/16] Make synchronize private by default can be made public if required in descendant --- ext/com/concurrent_ruby/ext/SynchronizationLibrary.java | 9 +++++---- lib/concurrent/delay.rb | 2 ++ lib/concurrent/ivar.rb | 2 ++ lib/concurrent/synchronization/abstract_object.rb | 7 ++++--- lib/concurrent/synchronization/java_pure_object.rb | 4 ++-- lib/concurrent/synchronization/monitor_object.rb | 4 ++-- lib/concurrent/synchronization/mutex_object.rb | 4 ++-- lib/concurrent/synchronization/rbx_object.rb | 4 ++-- 8 files changed, 21 insertions(+), 15 deletions(-) diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index 35b7cbdc9..ebf8cad8a 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -13,6 +13,7 @@ import org.jruby.runtime.builtin.IRubyObject; import org.jruby.runtime.load.Library; import org.jruby.runtime.Block; +import org.jruby.runtime.Visibility; import org.jruby.RubyBoolean; import org.jruby.RubyNil; import org.jruby.runtime.ThreadContext; @@ -54,14 +55,14 @@ public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block b } } - @JRubyMethod(name = "synchronize") + @JRubyMethod(name = "synchronize", visibility = Visibility.PRIVATE) public IRubyObject rubySynchronize(ThreadContext context, Block block) { synchronized (this) { return block.yield(context, null); } } - @JRubyMethod(name = "ns_wait", optional = 1, visibility = 'private') + @JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PRIVATE) public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) { Ruby runtime = context.runtime; if (args.length > 1) { @@ -93,13 +94,13 @@ public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) { return this; } - @JRubyMethod(name = "ns_signal", visibility = 'private') + @JRubyMethod(name = "ns_signal", visibility = Visibility.PRIVATE) public IRubyObject nsSignal(ThreadContext context) { notify(); return this; } - @JRubyMethod(name = "ns_broadcast", visibility = 'private') + @JRubyMethod(name = "ns_broadcast", visibility = Visibility.PRIVATE) public IRubyObject nsBroadcast(ThreadContext context) { notifyAll(); return this; diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 043456b5b..100ea00ac 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -79,6 +79,8 @@ def initialize(opts = {}, &block) @computing = false end + protected :synchronize + # Return the value this object represents after applying the options # specified by the `#set_deref_options` method. If the delayed operation # raised an exception this method will return nil. The execption object diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 6d4531e36..87a5c4247 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -66,6 +66,8 @@ def initialize(value = NO_VALUE, opts = {}) set(value) unless value == NO_VALUE end + protected :synchronize + # Add an observer on this object that will receive notification on update. # # Upon completion the `IVar` will notify all observers in a thread-safe way. diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index bb836f5c6..3f7b48e9a 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -34,14 +34,15 @@ def initialize(*args, &block) raise NotImplementedError end + private + # @yield runs the block synchronized against this object, - # equvivalent of java's `synchronize(this) {}` + # equivalent of java's `synchronize(this) {}` + # @note can by made public in descendants if required by `public :synchronize` def synchronize raise NotImplementedError end - private - # initialization of the object called inside synchronize block def ns_initialize(*args, &block) end diff --git a/lib/concurrent/synchronization/java_pure_object.rb b/lib/concurrent/synchronization/java_pure_object.rb index 88f4088a0..306e22314 100644 --- a/lib/concurrent/synchronization/java_pure_object.rb +++ b/lib/concurrent/synchronization/java_pure_object.rb @@ -9,12 +9,12 @@ def initialize(*args, &block) synchronize { ns_initialize(*args, &block) } end + private + def synchronize JRuby.reference0(self).synchronized { yield } end - private - def ns_wait(timeout = nil) success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout) self diff --git a/lib/concurrent/synchronization/monitor_object.rb b/lib/concurrent/synchronization/monitor_object.rb index 4ff75dee6..5b146bdba 100644 --- a/lib/concurrent/synchronization/monitor_object.rb +++ b/lib/concurrent/synchronization/monitor_object.rb @@ -7,12 +7,12 @@ def initialize(*args, &block) synchronize { ns_initialize(*args, &block) } end + private + def synchronize @__lock__do_not_use_directly.synchronize { yield } end - private - def ns_wait(timeout = nil) @__condition__do_not_use_directly.wait timeout self diff --git a/lib/concurrent/synchronization/mutex_object.rb b/lib/concurrent/synchronization/mutex_object.rb index 7ad63cf33..85aecb0ba 100644 --- a/lib/concurrent/synchronization/mutex_object.rb +++ b/lib/concurrent/synchronization/mutex_object.rb @@ -7,6 +7,8 @@ def initialize(*args, &block) synchronize { ns_initialize(*args, &block) } end + private + def synchronize if @__lock__do_not_use_directly.owned? yield @@ -15,8 +17,6 @@ def synchronize end end - private - def ns_signal @__condition__do_not_use_directly.signal self diff --git a/lib/concurrent/synchronization/rbx_object.rb b/lib/concurrent/synchronization/rbx_object.rb index 6a749594e..5f6d2f7c0 100644 --- a/lib/concurrent/synchronization/rbx_object.rb +++ b/lib/concurrent/synchronization/rbx_object.rb @@ -9,12 +9,12 @@ def initialize(*args, &block) end end + private + def synchronize(&block) Rubinius.synchronize(self, &block) end - private - def ns_wait(timeout = nil) wchan = Rubinius::Channel.new From 08c24238b4ab84f3c31bed76f935e5e5005a1b97 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 28 Apr 2015 15:24:06 +0200 Subject: [PATCH 04/16] Add ensure_ivar_visibility! and volatile attributes --- lib/concurrent/synchronization.rb | 2 + .../synchronization/abstract_object.rb | 38 +++++- lib/concurrent/synchronization/java_object.rb | 14 +++ .../synchronization/java_pure_object.rb | 4 + .../synchronization/mutex_object.rb | 5 + lib/concurrent/synchronization/rbx_object.rb | 23 ++++ spec/concurrent/synchronized_object_spec.rb | 109 ++++++++++++++++++ 7 files changed, 192 insertions(+), 3 deletions(-) create mode 100644 lib/concurrent/synchronization/java_object.rb create mode 100644 spec/concurrent/synchronized_object_spec.rb diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index 96a4ce7ba..d4863b559 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -1,6 +1,7 @@ require 'concurrent/utility/engine' require 'concurrent/synchronization/abstract_object' require 'concurrent/native_extensions' # JavaObject +require 'concurrent/synchronization/java_object' # JavaObject require 'concurrent/synchronization/mutex_object' require 'concurrent/synchronization/monitor_object' require 'concurrent/synchronization/rbx_object' @@ -21,6 +22,7 @@ module Synchronization end private_constant :Implementation + # @see AbstractObject class Object < Implementation end end diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index 3f7b48e9a..9cc6587b0 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -1,9 +1,7 @@ module Concurrent - # TODO rename to Synchronization - # TODO add newCondition module Synchronization # Safe synchronization under any Ruby implementation. - # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. + # It provides methods like {#synchronize}, {#ns_wait}, {#ns_signal} and {#ns_broadcast}. # Provides a single layer which can improve its implementation over time without changes needed to # the classes using it. Use {Synchronization::Object} not this abstract class. # @@ -118,6 +116,40 @@ def ns_signal def ns_broadcast raise NotImplementedError end + + # Allows to construct immutable objects where all fields are visible after initialization, not requiring + # further synchronization on access. + # @example + # class AClass + # attr_reader :val + # def initialize(val) + # @val = val # final value, after assignment it's not changed (just convention, not enforced) + # ensure_ivar_visibility! + # # now it can be shared as Java's final field + # end + # end + def ensure_ivar_visibility! + raise NotImplementedError + end + + # creates methods for reading and writing to a instance variable with volatile (Java semantic) instance variable + # return [Array] names of defined method names + def self.attr_volatile(*names) + names.each do |name| + ivar = :"@volatile_#{name}" + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{name} + #{ivar} + end + + def #{name}=(value) + #{ivar} = value + end + RUBY + end + names.map { |n| [n, :"#{n}="] }.flatten + end + end end end diff --git a/lib/concurrent/synchronization/java_object.rb b/lib/concurrent/synchronization/java_object.rb new file mode 100644 index 000000000..134665a81 --- /dev/null +++ b/lib/concurrent/synchronization/java_object.rb @@ -0,0 +1,14 @@ +module Concurrent + module Synchronization + + if Concurrent.on_jruby? + require 'jruby' + + class JavaObject < AbstractObject + def ensure_ivar_visibility! + # relying on undocumented behavior of JRuby, ivar access is volatile + end + end + end + end +end diff --git a/lib/concurrent/synchronization/java_pure_object.rb b/lib/concurrent/synchronization/java_pure_object.rb index 306e22314..9e3b47f26 100644 --- a/lib/concurrent/synchronization/java_pure_object.rb +++ b/lib/concurrent/synchronization/java_pure_object.rb @@ -33,6 +33,10 @@ def ns_signal JRuby.reference0(self).notify self end + + def ensure_ivar_visibility! + # relying on undocumented behavior of JRuby, ivar access is volatile + end end end end diff --git a/lib/concurrent/synchronization/mutex_object.rb b/lib/concurrent/synchronization/mutex_object.rb index 85aecb0ba..f73e5fd29 100644 --- a/lib/concurrent/synchronization/mutex_object.rb +++ b/lib/concurrent/synchronization/mutex_object.rb @@ -31,6 +31,11 @@ def ns_wait(timeout = nil) @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout self end + + def ensure_ivar_visibility! + # relying on undocumented behavior of CRuby, GVL acquire has lock which ensures visibility of ivars + # https://github.com/ruby/ruby/blob/ruby_2_2/thread_pthread.c#L204-L211 + end end end end diff --git a/lib/concurrent/synchronization/rbx_object.rb b/lib/concurrent/synchronization/rbx_object.rb index 5f6d2f7c0..3bed5f6ed 100644 --- a/lib/concurrent/synchronization/rbx_object.rb +++ b/lib/concurrent/synchronization/rbx_object.rb @@ -44,6 +44,29 @@ def ns_broadcast @waiters.shift << true until @waiters.empty? self end + + def ensure_ivar_visibility! + # Rubinius instance variables are not volatile so we need to insert barrier + Rubinius.memory_barrier + end + + def self.attr_volatile *names + names.each do |name| + ivar = :"@volatile_#{name}" + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{name} + Rubinius.memory_barrier + #{ivar} + end + + def #{name}=(value) + #{ivar} = value + Rubinius.memory_barrier + end + RUBY + end + names.map { |n| [n, :"#{n}="] }.flatten + end end end end diff --git a/spec/concurrent/synchronized_object_spec.rb b/spec/concurrent/synchronized_object_spec.rb new file mode 100644 index 000000000..da6437e9d --- /dev/null +++ b/spec/concurrent/synchronized_object_spec.rb @@ -0,0 +1,109 @@ +module Concurrent + + describe Synchronization::Object do + + class AClass < Synchronization::Object + attr_volatile :volatile + attr_accessor :not_volatile + + def initialize(value = nil) + super() + @Final = value + ensure_ivar_visibility! + end + + def final + @Final + end + + def count + synchronize { @count += 1 } + end + + def wait(timeout = nil) + synchronize { ns_wait(timeout) } + end + + private + + def ns_initialize + @count = 0 + end + end + + subject { AClass.new } + + describe '#wait' do + + it 'waiting thread is sleeping' do + t = Thread.new do + Thread.abort_on_exception = true + subject.wait + end + sleep 0.1 + expect(t.status).to eq 'sleep' + end + + it 'sleeping thread can be killed' do + t = Thread.new do + Thread.abort_on_exception = true + subject.wait rescue nil + end + sleep 0.1 + t.kill + sleep 0.1 + expect(t.status).to eq false + expect(t.alive?).to eq false + end + end + + describe '#synchronize' do + it 'allows only one thread to execute count' do + threads = 10.times.map { Thread.new(subject) { 100.times { subject.count } } } + threads.each(&:join) + expect(subject.count).to eq 1001 + end + end + + describe 'signaling' do + pending 'for now pending, tested pretty well by Event' + end + + specify 'final field always visible' do + store = AClass.new 'asd' + t1 = Thread.new { 1000000000.times { |i| store = AClass.new i.to_s } } + t2 = Thread.new { 10.times { expect(store.final).not_to be_nil; Thread.pass } } + t2.join + t1.kill + end + + describe 'attr volatile' do + specify 'older writes are always visible' do + store = AClass.new + store.not_volatile = 0 + store.volatile = 0 + + t1 = Thread.new do + Thread.abort_on_exception = true + 1000000000.times do |i| + store.not_volatile = i + store.volatile = i + end + end + + t2 = Thread.new do + 10.times do + volatile = store.volatile + not_volatile = store.not_volatile + expect(not_volatile).to be >= volatile + Thread.pass + end + end + + t2.join + t1.kill + end + end + + end +end From 41b7adf0d2f81ccc2bc9c7dc26d14978b8364e8d Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 28 Apr 2015 22:51:44 +0200 Subject: [PATCH 05/16] Add ImmutableStruct implementation --- lib/concurrent/synchronization.rb | 2 ++ .../synchronization/immutable_struct.rb | 23 +++++++++++++++++++ spec/concurrent/synchronized_object_spec.rb | 11 +++++++++ 3 files changed, 36 insertions(+) create mode 100644 lib/concurrent/synchronization/immutable_struct.rb diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index d4863b559..d1fbb36bc 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -27,3 +27,5 @@ class Object < Implementation end end end + +require 'concurrent/synchronization/immutable_struct' diff --git a/lib/concurrent/synchronization/immutable_struct.rb b/lib/concurrent/synchronization/immutable_struct.rb new file mode 100644 index 000000000..f09f09009 --- /dev/null +++ b/lib/concurrent/synchronization/immutable_struct.rb @@ -0,0 +1,23 @@ +module Concurrent + module Synchronization + class ImmutableStruct < Synchronization::Object + def self.with_fields(*names, &block) + Class.new(self) do + attr_reader(*names) + instance_eval &block if block + + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def initialize(#{names.join(', ')}) + #{names.map { |n| '@' + n.to_s }.join(', ')} = #{names.join(', ')} + ensure_ivar_visibility! + end + RUBY + end + end + + def self.[](*args) + new *args + end + end + end +end diff --git a/spec/concurrent/synchronized_object_spec.rb b/spec/concurrent/synchronized_object_spec.rb index da6437e9d..07a4244c4 100644 --- a/spec/concurrent/synchronized_object_spec.rb +++ b/spec/concurrent/synchronized_object_spec.rb @@ -105,5 +105,16 @@ def ns_initialize end end + describe Synchronization::ImmutableStruct do + let(:klass) { described_class.with_fields(:a, :b) } + subject { klass[1, 'a'] } + + specify do + expect(klass.superclass).to eq described_class + expect(subject.a).to eq 1 + expect(subject.b).to eq 'a' + end + end + end end From 5c4425f63db206aeaf880d883d6df564c886392a Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 28 Apr 2015 22:54:09 +0200 Subject: [PATCH 06/16] Apply new Synchronization::Object capabilities to Edge::Future --- examples/benchmark_new_futures.rb | 51 ++++++++ lib/concurrent/edge/future.rb | 199 +++++++++++++----------------- 2 files changed, 138 insertions(+), 112 deletions(-) create mode 100644 examples/benchmark_new_futures.rb diff --git a/examples/benchmark_new_futures.rb b/examples/benchmark_new_futures.rb new file mode 100644 index 000000000..3931ee3dd --- /dev/null +++ b/examples/benchmark_new_futures.rb @@ -0,0 +1,51 @@ +require 'benchmark/ips' +require 'concurrent' +require 'concurrent-edge' + +scale = 1 +time = 10 * scale +warmup = 2 * scale +warmup *= 10 if Concurrent.on_jruby? + + +Benchmark.ips(time, warmup) do |x| + of = Concurrent::Promise.execute { 1 } + nf = Concurrent.future { 1 } + x.report('value-old') { of.value! } + x.report('value-new') { nf.value! } + x.compare! +end + + +Benchmark.ips(time, warmup) do |x| + ohead = Concurrent::Promise.execute { 1 } + x.report('graph-old') do + head = ohead + branch1 = head.then(&:succ) + branch2 = head.then(&:succ).then(&:succ) + Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value! + end + nhead = Concurrent.future { 1 } + x.report('graph-new') do + head = nhead + branch1 = head.then(&:succ) + branch2 = head.then(&:succ).then(&:succ) + (branch1 + branch2).then { |(a, b)| a + b }.value! + end + x.compare! +end + +Benchmark.ips(time, warmup) do |x| + x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! } + x.report('immediate-new') { Concurrent.future { nil }.value! } + x.compare! +end + +Benchmark.ips(time, warmup) do |x| + of = Concurrent::Promise.execute { 1 } + nf = Concurrent.future { 1 } + x.report('then-old') { of.then(&:succ).then(&:succ).value! } + x.report('then-new') { nf.then(&:succ).then(&:succ).value! } + x.compare! +end + diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index 7a1f571cf..bb9cb465b 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -84,6 +84,14 @@ def post_on(executor, *args, &job) class Event < Synchronization::Object extend FutureShortcuts + def initialize(promise, default_executor = :io) + @Promise = promise + @DefaultExecutor = default_executor + @Touched = AtomicBoolean.new(false) + super() + ensure_ivar_visibility! + end + # Is obligation completion still pending? # @return [Boolean] def pending? @@ -105,7 +113,9 @@ def wait(timeout = nil) end def touch - pr_touch synchronize { ns_promise_to_touch } + if (promise = promise_to_touch) + promise.touch + end end def state @@ -113,7 +123,7 @@ def state end def default_executor - synchronize { ns_default_executor } + @DefaultExecutor end # @yield [success, value, reason] of the parent @@ -134,7 +144,7 @@ def schedule(intended_time) # @yield [success, value, reason] executed async on `executor` when completed # @return self def on_completion(executor = nil, &callback) - synchronize { ns_on_completion(ns_default_executor, executor, &callback) } + synchronize { ns_on_completion(@DefaultExecutor, executor, &callback) } end # @yield [success, value, reason] executed sync when completed @@ -184,7 +194,12 @@ def add_callback(method, *args) # @api private, only for inspection def promise - synchronize { ns_promise } + @Promise + end + + # @api private, only for inspection + def touched + @Touched.value end def with_default_executor(executor = default_executor) @@ -193,12 +208,9 @@ def with_default_executor(executor = default_executor) private - def ns_initialize(promise, default_executor = :io) - @promise = promise - @state = :pending - @callbacks = [] - @default_executor = default_executor - @touched = false # TODO use atom to avoid locking + def ns_initialize + @state = :pending + @callbacks = [] end def ns_wait_until_complete(timeout = nil) @@ -220,23 +232,8 @@ def ns_completed? ns_state == :completed end - def ns_promise_to_touch - unless @touched - @touched = true - ns_promise - end - end - - def pr_touch(promise) - promise.touch if promise - end - - def ns_promise - @promise - end - - def ns_default_executor - @default_executor + def promise_to_touch + @Promise if @Touched.make_true end def pr_chain(default_executor, executor = nil, &callback) @@ -411,13 +408,13 @@ def or(*futures) # @yield [value] executed async on `executor` when success # @return self def on_success(executor = nil, &callback) - synchronize { ns_on_success(ns_default_executor, executor, &callback) } + synchronize { ns_on_success(@DefaultExecutor, executor, &callback) } end # @yield [reason] executed async on `executor` when failed? # @return self def on_failure(executor = nil, &callback) - synchronize { ns_on_failure(ns_default_executor, executor, &callback) } + synchronize { ns_on_failure(@DefaultExecutor, executor, &callback) } end # @yield [value] executed sync when success @@ -450,8 +447,8 @@ def ns_add_callback(method, *args) private - def ns_initialize(promise, default_executor = :io) - super(promise, default_executor) + def ns_initialize + super @value = nil @reason = nil end @@ -618,16 +615,22 @@ def evaluate_to!(*args, &block) # @abstract class AbstractPromise < Synchronization::Object - def default_executor - future.default_executor + def initialize(future, *args, &block) + super(*args, &block) + @Future = future + ensure_ivar_visibility! end def future - synchronize { ns_future } + @Future end alias_method :event, :future + def default_executor + future.default_executor + end + def state future.state end @@ -645,16 +648,8 @@ def inspect private - def ns_initialize(future) - @future = future - end - - def ns_future - @future - end - def complete(*args) - pr_complete(synchronize { ns_future }, *args) + pr_complete(@Future, *args) end def pr_complete(future, *args) @@ -662,7 +657,7 @@ def pr_complete(future, *args) end def evaluate_to(*args, &block) - pr_evaluate_to(synchronize { ns_future }, *args, &block) + pr_evaluate_to(@Future, *args, &block) end # @return [Future] @@ -676,9 +671,7 @@ def pr_evaluate_to(future, *args, &block) class CompletableEventPromise < AbstractPromise public :complete - private - - def ns_initialize(default_executor = :io) + def initialize(default_executor = :io) super CompletableEvent.new(self, default_executor) end end @@ -688,6 +681,10 @@ def ns_initialize(default_executor = :io) # Concurrent.promise # @note TODO consider to allow being blocked_by class CompletableFuturePromise < AbstractPromise + def initialize(default_executor = :io) + super CompletableFuture.new(self, default_executor) + end + # Set the `Future` to a value and wake or notify all threads waiting on it. # # @param [Object] value the value to store in the `Future` @@ -721,12 +718,6 @@ def try_fail(reason = StandardError.new) def evaluate_to!(*args, &block) evaluate_to(*args, &block).wait! end - - private - - def ns_initialize(default_executor = :io) - super CompletableFuture.new(self, default_executor) - end end # @abstract @@ -769,8 +760,7 @@ def inspect private - def ns_initialize(future, blocked_by_futures) - super future + def ns_initialize(blocked_by_futures) @blocked_by = Array(blocked_by_futures) @countdown = @blocked_by.size end @@ -781,7 +771,7 @@ def ns_done(future) end def ns_completable_args(done_future, blocked_by) - [done_future, blocked_by, ns_future] + [done_future, blocked_by, @Future] end def pr_completable(_, _, _) @@ -795,33 +785,25 @@ def ns_blocked_by # @abstract class BlockedTaskPromise < BlockedPromise - def executor - synchronize { ns_executor } - end - - private - - def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) + def initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) raise ArgumentError, 'no block given' unless block_given? - super Future.new(self, default_executor), [blocked_by_future] - @task = task - @executor = executor + @Executor = executor + @Task = task + super Future.new(self, default_executor), blocked_by_future end - def ns_executor - @executor + def executor + @Executor end - def ns_task - @task - end + private - def task - synchronize { ns_task } + def ns_initialize(blocked_by_future) + super [blocked_by_future] end def ns_completable_args(done_future, blocked_by) - [done_future, blocked_by, ns_future, ns_executor, ns_task] + [done_future, blocked_by, @Future, @Executor, @Task] end def pr_completable(_, _, _, _, _) @@ -832,10 +814,9 @@ def pr_completable(_, _, _, _, _) class ThenPromise < BlockedTaskPromise private - def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) - blocked_by_future.is_a? Future or - raise ArgumentError, 'only Future can be appended with then' - super(blocked_by_future, default_executor, executor, &task) + def ns_initialize(blocked_by_future) + raise ArgumentError, 'only Future can be appended with then' unless blocked_by_future.is_a? Future + super(blocked_by_future) end def pr_completable(done_future, _, future, executor, task) @@ -850,10 +831,9 @@ def pr_completable(done_future, _, future, executor, task) class RescuePromise < BlockedTaskPromise private - def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) - blocked_by_future.is_a? Future or - raise ArgumentError, 'only Future can be rescued' - super(blocked_by_future, default_executor, executor, &task) + def ns_initialize(blocked_by_future) + raise ArgumentError, 'only Future can be rescued' unless blocked_by_future.is_a? Future + super(blocked_by_future) end def pr_completable(done_future, _, future, executor, task) @@ -879,16 +859,8 @@ def pr_completable(done_future, _, _, executor, task) # will be immediately completed class ImmediatePromise < InnerPromise - def self.new(*args) - promise = super(*args) - Concurrent.post_on(:fast, promise) { |promise| promise.future.complete } - promise - end - - private - - def ns_initialize(default_executor = :io) - super Event.new(self, default_executor) + def initialize(default_executor = :io) + super Event.new(self, default_executor).complete end end @@ -913,10 +885,14 @@ def ns_done(future) super future end - def ns_initialize(blocked_by_future, levels = 1, default_executor = :io) + def initialize(blocked_by_future, levels = 1, default_executor = :io) + super Future.new(self, default_executor), blocked_by_future, levels + end + + def ns_initialize(blocked_by_future, levels = 1) blocked_by_future.is_a? Future or raise ArgumentError, 'only Future can be flatten' - super(Future.new(self, default_executor), [blocked_by_future]) + super([blocked_by_future]) @levels = levels end @@ -929,7 +905,7 @@ def pr_completable(_, blocked_by, future) class AllPromise < BlockedPromise private - def ns_initialize(blocked_by_futures, default_executor = :io) + def initialize(blocked_by_futures, default_executor = :io) klass = blocked_by_futures.any? { |f| f.is_a?(Future) } ? Future : Event # noinspection RubyArgCount super(klass.new(self, default_executor), blocked_by_futures) @@ -955,7 +931,7 @@ class AnyPromise < BlockedPromise private - def ns_initialize(blocked_by_futures, default_executor = :io) + def initialize(blocked_by_futures, default_executor = :io) blocked_by_futures.all? { |f| f.is_a? Future } or raise ArgumentError, 'accepts only Futures not Events' super(Future.new(self, default_executor), blocked_by_futures) @@ -972,12 +948,12 @@ def pr_completable(done_future, _, future) class Delay < InnerPromise def touch - pr_complete synchronize { ns_future } + pr_complete @Future end private - def ns_initialize(default_executor = :io) + def initialize(default_executor = :io) super Event.new(self, default_executor) end end @@ -985,34 +961,33 @@ def ns_initialize(default_executor = :io) # will be evaluated to task in intended_time class ScheduledPromise < InnerPromise def intended_time - synchronize { ns_intended_time } + @IntendedTime end def inspect - "#{to_s[0..-2]} intended_time:[#{intended_time}}>" + "#{to_s[0..-2]} intended_time:[#{@IntendedTime}}>" end private - def ns_initialize(intended_time, default_executor = :io) + def initialize(intended_time, default_executor = :io) + @IntendedTime = intended_time super Event.new(self, default_executor) + end + + def ns_initialize in_seconds = begin - @intended_time = intended_time - now = Time.now - schedule_time = if intended_time.is_a? Time - intended_time - else - now + intended_time - end + now = Time.now + schedule_time = if @IntendedTime.is_a? Time + @IntendedTime + else + now + @IntendedTime + end [0, schedule_time.to_f - now.to_f].max end Concurrent.global_timer_set.post(in_seconds) { complete } end - - def ns_intended_time - @intended_time - end end end From 1ba4f88b18df3063ae2dcb8c0338ca52fe2af1e2 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 29 Apr 2015 11:23:25 +0200 Subject: [PATCH 07/16] ImmutableStruct Improvements --- .../synchronization/immutable_struct.rb | 46 ++++- spec/concurrent/synchronized_object_spec.rb | 177 ++++++++++-------- 2 files changed, 141 insertions(+), 82 deletions(-) diff --git a/lib/concurrent/synchronization/immutable_struct.rb b/lib/concurrent/synchronization/immutable_struct.rb index f09f09009..dcbcfce28 100644 --- a/lib/concurrent/synchronization/immutable_struct.rb +++ b/lib/concurrent/synchronization/immutable_struct.rb @@ -1,23 +1,67 @@ module Concurrent module Synchronization + # Similar to Struct but the fields are immutable and always visible (like Java final fields) + # @example + # Person = ImmutableStruct.with_fields :name, :age + # Person.new 'John Doe', 15 + # Person['John Doe', 15] + # Person['John Doe', 15].members # => [:name, :age] + # Person['John Doe', 15].values # => ['John Doe', 15] class ImmutableStruct < Synchronization::Object def self.with_fields(*names, &block) Class.new(self) do attr_reader(*names) - instance_eval &block if block class_eval <<-RUBY, __FILE__, __LINE__ + 1 def initialize(#{names.join(', ')}) #{names.map { |n| '@' + n.to_s }.join(', ')} = #{names.join(', ')} ensure_ivar_visibility! end + + def members + #{names.inspect} + end + + def self.members + #{names.inspect} + end RUBY + + instance_eval &block if block end end + # Define equality based on class and members' equality. This is optional since for CAS operation + # it may be required to compare references which is default behaviour of this class. + def self.define_equality! + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def ==(other) + self.class == other.class && + #{members.map { |name| "self.#{name} == other.#{name}" }.join(" && ")} + end + RUBY + end + def self.[](*args) new *args end + + include Enumerable + + def each(&block) + return to_enum unless block_given? + members.zip(values).each(&block) + end + + def size + members.size + end + + def values + members.map { |name| send name } + end + + alias_method :to_a, :values end end end diff --git a/spec/concurrent/synchronized_object_spec.rb b/spec/concurrent/synchronized_object_spec.rb index 07a4244c4..b8f6f8d5e 100644 --- a/spec/concurrent/synchronized_object_spec.rb +++ b/spec/concurrent/synchronized_object_spec.rb @@ -1,118 +1,133 @@ module Concurrent - describe Synchronization::Object do + describe Synchronization do + describe Synchronization::Object do - class AClass < Synchronization::Object - attr_volatile :volatile - attr_accessor :not_volatile + class AClass < Synchronization::Object + attr_volatile :volatile + attr_accessor :not_volatile - def initialize(value = nil) - super() - @Final = value - ensure_ivar_visibility! - end + def initialize(value = nil) + super() + @Final = value + ensure_ivar_visibility! + end - def final - @Final - end + def final + @Final + end - def count - synchronize { @count += 1 } - end + def count + synchronize { @count += 1 } + end - def wait(timeout = nil) - synchronize { ns_wait(timeout) } - end + def wait(timeout = nil) + synchronize { ns_wait(timeout) } + end - private + private - def ns_initialize - @count = 0 + def ns_initialize + @count = 0 + end end - end - subject { AClass.new } + subject { AClass.new } - describe '#wait' do + describe '#wait' do - it 'waiting thread is sleeping' do - t = Thread.new do - Thread.abort_on_exception = true - subject.wait + it 'waiting thread is sleeping' do + t = Thread.new do + Thread.abort_on_exception = true + subject.wait + end + sleep 0.1 + expect(t.status).to eq 'sleep' end - sleep 0.1 - expect(t.status).to eq 'sleep' - end - it 'sleeping thread can be killed' do - t = Thread.new do - Thread.abort_on_exception = true - subject.wait rescue nil + it 'sleeping thread can be killed' do + t = Thread.new do + Thread.abort_on_exception = true + subject.wait rescue nil + end + sleep 0.1 + t.kill + sleep 0.1 + expect(t.status).to eq false + expect(t.alive?).to eq false end - sleep 0.1 - t.kill - sleep 0.1 - expect(t.status).to eq false - expect(t.alive?).to eq false end - end - describe '#synchronize' do - it 'allows only one thread to execute count' do - threads = 10.times.map { Thread.new(subject) { 100.times { subject.count } } } - threads.each(&:join) - expect(subject.count).to eq 1001 + describe '#synchronize' do + it 'allows only one thread to execute count' do + threads = 10.times.map { Thread.new(subject) { 100.times { subject.count } } } + threads.each(&:join) + expect(subject.count).to eq 1001 + end end - end - describe 'signaling' do - pending 'for now pending, tested pretty well by Event' - end + describe 'signaling' do + pending 'for now pending, tested pretty well by Event' + end - specify 'final field always visible' do - store = AClass.new 'asd' - t1 = Thread.new { 1000000000.times { |i| store = AClass.new i.to_s } } - t2 = Thread.new { 10.times { expect(store.final).not_to be_nil; Thread.pass } } - t2.join - t1.kill - end + specify 'final field always visible' do + store = AClass.new 'asd' + t1 = Thread.new { 1000000000.times { |i| store = AClass.new i.to_s } } + t2 = Thread.new { 10.times { expect(store.final).not_to be_nil; Thread.pass } } + t2.join + t1.kill + end - describe 'attr volatile' do - specify 'older writes are always visible' do - store = AClass.new - store.not_volatile = 0 - store.volatile = 0 - - t1 = Thread.new do - Thread.abort_on_exception = true - 1000000000.times do |i| - store.not_volatile = i - store.volatile = i + describe 'attr volatile' do + specify 'older writes are always visible' do + store = AClass.new + store.not_volatile = 0 + store.volatile = 0 + + t1 = Thread.new do + Thread.abort_on_exception = true + 1000000000.times do |i| + store.not_volatile = i + store.volatile = i + end end - end - t2 = Thread.new do - 10.times do - volatile = store.volatile - not_volatile = store.not_volatile - expect(not_volatile).to be >= volatile - Thread.pass + t2 = Thread.new do + 10.times do + volatile = store.volatile + not_volatile = store.not_volatile + expect(not_volatile).to be >= volatile + Thread.pass + end end - end - t2.join - t1.kill + t2.join + t1.kill + end end end describe Synchronization::ImmutableStruct do - let(:klass) { described_class.with_fields(:a, :b) } - subject { klass[1, 'a'] } + AB = described_class.with_fields(:a, :b) + subject { AB[1, 'a'] } specify do - expect(klass.superclass).to eq described_class + expect(AB.superclass).to eq described_class expect(subject.a).to eq 1 expect(subject.b).to eq 'a' + expect(subject.values).to eq [1, 'a'] + expect(subject.to_a).to eq [1, 'a'] + expect(subject.size).to eq 2 + expect(subject.members).to eq [:a, :b] + expect(subject.each.to_a).to eq [[:a, 1], [:b, 'a']] + expect(subject.inspect).to match /#/ + end + + specify 'equality' do + klass = described_class.with_fields(:a, :b) + expect(klass[1, 'a']).not_to be == klass[1, 'a'] + klass.define_equality! + expect(klass[1, 'a']).to be == klass[1, 'a'] end end From 47cde7bade71a6b6a838bf9600ab20fcb2e6fa32 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 29 Apr 2015 11:24:08 +0200 Subject: [PATCH 08/16] Test stabilization --- spec/concurrent/atomic/condition_spec.rb | 2 +- spec/concurrent/atomic/cyclic_barrier_spec.rb | 7 ++----- spec/concurrent/synchronized_object_spec.rb | 1 - 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/spec/concurrent/atomic/condition_spec.rb b/spec/concurrent/atomic/condition_spec.rb index d00258637..9d9a396ea 100644 --- a/spec/concurrent/atomic/condition_spec.rb +++ b/spec/concurrent/atomic/condition_spec.rb @@ -284,7 +284,7 @@ module Concurrent mutex.synchronize { subject.broadcast } threads.each do |t| - expect(t.join(0.1)).to eq t + expect(t.join(5)).to eq t end end end diff --git a/spec/concurrent/atomic/cyclic_barrier_spec.rb b/spec/concurrent/atomic/cyclic_barrier_spec.rb index 0dec109bc..7359a8507 100644 --- a/spec/concurrent/atomic/cyclic_barrier_spec.rb +++ b/spec/concurrent/atomic/cyclic_barrier_spec.rb @@ -37,11 +37,8 @@ module Concurrent context 'with waiting threads' do it 'should be equal to the waiting threads count' do - Thread.new { barrier.wait } - Thread.new { barrier.wait } - - sleep(0.1) - + threads = [Thread.new { barrier.wait }, Thread.new { barrier.wait }] + Thread.pass until threads.all? { |t| t.status == 'sleep' } expect(barrier.number_waiting).to eq 2 end end diff --git a/spec/concurrent/synchronized_object_spec.rb b/spec/concurrent/synchronized_object_spec.rb index b8f6f8d5e..cffa6e64b 100644 --- a/spec/concurrent/synchronized_object_spec.rb +++ b/spec/concurrent/synchronized_object_spec.rb @@ -120,7 +120,6 @@ def ns_initialize expect(subject.size).to eq 2 expect(subject.members).to eq [:a, :b] expect(subject.each.to_a).to eq [[:a, 1], [:b, 'a']] - expect(subject.inspect).to match /#/ end specify 'equality' do From 310b7df38aca54fb0145757d6aaae6285df9d87b Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 29 Apr 2015 21:13:17 +0200 Subject: [PATCH 09/16] Synchronization module documentation --- doc/synchronization.md | 121 ++++++++++++++++++ lib/concurrent/configuration.rb | 2 +- lib/concurrent/edge/future.rb | 10 +- lib/concurrent/synchronization.rb | 1 + .../synchronization/abstract_object.rb | 4 +- lib/concurrent/synchronization/java_object.rb | 4 +- 6 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 doc/synchronization.md diff --git a/doc/synchronization.md b/doc/synchronization.md new file mode 100644 index 000000000..549a01ffb --- /dev/null +++ b/doc/synchronization.md @@ -0,0 +1,121 @@ +`Synchronization` module provides common layer for synchronization. It provides same guaranties independent of any particular Ruby implementation. + +*This is a new module, it is expected to fully stabilize for 1.0 release.* + +## Synchronization::Object + +Provides common parent for all objects which need to be synchronized or be using other synchronization tools. It provides: + +- Synchronized block +- Methods for waiting and signaling +- Volatile fields +- Ensure visibility of final fields +- Fields with CAS operations + +## Synchronized block + +`Synchronization::Object` provides private method `#synchronize(&block)`. For a given object only one Thread can enter one of the blocks synchronized against this object. Object is locked when a thread enters one of the synchronized blocks. + +Example of a simple counter which can be used by multiple threads: + +```ruby +class SafeCounter < Concurrent::Synchronization::Object + def initialize + super + synchronize { @count = 0 } + end + + def increment + synchronize { @count += 1 } + end + + def count + synchronize { @count } + end +end +``` + +### Naming conventions + +Methods starting with `ns_` are marking methods that are not using synchronization by themselves, they have to be used inside synchronize block. They are usually used in pairs to separate the synchronization from behavior: + +```ruby +def compute + service.report synchronize { ns_compute } +end + +private + +def ns_compute + ns_compute_reduce ns_compute_map +end +``` +where `compute` defines how is it synchronized and `ns_compute` handles the behavior (in this case the computation). `ns_` methods should only call other `ns_` methods or `pr_` methods. They can call normal methods on other objects, but that should be done with care (better to avoid) because the thread escapes this object while the lock is still held, which can lead to deadlock. That's why the `report` method is called in `compute` and not in `ns_compute`. + +`pr_` methods are pure functions they can be used in and outside of synchronized blocks. + +## Methods for waiting and signaling + +Sometimes while already inside the synchronized block some condition is not met. Then the thread needs to wait (releasing the lock) until the condition is met. The waiting thread is then signaled that it can continue. + +To fulfill these needs there are private methods: + +- `ns_wait` {include:Concurrent::Synchronization::AbstractObject#ns_wait} +- `ns_wait_until` {include:Concurrent::Synchronization::AbstractObject#ns_wait_until} +- `ns_signal` {include:Concurrent::Synchronization::AbstractObject#ns_signal} +- `ns_broadcast` {include:Concurrent::Synchronization::AbstractObject#ns_broadcast} + +All methods have to be called inside synchronized block. + +## Volatile fields + +`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field. + +## Ensure visibility of final fields + +Instance variables assigned only once in `initialize` method are not guaranteed to be visible to all threads. For that user can call `ensure_ivar_visibility!` method, like in following example taken from `Edge::AbstractPromise` implementation: + +```ruby +class AbstractPromise < Synchronization::Object + def initialize(future, *args, &block) + super(*args, &block) + @Future = future + ensure_ivar_visibility! + end + # ... +end +``` + +### Naming conventions + +Instance variables with camel case names are final and never reassigned. + +## Fields with CAS operations + +They are not supported directly, but AtomicReference can be stored in final field and then CAS operations can be done on it, like in following example taken from `Edge::Event` implementation: + +```ruby +class Event < Synchronization::Object + extend FutureShortcuts + + def initialize(promise, default_executor = :io) + @Promise = promise + @DefaultExecutor = default_executor + @Touched = AtomicBoolean.new(false) + super() + ensure_ivar_visibility! + end + # ... + def touch + # distribute touch to promise only once + @Promise.touch if @Touched.make_true + self + end + # ... +end +``` + +## Memory model (sort of) + +// TODO + diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 7494df800..7041fd46c 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -106,7 +106,7 @@ def self.global_timer_set end # General access point to global executors. - # @param [Symbol, Executor] maps symbols: + # @param [Symbol, Executor] executor_identifier symbols: # - :fast - {Concurrent.global_fast_executor} # - :io - {Concurrent.global_io_executor} # - :immediate - {Concurrent.global_immediate_executor} diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index bb9cb465b..26d7877e7 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -113,9 +113,9 @@ def wait(timeout = nil) end def touch - if (promise = promise_to_touch) - promise.touch - end + # distribute touch to promise only once + @Promise.touch if @Touched.make_true + self end def state @@ -232,10 +232,6 @@ def ns_completed? ns_state == :completed end - def promise_to_touch - @Promise if @Touched.make_true - end - def pr_chain(default_executor, executor = nil, &callback) ChainPromise.new(self, default_executor, executor || default_executor, &callback).future end diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index d1fbb36bc..d55816fa3 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -7,6 +7,7 @@ require 'concurrent/synchronization/rbx_object' module Concurrent + # {include:file:doc/synchronization.md} module Synchronization Implementation = case when Concurrent.on_jruby? diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index 9cc6587b0..de43d55f2 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -91,7 +91,7 @@ def ns_wait(timeout = nil) raise NotImplementedError end - # Signal one waiting thread + # Signal one waiting thread. # @return [self] # @note only to be used inside synchronized block # @note to provide direct access to this method in a descendant add method @@ -104,7 +104,7 @@ def ns_signal raise NotImplementedError end - # Broadcast to all waiting threads + # Broadcast to all waiting threads. # @return [self] # @note only to be used inside synchronized block # @note to provide direct access to this method in a descendant add method diff --git a/lib/concurrent/synchronization/java_object.rb b/lib/concurrent/synchronization/java_object.rb index 134665a81..2fcd39060 100644 --- a/lib/concurrent/synchronization/java_object.rb +++ b/lib/concurrent/synchronization/java_object.rb @@ -5,7 +5,9 @@ module Synchronization require 'jruby' class JavaObject < AbstractObject - def ensure_ivar_visibility! + private + + def ensure_ivar_visibility! # TODO move to java version # relying on undocumented behavior of JRuby, ivar access is volatile end end From 290702637de345ebf11fa3879e8c2c1d168e110e Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 29 Apr 2015 21:21:32 +0200 Subject: [PATCH 10/16] Move ensure_ivar_visibility! ruby method the the Java extension on JRuby --- .../ext/SynchronizationLibrary.java | 5 +++++ lib/concurrent/synchronization.rb | 1 - lib/concurrent/synchronization/java_object.rb | 16 ---------------- ...ed_object_spec.rb => synchronization_spec.rb} | 0 4 files changed, 5 insertions(+), 17 deletions(-) delete mode 100644 lib/concurrent/synchronization/java_object.rb rename spec/concurrent/{synchronized_object_spec.rb => synchronization_spec.rb} (100%) diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index ebf8cad8a..1eb0b9177 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -105,5 +105,10 @@ public IRubyObject nsBroadcast(ThreadContext context) { notifyAll(); return this; } + + @JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PRIVATE) + public IRubyObject ensureIvarVisibilityBang(ThreadContext context) { + return context.nil; + } } } diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index d55816fa3..e818b607d 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -1,7 +1,6 @@ require 'concurrent/utility/engine' require 'concurrent/synchronization/abstract_object' require 'concurrent/native_extensions' # JavaObject -require 'concurrent/synchronization/java_object' # JavaObject require 'concurrent/synchronization/mutex_object' require 'concurrent/synchronization/monitor_object' require 'concurrent/synchronization/rbx_object' diff --git a/lib/concurrent/synchronization/java_object.rb b/lib/concurrent/synchronization/java_object.rb deleted file mode 100644 index 2fcd39060..000000000 --- a/lib/concurrent/synchronization/java_object.rb +++ /dev/null @@ -1,16 +0,0 @@ -module Concurrent - module Synchronization - - if Concurrent.on_jruby? - require 'jruby' - - class JavaObject < AbstractObject - private - - def ensure_ivar_visibility! # TODO move to java version - # relying on undocumented behavior of JRuby, ivar access is volatile - end - end - end - end -end diff --git a/spec/concurrent/synchronized_object_spec.rb b/spec/concurrent/synchronization_spec.rb similarity index 100% rename from spec/concurrent/synchronized_object_spec.rb rename to spec/concurrent/synchronization_spec.rb From b94357b401d324c799a45fd5066f1872398a9945 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 30 Apr 2015 08:01:14 +0200 Subject: [PATCH 11/16] Move Object definition to its own file --- lib/concurrent/synchronization.rb | 21 +++------------------ lib/concurrent/synchronization/object.rb | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+), 18 deletions(-) create mode 100644 lib/concurrent/synchronization/object.rb diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index e818b607d..b6327bd9e 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -4,28 +4,13 @@ require 'concurrent/synchronization/mutex_object' require 'concurrent/synchronization/monitor_object' require 'concurrent/synchronization/rbx_object' +require 'concurrent/synchronization/object' + +require 'concurrent/synchronization/immutable_struct' module Concurrent # {include:file:doc/synchronization.md} module Synchronization - Implementation = case - when Concurrent.on_jruby? - JavaObject - when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) <= 0 - MonitorObject - when Concurrent.on_cruby? - MutexObject - when Concurrent.on_rbx? - RbxObject - else - MutexObject - end - private_constant :Implementation - - # @see AbstractObject - class Object < Implementation - end end end -require 'concurrent/synchronization/immutable_struct' diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb new file mode 100644 index 000000000..41979e13c --- /dev/null +++ b/lib/concurrent/synchronization/object.rb @@ -0,0 +1,21 @@ +module Concurrent + module Synchronization + Implementation = case + when Concurrent.on_jruby? + JavaObject + when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) <= 0 + MonitorObject + when Concurrent.on_cruby? + MutexObject + when Concurrent.on_rbx? + RbxObject + else + MutexObject + end + private_constant :Implementation + + # @see AbstractObject AbstractObject which defines interface of this class. + class Object < Implementation + end + end +end From 8c92b51ba7930da7b9263380094e78b4ab2bfabf Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 30 Apr 2015 08:11:36 +0200 Subject: [PATCH 12/16] Remove unnecessary definition of members methods by eval --- lib/concurrent/synchronization/immutable_struct.rb | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/synchronization/immutable_struct.rb b/lib/concurrent/synchronization/immutable_struct.rb index dcbcfce28..94439b0a0 100644 --- a/lib/concurrent/synchronization/immutable_struct.rb +++ b/lib/concurrent/synchronization/immutable_struct.rb @@ -11,17 +11,12 @@ class ImmutableStruct < Synchronization::Object def self.with_fields(*names, &block) Class.new(self) do attr_reader(*names) - class_eval <<-RUBY, __FILE__, __LINE__ + 1 def initialize(#{names.join(', ')}) #{names.map { |n| '@' + n.to_s }.join(', ')} = #{names.join(', ')} ensure_ivar_visibility! end - def members - #{names.inspect} - end - def self.members #{names.inspect} end @@ -46,6 +41,10 @@ def self.[](*args) new *args end + def members + self.class.members + end + include Enumerable def each(&block) From 32162e0fe1c1e15487a43bf5ee508d20ca1301bb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 30 Apr 2015 08:58:40 +0200 Subject: [PATCH 13/16] Extract ruby_version comparison --- ext/concurrent/extconf.rb | 2 +- lib/concurrent/synchronization/object.rb | 5 +++-- lib/concurrent/utility/engine.rb | 10 ++++++++++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/ext/concurrent/extconf.rb b/ext/concurrent/extconf.rb index dc74e10ce..716b114a4 100644 --- a/ext/concurrent/extconf.rb +++ b/ext/concurrent/extconf.rb @@ -11,7 +11,7 @@ def create_dummy_makefile end end -if defined?(JRUBY_VERSION) || ! Concurrent.allow_c_extensions? +if Concurrent.on_jruby? || ! Concurrent.allow_c_extensions? create_dummy_makefile warn 'C optimizations are not supported on this version of Ruby.' else diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb index 41979e13c..44e808a5c 100644 --- a/lib/concurrent/synchronization/object.rb +++ b/lib/concurrent/synchronization/object.rb @@ -3,13 +3,14 @@ module Synchronization Implementation = case when Concurrent.on_jruby? JavaObject - when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) <= 0 + when Concurrent.on_cruby? && Concurrent.ruby_version(:<=, 1, 9, 3) MonitorObject - when Concurrent.on_cruby? + when Concurrent.on_cruby? && Concurrent.ruby_version(:>, 1, 9, 3) MutexObject when Concurrent.on_rbx? RbxObject else + warn 'Possibly unsupported Ruby implementation' MutexObject end private_constant :Implementation diff --git a/lib/concurrent/utility/engine.rb b/lib/concurrent/utility/engine.rb index 0df014d9c..bf9bcb9fc 100644 --- a/lib/concurrent/utility/engine.rb +++ b/lib/concurrent/utility/engine.rb @@ -16,6 +16,16 @@ def on_rbx? def ruby_engine defined?(RUBY_ENGINE) ? RUBY_ENGINE : 'ruby' end + + def ruby_version(comparison, major, minor, patch) + result = (RUBY_VERSION.split('.').map(&:to_i) <=> [major, minor, patch]) + comparisons = { :== => [0], + :>= => [1, 0], + :<= => [-1, 0], + :> => [1], + :< => [-1] } + comparisons.fetch(comparison).include? result + end end extend EngineDetector From 3dc0e879591c40190911a6548a8f7a6538e82491 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 30 Apr 2015 12:59:04 +0200 Subject: [PATCH 14/16] Making Event and Future fields volatile to remove few more synchronizations --- examples/benchmark_new_futures.rb | 7 +- lib/concurrent/edge/future.rb | 391 +++++++++++++----------------- 2 files changed, 177 insertions(+), 221 deletions(-) diff --git a/examples/benchmark_new_futures.rb b/examples/benchmark_new_futures.rb index 3931ee3dd..642ad6b31 100644 --- a/examples/benchmark_new_futures.rb +++ b/examples/benchmark_new_futures.rb @@ -16,18 +16,15 @@ x.compare! end - Benchmark.ips(time, warmup) do |x| - ohead = Concurrent::Promise.execute { 1 } x.report('graph-old') do - head = ohead + head = Concurrent::Promise.execute { 1 } branch1 = head.then(&:succ) branch2 = head.then(&:succ).then(&:succ) Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value! end - nhead = Concurrent.future { 1 } x.report('graph-new') do - head = nhead + head = Concurrent.future { 1 } branch1 = head.then(&:succ) branch2 = head.then(&:succ).then(&:succ) (branch1 + branch2).then { |(a, b)| a + b }.value! diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index 26d7877e7..e65f36779 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -84,32 +84,39 @@ def post_on(executor, *args, &job) class Event < Synchronization::Object extend FutureShortcuts + attr_volatile :state + private :state= + def initialize(promise, default_executor = :io) @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new(false) + self.state = :pending super() ensure_ivar_visibility! end - # Is obligation completion still pending? + # Is Future still pending? # @return [Boolean] def pending? - synchronize { ns_pending? } + state == :pending end alias_method :incomplete?, :pending? + # Is Future still completed? + # @return [Boolean] def completed? - synchronize { ns_completed? } + state == :completed end # wait until Obligation is #complete? # @param [Numeric] timeout the maximum time in second to wait. - # @return [Obligation] self + # @return [Event] self def wait(timeout = nil) touch - synchronize { ns_wait_until_complete(timeout) } + wait_until_complete timeout + self end def touch @@ -118,39 +125,40 @@ def touch self end - def state - synchronize { ns_state } - end - def default_executor @DefaultExecutor end # @yield [success, value, reason] of the parent def chain(executor = nil, &callback) - pr_chain(default_executor, executor, &callback) + ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end alias_method :then, :chain + # TODO take block optionally + def join(*futures) + AllPromise.new([self, *futures], @DefaultExecutor).future + end + def delay - pr_delay(default_executor) + join(Delay.new(@DefaultExecutor).future) end def schedule(intended_time) - pr_schedule(default_executor, intended_time) + chain { ScheduledPromise.new(intended_time).future.join(self) }.flat end # @yield [success, value, reason] executed async on `executor` when completed # @return self def on_completion(executor = nil, &callback) - synchronize { ns_on_completion(@DefaultExecutor, executor, &callback) } + add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback end # @yield [success, value, reason] executed sync when completed # @return self def on_completion!(&callback) - synchronize { ns_on_completion!(&callback) } + add_callback :pr_callback_on_completion, callback end # @return [Array] @@ -166,17 +174,12 @@ def inspect synchronize { "#{ns_to_s[0..-2]} blocks:[#{pr_blocks(@callbacks).map(&:to_s).join(', ')}]>" } end - # TODO take block optionally - def join(*futures) - pr_join(default_executor, *futures) - end - alias_method :+, :join alias_method :and, :join # @api private def complete(raise = true) - callbacks = synchronize { ns_complete(raise) } + callbacks = synchronize { ns_complete raise } pr_call_callbacks callbacks self end @@ -189,7 +192,20 @@ def callbacks # @api private def add_callback(method, *args) - synchronize { ns_add_callback(method, *args) } + call = if completed? + true + else + synchronize do + if completed? + true + else + @callbacks << [method, *args] + false + end + end + end + pr_call_callback method, *args if call + self end # @api private, only for inspection @@ -202,60 +218,23 @@ def touched @Touched.value end - def with_default_executor(executor = default_executor) + def with_default_executor(executor = @DefaultExecutor) AllPromise.new([self], executor).future end private def ns_initialize - @state = :pending @callbacks = [] end - def ns_wait_until_complete(timeout = nil) - ns_wait_until(timeout) { ns_completed? } + def wait_until_complete(timeout) + unless completed? + synchronize { ns_wait_until(timeout) { completed? } } + end self end - def ns_state - @state - end - - def ns_pending? - ns_state == :pending - end - - alias_method :ns_incomplete?, :ns_pending? - - def ns_completed? - ns_state == :completed - end - - def pr_chain(default_executor, executor = nil, &callback) - ChainPromise.new(self, default_executor, executor || default_executor, &callback).future - end - - def pr_delay(default_executor) - pr_join(default_executor, Delay.new(default_executor).future) - end - - def pr_schedule(default_executor, intended_time) - pr_chain(default_executor) { ScheduledPromise.new(intended_time).future.join(self) }.flat - end - - def pr_join(default_executor, *futures) - AllPromise.new([self, *futures], default_executor).future - end - - def ns_on_completion(default_executor, executor = nil, &callback) - ns_add_callback :pr_async_callback_on_completion, executor || default_executor, callback - end - - def ns_on_completion!(&callback) - ns_add_callback :pr_callback_on_completion, callback - end - def pr_blocks(callbacks) callbacks.each_with_object([]) do |callback, promises| promises.push *callback.select { |v| v.is_a? AbstractPromise } @@ -263,7 +242,7 @@ def pr_blocks(callbacks) end def ns_to_s - "<##{self.class}:0x#{'%x' % (object_id << 1)} #{ns_state}>" + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" # TODO check ns status end def ns_complete(raise = true) @@ -274,21 +253,12 @@ def ns_complete(raise = true) callbacks end - def ns_add_callback(method, *args) - if ns_completed? - pr_call_callback method, *args - else - @callbacks << [method, *args] - end - self - end - def ns_complete_state - @state = :completed + self.state = :completed end def ns_check_multiple_assignment(raise, reason = nil) - if ns_completed? + if completed? if raise raise reason || Concurrent::MultipleAssignmentError.new('multiple assignment') else @@ -310,7 +280,7 @@ def pr_callback_on_completion(callback) end def pr_notify_blocked(promise) - promise.done self + promise.on_done self end def pr_call_callback(method, *args) @@ -325,60 +295,79 @@ def pr_call_callbacks(callbacks) class Future < Event - # Has the obligation been success? + private *attr_volatile(:value_field, :reason_field) + + def initialize(promise, default_executor = :io) + self.value_field = nil + self.reason_field = nil + super promise, default_executor + end + + # Has the Future been success? # @return [Boolean] def success? - synchronize { ns_success? } + state == :success end - # Has the obligation been failed? + # Has the Future been failed? # @return [Boolean] def failed? state == :failed end + # Has the Future been completed? + # @return [Boolean] + def completed? + [:success, :failed].include? state + end + # @return [Object] see Dereferenceable#deref def value(timeout = nil) touch - synchronize { ns_value timeout } + wait_until_complete timeout + value_field end def reason(timeout = nil) touch - synchronize { ns_reason timeout } + wait_until_complete timeout + reason_field end def result(timeout = nil) touch - synchronize { ns_result timeout } + wait_until_complete timeout + [success?, value_field, reason_field] end # wait until Obligation is #complete? # @param [Numeric] timeout the maximum time in second to wait. - # @return [Obligation] self + # @return [Event] self # @raise [Exception] when #failed? it raises #reason def wait!(timeout = nil) touch - synchronize { ns_wait_until_complete! timeout } + wait_until_complete! timeout end # @raise [Exception] when #failed? it raises #reason # @return [Object] see Dereferenceable#deref def value!(timeout = nil) touch - synchronize { ns_value! timeout } + wait_until_complete!(timeout) + value_field end # @example allows failed Future to be risen # raise Concurrent.future.fail def exception(*args) touch - synchronize { ns_exception(*args) } + raise 'obligation is not failed' unless failed? + reason_field.exception(*args) end # @yield [value] executed only on parent success def then(executor = nil, &callback) - pr_then(default_executor, executor, &callback) + ThenPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end # Creates new future where its value is result of asking actor with value of this Future. @@ -388,15 +377,15 @@ def then_ask(actor) # @yield [reason] executed only on parent failure def rescue(executor = nil, &callback) - pr_rescue(default_executor, executor, &callback) + RescuePromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end def flat(level = 1) - FlattingPromise.new(self, level, default_executor).future + FlattingPromise.new(self, level, @DefaultExecutor).future end def or(*futures) - AnyPromise.new([self, *futures], default_executor).future + AnyPromise.new([self, *futures], @DefaultExecutor).future end alias_method :|, :or @@ -404,119 +393,60 @@ def or(*futures) # @yield [value] executed async on `executor` when success # @return self def on_success(executor = nil, &callback) - synchronize { ns_on_success(@DefaultExecutor, executor, &callback) } + add_callback :pr_async_callback_on_success, executor || @DefaultExecutor, callback end # @yield [reason] executed async on `executor` when failed? # @return self def on_failure(executor = nil, &callback) - synchronize { ns_on_failure(@DefaultExecutor, executor, &callback) } + add_callback :pr_async_callback_on_failure, executor || @DefaultExecutor, callback end # @yield [value] executed sync when success # @return self def on_success!(&callback) - synchronize { ns_on_success!(&callback) } + add_callback :pr_callback_on_success, callback end # @yield [reason] executed sync when failed? # @return self def on_failure!(&callback) - synchronize { ns_on_failure!(&callback) } + add_callback :pr_callback_on_failure, callback end # @api private def complete(success, value, reason, raise = true) - callbacks = synchronize { ns_complete(success, value, reason, raise) } + callbacks = synchronize { ns_complete success, value, reason, raise } pr_call_callbacks callbacks, success, value, reason self end - def ns_add_callback(method, *args) - if ns_completed? - pr_call_callback method, ns_completed?, ns_value, ns_reason, *args - else - @callbacks << [method, *args] - end + def add_callback(method, *args) + call = if completed? + true + else + synchronize do + if completed? + true + else + @callbacks << [method, *args] + false + end + end + end + pr_call_callback method, success?, value_field, reason_field, *args if call self end private - def ns_initialize - super - @value = nil - @reason = nil - end - - def ns_success? - ns_state == :success - end - - def ns_failed? - ns_state == :failed - end - - def ns_completed? - [:success, :failed].include? ns_state - end - - def ns_value(timeout = nil) - ns_wait_until_complete timeout - @value - end - - def ns_reason(timeout = nil) - ns_wait_until_complete timeout - @reason - end - - def ns_result(timeout = nil) - value = ns_value(timeout) - [ns_success?, value, ns_reason] - end - - def ns_wait_until_complete!(timeout = nil) - ns_wait_until_complete(timeout) - raise self if ns_failed? + def wait_until_complete!(timeout = nil) + wait_until_complete(timeout) + raise self if failed? self end - def ns_value!(timeout = nil) - ns_wait_until_complete!(timeout) - @value - end - - def ns_exception(*args) - raise 'obligation is not failed' unless ns_failed? - ns_reason.exception(*args) - end - - def pr_then(default_executor, executor = nil, &callback) - ThenPromise.new(self, default_executor, executor || default_executor, &callback).future - end - - def pr_rescue(default_executor, executor = nil, &callback) - RescuePromise.new(self, default_executor, executor || default_executor, &callback).future - end - - def ns_on_success(default_executor, executor = nil, &callback) - ns_add_callback :pr_async_callback_on_success, executor || default_executor, callback - end - - def ns_on_failure(default_executor, executor = nil, &callback) - ns_add_callback :pr_async_callback_on_failure, executor || default_executor, callback - end - - def ns_on_success!(&callback) - ns_add_callback :pr_callback_on_success, callback - end - - def ns_on_failure!(&callback) - ns_add_callback :pr_callback_on_failure, callback - end - - def ns_complete(success, value, reason, raise = true) + def ns_complete(success, value, reason, raise) ns_check_multiple_assignment raise, reason ns_complete_state(success, value, reason) ns_broadcast @@ -526,11 +456,11 @@ def ns_complete(success, value, reason, raise = true) def ns_complete_state(success, value, reason) if success - @value = value - @state = :success + self.value_field = value + self.state = :success else - @reason = reason - @state = :failed + self.reason_field = reason + self.state = :failed end end @@ -728,26 +658,41 @@ def self.new(*args) promise end + def initialize(future, blocked_by_futures, *args, &block) + @BlockedBy = Array(blocked_by_futures) + @Countdown = AtomicFixnum.new @BlockedBy.size + super(future, blocked_by_futures, *args, &block) + end + # @api private - def done(future) # FIXME pass in success/value/reason to avoid locking + def on_done(future) # futures could be deleted from blocked_by one by one here, but that would be too expensive, # it's done once when all are done to free the reference - completable, *args = synchronize do - completable = ns_done(future) - blocked_by, @blocked_by = @blocked_by, [] if completable - [completable, *ns_completable_args(future, blocked_by)] + + countdown = process_on_done(future, @Countdown.decrement) + completable = completable?(countdown) + + if completable + pr_on_completable(*pr_on_completable_args(future, blocked_by)) + clear_blocked_by! end - pr_completable(*args) if completable end def touch - synchronize { ns_blocked_by }.each(&:touch) + blocked_by.each(&:touch) end # @api private # for inspection only def blocked_by - synchronize { ns_blocked_by } + @BlockedBy + end + + def clear_blocked_by! + # not synchronized because we do not care when this change propagates + blocked_by = @BlockedBy + @BlockedBy = [] + blocked_by end def inspect @@ -756,27 +701,22 @@ def inspect private - def ns_initialize(blocked_by_futures) - @blocked_by = Array(blocked_by_futures) - @countdown = @blocked_by.size + # @return [true,false] if completable + def completable?(countdown) + countdown.zero? end - # @return [true,false] if completable - def ns_done(future) - (@countdown -= 1).zero? + def process_on_done(future, countdown) + countdown end - def ns_completable_args(done_future, blocked_by) + def pr_on_completable_args(done_future, blocked_by) [done_future, blocked_by, @Future] end - def pr_completable(_, _, _) + def pr_on_completable(_, _, _) raise NotImplementedError end - - def ns_blocked_by - @blocked_by - end end # @abstract @@ -798,11 +738,11 @@ def ns_initialize(blocked_by_future) super [blocked_by_future] end - def ns_completable_args(done_future, blocked_by) + def pr_on_completable_args(done_future, blocked_by) [done_future, blocked_by, @Future, @Executor, @Task] end - def pr_completable(_, _, _, _, _) + def pr_on_completable(_, _, _, _, _) raise NotImplementedError end end @@ -815,7 +755,7 @@ def ns_initialize(blocked_by_future) super(blocked_by_future) end - def pr_completable(done_future, _, future, executor, task) + def pr_on_completable(done_future, _, future, executor, task) if done_future.success? Concurrent.post_on(executor, done_future, task) { |done_future, task| evaluate_to done_future.value, &task } else @@ -832,7 +772,7 @@ def ns_initialize(blocked_by_future) super(blocked_by_future) end - def pr_completable(done_future, _, future, executor, task) + def pr_on_completable(done_future, _, future, executor, task) if done_future.failed? Concurrent.post_on(executor, done_future, task) { |done_future, task| evaluate_to done_future.reason, &task } else @@ -844,7 +784,7 @@ def pr_completable(done_future, _, future, executor, task) class ChainPromise < BlockedTaskPromise private - def pr_completable(done_future, _, _, executor, task) + def pr_on_completable(done_future, _, _, executor, task) if Future === done_future Concurrent.post_on(executor, done_future, task) { |future, task| evaluate_to *future.result, &task } else @@ -861,40 +801,59 @@ def initialize(default_executor = :io) end class FlattingPromise < BlockedPromise + def blocked_by + synchronize { ns_blocked_by } + end + private - def ns_done(future) - value = future.value # TODO get the value as argument - if @levels > 0 + def process_on_done(future, countdown) + value = future.value + if @Levels.value > 0 case value when Future - @countdown += 1 - @blocked_by << value - @levels -= 1 + @Countdown.increment + @Levels.decrement + synchronize { @blocked_by << value } value.add_callback :pr_notify_blocked, self + countdown + 1 when Event raise TypeError, 'cannot flatten to Event' else raise TypeError, "returned value '#{value}' is not a Future" end + else + countdown end - super future end def initialize(blocked_by_future, levels = 1, default_executor = :io) - super Future.new(self, default_executor), blocked_by_future, levels + raise ArgumentError, 'levels has to be higher than 0' if levels < 1 + @Levels = AtomicFixnum.new levels + super Future.new(self, default_executor), blocked_by_future + @BlockedBy = nil # its not used in FlattingPromise end - def ns_initialize(blocked_by_future, levels = 1) + def ns_initialize(blocked_by_future) blocked_by_future.is_a? Future or raise ArgumentError, 'only Future can be flatten' - super([blocked_by_future]) - @levels = levels + @blocked_by = Array(blocked_by_future) end - def pr_completable(_, blocked_by, future) + def pr_on_completable(_, blocked_by, future) pr_complete future, *blocked_by.last.result end + + def ns_blocked_by + @blocked_by + end + + def clear_blocked_by! + # not synchronized because we do not care when this change propagates + blocked_by = @blocked_by + @blocked_by = [] + blocked_by + end end # used internally to support #with_default_executor @@ -907,7 +866,7 @@ def initialize(blocked_by_futures, default_executor = :io) super(klass.new(self, default_executor), blocked_by_futures) end - def pr_completable(done_future, blocked_by, future) + def pr_on_completable(done_future, blocked_by, future) results = blocked_by.select { |f| f.is_a?(Future) }.map(&:result) if results.empty? pr_complete future @@ -933,11 +892,11 @@ def initialize(blocked_by_futures, default_executor = :io) super(Future.new(self, default_executor), blocked_by_futures) end - def ns_done(future) + def completable?(countdown) true end - def pr_completable(done_future, _, future) + def pr_on_completable(done_future, _, future) pr_complete future, *done_future.result, false end end From cab6395b756ba8ad2a25e6dc93615b6497dbbe49 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 2 May 2015 15:00:32 +0200 Subject: [PATCH 15/16] Shorten the lock and condition ivar names --- lib/concurrent/synchronization/monitor_object.rb | 8 ++++---- lib/concurrent/synchronization/mutex_object.rb | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/concurrent/synchronization/monitor_object.rb b/lib/concurrent/synchronization/monitor_object.rb index 5b146bdba..466c2bb6f 100644 --- a/lib/concurrent/synchronization/monitor_object.rb +++ b/lib/concurrent/synchronization/monitor_object.rb @@ -2,19 +2,19 @@ module Concurrent module Synchronization class MonitorObject < MutexObject def initialize(*args, &block) - @__lock__do_not_use_directly = ::Monitor.new - @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond + @__lock__ = ::Monitor.new + @__condition__ = @__lock__.new_cond synchronize { ns_initialize(*args, &block) } end private def synchronize - @__lock__do_not_use_directly.synchronize { yield } + @__lock__.synchronize { yield } end def ns_wait(timeout = nil) - @__condition__do_not_use_directly.wait timeout + @__condition__.wait timeout self end end diff --git a/lib/concurrent/synchronization/mutex_object.rb b/lib/concurrent/synchronization/mutex_object.rb index f73e5fd29..fd87635fc 100644 --- a/lib/concurrent/synchronization/mutex_object.rb +++ b/lib/concurrent/synchronization/mutex_object.rb @@ -2,33 +2,33 @@ module Concurrent module Synchronization class MutexObject < AbstractObject def initialize(*args, &block) - @__lock__do_not_use_directly = ::Mutex.new - @__condition__do_not_use_directly = ::ConditionVariable.new + @__lock__ = ::Mutex.new + @__condition__ = ::ConditionVariable.new synchronize { ns_initialize(*args, &block) } end private def synchronize - if @__lock__do_not_use_directly.owned? + if @__lock__.owned? yield else - @__lock__do_not_use_directly.synchronize { yield } + @__lock__.synchronize { yield } end end def ns_signal - @__condition__do_not_use_directly.signal + @__condition__.signal self end def ns_broadcast - @__condition__do_not_use_directly.broadcast + @__condition__.broadcast self end def ns_wait(timeout = nil) - @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout + @__condition__.wait @__lock__, timeout self end From 67a82f776f465356f457dcf2e06e69136e40289d Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 2 May 2015 15:03:06 +0200 Subject: [PATCH 16/16] Ensure task arguments are supplied by block arguments not by captured locals --- lib/concurrent/edge/future.rb | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index e65f36779..f999404bf 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -267,8 +267,8 @@ def ns_check_multiple_assignment(raise, reason = nil) end end - def pr_with_async(executor, &block) - Concurrent.post_on(executor, &block) + def pr_with_async(executor, *args, &block) + Concurrent.post_on(executor, *args, &block) end def pr_async_callback_on_completion(executor, callback) @@ -469,11 +469,15 @@ def pr_call_callbacks(callbacks, success, value, reason) end def pr_async_callback_on_success(success, value, reason, executor, callback) - pr_with_async(executor) { pr_callback_on_success success, value, reason, callback } + pr_with_async(executor, success, value, reason, callback) do |success, value, reason, callback| + pr_callback_on_success success, value, reason, callback + end end def pr_async_callback_on_failure(success, value, reason, executor, callback) - pr_with_async(executor) { pr_callback_on_failure success, value, reason, callback } + pr_with_async(executor, success, value, reason, callback) do |success, value, reason, callback| + pr_callback_on_failure success, value, reason, callback + end end def pr_callback_on_success(success, value, reason, callback) @@ -493,7 +497,9 @@ def pr_notify_blocked(success, value, reason, promise) end def pr_async_callback_on_completion(success, value, reason, executor, callback) - pr_with_async(executor) { pr_callback_on_completion success, value, reason, callback } + pr_with_async(executor, success, value, reason, callback) do |success, value, reason, callback| + pr_callback_on_completion success, value, reason, callback + end end end