diff --git a/doc/synchronization.md b/doc/synchronization.md index 549a01ffb..c701afebf 100644 --- a/doc/synchronization.md +++ b/doc/synchronization.md @@ -88,7 +88,7 @@ end ### Naming conventions -Instance variables with camel case names are final and never reassigned. +Instance variables with camel case names are final and never reassigned, e.g. `@FinalVariable`. ## Fields with CAS operations @@ -115,7 +115,61 @@ class Event < Synchronization::Object end ``` -## Memory model (sort of) +Operations on `@Touched` field have volatile semantic. -// TODO +## Memory model + +*Intended for further revision, and extension.* + +When writing libraries in `concurrent-ruby` we are reasoning based on following memory model which is further extended by features provided in `Synchronization::Object` (described above). + +The memory model is constructed based on our best effort and knowledge of the 3 main Ruby implementations (CRuby, JRuby, Rubinius). When considering certain aspect we always choose the weakest guarantee (e.g. local variable updates are always visible in CRuby but not in JRuby, so in this case JRuby behavior is picked). If some Ruby behavior is omitted here it is considered unsafe for use in parallel environment (Reasons may be lack of information, or difficulty of verification). + +This takes in account following implementations: + +- CRuby 1.9 - 2.2 (no differences found) +- JRuby 1.7 +- JRuby 9 *not examined yet, same behavior as in 1.7 assumed* +- Rubinius 2.5 + +We are interested in following behaviors: + +- **volatility** - in Java's sense. Any written value is immediately visible to any subsequent reads including all writes leading to this value. +- **atomicity** - operation is either done or not as a whole. + +### Variables + +- **Local variables** - atomic assignment, non-volatile. + - Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure. + - Reason: local variables are non-volatile on Jruby and Rubinius. +- **Instance variables** - atomic assignment, non-volatile. + - Consequence: Different thread may see old values; different thread may see not fully-initialized object. + - Reason: local variables are non-volatile on Jruby and Rubinius. +- **Constants** - atomic assignment, volatile. + +Other: + +- **Global variables** - we don't use them, omitted (atomic and volatile on JRuby and CRuby, Rubinius unknown) +- **Class variables** - we don't use them, omitted (atomic and volatile on JRuby and CRuby, Rubinius unknown) + +### Assumptions + +Following operations are **assumed** thread-safe, volatile and atomic on all implementations: + +- Class definition +- Method definition +- Library requirement + +It's best practice though to eager load before going into parallel part of an application. + +### Issues to be aware of + +- **Initialization** - Since instance variables are not volatile and a particular implementation may preinitialize values with nils, based on shapes it already saw, a second thread obtaining reference to newly constructed may still see old preinitialized values instead of values set in `initialize` method. To fix this `ensure_ivar_visibility!` can be used or the object can be safely published in a volatile field. +- **`||=`, `+=` and similar** - are not atomic. + +### Notes/Sources on implementations + +- [JRuby wiki page on concurrency](https://github.com/jruby/jruby/wiki/Concurrency-in-jruby) +- [Rubinius page on concurrency](http://rubini.us/doc/en/systems/concurrency/) +- CRuby has GVL. Any GVL release and acquire uses lock which means that all writes done by a releasing thread will be visible to the second acquiring thread. See: diff --git a/examples/benchmark_new_futures.rb b/examples/benchmark_new_futures.rb index 642ad6b31..aaad68e60 100644 --- a/examples/benchmark_new_futures.rb +++ b/examples/benchmark_new_futures.rb @@ -7,10 +7,15 @@ warmup = 2 * scale warmup *= 10 if Concurrent.on_jruby? +Benchmark.ips(time, warmup) do |x| + x.report('status-old') { f = Concurrent::Promise.execute { nil }; 100.times { f.complete? } } + x.report('status-new') { f = Concurrent.future(:fast) { nil }; 100.times { f.completed? } } + x.compare! +end Benchmark.ips(time, warmup) do |x| of = Concurrent::Promise.execute { 1 } - nf = Concurrent.future { 1 } + nf = Concurrent.future(:fast) { 1 } x.report('value-old') { of.value! } x.report('value-new') { nf.value! } x.compare! @@ -18,31 +23,36 @@ Benchmark.ips(time, warmup) do |x| x.report('graph-old') do - head = Concurrent::Promise.execute { 1 } - branch1 = head.then(&:succ) - branch2 = head.then(&:succ).then(&:succ) - Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value! + head = Concurrent::Promise.execute { 1 } + 10.times do + branch1 = head.then(&:succ) + branch2 = head.then(&:succ).then(&:succ) + head = Concurrent::Promise.zip(branch1, branch2).then { |a, b| a + b } + end + head.value! end x.report('graph-new') do - head = Concurrent.future { 1 } - branch1 = head.then(&:succ) - branch2 = head.then(&:succ).then(&:succ) - (branch1 + branch2).then { |(a, b)| a + b }.value! + head = Concurrent.future(:fast) { 1 } + 10.times do + branch1 = head.then(&:succ) + branch2 = head.then(&:succ).then(&:succ) + head = (branch1 & branch2).then { |a, b| a + b } + end + head.value! end x.compare! end Benchmark.ips(time, warmup) do |x| x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! } - x.report('immediate-new') { Concurrent.future { nil }.value! } + x.report('immediate-new') { Concurrent.future(:fast) { nil }.value! } x.compare! end Benchmark.ips(time, warmup) do |x| of = Concurrent::Promise.execute { 1 } - nf = Concurrent.future { 1 } - x.report('then-old') { of.then(&:succ).then(&:succ).value! } - x.report('then-new') { nf.then(&:succ).then(&:succ).value! } + nf = Concurrent.future(:fast) { 1 } + x.report('then-old') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! } + x.report('then-new') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! } x.compare! end - diff --git a/examples/edge_futures.in.rb b/examples/edge_futures.in.rb new file mode 100644 index 000000000..fbaa2587b --- /dev/null +++ b/examples/edge_futures.in.rb @@ -0,0 +1,158 @@ +### Simple asynchronous task + +future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately +future.completed? +# block until evaluated +future.value +future.completed? + + +### Failing asynchronous task + +future = Concurrent.future { raise 'Boom' } +future.value +future.value! rescue $! +future.reason +# re-raising +raise future rescue $! + + +### Chaining + +head = Concurrent.future { 1 } # +branch1 = head.then(&:succ) # +branch2 = head.then(&:succ).then(&:succ) # +branch1.zip(branch2).value +(branch1 & branch2).then { |(a, b)| a + b }.value +# pick only first completed +(branch1 | branch2).value + + +### Error handling + +Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates +Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value +Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value + + +### Delay + +# will not evaluate until asked by #value or other method requiring completion +scheduledfuture = Concurrent.delay { 'lazy' } +sleep 0.1 # +future.completed? +future.value + +# propagates trough chain allowing whole or partial lazy chains + +head = Concurrent.delay { 1 } +branch1 = head.then(&:succ) +branch2 = head.delay.then(&:succ) +join = branch1 & branch2 + +sleep 0.1 # nothing will complete +[head, branch1, branch2, join].map(&:completed?) + +branch1.value +sleep 0.1 # forces only head to complete, branch 2 stays incomplete +[head, branch1, branch2, join].map(&:completed?) + +join.value + + +### Flatting + +Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future + +# more complicated example +Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }. + flat(1). + then { |f| f.then(&:succ) }. + flat(1).value + + +### Schedule + +scheduled = Concurrent.schedule(0.1) { 1 } + +scheduled.completed? +scheduled.value # available after 0.1sec + +# and in chain +scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ) +# will not be scheduled until value is requested +sleep 0.1 # +scheduled.value # returns after another 0.1sec + + +### Completable Future and Event + +future = Concurrent.future +event = Concurrent.event + +# will be blocked until completed +t1 = Thread.new { future.value } # +t2 = Thread.new { event.wait } # + +future.success 1 +future.success 1 rescue $! +future.try_success 2 +event.complete + +[t1, t2].each &:join # + + +### Callbacks + +queue = Queue.new +future = Concurrent.delay { 1 + 1 } + +future.on_success { queue << 1 } # evaluated asynchronously +future.on_success! { queue << 2 } # evaluated on completing thread + +queue.empty? +future.value +queue.pop +queue.pop + + +### Thread-pools + +Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait + + +### Interoperability with actors + +actor = Concurrent::Actor::Utils::AdHoc.spawn :square do + -> v { v ** 2 } +end + +Concurrent. + future { 2 }. + then_ask(actor). + then { |v| v + 2 }. + value + +actor.ask(2).then(&:succ).value + + +### Common use-cases Examples + +# simple background processing +Concurrent.future { do_stuff } + +# parallel background processing +jobs = 10.times.map { |i| Concurrent.future { i } } # +Concurrent.zip(*jobs).value + + +# periodic task +def schedule_job + Concurrent.schedule(1) { do_stuff }. + rescue { |e| report_error e }. + then { schedule_job } +end + +schedule_job + + diff --git a/examples/edge_futures.out.rb b/examples/edge_futures.out.rb new file mode 100644 index 000000000..4bafefbd1 --- /dev/null +++ b/examples/edge_futures.out.rb @@ -0,0 +1,185 @@ +### Simple asynchronous task + +future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately + # => <#Concurrent::Edge::Future:0x7fa08385da60 pending blocks:[]> +future.completed? # => false +# block until evaluated +future.value # => 2 +future.completed? # => true + + +### Failing asynchronous task + +future = Concurrent.future { raise 'Boom' } + # => <#Concurrent::Edge::Future:0x7fa083834638 failed blocks:[]> +future.value # => nil +future.value! rescue $! # => # +future.reason # => # +# re-raising +raise future rescue $! # => # + + +### Chaining + +head = Concurrent.future { 1 } +branch1 = head.then(&:succ) +branch2 = head.then(&:succ).then(&:succ) +branch1.zip(branch2).value # => [2, 3] +(branch1 & branch2).then { |(a, b)| a + b }.value + # => 5 +# pick only first completed +(branch1 | branch2).value # => 2 + + +### Error handling + +Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates + # => NoMethodError +Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value + # => 2 +Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value + # => 3 + + +### Delay + +# will not evaluate until asked by #value or other method requiring completion +scheduledfuture = Concurrent.delay { 'lazy' } + # => <#Concurrent::Edge::Future:0x7fa0831917b8 pending blocks:[]> +sleep 0.1 +future.completed? # => true +future.value # => nil + +# propagates trough chain allowing whole or partial lazy chains + +head = Concurrent.delay { 1 } + # => <#Concurrent::Edge::Future:0x7fa083172ef8 pending blocks:[]> +branch1 = head.then(&:succ) + # => <#Concurrent::Edge::Future:0x7fa083171c88 pending blocks:[]> +branch2 = head.delay.then(&:succ) + # => <#Concurrent::Edge::Future:0x7fa08294f528 pending blocks:[]> +join = branch1 & branch2 + # => <#Concurrent::Edge::Future:0x7fa08294e218 pending blocks:[]> + +sleep 0.1 # nothing will complete # => 0 +[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false] + +branch1.value # => 2 +sleep 0.1 # forces only head to complete, branch 2 stays incomplete + # => 0 +[head, branch1, branch2, join].map(&:completed?) # => [true, true, false, false] + +join.value # => [2, 2] + + +### Flatting + +Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future + # => 2 + +# more complicated example +Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }. + flat(1). + then { |f| f.then(&:succ) }. + flat(1).value # => 3 + + +### Schedule + +scheduled = Concurrent.schedule(0.1) { 1 } + # => <#Concurrent::Edge::Future:0x7fa08224edf0 pending blocks:[]> + +scheduled.completed? # => false +scheduled.value # available after 0.1sec # => 1 + +# and in chain +scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ) + # => <#Concurrent::Edge::Future:0x7fa0831f3d50 pending blocks:[]> +# will not be scheduled until value is requested +sleep 0.1 +scheduled.value # returns after another 0.1sec # => 2 + + +### Completable Future and Event + +future = Concurrent.future + # => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 pending blocks:[]> +event = Concurrent.event + # => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 pending blocks:[]> + +# will be blocked until completed +t1 = Thread.new { future.value } +t2 = Thread.new { event.wait } + +future.success 1 + # => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 success blocks:[]> +future.success 1 rescue $! + # => # +future.try_success 2 # => false +event.complete + # => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 completed blocks:[]> + +[t1, t2].each &:join + + +### Callbacks + +queue = Queue.new # => # +future = Concurrent.delay { 1 + 1 } + # => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]> + +future.on_success { queue << 1 } # evaluated asynchronously + # => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]> +future.on_success! { queue << 2 } # evaluated on completing thread + # => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]> + +queue.empty? # => true +future.value # => 2 +queue.pop # => 2 +queue.pop # => 1 + + +### Thread-pools + +Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait + # => <#Concurrent::Edge::Future:0x7fa08318b070 success blocks:[]> + + +### Interoperability with actors + +actor = Concurrent::Actor::Utils::AdHoc.spawn :square do + -> v { v ** 2 } +end + # => # + +Concurrent. + future { 2 }. + then_ask(actor). + then { |v| v + 2 }. + value # => 6 + +actor.ask(2).then(&:succ).value # => 5 + + +### Common use-cases Examples + +# simple background processing +Concurrent.future { do_stuff } + # => <#Concurrent::Edge::Future:0x7fa0839ee8e8 pending blocks:[]> + +# parallel background processing +jobs = 10.times.map { |i| Concurrent.future { i } } +Concurrent.zip(*jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + +# periodic task +def schedule_job + Concurrent.schedule(1) { do_stuff }. + rescue { |e| report_error e }. + then { schedule_job } +end # => :schedule_job + +schedule_job + # => <#Concurrent::Edge::Future:0x7fa082904f78 pending blocks:[]> + + diff --git a/examples/format.rb b/examples/format.rb new file mode 100644 index 000000000..8d6a457e1 --- /dev/null +++ b/examples/format.rb @@ -0,0 +1,75 @@ +require 'rubygems' +require 'bundler/setup' +require 'pry' +require 'pp' + +input_paths = if ARGV.empty? + Dir.glob("#{File.dirname(__FILE__)}/*.in.rb") + else + ARGV + end.map { |p| File.expand_path p } + +input_paths.each_with_index do |input_path, i| + + pid = fork do + require_relative 'init.rb' + + begin + output_path = input_path.gsub /\.in\.rb$/, '.out.rb' + input = File.readlines(input_path) + + chunks = [] + line = '' + + while !input.empty? + line += input.shift + if Pry::Code.complete_expression? line + chunks << line + line = '' + end + end + + raise unless line.empty? + + chunks.map! { |chunk| [chunk, [chunk.split($/).size, 1].max] } + environment = Module.new.send :binding + evaluate = ->(code, line) do + eval(code, environment, input_path, line) + end + + indent = 50 + + line_count = 1 + output = '' + chunks.each do |chunk, lines| + result = evaluate.(chunk, line_count) + unless chunk.strip.empty? || chunk =~ /\A *#/ + pre_lines = chunk.lines.to_a + last_line = pre_lines.pop + output << pre_lines.join + + if last_line =~ /\#$/ + output << last_line.gsub(/\#$/, '') + else + if last_line.size < indent && result.inspect.size < indent + output << "%-#{indent}s %s" % [last_line.chomp, "# => #{result.inspect}\n"] + else + output << last_line << " # => #{result.inspect}\n" + end + end + else + output << chunk + end + line_count += lines + end + + puts "#{input_path}\n -> #{output_path}" + #puts output + File.write(output_path, output) + rescue => ex + puts "#{ex} (#{ex.class})\n#{ex.backtrace * "\n"}" + end + end + + Process.wait pid +end diff --git a/examples/init.rb b/examples/init.rb new file mode 100644 index 000000000..c3ed8aafb --- /dev/null +++ b/examples/init.rb @@ -0,0 +1,5 @@ +require 'concurrent-edge' + +def do_stuff + :stuff +end diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index 702a133ec..00791f401 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -17,6 +17,7 @@ import org.jruby.RubyBoolean; import org.jruby.RubyNil; import org.jruby.runtime.ThreadContext; +import org.jruby.util.unsafe.UnsafeHolder; public class SynchronizationLibrary implements Library { @@ -48,11 +49,9 @@ public JavaObject(Ruby runtime, RubyClass metaClass) { super(runtime, metaClass); } - @JRubyMethod(rest = true) - public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) { - synchronized (this) { - return callMethod(context, "ns_initialize", args, block); - } + @JRubyMethod + public IRubyObject initialize(ThreadContext context) { + return this; } @JRubyMethod(name = "synchronize", visibility = Visibility.PROTECTED) @@ -108,7 +107,47 @@ public IRubyObject nsBroadcast(ThreadContext context) { @JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PROTECTED) public IRubyObject ensureIvarVisibilityBang(ThreadContext context) { + if (UnsafeHolder.SUPPORTS_FENCES) + UnsafeHolder.storeFence(); + else + anVolatileField = 1; return context.nil; } + + private volatile int anVolatileField = 0; // TODO unused on JAVA8 + public static final long AN_VOLATILE_FIELD_OFFSET = + UnsafeHolder.fieldOffset(JavaObject.class, "anVolatileField"); + + @JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED) + public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) { + if (UnsafeHolder.U == null) { + synchronized (this) { + return instance_variable_get(context, name); + } + } else if (UnsafeHolder.SUPPORTS_FENCES) { + UnsafeHolder.loadFence(); + return instance_variable_get(context, name); + } else { + UnsafeHolder.U.getIntVolatile(this, AN_VOLATILE_FIELD_OFFSET); + return instance_variable_get(context, name); + } + } + + @JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED) + public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) { + if (UnsafeHolder.U == null) { + synchronized (this) { + return instance_variable_set(name, value); + } + } else if (UnsafeHolder.SUPPORTS_FENCES) { + IRubyObject result = instance_variable_set(name, value); + UnsafeHolder.storeFence(); + return result; + } else { + UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1); + IRubyObject result = instance_variable_set(name, value); + return result; + } + } } } diff --git a/lib/concurrent-edge.rb b/lib/concurrent-edge.rb index c6b6af05e..2fad35da3 100644 --- a/lib/concurrent-edge.rb +++ b/lib/concurrent-edge.rb @@ -7,3 +7,4 @@ require 'concurrent/lazy_register' require 'concurrent/edge/future' +require 'concurrent/edge/lock_free_stack' diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 4cfb84826..644df720f 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -49,7 +49,8 @@ class Core < Synchronization::Object # any logging system # @param [Proc] block for class instantiation def initialize(opts = {}, &block) - super + super(&nil) + synchronize { ns_initialize(opts, &block) } end # @return [Reference, nil] of parent actor diff --git a/lib/concurrent/at_exit.rb b/lib/concurrent/at_exit.rb index eb530e5fd..b5ef4c9cd 100644 --- a/lib/concurrent/at_exit.rb +++ b/lib/concurrent/at_exit.rb @@ -8,6 +8,11 @@ module Concurrent class AtExitImplementation < Synchronization::Object include Logging + def initialize(*args) + super() + synchronize { ns_initialize *args } + end + # Add a handler to be run at `Kernel#at_exit` # @param [Object] handler_id optionally provide an id, if allready present, handler is replaced # @yield the handler diff --git a/lib/concurrent/atomic/atomic_boolean.rb b/lib/concurrent/atomic/atomic_boolean.rb index 1df03a989..dc1c41cb3 100644 --- a/lib/concurrent/atomic/atomic_boolean.rb +++ b/lib/concurrent/atomic/atomic_boolean.rb @@ -30,7 +30,8 @@ class MutexAtomicBoolean < Synchronization::Object # # @param [Boolean] initial the initial value def initialize(initial = false) - super(initial) + super() + synchronize { ns_initialize(initial) } end # @!macro [attach] atomic_boolean_method_value_get diff --git a/lib/concurrent/atomic/atomic_fixnum.rb b/lib/concurrent/atomic/atomic_fixnum.rb index 37b806b62..b38ef7cdd 100644 --- a/lib/concurrent/atomic/atomic_fixnum.rb +++ b/lib/concurrent/atomic/atomic_fixnum.rb @@ -35,7 +35,8 @@ class MutexAtomicFixnum < Synchronization::Object # @param [Fixnum] initial the initial value # @raise [ArgumentError] if the initial value is not a `Fixnum` def initialize(initial = 0) - super(initial) + super() + synchronize { ns_initialize(initial) } end # @!macro [attach] atomic_fixnum_method_value_get diff --git a/lib/concurrent/atomic/copy_on_notify_observer_set.rb b/lib/concurrent/atomic/copy_on_notify_observer_set.rb index c1a133e0e..8c2b5eaab 100644 --- a/lib/concurrent/atomic/copy_on_notify_observer_set.rb +++ b/lib/concurrent/atomic/copy_on_notify_observer_set.rb @@ -8,6 +8,11 @@ module Concurrent # prevent concurrency issues class CopyOnNotifyObserverSet < Synchronization::Object + def initialize + super() + synchronize { ns_initialize } + end + # Adds an observer to this set. If a block is passed, the observer will be # created by this method and no other params should be passed # @@ -24,7 +29,7 @@ def add_observer(observer=nil, func=:update, &block) if block observer = block - func = :call + func = :call end synchronize do diff --git a/lib/concurrent/atomic/copy_on_write_observer_set.rb b/lib/concurrent/atomic/copy_on_write_observer_set.rb index 2ed2ffcbc..085490a64 100644 --- a/lib/concurrent/atomic/copy_on_write_observer_set.rb +++ b/lib/concurrent/atomic/copy_on_write_observer_set.rb @@ -7,6 +7,11 @@ module Concurrent # duplicated and replaced with a new one. class CopyOnWriteObserverSet < Synchronization::Object + def initialize + super() + synchronize { ns_initialize } + end + # Adds an observer to this set # If a block is passed, the observer will be created by this method and no # other params should be passed diff --git a/lib/concurrent/atomic/count_down_latch.rb b/lib/concurrent/atomic/count_down_latch.rb index 183a98039..628ec53b1 100644 --- a/lib/concurrent/atomic/count_down_latch.rb +++ b/lib/concurrent/atomic/count_down_latch.rb @@ -24,7 +24,8 @@ def initialize(count = 1) unless count.is_a?(Fixnum) && count >= 0 raise ArgumentError.new('count must be in integer greater than or equal zero') end - super(count) + super() + synchronize { ns_initialize count } end # @!macro [attach] count_down_latch_method_wait diff --git a/lib/concurrent/atomic/cyclic_barrier.rb b/lib/concurrent/atomic/cyclic_barrier.rb index 03dac3ed4..065927792 100644 --- a/lib/concurrent/atomic/cyclic_barrier.rb +++ b/lib/concurrent/atomic/cyclic_barrier.rb @@ -18,7 +18,8 @@ def initialize(parties, &block) if !parties.is_a?(Fixnum) || parties < 1 raise ArgumentError.new('count must be in integer greater than or equal zero') end - super(parties, &block) + super(&nil) + synchronize { ns_initialize parties, &block } end # @return [Fixnum] the number of threads needed to pass the barrier diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index b0f1f80e7..8b4c87a3e 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -19,6 +19,7 @@ class Event < Synchronization::Object # `Event` will block. def initialize super + synchronize { ns_initialize } end # Is the object in the set state? diff --git a/lib/concurrent/atomic/read_write_lock.rb b/lib/concurrent/atomic/read_write_lock.rb index a7fc388fa..bdd79a2d9 100644 --- a/lib/concurrent/atomic/read_write_lock.rb +++ b/lib/concurrent/atomic/read_write_lock.rb @@ -1,6 +1,7 @@ require 'thread' require 'concurrent/atomic/atomic_reference' require 'concurrent/errors' +require 'concurrent/synchronization' module Concurrent @@ -26,19 +27,19 @@ module Concurrent # This will lead to deadlock # # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html java.util.concurrent.ReentrantReadWriteLock - class ReadWriteLock + class ReadWriteLock < Synchronization::Object # @!visibility private - WAITING_WRITER = 1 << 15 + WAITING_WRITER = 1 << 15 # @!visibility private - RUNNING_WRITER = 1 << 30 + RUNNING_WRITER = 1 << 30 # @!visibility private - MAX_READERS = WAITING_WRITER - 1 + MAX_READERS = WAITING_WRITER - 1 # @!visibility private - MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 + MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 # Implementation notes: # A goal is to make the uncontended path for both readers/writers lock-free @@ -53,11 +54,11 @@ class ReadWriteLock # Create a new `ReadWriteLock` in the unlocked state. def initialize - @counter = AtomicReference.new(0) # single integer which represents lock state - @reader_q = ConditionVariable.new # queue for waiting readers - @reader_mutex = Mutex.new # to protect reader queue - @writer_q = ConditionVariable.new # queue for waiting writers - @writer_mutex = Mutex.new # to protect writer queue + @Counter = AtomicReference.new(0) # single integer which represents lock state + @ReadLock = Synchronization::Lock.new + @WriteLock = Synchronization::Lock.new + ensure_ivar_visibility! + super() end # Execute a block operation within a read lock. @@ -106,34 +107,28 @@ def with_write_lock # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def acquire_read_lock - while(true) - c = @counter.value + while true + c = @Counter.value raise ResourceLimitError.new('Too many reader threads') if max_readers?(c) # If a writer is waiting when we first queue up, we need to wait if waiting_writer?(c) - # But it is possible that the writer could finish and decrement @counter right here... - @reader_mutex.synchronize do - # So check again inside the synchronized section - @reader_q.wait(@reader_mutex) if waiting_writer? - end + @ReadLock.wait_until { !waiting_writer? } # after a reader has waited once, they are allowed to "barge" ahead of waiting writers # but if a writer is *running*, the reader still needs to wait (naturally) - while(true) - c = @counter.value + while true + c = @Counter.value if running_writer?(c) - @reader_mutex.synchronize do - @reader_q.wait(@reader_mutex) if running_writer? - end + @ReadLock.wait_until { !running_writer? } else - return if @counter.compare_and_swap(c,c+1) + return if @Counter.compare_and_swap(c, c+1) end end else - break if @counter.compare_and_swap(c,c+1) + break if @Counter.compare_and_swap(c, c+1) end - end + end true end @@ -141,12 +136,12 @@ def acquire_read_lock # # @return [Boolean] true if the lock is successfully released def release_read_lock - while(true) - c = @counter.value - if @counter.compare_and_swap(c,c-1) + while true + c = @Counter.value + if @Counter.compare_and_swap(c, c-1) # If one or more writers were waiting, and we were the last reader, wake a writer up if waiting_writer?(c) && running_readers(c) == 1 - @writer_mutex.synchronize { @writer_q.signal } + @WriteLock.signal end break end @@ -161,32 +156,31 @@ def release_read_lock # @raise [Concurrent::ResourceLimitError] if the maximum number of writers # is exceeded. def acquire_write_lock - while(true) - c = @counter.value + while true + c = @Counter.value raise ResourceLimitError.new('Too many writer threads') if max_writers?(c) if c == 0 # no readers OR writers running # if we successfully swap the RUNNING_WRITER bit on, then we can go ahead - break if @counter.compare_and_swap(0,RUNNING_WRITER) - elsif @counter.compare_and_swap(c,c+WAITING_WRITER) - while(true) + break if @Counter.compare_and_swap(0, RUNNING_WRITER) + elsif @Counter.compare_and_swap(c, c+WAITING_WRITER) + while true # Now we have successfully incremented, so no more readers will be able to increment # (they will wait instead) # However, readers OR writers could decrement right here, OR another writer could increment - @writer_mutex.synchronize do + @WriteLock.wait_until do # So we have to do another check inside the synchronized section # If a writer OR reader is running, then go to sleep - c = @counter.value - @writer_q.wait(@writer_mutex) if running_writer?(c) || running_readers?(c) + c = @Counter.value + !running_writer?(c) && !running_readers?(c) end # We just came out of a wait # If we successfully turn the RUNNING_WRITER bit on with an atomic swap, # Then we are OK to stop waiting and go ahead # Otherwise go back and wait again - c = @counter.value - break if !running_writer?(c) && !running_readers?(c) && - @counter.compare_and_swap(c,c+RUNNING_WRITER-WAITING_WRITER) + c = @Counter.value + break if !running_writer?(c) && !running_readers?(c) && @Counter.compare_and_swap(c, c+RUNNING_WRITER-WAITING_WRITER) end break end @@ -198,16 +192,9 @@ def acquire_write_lock # # @return [Boolean] true if the lock is successfully released def release_write_lock - while(true) - c = @counter.value - if @counter.compare_and_swap(c,c-RUNNING_WRITER) - @reader_mutex.synchronize { @reader_q.broadcast } - if waiting_writers(c) > 0 # if any writers are waiting... - @writer_mutex.synchronize { @writer_q.signal } - end - break - end - end + c = @Counter.update { |c| c-RUNNING_WRITER } + @ReadLock.broadcast + @WriteLock.signal if waiting_writers(c) > 0 true end @@ -215,50 +202,50 @@ def release_write_lock # # @return [Boolean] true if the write lock is held else false` def write_locked? - @counter.value >= RUNNING_WRITER + @Counter.value >= RUNNING_WRITER end # Queries whether any threads are waiting to acquire the read or write lock. # # @return [Boolean] true if any threads are waiting for a lock else false def has_waiters? - waiting_writer?(@counter.value) + waiting_writer?(@Counter.value) end private # @!visibility private - def running_readers(c = @counter.value) + def running_readers(c = @Counter.value) c & MAX_READERS end # @!visibility private - def running_readers?(c = @counter.value) + def running_readers?(c = @Counter.value) (c & MAX_READERS) > 0 end # @!visibility private - def running_writer?(c = @counter.value) + def running_writer?(c = @Counter.value) c >= RUNNING_WRITER end # @!visibility private - def waiting_writers(c = @counter.value) + def waiting_writers(c = @Counter.value) (c & MAX_WRITERS) / WAITING_WRITER end # @!visibility private - def waiting_writer?(c = @counter.value) + def waiting_writer?(c = @Counter.value) c >= WAITING_WRITER end # @!visibility private - def max_readers?(c = @counter.value) + def max_readers?(c = @Counter.value) (c & MAX_READERS) == MAX_READERS end # @!visibility private - def max_writers?(c = @counter.value) + def max_writers?(c = @Counter.value) (c & MAX_WRITERS) == MAX_WRITERS end end diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 4a7ac1cc8..a81f66f10 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -23,7 +23,8 @@ def initialize(count) unless count.is_a?(Fixnum) && count >= 0 fail ArgumentError, 'count must be an non-negative integer' end - super(count) + super() + synchronize { ns_initialize count } end # @!macro [attach] semaphore_method_acquire diff --git a/lib/concurrent/channel/blocking_ring_buffer.rb b/lib/concurrent/channel/blocking_ring_buffer.rb index 41d7b7cd8..4052a256b 100644 --- a/lib/concurrent/channel/blocking_ring_buffer.rb +++ b/lib/concurrent/channel/blocking_ring_buffer.rb @@ -4,7 +4,8 @@ module Concurrent class BlockingRingBuffer < Synchronization::Object def initialize(capacity) - super(capacity) + super() + synchronize { ns_initialize capacity} end # @return [Integer] the capacity of the buffer diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index cb6ed870a..3450a2db2 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -5,7 +5,6 @@ class BufferedChannel def initialize(size) @mutex = Mutex.new - @condition = ConditionVariable.new @buffer_condition = ConditionVariable.new @probe_set = WaitableList.new diff --git a/lib/concurrent/channel/waitable_list.rb b/lib/concurrent/channel/waitable_list.rb index d7cebedd9..5bb4f759b 100644 --- a/lib/concurrent/channel/waitable_list.rb +++ b/lib/concurrent/channel/waitable_list.rb @@ -3,6 +3,11 @@ module Concurrent class WaitableList < Synchronization::Object + def initialize + super() + synchronize { ns_initialize } + end + def size synchronize { @list.size } end diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 2e661c2ee..55329b50e 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -58,7 +58,8 @@ class Delay < Synchronization::Object # @raise [ArgumentError] if no block is given def initialize(opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? - super(opts, &block) + super(&nil) + synchronize { ns_initialize(opts, &block) } end # Return the value this object represents after applying the options diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index f999404bf..8f099d7e6 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -1,4 +1,5 @@ -require 'concurrent' +require 'concurrent' # TODO do not require whole concurrent gem +require 'concurrent/edge/lock_free_stack' # TODO support Dereferencable ? # TODO behaviour with Interrupt exceptions is undefined, use Signal.trap to avoid issues @@ -8,8 +9,6 @@ module Concurrent module Edge module FutureShortcuts - # TODO to construct event to be set later to trigger rest of the tree - # User is responsible for completing the event once. # @return [CompletableEvent] def event(default_executor = :io) @@ -49,15 +48,10 @@ def schedule(intended_time, default_executor = :io, &task) # fails on first error # does not block a thread # @return [Future] - def join(*futures) + def zip(*futures) AllPromise.new(futures).future end - # TODO pick names for join, any on class/instance - # consider renaming to zip as in scala - alias_method :all, :join - alias_method :zip, :join - def any(*futures) AnyPromise.new(futures).future end @@ -75,7 +69,7 @@ def post_on(executor, *args, &job) end # TODO add first(futures, count=count) - # TODO allow to to have a join point for many futures and process them in batches by 10 + # TODO allow to to have a zip point for many futures and process them in batches by 10 end extend FutureShortcuts @@ -84,21 +78,24 @@ def post_on(executor, *args, &job) class Event < Synchronization::Object extend FutureShortcuts - attr_volatile :state - private :state= - def initialize(promise, default_executor = :io) @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new(false) - self.state = :pending + @Callbacks = LockFreeStack.new + @Waiters = LockFreeStack.new + @State = AtomicReference.new :pending super() ensure_ivar_visibility! end + def state + @State.get + end + # Is Future still pending? # @return [Boolean] - def pending? + def pending?(state = self.state) state == :pending end @@ -106,7 +103,7 @@ def pending? # Is Future still completed? # @return [Boolean] - def completed? + def completed?(state = self.state) state == :completed end @@ -137,16 +134,18 @@ def chain(executor = nil, &callback) alias_method :then, :chain # TODO take block optionally - def join(*futures) + def zip(*futures) AllPromise.new([self, *futures], @DefaultExecutor).future end + alias_method :&, :zip + def delay - join(Delay.new(@DefaultExecutor).future) + zip(Delay.new(@DefaultExecutor).future) end def schedule(intended_time) - chain { ScheduledPromise.new(intended_time).future.join(self) }.flat + chain { ScheduledPromise.new(intended_time).future.zip(self) }.flat end # @yield [success, value, reason] executed async on `executor` when completed @@ -161,50 +160,54 @@ def on_completion!(&callback) add_callback :pr_callback_on_completion, callback end - # @return [Array] - def blocks - pr_blocks(synchronize { @callbacks }) + def with_default_executor(executor = @DefaultExecutor) + AllPromise.new([self], executor).future end def to_s - synchronize { ns_to_s } + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" end def inspect - synchronize { "#{ns_to_s[0..-2]} blocks:[#{pr_blocks(@callbacks).map(&:to_s).join(', ')}]>" } + "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end - alias_method :+, :join - alias_method :and, :join - # @api private def complete(raise = true) - callbacks = synchronize { ns_complete raise } - pr_call_callbacks callbacks + if complete_state + # go to synchronized block only if there were waiting threads + synchronize { ns_broadcast } if @Waiters.clear + call_callbacks + else + Concurrent::MultipleAssignmentError.new('multiple assignment') if raise + return false + end self end + # @api private + # just for inspection + # @return [Array] + def blocks + @Callbacks.each_with_object([]) do |callback, promises| + promises.push *callback.select { |v| v.is_a? AbstractPromise } + end + end + # @api private # just for inspection def callbacks - synchronize { @callbacks }.clone.freeze + @Callbacks.each.to_a end # @api private def add_callback(method, *args) - call = if completed? - true - else - synchronize do - if completed? - true - else - @callbacks << [method, *args] - false - end - end - end - pr_call_callback method, *args if call + if completed? + call_callback method, *args + else + @Callbacks.push [method, *args] + call_callbacks if completed? + end self end @@ -218,53 +221,28 @@ def touched @Touched.value end - def with_default_executor(executor = @DefaultExecutor) - AllPromise.new([self], executor).future - end - private - def ns_initialize - @callbacks = [] - end - def wait_until_complete(timeout) - unless completed? - synchronize { ns_wait_until(timeout) { completed? } } + lock = Synchronization::Lock.new + + while true + last_waiter = @Waiters.peek # waiters' state before completion + break if completed? + + # synchronize so it cannot be signaled before it waits + synchronize do + # ok only if completing thread did not start signaling + next unless @Waiters.compare_and_push last_waiter, lock + ns_wait_until(timeout) { completed? } + break + end end self end - def pr_blocks(callbacks) - callbacks.each_with_object([]) do |callback, promises| - promises.push *callback.select { |v| v.is_a? AbstractPromise } - end - end - - def ns_to_s - "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" # TODO check ns status - end - - def ns_complete(raise = true) - ns_check_multiple_assignment raise - ns_complete_state - ns_broadcast - callbacks, @callbacks = @callbacks, [] - callbacks - end - - def ns_complete_state - self.state = :completed - end - - def ns_check_multiple_assignment(raise, reason = nil) - if completed? - if raise - raise reason || Concurrent::MultipleAssignmentError.new('multiple assignment') - else - return nil - end - end + def complete_state + @State.compare_and_set :pending, :completed end def pr_with_async(executor, *args, &block) @@ -279,65 +257,80 @@ def pr_callback_on_completion(callback) callback.call end - def pr_notify_blocked(promise) + def pr_callback_notify_blocked(promise) promise.on_done self end - def pr_call_callback(method, *args) - # all methods has to be pure + def call_callback(method, *args) self.send method, *args end - def pr_call_callbacks(callbacks) - callbacks.each { |method, *args| pr_call_callback method, *args } + def call_callbacks + method, *args = @Callbacks.pop + while method + call_callback method, *args + method, *args = @Callbacks.pop + end end end class Future < Event + Success = ImmutableStruct.new :value do + def reason + nil + end + + def to_s + 'success' + end + end - private *attr_volatile(:value_field, :reason_field) + Failed = ImmutableStruct.new :reason do + def value + nil + end - def initialize(promise, default_executor = :io) - self.value_field = nil - self.reason_field = nil - super promise, default_executor + def to_s + 'failed' + end end # Has the Future been success? # @return [Boolean] - def success? - state == :success + def success?(state = self.state) + Success === state end # Has the Future been failed? # @return [Boolean] - def failed? - state == :failed + def failed?(state = self.state) + Failed === state end # Has the Future been completed? # @return [Boolean] - def completed? - [:success, :failed].include? state + def completed?(state = self.state) + success? state or failed? state end # @return [Object] see Dereferenceable#deref def value(timeout = nil) touch wait_until_complete timeout - value_field + state.value end def reason(timeout = nil) touch wait_until_complete timeout - reason_field + state.reason end def result(timeout = nil) touch wait_until_complete timeout - [success?, value_field, reason_field] + state = self.state + [success?(state), state.value, state.reason] end # wait until Obligation is #complete? @@ -354,7 +347,7 @@ def wait!(timeout = nil) def value!(timeout = nil) touch wait_until_complete!(timeout) - value_field + state.value end # @example allows failed Future to be risen @@ -362,7 +355,7 @@ def value!(timeout = nil) def exception(*args) touch raise 'obligation is not failed' unless failed? - reason_field.exception(*args) + state.reason.exception(*args) end # @yield [value] executed only on parent success @@ -384,11 +377,11 @@ def flat(level = 1) FlattingPromise.new(self, level, @DefaultExecutor).future end - def or(*futures) + def any(*futures) AnyPromise.new([self, *futures], @DefaultExecutor).future end - alias_method :|, :or + alias_method :|, :any # @yield [value] executed async on `executor` when success # @return self @@ -414,27 +407,34 @@ def on_failure!(&callback) add_callback :pr_callback_on_failure, callback end + # @api private + def apply_value(value, block) + block.call value + end + # @api private def complete(success, value, reason, raise = true) - callbacks = synchronize { ns_complete success, value, reason, raise } - pr_call_callbacks callbacks, success, value, reason + if complete_state success, value, reason + @Waiters.clear + synchronize { ns_broadcast } + call_callbacks success, value, reason + else + raise reason || Concurrent::MultipleAssignmentError.new('multiple assignment') if raise + return false + end self end def add_callback(method, *args) - call = if completed? - true - else - synchronize do - if completed? - true - else - @callbacks << [method, *args] - false - end - end - end - pr_call_callback method, success?, value_field, reason_field, *args if call + state = self.state + if completed?(state) + call_callback method, success?(state), state.value, state.reason, *args + else + @Callbacks.push [method, *args] + state = self.state + # take back if it was completed in the meanwhile + call_callbacks success?(state), state.value, state.reason if completed?(state) + end self end @@ -446,26 +446,20 @@ def wait_until_complete!(timeout = nil) self end - def ns_complete(success, value, reason, raise) - ns_check_multiple_assignment raise, reason - ns_complete_state(success, value, reason) - ns_broadcast - callbacks, @callbacks = @callbacks, [] - callbacks + def complete_state(success, value, reason) + @State.compare_and_set :pending, success ? Success.new(value) : Failed.new(reason) end - def ns_complete_state(success, value, reason) - if success - self.value_field = value - self.state = :success - else - self.reason_field = reason - self.state = :failed + def call_callbacks(success, value, reason) + method, *args = @Callbacks.pop + while method + call_callback method, success, value, reason, *args + method, *args = @Callbacks.pop end end - def pr_call_callbacks(callbacks, success, value, reason) - callbacks.each { |method, *args| pr_call_callback method, success, value, reason, *args } + def call_callback(method, success, value, reason, *args) + self.send method, success, value, reason, *args end def pr_async_callback_on_success(success, value, reason, executor, callback) @@ -481,7 +475,7 @@ def pr_async_callback_on_failure(success, value, reason, executor, callback) end def pr_callback_on_success(success, value, reason, callback) - callback.call value if success + apply_value value, callback if success end def pr_callback_on_failure(success, value, reason, callback) @@ -492,7 +486,7 @@ def pr_callback_on_completion(success, value, reason, callback) callback.call success, value, reason end - def pr_notify_blocked(success, value, reason, promise) + def pr_callback_notify_blocked(success, value, reason, promise) super(promise) end @@ -535,11 +529,11 @@ def try_fail(reason = StandardError.new) end def evaluate_to(*args, &block) - promise.evaluate_to(*args, &block) + promise.evaluate_to(*args, block) end def evaluate_to!(*args, &block) - promise.evaluate_to!(*args, &block) + promise.evaluate_to!(*args, block) end end @@ -547,8 +541,8 @@ def evaluate_to!(*args, &block) # @abstract class AbstractPromise < Synchronization::Object - def initialize(future, *args, &block) - super(*args, &block) + def initialize(future) + super(&nil) @Future = future ensure_ivar_visibility! end @@ -581,22 +575,14 @@ def inspect private def complete(*args) - pr_complete(@Future, *args) - end - - def pr_complete(future, *args) - future.complete(*args) - end - - def evaluate_to(*args, &block) - pr_evaluate_to(@Future, *args, &block) + @Future.complete(*args) end # @return [Future] - def pr_evaluate_to(future, *args, &block) - pr_complete future, true, block.call(*args), nil + def evaluate_to(*args, &block) + complete true, block.call(*args), nil rescue => error - pr_complete future, false, nil, error + complete false, nil, error end end @@ -609,10 +595,9 @@ def initialize(default_executor = :io) end # @note Be careful not to fullfill the promise twice - # @example initialization - # Concurrent.promise - # @note TODO consider to allow being blocked_by class CompletableFuturePromise < AbstractPromise + # TODO consider to allow being blocked_by + def initialize(default_executor = :io) super CompletableFuture.new(self, default_executor) end @@ -647,8 +632,8 @@ def try_fail(reason = StandardError.new) public :evaluate_to # @return [Future] - def evaluate_to!(*args, &block) - evaluate_to(*args, &block).wait! + def evaluate_to!(*args, block) + evaluate_to(*args, block).wait! end end @@ -658,28 +643,23 @@ class InnerPromise < AbstractPromise # @abstract class BlockedPromise < InnerPromise - def self.new(*args) - promise = super(*args) - promise.blocked_by.each { |f| f.add_callback :pr_notify_blocked, promise } - promise - end + def initialize(future, blocked_by_futures, countdown, &block) + initialize_blocked_by(blocked_by_futures) + @Countdown = AtomicFixnum.new countdown - def initialize(future, blocked_by_futures, *args, &block) - @BlockedBy = Array(blocked_by_futures) - @Countdown = AtomicFixnum.new @BlockedBy.size - super(future, blocked_by_futures, *args, &block) + super(future) + blocked_by.each { |future| future.add_callback :pr_callback_notify_blocked, self } end # @api private def on_done(future) - # futures could be deleted from blocked_by one by one here, but that would be too expensive, - # it's done once when all are done to free the reference - - countdown = process_on_done(future, @Countdown.decrement) + countdown = process_on_done(future) completable = completable?(countdown) if completable - pr_on_completable(*pr_on_completable_args(future, blocked_by)) + on_completable(future) + # futures could be deleted from blocked_by one by one here, but that would be too expensive, + # it's done once when all are done to free the reference clear_blocked_by! end end @@ -694,33 +674,32 @@ def blocked_by @BlockedBy end - def clear_blocked_by! - # not synchronized because we do not care when this change propagates - blocked_by = @BlockedBy - @BlockedBy = [] - blocked_by - end - def inspect "#{to_s[0..-2]} blocked_by:[#{ blocked_by.map(&:to_s).join(', ')}]>" end private + def initialize_blocked_by(blocked_by_futures) + (@BlockedBy = Array(blocked_by_futures).freeze).size + end + + def clear_blocked_by! + # not synchronized because we do not care when this change propagates + @BlockedBy = [] + nil + end + # @return [true,false] if completable def completable?(countdown) countdown.zero? end - def process_on_done(future, countdown) - countdown - end - - def pr_on_completable_args(done_future, blocked_by) - [done_future, blocked_by, @Future] + def process_on_done(future) + @Countdown.decrement end - def pr_on_completable(_, _, _) + def on_completable(done_future) raise NotImplementedError end end @@ -731,41 +710,29 @@ def initialize(blocked_by_future, default_executor = :io, executor = default_exe raise ArgumentError, 'no block given' unless block_given? @Executor = executor @Task = task - super Future.new(self, default_executor), blocked_by_future + super Future.new(self, default_executor), blocked_by_future, 1 end def executor @Executor end - - private - - def ns_initialize(blocked_by_future) - super [blocked_by_future] - end - - def pr_on_completable_args(done_future, blocked_by) - [done_future, blocked_by, @Future, @Executor, @Task] - end - - def pr_on_completable(_, _, _, _, _) - raise NotImplementedError - end end class ThenPromise < BlockedTaskPromise private - def ns_initialize(blocked_by_future) + def initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) raise ArgumentError, 'only Future can be appended with then' unless blocked_by_future.is_a? Future - super(blocked_by_future) + super blocked_by_future, default_executor, executor, &task end - def pr_on_completable(done_future, _, future, executor, task) + def on_completable(done_future) if done_future.success? - Concurrent.post_on(executor, done_future, task) { |done_future, task| evaluate_to done_future.value, &task } + Concurrent.post_on(@Executor, done_future, @Task) do |done_future, task| + evaluate_to { done_future.apply_value done_future.value, task } + end else - pr_complete future, false, nil, done_future.reason + complete false, nil, done_future.reason end end end @@ -773,16 +740,16 @@ def pr_on_completable(done_future, _, future, executor, task) class RescuePromise < BlockedTaskPromise private - def ns_initialize(blocked_by_future) + def initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) raise ArgumentError, 'only Future can be rescued' unless blocked_by_future.is_a? Future - super(blocked_by_future) + super blocked_by_future, default_executor, executor, &task end - def pr_on_completable(done_future, _, future, executor, task) + def on_completable(done_future) if done_future.failed? - Concurrent.post_on(executor, done_future, task) { |done_future, task| evaluate_to done_future.reason, &task } + Concurrent.post_on(@Executor, done_future.reason, @Task) { |reason, task| evaluate_to reason, &task } else - pr_complete future, true, done_future.value, nil + complete true, done_future.value, nil end end end @@ -790,11 +757,11 @@ def pr_on_completable(done_future, _, future, executor, task) class ChainPromise < BlockedTaskPromise private - def pr_on_completable(done_future, _, _, executor, task) + def on_completable(done_future) if Future === done_future - Concurrent.post_on(executor, done_future, task) { |future, task| evaluate_to *future.result, &task } + Concurrent.post_on(@Executor, done_future, @Task) { |future, task| evaluate_to *future.result, &task } else - Concurrent.post_on(executor, task) { |task| evaluate_to &task } + Concurrent.post_on(@Executor, @Task) { |task| evaluate_to &task } end end end @@ -808,82 +775,104 @@ def initialize(default_executor = :io) class FlattingPromise < BlockedPromise def blocked_by - synchronize { ns_blocked_by } + @BlockedBy.each.to_a end private - def process_on_done(future, countdown) - value = future.value - if @Levels.value > 0 + def process_on_done(future) + countdown = super(future) + value = future.value + if countdown.nonzero? case value when Future - @Countdown.increment - @Levels.decrement - synchronize { @blocked_by << value } - value.add_callback :pr_notify_blocked, self - countdown + 1 + @BlockedBy.push value + value.add_callback :pr_callback_notify_blocked, self + @Countdown.value when Event raise TypeError, 'cannot flatten to Event' else - raise TypeError, "returned value '#{value}' is not a Future" + raise TypeError, "returned value #{value.inspect} is not a Future" end - else - countdown end + countdown end def initialize(blocked_by_future, levels = 1, default_executor = :io) raise ArgumentError, 'levels has to be higher than 0' if levels < 1 - @Levels = AtomicFixnum.new levels - super Future.new(self, default_executor), blocked_by_future - @BlockedBy = nil # its not used in FlattingPromise - end - - def ns_initialize(blocked_by_future) blocked_by_future.is_a? Future or raise ArgumentError, 'only Future can be flatten' - @blocked_by = Array(blocked_by_future) + super Future.new(self, default_executor), blocked_by_future, 1 + levels end - def pr_on_completable(_, blocked_by, future) - pr_complete future, *blocked_by.last.result + def initialize_blocked_by(blocked_by_future) + @BlockedBy = LockFreeStack.new.push(blocked_by_future) + 1 end - def ns_blocked_by - @blocked_by + def on_completable(done_future) + complete *done_future.result end def clear_blocked_by! - # not synchronized because we do not care when this change propagates - blocked_by = @blocked_by - @blocked_by = [] - blocked_by + @BlockedBy.clear + nil end end # used internally to support #with_default_executor class AllPromise < BlockedPromise + + class ArrayFuture < Future + def apply_value(value, block) + block.call(*value) + end + end + private def initialize(blocked_by_futures, default_executor = :io) - klass = blocked_by_futures.any? { |f| f.is_a?(Future) } ? Future : Event + klass = Event + blocked_by_futures.each do |f| + if f.is_a?(Future) + if klass == Event + klass = Future + elsif klass == Future + klass = ArrayFuture + break + end + end + end + # noinspection RubyArgCount - super(klass.new(self, default_executor), blocked_by_futures) + super(klass.new(self, default_executor), blocked_by_futures, blocked_by_futures.size) end - def pr_on_completable(done_future, blocked_by, future) - results = blocked_by.select { |f| f.is_a?(Future) }.map(&:result) - if results.empty? - pr_complete future - else - if results.all? { |success, _, _| success } - params = results.map { |_, value, _| value } - pr_complete(future, true, params.size == 1 ? params.first : params, nil) + def on_completable(done_future) + all_success = true + reason = nil + + values = blocked_by.each_with_object([]) do |future, values| + next unless future.is_a?(Future) + success, value, reason = future.result + + unless success + all_success = false + reason = reason + break + end + values << value + end + + if all_success + if values.empty? + complete else - # TODO what about other reasons? - pr_complete future.false, nil, results.find { |success, _, _| !success }.last + complete(true, values.size == 1 ? values.first : values, nil) end + else + # TODO what about other reasons? + complete false, nil, reason end end end @@ -895,21 +884,21 @@ class AnyPromise < BlockedPromise def initialize(blocked_by_futures, default_executor = :io) blocked_by_futures.all? { |f| f.is_a? Future } or raise ArgumentError, 'accepts only Futures not Events' - super(Future.new(self, default_executor), blocked_by_futures) + super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size) end def completable?(countdown) true end - def pr_on_completable(done_future, _, future) - pr_complete future, *done_future.result, false + def on_completable(done_future) + complete *done_future.result, false end end class Delay < InnerPromise def touch - pr_complete @Future + complete end private @@ -933,10 +922,7 @@ def inspect def initialize(intended_time, default_executor = :io) @IntendedTime = intended_time - super Event.new(self, default_executor) - end - def ns_initialize in_seconds = begin now = Time.now schedule_time = if @IntendedTime.is_a? Time @@ -947,6 +933,8 @@ def ns_initialize [0, schedule_time.to_f - now.to_f].max end + super Event.new(self, default_executor) + Concurrent.global_timer_set.post(in_seconds) { complete } end end diff --git a/lib/concurrent/edge/lock_free_stack.rb b/lib/concurrent/edge/lock_free_stack.rb new file mode 100644 index 000000000..93e005d74 --- /dev/null +++ b/lib/concurrent/edge/lock_free_stack.rb @@ -0,0 +1,76 @@ +module Concurrent + module Edge + class LockFreeStack < Synchronization::Object + + Node = ImmutableStruct.new(:value, :next) do + singleton_class.send :alias_method, :[], :new + end + + class Empty < Node + def next + self + end + end + + EMPTY = Empty[nil, nil] + + def initialize + super() + @Head = AtomicReference.new EMPTY + ensure_ivar_visibility! + end + + def empty? + @Head.get.equal? EMPTY + end + + def compare_and_push(head, value) + @Head.compare_and_set head, Node[value, head] + end + + def push(value) + @Head.update { |head| Node[value, head] } + self + end + + def peek + @Head.get + end + + def compare_and_pop(head) + @Head.compare_and_set head, head.next + end + + def pop + popped = nil + @Head.update { |head| (popped = head).next } + popped.value + end + + def compare_and_clear(head) + @Head.compare_and_set head, EMPTY + end + + def clear + while true + head = @Head.get + return false if head == EMPTY + return true if @Head.compare_and_set head, EMPTY + end + end + + include Enumerable + + def each + return to_enum unless block_given? + it = peek + until it.equal?(EMPTY) + yield it.value + it = it.next + end + self + end + + end + end +end diff --git a/lib/concurrent/executor/executor_service.rb b/lib/concurrent/executor/executor_service.rb index 683a60aa1..5d2a2c720 100644 --- a/lib/concurrent/executor/executor_service.rb +++ b/lib/concurrent/executor/executor_service.rb @@ -99,7 +99,8 @@ class AbstractExecutorService < Synchronization::Object attr_reader :fallback_policy def initialize(*args, &block) - super + super(&nil) + synchronize { ns_initialize(*args, &block) } end # @!macro [attach] executor_service_method_shutdown diff --git a/lib/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent/executor/java_single_thread_executor.rb index 96e5acc71..eee25d9d5 100644 --- a/lib/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent/executor/java_single_thread_executor.rb @@ -23,10 +23,6 @@ def initialize(opts = {}) protected - def ns_initialize(opts) - super(opts) - end - def ns_initialize(opts) @executor = java.util.concurrent.Executors.newSingleThreadExecutor @fallback_policy = opts.fetch(:fallback_policy, :discard) diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index 1ec6ec90f..429a7e809 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -9,6 +9,11 @@ module Concurrent class SerializedExecution < Synchronization::Object include Logging + def initialize() + super() + synchronize { ns_initialize } + end + Job = Struct.new(:executor, :args, :block) do def call block.call(*args) diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 631e63d20..c80558c85 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -56,12 +56,12 @@ class IVar < Synchronization::Object # returning the data # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc - def initialize(value = NO_VALUE, opts = {}) + def initialize(value = NO_VALUE, opts = {}, &block) if value != NO_VALUE && block_given? raise ArgumentError.new('provide only a value or a block') end - value = yield if block_given? - super + super(&nil) + synchronize { ns_initialize(value, opts, &block) } end # Add an observer on this object that will receive notification on update. @@ -150,6 +150,7 @@ def try_set(value = NO_VALUE, &block) # @!visibility private def ns_initialize(value, opts) + value = yield if block_given? init_obligation(self) self.observers = CopyOnWriteObserverSet.new set_deref_options(opts) diff --git a/lib/concurrent/struct/abstract_struct.rb b/lib/concurrent/struct/abstract_struct.rb index 443c0664f..755940feb 100644 --- a/lib/concurrent/struct/abstract_struct.rb +++ b/lib/concurrent/struct/abstract_struct.rb @@ -3,6 +3,13 @@ module Concurrent # @!visibility private module AbstractStruct + # @!visibility private + def initialize(*values) + super() + ns_initialize(*values) + ensure_ivar_visibility! + end + # @!macro [attach] struct_length # # Returns the number of struct members. diff --git a/lib/concurrent/struct/immutable_struct.rb b/lib/concurrent/struct/immutable_struct.rb index 4b0157765..9eaf308b4 100644 --- a/lib/concurrent/struct/immutable_struct.rb +++ b/lib/concurrent/struct/immutable_struct.rb @@ -9,15 +9,11 @@ module Concurrent module ImmutableStruct include AbstractStruct - # @!visibility private - def initialize(*values) - ns_initialize(*values) - end - # @!macro struct_values def values ns_values end + alias_method :to_a, :values # @!macro struct_values_at @@ -29,6 +25,7 @@ def values_at(*indexes) def inspect ns_inspect end + alias_method :to_s, :inspect # @!macro struct_merge @@ -83,7 +80,7 @@ def self.new(*args, &block) FACTORY = Class.new(Synchronization::Object) do def define_struct(name, members, &block) synchronize do - AbstractStruct.define_struct_class(ImmutableStruct, nil, name, members, &block) + AbstractStruct.define_struct_class(ImmutableStruct, Synchronization::Object, name, members, &block) end end end.new diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index 9c63c6259..d11e53846 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -1,11 +1,14 @@ require 'concurrent/utility/engine' require 'concurrent/synchronization/abstract_object' -require 'concurrent/native_extensions' # JavaObject +require 'concurrent/synchronization/java_object' require 'concurrent/synchronization/mutex_object' require 'concurrent/synchronization/monitor_object' require 'concurrent/synchronization/rbx_object' require 'concurrent/synchronization/object' +require 'concurrent/synchronization/condition' +require 'concurrent/synchronization/lock' + module Concurrent # {include:file:doc/synchronization.md} module Synchronization diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index e56cf84e3..6a1a3423f 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -42,6 +42,18 @@ def synchronize end # initialization of the object called inside synchronize block + # @note has to be called manually when required in children of this class + # @example + # class Child < Concurrent::Synchornization::Object + # def initialize(*args, &block) + # super(&nil) + # synchronize { ns_initialize(*args, &block) } + # end + # + # def ns_initialize(*args, &block) + # @args = args + # end + # end def ns_initialize(*args, &block) end @@ -149,7 +161,6 @@ def #{name}=(value) end names.map { |n| [n, :"#{n}="] }.flatten end - end end end diff --git a/lib/concurrent/synchronization/condition.rb b/lib/concurrent/synchronization/condition.rb new file mode 100644 index 000000000..eeb6996cd --- /dev/null +++ b/lib/concurrent/synchronization/condition.rb @@ -0,0 +1,53 @@ +module Concurrent + module Synchronization + class Condition < Object + + singleton_class.send :alias_method, :private_new, :new + private_class_method :new + + def initialize(lock) + @Lock = lock + ensure_ivar_visibility! + super() + end + + def wait(timeout = nil) + @Lock.synchronize { ns_wait(timeout) } + end + + def ns_wait(timeout = nil) + synchronize { super(timeout) } + end + + def wait_until(timeout = nil, &condition) + @Lock.synchronize { ns_wait_until(timeout, &condition) } + end + + def ns_wait_until(timeout = nil, &condition) + synchronize { super(timeout, &condition) } + end + + def signal + @Lock.synchronize { ns_signal } + end + + def ns_signal + synchronize { super } + end + + def broadcast + @Lock.synchronize { ns_broadcast } + end + + def ns_broadcast + synchronize { super } + end + end + + class Object < Implementation + def new_condition + Condition.private_new(self) + end + end + end +end diff --git a/lib/concurrent/synchronization/java_object.rb b/lib/concurrent/synchronization/java_object.rb new file mode 100644 index 000000000..5ba0efacf --- /dev/null +++ b/lib/concurrent/synchronization/java_object.rb @@ -0,0 +1,33 @@ +require 'concurrent/native_extensions' # load native part first + +module Concurrent + module Synchronization + + if Concurrent.on_jruby? + require 'jruby' + + class JavaObject < AbstractObject + + def self.attr_volatile(*names) + names.each do |name| + + ivar = :"@volatile_#{name}" + + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{name} + instance_variable_get_volatile(:#{ivar}) + end + + def #{name}=(value) + instance_variable_set_volatile(:#{ivar}, value) + end + RUBY + + end + names.map { |n| [n, :"#{n}="] }.flatten + end + + end + end + end +end diff --git a/lib/concurrent/synchronization/java_pure_object.rb b/lib/concurrent/synchronization/java_pure_object.rb deleted file mode 100644 index ff4ec210c..000000000 --- a/lib/concurrent/synchronization/java_pure_object.rb +++ /dev/null @@ -1,43 +0,0 @@ -module Concurrent - module Synchronization - - if Concurrent.on_jruby? - require 'jruby' - - class JavaPureObject < AbstractObject - def initialize(*args, &block) - synchronize { ns_initialize(*args, &block) } - end - - protected - - def synchronize - JRuby.reference0(self).synchronized { yield } - end - - def ns_wait(timeout = nil) - success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout) - self - rescue java.lang.InterruptedException => e - raise ThreadError(e.message) - ensure - ns_signal unless success - end - - def ns_broadcast - JRuby.reference0(self).notifyAll - self - end - - def ns_signal - JRuby.reference0(self).notify - self - end - - def ensure_ivar_visibility! - # relying on undocumented behavior of JRuby, ivar access is volatile - end - end - end - end -end diff --git a/lib/concurrent/synchronization/lock.rb b/lib/concurrent/synchronization/lock.rb new file mode 100644 index 000000000..5a7c0192c --- /dev/null +++ b/lib/concurrent/synchronization/lock.rb @@ -0,0 +1,32 @@ +module Concurrent + module Synchronization + class Lock < Object + + public :synchronize + + def wait(timeout = nil) + synchronize { ns_wait(timeout) } + end + + public :ns_wait + + def wait_until(timeout = nil, &condition) + synchronize { ns_wait_until(timeout, &condition) } + end + + public :ns_wait_until + + def signal + synchronize { ns_signal } + end + + public :ns_signal + + def broadcast + synchronize { ns_broadcast } + end + + public :ns_broadcast + end + end +end diff --git a/lib/concurrent/synchronization/monitor_object.rb b/lib/concurrent/synchronization/monitor_object.rb index 661e93afd..d7f55bd24 100644 --- a/lib/concurrent/synchronization/monitor_object.rb +++ b/lib/concurrent/synchronization/monitor_object.rb @@ -4,7 +4,6 @@ class MonitorObject < MutexObject def initialize(*args, &block) @__lock__ = ::Monitor.new @__condition__ = @__lock__.new_cond - synchronize { ns_initialize(*args, &block) } end protected diff --git a/lib/concurrent/synchronization/mutex_object.rb b/lib/concurrent/synchronization/mutex_object.rb index 1a8a6d95c..eaa785a1e 100644 --- a/lib/concurrent/synchronization/mutex_object.rb +++ b/lib/concurrent/synchronization/mutex_object.rb @@ -4,7 +4,6 @@ class MutexObject < AbstractObject def initialize(*args, &block) @__lock__ = ::Mutex.new @__condition__ = ::ConditionVariable.new - synchronize { ns_initialize(*args, &block) } end protected diff --git a/lib/concurrent/synchronization/rbx_object.rb b/lib/concurrent/synchronization/rbx_object.rb index fd42fad28..e0e5119a4 100644 --- a/lib/concurrent/synchronization/rbx_object.rb +++ b/lib/concurrent/synchronization/rbx_object.rb @@ -3,10 +3,8 @@ module Synchronization if Concurrent.on_rbx? class RbxObject < AbstractObject def initialize(*args, &block) - synchronize do - @waiters = [] - ns_initialize(*args, &block) - end + @waiters = [] + ensure_ivar_visibility! end protected diff --git a/lib/concurrent/utility/monotonic_time.rb b/lib/concurrent/utility/monotonic_time.rb index 2b2bcd263..0b085e8e2 100644 --- a/lib/concurrent/utility/monotonic_time.rb +++ b/lib/concurrent/utility/monotonic_time.rb @@ -2,10 +2,12 @@ module Concurrent - # Clock that cannot be set and represents monotonic time since - # some unspecified starting point. - # @!visibility private - GLOBAL_MONOTONIC_CLOCK = Class.new(Synchronization::Object) { + class_definition = Class.new(Synchronization::Object) do + def initialize + super() + @last_time = Time.now.to_f + ensure_ivar_visibility! + end if defined?(Process::CLOCK_MONOTONIC) # @!visibility private @@ -24,21 +26,20 @@ def get_time synchronize do now = Time.now.to_f if @last_time < now - @last_time = now + @last_time = now else # clock has moved back in time - @last_time += 0.000_001 + @last_time += 0.000_001 end end end - protected - - # @!visibility private - def ns_initialize - @last_time = Time.now.to_f - end end - }.new + end + + # Clock that cannot be set and represents monotonic time since + # some unspecified starting point. + # @!visibility private + GLOBAL_MONOTONIC_CLOCK = class_definition.new private_constant :GLOBAL_MONOTONIC_CLOCK # @!macro [attach] monotonic_get_time @@ -52,5 +53,6 @@ def ns_initialize def monotonic_time GLOBAL_MONOTONIC_CLOCK.get_time end + module_function :monotonic_time end diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index b63a472f4..d56b9d9af 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -15,9 +15,6 @@ module Actor # end describe 'Concurrent::Actor' do - prepend_before do - do_no_reset! - end def terminate_actors(*actors) actors.each do |actor| diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 79dd269dd..3fa213c8d 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -3,10 +3,8 @@ module Concurrent describe Configuration do before(:each) do - reset_gem_configuration - end - - after(:each) do + # redundant - done in spec_helper.rb + # done here again for explicitness reset_gem_configuration end @@ -28,9 +26,9 @@ module Concurrent end specify '#terminate_pools! acts on all executors with auto_terminate: true' do - expect(Concurrent.global_fast_executor).to receive(:kill).with(no_args).and_call_original - expect(Concurrent.global_io_executor).to receive(:kill).with(no_args).and_call_original - expect(Concurrent.global_timer_set).to receive(:kill).with(no_args).and_call_original + expect(Concurrent.global_fast_executor).to receive(:kill).once.with(no_args).and_call_original + expect(Concurrent.global_io_executor).to receive(:kill).once.with(no_args).and_call_original + expect(Concurrent.global_timer_set).to receive(:kill).once.with(no_args).and_call_original Concurrent.terminate_pools! end end diff --git a/spec/concurrent/edge/future_spec.rb b/spec/concurrent/edge/future_spec.rb index cc9fde87c..7af735066 100644 --- a/spec/concurrent/edge/future_spec.rb +++ b/spec/concurrent/edge/future_spec.rb @@ -66,7 +66,7 @@ specify do completable_event = Concurrent.event one = completable_event.chain { 1 } - join = Concurrent.join(completable_event).chain { 1 } + join = Concurrent.zip(completable_event).chain { 1 } expect(one.completed?).to be false completable_event.complete expect(one.value).to eq 1 @@ -78,7 +78,7 @@ specify do completable_future = Concurrent.future one = completable_future.then(&:succ) - join = Concurrent.join(completable_future).then { |v| v } + join = Concurrent.zip(completable_future).then { |v| v } expect(one.completed?).to be false completable_future.success 0 expect(one.value).to eq 1 @@ -97,8 +97,7 @@ queue.push(2) anys = [Concurrent.any(f1, f2), - f1 | f2, - f1.or(f2)] + f1 | f2] anys.each do |any| expect(any.value.to_s).to match /1|2/ @@ -107,15 +106,42 @@ end end + describe '.zip' do + it 'continues on first result' do + a = Concurrent.future { 1 } + b = Concurrent.future { 2 } + c = Concurrent.future { 3 } + + z1 = a & b + z2 = Concurrent.zip a, b, c + + expect(z1.value).to eq [1, 2] + expect(z2.value).to eq [1, 2, 3] + + q = Queue.new + z1.then { |*args| q << args } + expect(q.pop).to eq [1, 2] + z1.then { |a, b, c| q << [a, b, c] } + expect(q.pop).to eq [1, 2, nil] + + expect(z1.then { |a, b| a+b }.value).to eq 3 + expect(z1.then { |a, b| a+b }.value).to eq 3 + expect(z1.then(&:+).value).to eq 3 + expect(z2.then { |a, b, c| a+b+c }.value).to eq 6 + end + end + describe 'Future' do it 'has sync and async callbacks' do queue = Queue.new future = Concurrent.future { :value } # executed on FAST_EXECUTOR pool by default future.on_completion(:io) { queue.push(:async) } # async callback overridden to execute on IO_EXECUTOR pool future.on_completion! { queue.push(:sync) } # sync callback executed right after completion in the same thread-pool + future.on_success(:io) { queue.push(:async) } # async callback overridden to execute on IO_EXECUTOR pool + future.on_success! { queue.push(:sync) } # sync callback executed right after completion in the same thread-pool expect(future.value).to eq :value - expect([queue.pop, queue.pop].sort).to eq [:async, :sync] + expect([queue.pop, queue.pop, queue.pop, queue.pop].sort).to eq [:async, :async, :sync, :sync] end it 'chains' do @@ -126,7 +152,7 @@ future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR future5 = future3.with_default_executor(:fast) # connects new future with different executor, the new future is completed when future3 is future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 - future7 = Concurrent.join(future0, future3) + future7 = future0 & future3 future8 = future0.rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0 futures = [future0, future1, future2, future3, future4, future5, future6, future7, future8] @@ -179,15 +205,16 @@ branch1 = head.then(&:succ) branch2 = head.then(&:succ).delay.then(&:succ) results = [ - Concurrent.join(branch1, branch2).then { |b1, b2| b1 + b2 }, - branch1.join(branch2).then { |b1, b2| b1 + b2 }, - (branch1 + branch2).then { |b1, b2| b1 + b2 }] + Concurrent.zip(branch1, branch2).then { |b1, b2| b1 + b2 }, + branch1.zip(branch2).then { |b1, b2| b1 + b2 }, + (branch1 & branch2).then { |b1, b2| b1 + b2 }] sleep 0.1 expect(branch1).to be_completed expect(branch2).not_to be_completed expect(results.map(&:value)).to eq [5, 5, 5] + expect(Concurrent.zip(branch1, branch2).value).to eq [2, 3] end it 'has flat map' do @@ -208,8 +235,33 @@ value).to eq 6 end + specify do + expect(Concurrent.future { :v }.value!).to eq :v + end + end +# def synchronize +# if @__mutex__do_not_use_directly.owned? +# yield +# else +# @__mutex__do_not_use_directly.synchronize { yield } +# # @__mutex__do_not_use_directly.synchronize do +# # locking = (Thread.current[:locking] ||= []) +# # locking.push self +# # puts "locking #{locking.size}" # : #{locking}" +# # begin +# # yield +# # ensure +# # if locking.size > 2 +# # # binding.pry +# # end +# # locking.pop +# # end +# # end +# end +# end + __END__ puts '-- connecting existing promises' diff --git a/spec/concurrent/synchronization_spec.rb b/spec/concurrent/synchronization_spec.rb index 87f4e199c..5351e473a 100644 --- a/spec/concurrent/synchronization_spec.rb +++ b/spec/concurrent/synchronization_spec.rb @@ -10,6 +10,7 @@ class AClass < Synchronization::Object def initialize(value = nil) super() @Final = value + ns_initialize ensure_ivar_visibility! end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8630751ce..58ee88bd3 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,7 +5,7 @@ logger = Logger.new($stderr) logger.level = Logger::WARN Concurrent.global_logger = lambda do |level, progname, message = nil, &block| - logger.add level, message, progname, &block +logger.add level, message, progname, &block end if ENV['COVERAGE'] || ENV['CI'] || ENV['TRAVIS'] @@ -13,8 +13,8 @@ require 'coveralls' SimpleCov.formatter = SimpleCov::Formatter::MultiFormatter[ - SimpleCov::Formatter::HTMLFormatter, - Coveralls::SimpleCov::Formatter + SimpleCov::Formatter::HTMLFormatter, + Coveralls::SimpleCov::Formatter ] SimpleCov.start do @@ -42,9 +42,4 @@ #TODO: Better configuration management in individual test suites reset_gem_configuration end - - config.after(:each) do - #TODO: Better thread management in individual test suites - kill_rogue_threads(false) - end end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 06d746351..a550163dd 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -16,40 +16,23 @@ def delta(v1, v2) include EngineDetector def use_c_extensions? - Concurrent.allow_c_extensions? # from extension_helper.rb - end - - def do_no_reset! - @do_not_reset = true + Concurrent.allow_c_extensions? end GLOBAL_EXECUTORS = [ - [:GLOBAL_FAST_EXECUTOR, -> { Delay.new { Concurrent.new_fast_executor(auto_terminate: true) } }], - [:GLOBAL_IO_EXECUTOR, -> { Delay.new { Concurrent.new_io_executor(auto_terminate: true) } }], - [:GLOBAL_TIMER_SET, -> { Delay.new { Concurrent::TimerSet.new(auto_terminate: true) } }], + [:GLOBAL_FAST_EXECUTOR, -> { Delay.new { Concurrent.new_fast_executor(auto_terminate: true) } }], + [:GLOBAL_IO_EXECUTOR, -> { Delay.new { Concurrent.new_io_executor(auto_terminate: true) } }], + [:GLOBAL_TIMER_SET, -> { Delay.new { Concurrent::TimerSet.new(auto_terminate: true) } }], ] - @@killed = false - def reset_gem_configuration - if @@killed - GLOBAL_EXECUTORS.each do |var, factory| - executor = Concurrent.const_get(var).value! - executor.shutdown - executor.kill - Concurrent.const_set(var, factory.call) - end - @@killed = false - end - end - - def kill_rogue_threads(warning = true) - return if @do_not_reset - warn('[DEPRECATED] brute force thread control being used -- tests need updated') if warning - Thread.list.each do |thread| - thread.kill unless thread == Thread.current + GLOBAL_EXECUTORS.each do |var, factory| + executor = Concurrent.const_get(var).value! + executor.shutdown + executor.wait_for_termination(0.2) + executor.kill + Concurrent.const_set(var, factory.call) end - @@killed = true end def monotonic_interval