From d66f6161d78c6509712d75209d48b0854a9b79fb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 6 Apr 2015 23:01:48 +0200 Subject: [PATCH 01/16] Add EngineDetector and fix extension_helper path It was polluting global Ruby load space --- Rakefile | 12 +++++----- concurrent-ruby-ext.gemspec | 2 +- ext/concurrent/extconf.rb | 2 +- lib/concurrent/atomic.rb | 9 ++++---- lib/concurrent/atomic/atomic_boolean.rb | 4 ++-- lib/concurrent/atomic/atomic_fixnum.rb | 4 ++-- lib/concurrent/atomic/count_down_latch.rb | 2 +- lib/concurrent/atomic/semaphore.rb | 2 +- lib/concurrent/atomic/thread_local_var.rb | 4 ++-- lib/concurrent/atomic_reference/jruby.rb | 2 +- lib/concurrent/atomic_reference/ruby.rb | 2 +- lib/concurrent/collection/priority_queue.rb | 2 +- lib/concurrent/executor/cached_thread_pool.rb | 2 +- lib/concurrent/executor/executor.rb | 4 ++-- lib/concurrent/executor/fixed_thread_pool.rb | 2 +- .../executor/java_cached_thread_pool.rb | 2 +- .../executor/java_fixed_thread_pool.rb | 2 +- .../executor/java_single_thread_executor.rb | 2 +- .../executor/java_thread_pool_executor.rb | 2 +- .../executor/single_thread_executor.rb | 2 +- .../executor/thread_pool_executor.rb | 2 +- lib/{ => concurrent}/extension_helper.rb | 11 ++++------ lib/concurrent/utility/engine.rb | 22 +++++++++++++++++++ lib/concurrent/utility/monotonic_time.rb | 2 +- lib/concurrent/utility/processor_count.rb | 2 +- spec/concurrent/atomic/atomic_boolean_spec.rb | 4 ++-- spec/concurrent/atomic/atomic_fixnum_spec.rb | 4 ++-- .../atomic/count_down_latch_spec.rb | 4 ++-- spec/concurrent/atomic/semaphore_spec.rb | 4 ++-- .../atomic/thread_local_var_spec.rb | 6 ++--- spec/concurrent/atomic_spec.rb | 6 ++--- .../collection/priority_queue_spec.rb | 4 ++-- .../executor/java_cached_thread_pool_spec.rb | 2 +- .../executor/java_fixed_thread_pool_spec.rb | 2 +- .../java_single_thread_executor_spec.rb | 2 +- .../java_thread_pool_executor_spec.rb | 2 +- .../executor/thread_pool_class_cast_spec.rb | 8 +++---- spec/support/example_group_extensions.rb | 14 ++---------- 38 files changed, 88 insertions(+), 78 deletions(-) rename lib/{ => concurrent}/extension_helper.rb (83%) create mode 100644 lib/concurrent/utility/engine.rb diff --git a/Rakefile b/Rakefile index 735254f29..4e9d644c3 100644 --- a/Rakefile +++ b/Rakefile @@ -1,6 +1,6 @@ #!/usr/bin/env rake -require_relative './lib/extension_helper' +require 'concurrent/extension_helper' ## load the two gemspec files CORE_GEMSPEC = Gem::Specification.load('concurrent-ruby.gemspec') @@ -12,7 +12,7 @@ GEM_NAME = 'concurrent-ruby' EXTENSION_NAME = 'extension' JAVA_EXT_NAME = 'concurrent_ruby_ext' -if Concurrent.jruby? +if Concurrent.on_jruby? CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}-java.gem" else CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}.gem" @@ -35,7 +35,7 @@ Dir.glob('tasks/**/*.rake').each do |rakefile| safe_load rakefile end -if Concurrent.jruby? +if Concurrent.on_jruby? ## create the compile task for the JRuby-specific gem require 'rake/javaextensiontask' @@ -95,7 +95,7 @@ end namespace :build do build_deps = [:clean] - build_deps << :compile if Concurrent.jruby? + build_deps << :compile if Concurrent.on_jruby? desc "Build #{CORE_GEM} into the pkg directory" task :core => build_deps do @@ -103,7 +103,7 @@ namespace :build do sh 'mv *.gem pkg/' end - unless Concurrent.jruby? + unless Concurrent.on_jruby? desc "Build #{EXTENSION_GEM} into the pkg directory" task :ext => [:clean] do sh "gem build #{EXT_GEMSPEC.name}.gemspec" @@ -120,7 +120,7 @@ namespace :build do end end -if Concurrent.jruby? +if Concurrent.on_jruby? desc 'Build JRuby-specific core gem (alias for `build:core`)' task :build => ['build:core'] else diff --git a/concurrent-ruby-ext.gemspec b/concurrent-ruby-ext.gemspec index c0e6040c0..d9598f7a5 100644 --- a/concurrent-ruby-ext.gemspec +++ b/concurrent-ruby-ext.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.files = Dir['ext/**/*.{h,c,cpp}'] s.files += [ - 'lib/extension_helper.rb', + 'lib/concurrent/extension_helper.rb', 'lib/concurrent/atomic_reference/concurrent_update_error.rb', 'lib/concurrent/atomic_reference/direct_update.rb', 'lib/concurrent/atomic_reference/numeric_cas_wrapper.rb', diff --git a/ext/concurrent/extconf.rb b/ext/concurrent/extconf.rb index 76cb706dd..3d7ea5eda 100644 --- a/ext/concurrent/extconf.rb +++ b/ext/concurrent/extconf.rb @@ -1,6 +1,6 @@ require 'fileutils' -require_relative '../../lib/extension_helper' +require 'concurrent/extension_helper' EXTENSION_NAME = 'extension' diff --git a/lib/concurrent/atomic.rb b/lib/concurrent/atomic.rb index d6c2bd0bc..a40fb4e17 100644 --- a/lib/concurrent/atomic.rb +++ b/lib/concurrent/atomic.rb @@ -3,16 +3,17 @@ # user that they should use the new implementation instead. if defined?(Atomic) - warn <<-RUBY + warn <<-TXT [ATOMIC] Detected an `Atomic` class, which may indicate a dependency on the ruby-atomic gem. That gem has been deprecated and merged into the concurrent-ruby gem. Please use the Concurrent::Atomic class for atomic references and not the Atomic class. -RUBY + TXT end ##################################################################### -require_relative '../extension_helper' +require 'concurrent/extension_helper' +require 'concurrent/utility/engine' require 'concurrent/atomic_reference/concurrent_update_error' require 'concurrent/atomic_reference/mutex_atomic' @@ -21,7 +22,7 @@ if /[^0fF]/ =~ ENV['FORCE_ATOMIC_FALLBACK'] ruby_engine = 'mutex_atomic' else - ruby_engine = defined?(RUBY_ENGINE)? RUBY_ENGINE : 'ruby' + ruby_engine = Concurrent.ruby_engine end require "concurrent/atomic_reference/#{ruby_engine}" diff --git a/lib/concurrent/atomic/atomic_boolean.rb b/lib/concurrent/atomic/atomic_boolean.rb index 3bca953d7..915c08a16 100644 --- a/lib/concurrent/atomic/atomic_boolean.rb +++ b/lib/concurrent/atomic/atomic_boolean.rb @@ -1,4 +1,4 @@ -require_relative '../../extension_helper' +require 'concurrent/extension_helper' module Concurrent @@ -113,7 +113,7 @@ def make_false end end - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? class AtomicBoolean < JavaAtomicBoolean end diff --git a/lib/concurrent/atomic/atomic_fixnum.rb b/lib/concurrent/atomic/atomic_fixnum.rb index f6968b751..b8807beb4 100644 --- a/lib/concurrent/atomic/atomic_fixnum.rb +++ b/lib/concurrent/atomic/atomic_fixnum.rb @@ -1,4 +1,4 @@ -require_relative '../../extension_helper' +require 'concurrent/extension_helper' module Concurrent @@ -118,7 +118,7 @@ def compare_and_set(expect, update) end end - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? # @!macro atomic_fixnum class AtomicFixnum < JavaAtomicFixnum diff --git a/lib/concurrent/atomic/count_down_latch.rb b/lib/concurrent/atomic/count_down_latch.rb index 24a5eb146..f7ad0f650 100644 --- a/lib/concurrent/atomic/count_down_latch.rb +++ b/lib/concurrent/atomic/count_down_latch.rb @@ -69,7 +69,7 @@ def count end end - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? # @!macro count_down_latch class JavaCountDownLatch diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index b2a5f84de..0d5f2d3e7 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -150,7 +150,7 @@ def try_acquire_timed(permits, timeout) end end - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? # @!macro semaphore # diff --git a/lib/concurrent/atomic/thread_local_var.rb b/lib/concurrent/atomic/thread_local_var.rb index 26e06a5f8..0ee96ccca 100644 --- a/lib/concurrent/atomic/thread_local_var.rb +++ b/lib/concurrent/atomic/thread_local_var.rb @@ -35,7 +35,7 @@ module ThreadLocalRubyStorage protected - unless RUBY_PLATFORM == 'java' + unless Concurrent.on_jruby? require 'ref' end @@ -119,7 +119,7 @@ def bind(value, &block) # @!macro abstract_thread_local_var class ThreadLocalVar < AbstractThreadLocalVar - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? include ThreadLocalJavaStorage else include ThreadLocalRubyStorage diff --git a/lib/concurrent/atomic_reference/jruby.rb b/lib/concurrent/atomic_reference/jruby.rb index adb7e23e4..103b8cd8c 100644 --- a/lib/concurrent/atomic_reference/jruby.rb +++ b/lib/concurrent/atomic_reference/jruby.rb @@ -1,4 +1,4 @@ -require_relative '../../extension_helper' +require 'concurrent/extension_helper' if defined?(Concurrent::JavaAtomic) require 'concurrent/atomic_reference/direct_update' diff --git a/lib/concurrent/atomic_reference/ruby.rb b/lib/concurrent/atomic_reference/ruby.rb index 6f9b76631..7ae3f520b 100644 --- a/lib/concurrent/atomic_reference/ruby.rb +++ b/lib/concurrent/atomic_reference/ruby.rb @@ -1,5 +1,5 @@ if defined? Concurrent::CAtomic - require_relative '../../extension_helper' + require 'concurrent/extension_helper' require 'concurrent/atomic_reference/direct_update' require 'concurrent/atomic_reference/numeric_cas_wrapper' diff --git a/lib/concurrent/collection/priority_queue.rb b/lib/concurrent/collection/priority_queue.rb index cbf3c3c7d..03d5a3fdd 100644 --- a/lib/concurrent/collection/priority_queue.rb +++ b/lib/concurrent/collection/priority_queue.rb @@ -219,7 +219,7 @@ def swim(k) end end - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? # @!macro priority_queue class JavaPriorityQueue diff --git a/lib/concurrent/executor/cached_thread_pool.rb b/lib/concurrent/executor/cached_thread_pool.rb index 46dd7a341..ab50f8ce0 100644 --- a/lib/concurrent/executor/cached_thread_pool.rb +++ b/lib/concurrent/executor/cached_thread_pool.rb @@ -2,7 +2,7 @@ module Concurrent - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? require 'concurrent/executor/java_cached_thread_pool' # @!macro [attach] cached_thread_pool # A thread pool that dynamically grows and shrinks to fit the current workload. diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 46ef26a06..7ed5bba62 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -69,7 +69,7 @@ def auto_terminate? def enable_at_exit_handler!(opts = {}) if opts.fetch(:stop_on_exit, true) @auto_terminate = true - if RUBY_PLATFORM == 'ruby' + if Concurrent.on_cruby? create_mri_at_exit_handler!(self.object_id) else create_at_exit_handler!(self) @@ -270,7 +270,7 @@ def kill_execution end end - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? module JavaExecutor include Executor diff --git a/lib/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent/executor/fixed_thread_pool.rb index e9e0c2e64..c37ce6dda 100644 --- a/lib/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent/executor/fixed_thread_pool.rb @@ -2,7 +2,7 @@ module Concurrent - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? require 'concurrent/executor/java_fixed_thread_pool' diff --git a/lib/concurrent/executor/java_cached_thread_pool.rb b/lib/concurrent/executor/java_cached_thread_pool.rb index 3a067d740..eec042702 100644 --- a/lib/concurrent/executor/java_cached_thread_pool.rb +++ b/lib/concurrent/executor/java_cached_thread_pool.rb @@ -1,4 +1,4 @@ -if RUBY_PLATFORM == 'java' +if Concurrent.on_jruby? require 'concurrent/executor/java_thread_pool_executor' diff --git a/lib/concurrent/executor/java_fixed_thread_pool.rb b/lib/concurrent/executor/java_fixed_thread_pool.rb index d3d349b89..df7eead5b 100644 --- a/lib/concurrent/executor/java_fixed_thread_pool.rb +++ b/lib/concurrent/executor/java_fixed_thread_pool.rb @@ -1,4 +1,4 @@ -if RUBY_PLATFORM == 'java' +if Concurrent.on_jruby? require 'concurrent/executor/java_thread_pool_executor' diff --git a/lib/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent/executor/java_single_thread_executor.rb index 62691324b..6fc3134a3 100644 --- a/lib/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent/executor/java_single_thread_executor.rb @@ -1,4 +1,4 @@ -if RUBY_PLATFORM == 'java' +if Concurrent.on_jruby? require_relative 'executor' module Concurrent diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index 21af2b537..9a5ad34b8 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -1,4 +1,4 @@ -if RUBY_PLATFORM == 'java' +if Concurrent.on_jruby? require_relative 'executor' module Concurrent diff --git a/lib/concurrent/executor/single_thread_executor.rb b/lib/concurrent/executor/single_thread_executor.rb index 5d617f7ad..85fa2e5ec 100644 --- a/lib/concurrent/executor/single_thread_executor.rb +++ b/lib/concurrent/executor/single_thread_executor.rb @@ -2,7 +2,7 @@ module Concurrent - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? require 'concurrent/executor/java_single_thread_executor' diff --git a/lib/concurrent/executor/thread_pool_executor.rb b/lib/concurrent/executor/thread_pool_executor.rb index 3c234e44b..d97e9f09f 100644 --- a/lib/concurrent/executor/thread_pool_executor.rb +++ b/lib/concurrent/executor/thread_pool_executor.rb @@ -2,7 +2,7 @@ module Concurrent - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? require 'concurrent/executor/java_thread_pool_executor' # @!macro [attach] thread_pool_executor # diff --git a/lib/extension_helper.rb b/lib/concurrent/extension_helper.rb similarity index 83% rename from lib/extension_helper.rb rename to lib/concurrent/extension_helper.rb index 8ddf1bb1b..23b79aac4 100644 --- a/lib/extension_helper.rb +++ b/lib/concurrent/extension_helper.rb @@ -1,3 +1,5 @@ +require 'concurrent/utility/engine' + module Concurrent @@c_ext_loaded ||= false @@ -5,12 +7,7 @@ module Concurrent # @!visibility private def self.allow_c_extensions? - defined?(RUBY_ENGINE) && RUBY_ENGINE == 'ruby' - end - - # @!visibility private - def self.jruby? - RUBY_PLATFORM == 'java' + on_cruby? end if allow_c_extensions? && !@@c_ext_loaded @@ -26,7 +23,7 @@ def self.jruby? warn 'Performance on MRI may be improved with the concurrent-ruby-ext gem. Please see http://concurrent-ruby.com' end end - elsif jruby? && !@@java_ext_loaded + elsif on_jruby? && !@@java_ext_loaded begin require 'concurrent_ruby_ext' @@java_ext_loaded = true diff --git a/lib/concurrent/utility/engine.rb b/lib/concurrent/utility/engine.rb new file mode 100644 index 000000000..0df014d9c --- /dev/null +++ b/lib/concurrent/utility/engine.rb @@ -0,0 +1,22 @@ +module Concurrent + + module EngineDetector + def on_jruby? + ruby_engine == 'jruby' + end + + def on_cruby? + ruby_engine == 'ruby' + end + + def on_rbx? + ruby_engine == 'rbx' + end + + def ruby_engine + defined?(RUBY_ENGINE) ? RUBY_ENGINE : 'ruby' + end + end + + extend EngineDetector +end diff --git a/lib/concurrent/utility/monotonic_time.rb b/lib/concurrent/utility/monotonic_time.rb index 4a035ee2e..3dd493484 100644 --- a/lib/concurrent/utility/monotonic_time.rb +++ b/lib/concurrent/utility/monotonic_time.rb @@ -10,7 +10,7 @@ module Concurrent def get_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end - elsif RUBY_PLATFORM == 'java' + elsif Concurrent.on_jruby? # @!visibility private def get_time java.lang.System.nanoTime() / 1_000_000_000.0 diff --git a/lib/concurrent/utility/processor_count.rb b/lib/concurrent/utility/processor_count.rb index b168e1d16..56dccb20e 100644 --- a/lib/concurrent/utility/processor_count.rb +++ b/lib/concurrent/utility/processor_count.rb @@ -71,7 +71,7 @@ def physical_processor_count private def compute_processor_count - if RUBY_PLATFORM == 'java' + if Concurrent.on_jruby? java.lang.Runtime.getRuntime.availableProcessors else os_name = RbConfig::CONFIG["target_os"] diff --git a/spec/concurrent/atomic/atomic_boolean_spec.rb b/spec/concurrent/atomic/atomic_boolean_spec.rb index b0d3e6406..e42bcb0d2 100644 --- a/spec/concurrent/atomic/atomic_boolean_spec.rb +++ b/spec/concurrent/atomic/atomic_boolean_spec.rb @@ -156,7 +156,7 @@ module Concurrent end end - if TestHelpers.jruby? + if Concurrent.on_jruby? describe JavaAtomicBoolean do it_should_behave_like :atomic_boolean @@ -170,7 +170,7 @@ module Concurrent end end - if TestHelpers.jruby? + if Concurrent.on_jruby? it 'inherits from JavaAtomicBoolean' do expect(AtomicBoolean.ancestors).to include(JavaAtomicBoolean) end diff --git a/spec/concurrent/atomic/atomic_fixnum_spec.rb b/spec/concurrent/atomic/atomic_fixnum_spec.rb index 959651ec7..075ceea34 100644 --- a/spec/concurrent/atomic/atomic_fixnum_spec.rb +++ b/spec/concurrent/atomic/atomic_fixnum_spec.rb @@ -170,7 +170,7 @@ module Concurrent end end - if TestHelpers.jruby? + if Concurrent.on_jruby? describe JavaAtomicFixnum do it_should_behave_like :atomic_fixnum @@ -184,7 +184,7 @@ module Concurrent end end - if TestHelpers.jruby? + if Concurrent.on_jruby? it 'inherits from JavaAtomicFixnum' do expect(AtomicFixnum.ancestors).to include(JavaAtomicFixnum) end diff --git a/spec/concurrent/atomic/count_down_latch_spec.rb b/spec/concurrent/atomic/count_down_latch_spec.rb index e6967e33d..e0b3a966c 100644 --- a/spec/concurrent/atomic/count_down_latch_spec.rb +++ b/spec/concurrent/atomic/count_down_latch_spec.rb @@ -132,7 +132,7 @@ def subject.simulate_spurious_wake_up end end - if TestHelpers.jruby? + if Concurrent.on_jruby? describe JavaCountDownLatch do @@ -141,7 +141,7 @@ def subject.simulate_spurious_wake_up end describe CountDownLatch do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaCountDownLatch' do expect(CountDownLatch.ancestors).to include(JavaCountDownLatch) end diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index af105d9f2..8330df0cb 100644 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -148,14 +148,14 @@ def subject.simulate_spurious_wake_up end end - if TestHelpers.jruby? + if Concurrent.on_jruby? describe JavaSemaphore do it_should_behave_like :semaphore end end describe Semaphore do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaSemaphore' do expect(Semaphore.ancestors).to include(JavaSemaphore) end diff --git a/spec/concurrent/atomic/thread_local_var_spec.rb b/spec/concurrent/atomic/thread_local_var_spec.rb index f7f042689..e769fabaa 100644 --- a/spec/concurrent/atomic/thread_local_var_spec.rb +++ b/spec/concurrent/atomic/thread_local_var_spec.rb @@ -28,7 +28,7 @@ module Concurrent expect(t2.value).to eq 14 end - if jruby? + if Concurrent.on_jruby? it 'uses ThreadLocalJavaStorage' do expect(subject.class.ancestors).to include(Concurrent::AbstractThreadLocalVar::ThreadLocalJavaStorage) end @@ -39,7 +39,7 @@ module Concurrent end end - unless jruby? + unless Concurrent.on_jruby? context 'GC' do it 'does not leave values behind when bind is used' do var = ThreadLocalVar.new(0) @@ -51,7 +51,7 @@ module Concurrent end it 'does not leave values behind when bind is not used' do - skip 'GC.run works reliably only on MRI' unless mri? # TODO + skip 'GC.run works reliably only on MRI' unless Concurrent.on_cruby? # TODO result = 7.times.any? do |i| var = ThreadLocalVar.new(0) diff --git a/spec/concurrent/atomic_spec.rb b/spec/concurrent/atomic_spec.rb index 513e653c1..cb23c0106 100644 --- a/spec/concurrent/atomic_spec.rb +++ b/spec/concurrent/atomic_spec.rb @@ -150,15 +150,15 @@ module Concurrent end describe Atomic do - if TestHelpers.jruby? + if Concurrent.on_jruby? it 'inherits from JavaAtomic' do expect(Atomic.ancestors).to include(Concurrent::JavaAtomic) end - elsif TestHelpers.use_c_extensions? + elsif Concurrent.allow_c_extensions? it 'inherits from CAtomic' do expect(Atomic.ancestors).to include(Concurrent::CAtomic) end - elsif TestHelpers.rbx? + elsif Concurrent.on_rbx? it 'inherits from RbxAtomic' do expect(Atomic.ancestors).to include(Concurrent::RbxAtomic) end diff --git a/spec/concurrent/collection/priority_queue_spec.rb b/spec/concurrent/collection/priority_queue_spec.rb index e3bf8e499..ae5b16256 100644 --- a/spec/concurrent/collection/priority_queue_spec.rb +++ b/spec/concurrent/collection/priority_queue_spec.rb @@ -293,7 +293,7 @@ module Concurrent it_should_behave_like :priority_queue end - if TestHelpers.jruby? + if Concurrent.on_jruby? describe JavaPriorityQueue do @@ -302,7 +302,7 @@ module Concurrent end describe PriorityQueue do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaPriorityQueue' do expect(PriorityQueue.ancestors).to include(JavaPriorityQueue) end diff --git a/spec/concurrent/executor/java_cached_thread_pool_spec.rb b/spec/concurrent/executor/java_cached_thread_pool_spec.rb index 08523cf4d..041385a82 100644 --- a/spec/concurrent/executor/java_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_cached_thread_pool_spec.rb @@ -1,4 +1,4 @@ -if Concurrent::TestHelpers.jruby? +if Concurrent.on_jruby? require_relative 'cached_thread_pool_shared' diff --git a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb index 12f585a31..efb11f07c 100644 --- a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb @@ -1,4 +1,4 @@ -if Concurrent::TestHelpers.jruby? +if Concurrent.on_jruby? require_relative 'fixed_thread_pool_shared' diff --git a/spec/concurrent/executor/java_single_thread_executor_spec.rb b/spec/concurrent/executor/java_single_thread_executor_spec.rb index c88a021fe..41e8404e5 100644 --- a/spec/concurrent/executor/java_single_thread_executor_spec.rb +++ b/spec/concurrent/executor/java_single_thread_executor_spec.rb @@ -1,4 +1,4 @@ -if Concurrent::TestHelpers.jruby? +if Concurrent.on_jruby? require_relative 'executor_service_shared' diff --git a/spec/concurrent/executor/java_thread_pool_executor_spec.rb b/spec/concurrent/executor/java_thread_pool_executor_spec.rb index 09209416b..c1c207039 100644 --- a/spec/concurrent/executor/java_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/java_thread_pool_executor_spec.rb @@ -1,4 +1,4 @@ -if Concurrent::TestHelpers.jruby? +if Concurrent.on_jruby? require_relative 'thread_pool_executor_shared' diff --git a/spec/concurrent/executor/thread_pool_class_cast_spec.rb b/spec/concurrent/executor/thread_pool_class_cast_spec.rb index ad8575d89..18a676175 100644 --- a/spec/concurrent/executor/thread_pool_class_cast_spec.rb +++ b/spec/concurrent/executor/thread_pool_class_cast_spec.rb @@ -1,7 +1,7 @@ module Concurrent describe SingleThreadExecutor do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaSingleThreadExecutor' do expect(SingleThreadExecutor.ancestors).to include(JavaSingleThreadExecutor) end @@ -13,7 +13,7 @@ module Concurrent end describe ThreadPoolExecutor do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaThreadPoolExecutor' do expect(ThreadPoolExecutor.ancestors).to include(JavaThreadPoolExecutor) end @@ -25,7 +25,7 @@ module Concurrent end describe CachedThreadPool do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaCachedThreadPool' do expect(CachedThreadPool.ancestors).to include(JavaCachedThreadPool) end @@ -37,7 +37,7 @@ module Concurrent end describe FixedThreadPool do - if jruby? + if Concurrent.on_jruby? it 'inherits from JavaFixedThreadPool' do expect(FixedThreadPool.ancestors).to include(JavaFixedThreadPool) end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index f97a0d7ef..fc90bff78 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -1,5 +1,5 @@ require 'rbconfig' -require_relative '../../lib/extension_helper.rb' +require 'concurrent/extension_helper.rb' module Concurrent module TestHelpers @@ -13,17 +13,7 @@ def delta(v1, v2) return (v1 - v2).abs end - def mri? - RUBY_ENGINE == 'ruby' - end - - def jruby? - RUBY_ENGINE == 'jruby' - end - - def rbx? - RUBY_ENGINE == 'rbx' - end + include EngineDetector def use_c_extensions? Concurrent.allow_c_extensions? # from extension_helper.rb From 26258637de4cdf9f332e65d842a2c2f5c762c1a5 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 7 Apr 2015 12:34:04 +0200 Subject: [PATCH 02/16] Add synchronization layer --- lib/concurrent.rb | 1 + lib/concurrent/actor.rb | 2 +- lib/concurrent/actor/core.rb | 4 +- lib/concurrent/atomic/synchronization.rb | 51 ------ lib/concurrent/atomics.rb | 1 - .../executor/serialized_execution.rb | 6 +- lib/concurrent/synchronized_object.rb | 159 ++++++++++++++++++ 7 files changed, 166 insertions(+), 58 deletions(-) delete mode 100644 lib/concurrent/atomic/synchronization.rb create mode 100644 lib/concurrent/synchronized_object.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 48af73a6b..51a690d8f 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -9,6 +9,7 @@ require 'concurrent/errors' require 'concurrent/executors' require 'concurrent/utilities' +require 'concurrent/synchronized_object' require 'concurrent/atomic' require 'concurrent/agent' diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index 6ff250456..7eb4fb6ed 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -3,7 +3,7 @@ require 'concurrent/executor/serialized_execution' require 'concurrent/ivar' require 'concurrent/logging' -require 'concurrent/atomic/synchronization' +require 'concurrent/synchronized_object' module Concurrent # TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ? diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 49718b7c5..aafc0cf23 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -9,10 +9,9 @@ module Actor # @note devel: core should not block on anything, e.g. it cannot wait on # children to terminate that would eat up all threads in task pool and # deadlock - class Core + class Core < SynchronizedObject include TypeCheck include Concurrent::Logging - include Synchronization # @!attribute [r] reference # @return [Reference] reference to this actor which can be safely passed around @@ -48,6 +47,7 @@ class Core # any logging system # @param [Proc] block for class instantiation def initialize(opts = {}, &block) + super(&nil) synchronize do @mailbox = Array.new @serialized_execution = SerializedExecution.new diff --git a/lib/concurrent/atomic/synchronization.rb b/lib/concurrent/atomic/synchronization.rb deleted file mode 100644 index c236ea41c..000000000 --- a/lib/concurrent/atomic/synchronization.rb +++ /dev/null @@ -1,51 +0,0 @@ -module Concurrent - - # Safe synchronization under JRuby, prevents reading uninitialized @mutex variable. - # @note synchronized needs to be called in #initialize for this module to work properly - # @example usage - # class AClass - # include Synchronized - # - # def initialize - # synchronize do - # # body of the constructor ... - # end - # end - # - # def a_method - # synchronize do - # # body of a_method ... - # end - # end - # end - module Synchronization - - engine = defined?(RUBY_ENGINE) && RUBY_ENGINE - - case engine - when 'jruby' - require 'jruby' - - def synchronize - JRuby.reference0(self).synchronized { yield } - end - - when 'rbx' - - def synchronize - Rubinius.lock(self) - yield - ensure - Rubinius.unlock(self) - end - - else - - def synchronize - @mutex ||= Mutex.new - @mutex.synchronize { yield } - end - - end - end -end diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 1c812ac00..1b2af04a0 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -8,6 +8,5 @@ require 'concurrent/atomic/count_down_latch' require 'concurrent/atomic/event' require 'concurrent/atomic/read_write_lock' -require 'concurrent/atomic/synchronization' require 'concurrent/atomic/semaphore' require 'concurrent/atomic/thread_local_var' diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index aee2b8d46..1f3825f88 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -1,14 +1,13 @@ require 'delegate' require 'concurrent/executor/executor' require 'concurrent/logging' -require 'concurrent/atomic/synchronization' +require 'concurrent/synchronized_object' module Concurrent # Ensures passed jobs in a serialized order never running at the same time. - class SerializedExecution + class SerializedExecution < SynchronizedObject include Logging - include Synchronization Job = Struct.new(:executor, :args, :block) do def call @@ -17,6 +16,7 @@ def call end def initialize + super(&nil) synchronize do @being_executed = false @stash = [] diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb new file mode 100644 index 000000000..3d12b9264 --- /dev/null +++ b/lib/concurrent/synchronized_object.rb @@ -0,0 +1,159 @@ +module Concurrent + + # Safe synchronization under any Ruby implementation + # Provides a single layer which can improve its implementation over time without changes needed to + # the classes using it. Use {SynchronizedObject} not this abstract class. + # @example + # class AnClass < SynchronizedObject + # def initialize + # super + # synchronize { @value = 'asd' } + # end + # + # def value + # synchronize { @value } + # end + # end + class AbstractSynchronizedObject + + # @abstract for helper ivar initialization if needed, + # otherwise it can be left empty. + def initialize + raise NotImplementedError + end + + # @yield runs the block synchronized against this object, + # equvivalent of java's `synchronize(this) {}` + def synchronize + raise NotImplementedError + end + + # wait until another thread calls #signal or #broadcast, + # spurious wake-ups can happen. + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + def wait(timeout = nil) + synchronize { ns_wait(timeout) } + end + + # Wait until condition is met or timeout passes, + # protects against spurious wake-ups. + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + # @yield condition to be met + # @yieldreturn [true, false] + def wait_until(timeout = nil, &condition) + synchronize { ns_wait_until(timeout, &condition) } + end + + # signal one waiting thread + def signal + synchronize { ns_signal } + end + + # broadcast to all waiting threads + def broadcast + synchronize { ns_broadcast } + end + + private + + # @yield condition + def ns_wait_until(timeout, &condition) + if timeout + wait_until = Concurrent.monotonic_time + timeout + ns_wait timeout + while (now = Concurrent.monotonic_time) < wait_until + ns_wait wait_until - now + end + condition.call + else + ns_wait timeout until condition.call + true + end + end + + def ns_wait(timeout) + raise NotImplementedError + end + + def ns_signal + raise NotImplementedError + end + + def ns_broadcast + raise NotImplementedError + end + + end + + begin + require 'jruby' + + # roughly more than 2x faster + class JavaSynchronizedObject < AbstractSynchronizedObject + def initialize + end + + def synchronize + JRuby.reference0(self).synchronized { yield } + end + + private + + def ns_wait(timeout) + if timeout + JRuby.reference0(self).wait(timeout * 1000) + else + JRuby.reference0(self).wait + end + end + + def ns_broadcast + JRuby.reference0(self).notifyAll + end + + def ns_signal + JRuby.reference0(self).notify + end + end + rescue LoadError + # ignore + end + + class RubySynchronizedObject < AbstractSynchronizedObject + def initialize + @__mutex__do_not_use_directly = Mutex.new + @__condition__do_not_use_directly = ConditionVariable.new + end + + def synchronize + if @__mutex__do_not_use_directly.owned? + yield + else + @__mutex__do_not_use_directly.synchronize { yield } + end + end + + private + + def ns_signal + @__condition__do_not_use_directly.signal + end + + def ns_broadcast + @__condition__do_not_use_directly.broadcast + end + + def ns_wait(timeout) + @__condition__do_not_use_directly.wait @__mutex__do_not_use_directly, timeout + end + end + + # TODO add rbx implementation + SynchronizedObject = Class.new case + when Concurrent.on_jruby? + JavaSynchronizedObject + else + RubySynchronizedObject + end + +end From 93a4fc27a430db4b8fa10ff92164b04c80e1df5c Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 7 Apr 2015 12:44:38 +0200 Subject: [PATCH 03/16] Migrate Event to SynchronizedObject --- lib/concurrent/atomic/event.rb | 75 ++++++++++++---------------- spec/concurrent/atomic/event_spec.rb | 6 +-- 2 files changed, 35 insertions(+), 46 deletions(-) diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index b5393a73d..13719244d 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -1,5 +1,5 @@ require 'thread' -require 'concurrent/atomic/condition' +require 'concurrent/synchronized_object' module Concurrent @@ -13,24 +13,23 @@ module Concurrent # `#reset` at any time once it has been set. # # @see http://msdn.microsoft.com/en-us/library/windows/desktop/ms682655.aspx - class Event + class Event < SynchronizedObject # Creates a new `Event` in the unset state. Threads calling `#wait` on the # `Event` will block. def initialize - @set = false - @mutex = Mutex.new - @condition = Condition.new + super + synchronize do + @set = false + @iteration = 0 + end end # Is the object in the set state? # # @return [Boolean] indicating whether or not the `Event` has been set def set? - @mutex.lock - @set - ensure - @mutex.unlock + synchronize { @set } end # Trigger the event, setting the state to `set` and releasing all threads @@ -38,29 +37,11 @@ def set? # # @return [Boolean] should always return `true` def set - @mutex.lock - unless @set - @set = true - @condition.broadcast - end - true - ensure - @mutex.unlock + synchronize { ns_set } end def try? - @mutex.lock - - if @set - false - else - @set = true - @condition.broadcast - true - end - - ensure - @mutex.unlock + synchronize { @set ? false : ns_set } end # Reset a previously set event back to the `unset` state. @@ -68,11 +49,13 @@ def try? # # @return [Boolean] should always return `true` def reset - @mutex.lock - @set = false - true - ensure - @mutex.unlock + synchronize do + if @set + @set = false + @iteration +=1 + end + true + end end # Wait a given number of seconds for the `Event` to be set by another @@ -81,18 +64,24 @@ def reset # # @return [Boolean] true if the `Event` was set before timeout else false def wait(timeout = nil) - @mutex.lock - - unless @set - remaining = Condition::Result.new(timeout) - while !@set && remaining.can_wait? - remaining = @condition.wait(@mutex, remaining.remaining_time) + synchronize do + unless @set + iteration = @iteration + ns_wait_until(timeout) { iteration != @iteration || @set } + else + true end end + end - @set - ensure - @mutex.unlock + private + + def ns_set + unless @set + @set = true + ns_broadcast + end + true end end end diff --git a/spec/concurrent/atomic/event_spec.rb b/spec/concurrent/atomic/event_spec.rb index 4a7a102c0..7b6142f6b 100644 --- a/spec/concurrent/atomic/event_spec.rb +++ b/spec/concurrent/atomic/event_spec.rb @@ -147,9 +147,9 @@ module Concurrent before(:each) do def subject.simulate_spurious_wake_up - @mutex.synchronize do - @condition.signal - @condition.broadcast + synchronize do + ns_signal + ns_broadcast end end end From d4a21ea5672c2129f8c94e6a777d4a5776365c37 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 9 Apr 2015 16:01:23 +0200 Subject: [PATCH 04/16] Fix ns_wait_until check the condition at the begging too --- lib/concurrent/synchronized_object.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 3d12b9264..8cbaacc7f 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -60,11 +60,12 @@ def broadcast def ns_wait_until(timeout, &condition) if timeout wait_until = Concurrent.monotonic_time + timeout - ns_wait timeout - while (now = Concurrent.monotonic_time) < wait_until + while true + now = Concurrent.monotonic_time + condition_result = condition.call + return condition_result if now >= wait_until || condition_result ns_wait wait_until - now end - condition.call else ns_wait timeout until condition.call true From 2e2a464cdf8ce10014d57a74732a21e7c80d87ea Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 9 Apr 2015 20:10:19 +0200 Subject: [PATCH 05/16] Support 1.9.3 Ruby --- lib/concurrent/synchronized_object.rb | 32 ++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 8cbaacc7f..a24a6d704 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -61,7 +61,7 @@ def ns_wait_until(timeout, &condition) if timeout wait_until = Concurrent.monotonic_time + timeout while true - now = Concurrent.monotonic_time + now = Concurrent.monotonic_time condition_result = condition.call return condition_result if now >= wait_until || condition_result ns_wait wait_until - now @@ -122,15 +122,15 @@ def ns_signal class RubySynchronizedObject < AbstractSynchronizedObject def initialize - @__mutex__do_not_use_directly = Mutex.new + @__lock__do_not_use_directly = Mutex.new @__condition__do_not_use_directly = ConditionVariable.new end def synchronize - if @__mutex__do_not_use_directly.owned? + if @__lock__do_not_use_directly.owned? yield else - @__mutex__do_not_use_directly.synchronize { yield } + @__lock__do_not_use_directly.synchronize { yield } end end @@ -145,7 +145,24 @@ def ns_broadcast end def ns_wait(timeout) - @__condition__do_not_use_directly.wait @__mutex__do_not_use_directly, timeout + @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout + end + end + + class Ruby19SynchronizedObject < RubySynchronizedObject + def initialize + @__lock__do_not_use_directly = Monitor.new + @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond + end + + def synchronize + @__lock__do_not_use_directly.synchronize { yield } + end + + private + + def ns_wait(timeout) + @__condition__do_not_use_directly.wait timeout end end @@ -153,8 +170,11 @@ def ns_wait(timeout) SynchronizedObject = Class.new case when Concurrent.on_jruby? JavaSynchronizedObject + when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 + Ruby19SynchronizedObject + when Concurrent.on_cruby? + RubySynchronizedObject else RubySynchronizedObject end - end From 41daaf655d95c5f5d0d13210ca170ad36d098c23 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 10 Apr 2015 10:08:11 +0200 Subject: [PATCH 06/16] Fix rounding error --- lib/concurrent/atomic/event.rb | 2 +- lib/concurrent/synchronized_object.rb | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index 13719244d..3f53c8f5b 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -67,7 +67,7 @@ def wait(timeout = nil) synchronize do unless @set iteration = @iteration - ns_wait_until(timeout) { iteration != @iteration || @set } + ns_wait_until(timeout) { iteration < @iteration || @set } else true end diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index a24a6d704..93fcb66a5 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -28,6 +28,8 @@ def synchronize raise NotImplementedError end + private + # wait until another thread calls #signal or #broadcast, # spurious wake-ups can happen. # @param [Numeric, nil] timeout in seconds, `nil` means no timeout @@ -54,8 +56,6 @@ def broadcast synchronize { ns_broadcast } end - private - # @yield condition def ns_wait_until(timeout, &condition) if timeout @@ -63,7 +63,9 @@ def ns_wait_until(timeout, &condition) while true now = Concurrent.monotonic_time condition_result = condition.call - return condition_result if now >= wait_until || condition_result + # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 + # when passed to java #wait(long timeout) + return condition_result if (now + 0.001) >= wait_until || condition_result ns_wait wait_until - now end else @@ -123,7 +125,7 @@ def ns_signal class RubySynchronizedObject < AbstractSynchronizedObject def initialize @__lock__do_not_use_directly = Mutex.new - @__condition__do_not_use_directly = ConditionVariable.new + @__condition__do_not_use_directly = ::ConditionVariable.new end def synchronize From 07ee51f14253289b59fa27d92afaee8bd90c2db3 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 10 Apr 2015 12:55:27 +0200 Subject: [PATCH 07/16] Add basic rbx support --- lib/concurrent/synchronized_object.rb | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 93fcb66a5..a64eb7afa 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -122,7 +122,7 @@ def ns_signal # ignore end - class RubySynchronizedObject < AbstractSynchronizedObject + class MutexSynchronizedObject < AbstractSynchronizedObject def initialize @__lock__do_not_use_directly = Mutex.new @__condition__do_not_use_directly = ::ConditionVariable.new @@ -151,7 +151,7 @@ def ns_wait(timeout) end end - class Ruby19SynchronizedObject < RubySynchronizedObject + class MonitorSynchronizedObject < MutexSynchronizedObject def initialize @__lock__do_not_use_directly = Monitor.new @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond @@ -173,10 +173,13 @@ def ns_wait(timeout) when Concurrent.on_jruby? JavaSynchronizedObject when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 - Ruby19SynchronizedObject + MonitorSynchronizedObject when Concurrent.on_cruby? - RubySynchronizedObject + MutexSynchronizedObject + when Concurrent.on_rbx? + # TODO better implementation + MonitorSynchronizedObject else - RubySynchronizedObject + MutexSynchronizedObject end end From 06fc7777377f0a67005cd71feb6703c46585bc89 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 12 Apr 2015 12:03:16 +0200 Subject: [PATCH 08/16] Clarify return values --- lib/concurrent/synchronized_object.rb | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index a64eb7afa..5a5858c4d 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -33,8 +33,10 @@ def synchronize # wait until another thread calls #signal or #broadcast, # spurious wake-ups can happen. # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + # @return [self] def wait(timeout = nil) synchronize { ns_wait(timeout) } + self end # Wait until condition is met or timeout passes, @@ -42,18 +44,23 @@ def wait(timeout = nil) # @param [Numeric, nil] timeout in seconds, `nil` means no timeout # @yield condition to be met # @yieldreturn [true, false] + # @return [true, false] def wait_until(timeout = nil, &condition) synchronize { ns_wait_until(timeout, &condition) } end # signal one waiting thread + # @return [self] def signal synchronize { ns_signal } + self end # broadcast to all waiting threads + # @return [self] def broadcast synchronize { ns_broadcast } + self end # @yield condition @@ -74,21 +81,24 @@ def ns_wait_until(timeout, &condition) end end + # @return [self] def ns_wait(timeout) raise NotImplementedError end + # @return [self] def ns_signal raise NotImplementedError end + # @return [self] def ns_broadcast raise NotImplementedError end end - begin + if Concurrent.on_jruby? require 'jruby' # roughly more than 2x faster @@ -108,18 +118,19 @@ def ns_wait(timeout) else JRuby.reference0(self).wait end + self end def ns_broadcast JRuby.reference0(self).notifyAll + self end def ns_signal JRuby.reference0(self).notify + self end end - rescue LoadError - # ignore end class MutexSynchronizedObject < AbstractSynchronizedObject @@ -140,14 +151,17 @@ def synchronize def ns_signal @__condition__do_not_use_directly.signal + self end def ns_broadcast @__condition__do_not_use_directly.broadcast + self end def ns_wait(timeout) @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout + self end end From 52fdb3e4b111f5d69d19aef29006b815e4cceaac Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 12 Apr 2015 12:03:36 +0200 Subject: [PATCH 09/16] Add better Rbx support --- lib/concurrent/synchronized_object.rb | 72 +++++++++++++++++++++------ 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 5a5858c4d..66ca6851f 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -179,21 +179,65 @@ def synchronize def ns_wait(timeout) @__condition__do_not_use_directly.wait timeout + self + end + end + + if Concurrent.on_rbx? + class RbxSynchronizedObject < AbstractSynchronizedObject + def initialize + @waiters = [] + end + + def synchronize(&block) + Rubinius.synchronize(self, &block) + end + + private + + def ns_wait(timeout = nil) + wchan = Rubinius::Channel.new + + begin + @waiters.push wchan + Rubinius.unlock(self) + signaled = wchan.receive_timeout timeout + ensure + Rubinius.lock(self) + + if !signaled && !@waiters.delete(wchan) + # we timed out, but got signaled afterwards, + # so pass that signal on to the next waiter + @waiters.shift << true unless @waiters.empty? + end + end + + self + end + + def ns_signal + @waiters.shift << true unless @waiters.empty? + self + end + + def ns_broadcast + @waiters.shift << true until @waiters.empty? + self + end end end - # TODO add rbx implementation - SynchronizedObject = Class.new case - when Concurrent.on_jruby? - JavaSynchronizedObject - when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 - MonitorSynchronizedObject - when Concurrent.on_cruby? - MutexSynchronizedObject - when Concurrent.on_rbx? - # TODO better implementation - MonitorSynchronizedObject - else - MutexSynchronizedObject - end + class SynchronizedObject < case + when Concurrent.on_jruby? + JavaSynchronizedObject + when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 + MonitorSynchronizedObject + when Concurrent.on_cruby? + MutexSynchronizedObject + when Concurrent.on_rbx? + RbxSynchronizedObject + else + MutexSynchronizedObject + end + end end From 3a8aedc1b904e749713d5302b540c0d92ee38085 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 12 Apr 2015 14:56:33 +0200 Subject: [PATCH 10/16] Migrate CountDownLatch to SynchronizedObject --- lib/concurrent/atomic/count_down_latch.rb | 27 +++++++------------ .../atomic/count_down_latch_spec.rb | 10 +++---- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/lib/concurrent/atomic/count_down_latch.rb b/lib/concurrent/atomic/count_down_latch.rb index f7ad0f650..ea86607d0 100644 --- a/lib/concurrent/atomic/count_down_latch.rb +++ b/lib/concurrent/atomic/count_down_latch.rb @@ -1,4 +1,4 @@ -require 'concurrent/atomic/condition' +require 'concurrent/synchronized_object' module Concurrent @@ -11,7 +11,7 @@ module Concurrent # method. Each of the other threads calls `#count_down` when done with its work. # When the latch counter reaches zero the waiting thread is unblocked and continues # with its work. A `CountDownLatch` can be used only once. Its value cannot be reset. - class MutexCountDownLatch + class PureCountDownLatch < SynchronizedObject # @!macro [attach] count_down_latch_method_initialize # @@ -21,12 +21,11 @@ class MutexCountDownLatch # # @raise [ArgumentError] if `count` is not an integer or is less than zero def initialize(count = 1) + super() unless count.is_a?(Fixnum) && count >= 0 raise ArgumentError.new('count must be in integer greater than or equal zero') end - @mutex = Mutex.new - @condition = Condition.new - @count = count + synchronize { @count = count } end # @!macro [attach] count_down_latch_method_wait @@ -37,15 +36,7 @@ def initialize(count = 1) # to block indefinitely # @return [Boolean] `true` if the `count` reaches zero else false on `timeout` def wait(timeout = nil) - @mutex.synchronize do - - remaining = Condition::Result.new(timeout) - while @count > 0 && remaining.can_wait? - remaining = @condition.wait(@mutex, remaining.remaining_time) - end - - @count == 0 - end + synchronize { ns_wait_until(timeout) { @count == 0 } } end # @!macro [attach] count_down_latch_method_count_down @@ -53,9 +44,9 @@ def wait(timeout = nil) # Signal the latch to decrement the counter. Will signal all blocked threads when # the `count` reaches zero. def count_down - @mutex.synchronize do + synchronize do @count -= 1 if @count > 0 - @condition.broadcast if @count == 0 + ns_broadcast if @count == 0 end end @@ -65,7 +56,7 @@ def count_down # # @return [Fixnum] the current value of the counter def count - @mutex.synchronize { @count } + synchronize { @count } end end @@ -110,7 +101,7 @@ class CountDownLatch < JavaCountDownLatch else # @!macro count_down_latch - class CountDownLatch < MutexCountDownLatch + class CountDownLatch < PureCountDownLatch end end end diff --git a/spec/concurrent/atomic/count_down_latch_spec.rb b/spec/concurrent/atomic/count_down_latch_spec.rb index e0b3a966c..71b8d12c2 100644 --- a/spec/concurrent/atomic/count_down_latch_spec.rb +++ b/spec/concurrent/atomic/count_down_latch_spec.rb @@ -88,7 +88,7 @@ module Concurrent - describe MutexCountDownLatch do + describe PureCountDownLatch do it_should_behave_like :count_down_latch @@ -98,9 +98,9 @@ module Concurrent before(:each) do def subject.simulate_spurious_wake_up - @mutex.synchronize do - @condition.signal - @condition.broadcast + synchronize do + signal + broadcast end end end @@ -147,7 +147,7 @@ def subject.simulate_spurious_wake_up end else it 'inherits from MutexCountDownLatch' do - expect(CountDownLatch.ancestors).to include(MutexCountDownLatch) + expect(CountDownLatch.ancestors).to include(PureCountDownLatch) end end end From 662ba4a772104df6c4d1c9e7d7f64720e26a3781 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 12 Apr 2015 16:27:51 +0200 Subject: [PATCH 11/16] Migrate CyclicBarrier to SynchronizedObject --- lib/concurrent/atomic/cyclic_barrier.rb | 51 ++++++++----------- spec/concurrent/atomic/cyclic_barrier_spec.rb | 6 +-- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/lib/concurrent/atomic/cyclic_barrier.rb b/lib/concurrent/atomic/cyclic_barrier.rb index 73a431ad0..cceacf130 100644 --- a/lib/concurrent/atomic/cyclic_barrier.rb +++ b/lib/concurrent/atomic/cyclic_barrier.rb @@ -1,6 +1,8 @@ +require 'concurrent/synchronized_object' + module Concurrent - class CyclicBarrier + class CyclicBarrier < SynchronizedObject Generation = Struct.new(:status) private_constant :Generation @@ -13,23 +15,26 @@ class CyclicBarrier # # @raise [ArgumentError] if `parties` is not an integer or is less than zero def initialize(parties, &block) - raise ArgumentError.new('count must be in integer greater than or equal zero') if !parties.is_a?(Fixnum) || parties < 1 - @parties = parties - @mutex = Mutex.new - @condition = Condition.new - @number_waiting = 0 - @action = block - @generation = Generation.new(:waiting) + super(&nil) + if !parties.is_a?(Fixnum) || parties < 1 + raise ArgumentError.new('count must be in integer greater than or equal zero') + end + synchronize do + @parties = parties + @number_waiting = 0 + @action = block + @generation = Generation.new(:waiting) + end end # @return [Fixnum] the number of threads needed to pass the barrier def parties - @parties + synchronize { @parties } end # @return [Fixnum] the number of threads currently waiting on the barrier def number_waiting - @number_waiting + synchronize { @number_waiting } end # Blocks on the barrier until the number of waiting threads is equal to @@ -41,7 +46,7 @@ def number_waiting # @return [Boolean] `true` if the `count` reaches zero else false on # `timeout` or on `reset` or if the barrier is broken def wait(timeout = nil) - @mutex.synchronize do + synchronize do return false unless @generation.status == :waiting @@ -58,7 +63,6 @@ def wait(timeout = nil) end - # resets the barrier to its initial state # If there is at least one waiting thread, it will be woken up, the `wait` # method will return false and the barrier will be broken @@ -66,9 +70,7 @@ def wait(timeout = nil) # # @return [nil] def reset - @mutex.synchronize do - set_status_and_restore(:reset) - end + synchronize { set_status_and_restore(:reset) } end # A barrier can be broken when: @@ -78,35 +80,26 @@ def reset # A broken barrier can be restored using `reset` it's safer to create a new one # @return [Boolean] true if the barrier is broken otherwise false def broken? - @mutex.synchronize { @generation.status != :waiting } + synchronize { @generation.status != :waiting } end private def set_status_and_restore(new_status) @generation.status = new_status - @condition.broadcast - @generation = Generation.new(:waiting) + ns_broadcast + @generation = Generation.new(:waiting) @number_waiting = 0 end def wait_for_wake_up(generation, timeout) - if wait_while_waiting(generation, timeout) + if ns_wait_until(timeout) { generation.status != :waiting } generation.status == :fulfilled else generation.status = :broken - @condition.broadcast + ns_broadcast false end end - - def wait_while_waiting(generation, timeout) - remaining = Condition::Result.new(timeout) - while generation.status == :waiting && remaining.can_wait? - remaining = @condition.wait(@mutex, remaining.remaining_time) - end - remaining.woken_up? - end - end end diff --git a/spec/concurrent/atomic/cyclic_barrier_spec.rb b/spec/concurrent/atomic/cyclic_barrier_spec.rb index 453197ea9..cfe3d2d75 100644 --- a/spec/concurrent/atomic/cyclic_barrier_spec.rb +++ b/spec/concurrent/atomic/cyclic_barrier_spec.rb @@ -211,9 +211,9 @@ module Concurrent before(:each) do def barrier.simulate_spurious_wake_up - @mutex.synchronize do - @condition.signal - @condition.broadcast + synchronize do + ns_signal + ns_broadcast end end end From e78b4a5bbbd20354239fbcea039f34d879981c72 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 14 Apr 2015 14:47:36 +0200 Subject: [PATCH 12/16] Make Thread#status work --- lib/concurrent/synchronized_object.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 66ca6851f..62efc99ce 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -112,12 +112,8 @@ def synchronize private - def ns_wait(timeout) - if timeout - JRuby.reference0(self).wait(timeout * 1000) - else - JRuby.reference0(self).wait - end + def ns_wait(timeout = nil) + JRuby.reference0(Thread.current).wait_timeout(self, timeout) self end From 18d56fa7161876b194e1711b0c53fbf41b70567f Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 14 Apr 2015 14:57:24 +0200 Subject: [PATCH 13/16] Improve documentation and unify default parameters --- lib/concurrent/synchronized_object.rb | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 62efc99ce..989c5a65e 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -1,9 +1,17 @@ module Concurrent - # Safe synchronization under any Ruby implementation + # Safe synchronization under any Ruby implementation. + # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. # Provides a single layer which can improve its implementation over time without changes needed to # the classes using it. Use {SynchronizedObject} not this abstract class. - # @example + # + # @note this object does not support usage together with {Thread#wakeup} and {Thread#raise}. + # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `SynchronizedObject#wait` and + # `Thread#wakeup` will not work on all platforms. + # + # @see {Event} implementation as an example of this class use + # + # @example simple # class AnClass < SynchronizedObject # def initialize # super @@ -82,7 +90,7 @@ def ns_wait_until(timeout, &condition) end # @return [self] - def ns_wait(timeout) + def ns_wait(timeout = nil) raise NotImplementedError end @@ -155,7 +163,7 @@ def ns_broadcast self end - def ns_wait(timeout) + def ns_wait(timeout = nil) @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout self end @@ -173,7 +181,7 @@ def synchronize private - def ns_wait(timeout) + def ns_wait(timeout = nil) @__condition__do_not_use_directly.wait timeout self end From 524592adec2a7b55c1bcf3044ad09969a0631c43 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 18 Apr 2015 23:20:58 +0200 Subject: [PATCH 14/16] Add java implementation of JavaSynchronizedObject --- ext/ConcurrentRubyExtService.java | 1 + .../ext/JavaSynchronizedObjectLibrary.java | 100 ++++++++++++++++++ lib/concurrent.rb | 3 +- lib/concurrent/atomic/cyclic_barrier.rb | 37 +++---- lib/concurrent/extension_helper.rb | 3 + lib/concurrent/synchronized_object.rb | 13 ++- spec/concurrent/atomic/cyclic_barrier_spec.rb | 10 +- 7 files changed, 143 insertions(+), 24 deletions(-) create mode 100644 ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java diff --git a/ext/ConcurrentRubyExtService.java b/ext/ConcurrentRubyExtService.java index 8697c8b46..1b4e92ec1 100644 --- a/ext/ConcurrentRubyExtService.java +++ b/ext/ConcurrentRubyExtService.java @@ -9,6 +9,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException { new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false); new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false); new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false); + new com.concurrent_ruby.ext.JavaSynchronizedObjectLibrary().load(runtime, false); return true; } } diff --git a/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java b/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java new file mode 100644 index 000000000..d0c5c329f --- /dev/null +++ b/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java @@ -0,0 +1,100 @@ +package com.concurrent_ruby.ext; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jruby.Ruby; +import org.jruby.RubyClass; +import org.jruby.RubyModule; +import org.jruby.RubyObject; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ObjectAllocator; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.runtime.load.Library; +import org.jruby.runtime.Block; +import org.jruby.RubyBoolean; +import org.jruby.RubyNil; +import org.jruby.runtime.ThreadContext; + +public class JavaSynchronizedObjectLibrary implements Library { + + public void load(Ruby runtime, boolean wrap) throws IOException { + RubyModule concurrentMod = runtime.defineModule("Concurrent"); + RubyClass javaSynchronizedObjectClass = concurrentMod.defineClassUnder( + "JavaSynchronizedObject", + concurrentMod.getClass("AbstractSynchronizedObject"), + JRUBYREFERENCE_ALLOCATOR); + javaSynchronizedObjectClass.defineAnnotatedMethods(JavaSynchronizedObject.class); + } + + private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new JavaSynchronizedObject(runtime, klazz); + } + }; + + @JRubyClass(name = "JavaSynchronizedObject", parent = "AbstractSynchronizedObject") + public static class JavaSynchronizedObject extends RubyObject { + + public JavaSynchronizedObject(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public IRubyObject initialize(ThreadContext context) { + return context.nil; + } + + @JRubyMethod(name = "synchronize") + public IRubyObject rubySynchronize(ThreadContext context, Block block) { + synchronized (this) { + return block.yield(context, null); + } + } + + @JRubyMethod(name = "ns_wait", optional = 1) + public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) { + Ruby runtime = context.runtime; + if (args.length > 1) { + throw runtime.newArgumentError(args.length, 1); + } + Double timeout = null; + if (args.length > 0 && !args[0].isNil()) { + timeout = args[0].convertToFloat().getDoubleValue(); + if (timeout < 0) { + throw runtime.newArgumentError("time interval must be positive"); + } + } + if (Thread.interrupted()) { + throw runtime.newConcurrencyError("thread interrupted"); + } + boolean success = false; + try { + success = context.getThread().wait_timeout(this, timeout); + } catch (InterruptedException ie) { + throw runtime.newConcurrencyError(ie.getLocalizedMessage()); + } finally { + // An interrupt or timeout may have caused us to miss + // a notify that we consumed, so do another notify in + // case someone else is available to pick it up. + if (!success) { + this.notify(); + } + } + return this; + } + + @JRubyMethod(name = "ns_signal") + public IRubyObject nsSignal(ThreadContext context) { + notify(); + return this; + } + + @JRubyMethod(name = "ns_broadcast") + public IRubyObject nsBroadcast(ThreadContext context) { + notifyAll(); + return this; + } + } +} diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 51a690d8f..5a2a2a1aa 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -1,5 +1,7 @@ require 'concurrent/version' +require 'concurrent/synchronized_object' + require 'concurrent/configuration' require 'concurrent/actor' @@ -9,7 +11,6 @@ require 'concurrent/errors' require 'concurrent/executors' require 'concurrent/utilities' -require 'concurrent/synchronized_object' require 'concurrent/atomic' require 'concurrent/agent' diff --git a/lib/concurrent/atomic/cyclic_barrier.rb b/lib/concurrent/atomic/cyclic_barrier.rb index cceacf130..cde0d254b 100644 --- a/lib/concurrent/atomic/cyclic_barrier.rb +++ b/lib/concurrent/atomic/cyclic_barrier.rb @@ -20,10 +20,9 @@ def initialize(parties, &block) raise ArgumentError.new('count must be in integer greater than or equal zero') end synchronize do - @parties = parties - @number_waiting = 0 - @action = block - @generation = Generation.new(:waiting) + @parties = parties + @action = block + ns_next_generation end end @@ -54,10 +53,16 @@ def wait(timeout = nil) if @number_waiting == @parties @action.call if @action - set_status_and_restore(:fulfilled) + ns_generation_done @generation, :fulfilled true else - wait_for_wake_up(@generation, timeout) + generation = @generation + if ns_wait_until(timeout) { generation.status != :waiting } + generation.status == :fulfilled + else + ns_generation_done generation, :broken, false + false + end end end end @@ -70,7 +75,7 @@ def wait(timeout = nil) # # @return [nil] def reset - synchronize { set_status_and_restore(:reset) } + synchronize { ns_generation_done @generation, :reset } end # A barrier can be broken when: @@ -85,21 +90,17 @@ def broken? private - def set_status_and_restore(new_status) - @generation.status = new_status + def ns_generation_done(generation, status, continue = true) + generation.status = status + ns_next_generation if continue ns_broadcast + end + + def ns_next_generation @generation = Generation.new(:waiting) @number_waiting = 0 end - def wait_for_wake_up(generation, timeout) - if ns_wait_until(timeout) { generation.status != :waiting } - generation.status == :fulfilled - else - generation.status = :broken - ns_broadcast - false - end - end + end end diff --git a/lib/concurrent/extension_helper.rb b/lib/concurrent/extension_helper.rb index 23b79aac4..5849906b3 100644 --- a/lib/concurrent/extension_helper.rb +++ b/lib/concurrent/extension_helper.rb @@ -2,6 +2,9 @@ module Concurrent + class AbstractSynchronizedObject # FIXME has to be present before Java extensions are loaded + end + @@c_ext_loaded ||= false @@java_ext_loaded ||= false diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 989c5a65e..4dccbfa67 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -1,3 +1,5 @@ +require 'concurrent/utility/engine' + module Concurrent # Safe synchronization under any Ruby implementation. @@ -75,7 +77,7 @@ def broadcast def ns_wait_until(timeout, &condition) if timeout wait_until = Concurrent.monotonic_time + timeout - while true + loop do now = Concurrent.monotonic_time condition_result = condition.call # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 @@ -106,11 +108,12 @@ def ns_broadcast end + require 'concurrent/extension_helper' # FIXME weird order + if Concurrent.on_jruby? require 'jruby' - # roughly more than 2x faster - class JavaSynchronizedObject < AbstractSynchronizedObject + class JavaPureSynchronizedObject < AbstractSynchronizedObject def initialize end @@ -121,8 +124,10 @@ def synchronize private def ns_wait(timeout = nil) - JRuby.reference0(Thread.current).wait_timeout(self, timeout) + success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout) self + ensure + ns_signal unless success end def ns_broadcast diff --git a/spec/concurrent/atomic/cyclic_barrier_spec.rb b/spec/concurrent/atomic/cyclic_barrier_spec.rb index cfe3d2d75..0dec109bc 100644 --- a/spec/concurrent/atomic/cyclic_barrier_spec.rb +++ b/spec/concurrent/atomic/cyclic_barrier_spec.rb @@ -173,6 +173,15 @@ module Concurrent expect(barrier).to be_broken end + it 'breaks the barrier and release all other threads 2' do + t1 = Thread.new { barrier.wait(0.1) } + t2 = Thread.new { barrier.wait(0.1) } + + [t1, t2].each(&:join) + + expect(barrier).to be_broken + end + it 'does not execute the block on timeout' do counter = AtomicFixnum.new barrier = described_class.new(parties) { counter.increment } @@ -242,5 +251,4 @@ def barrier.simulate_spurious_wake_up end end end - end From 44b386320032ac87f2e1285dd34450485c68c01b Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 20 Apr 2015 14:29:04 +0200 Subject: [PATCH 15/16] Cleanup the load order in synchronized object --- Rakefile | 2 +- concurrent-ruby-ext.gemspec | 1 - .../ext/JavaSynchronizedObjectLibrary.java | 25 +- ext/concurrent/extconf.rb | 2 +- lib/concurrent/atomic.rb | 2 +- lib/concurrent/atomic/atomic_boolean.rb | 2 +- lib/concurrent/atomic/atomic_fixnum.rb | 2 +- lib/concurrent/atomic_reference/jruby.rb | 2 +- lib/concurrent/atomic_reference/ruby.rb | 2 +- lib/concurrent/extension_helper.rb | 37 --- lib/concurrent/native_extensions.rb | 2 + lib/concurrent/native_extensions/before.rb | 1 + lib/concurrent/native_extensions/load.rb | 46 ++++ lib/concurrent/synchronized_object.rb | 259 ++---------------- .../abstract.rb | 121 ++++++++ .../java_pure.rb | 36 +++ .../monitor.rb | 21 ++ .../mutex.rb | 35 +++ .../rbx.rb | 47 ++++ spec/support/example_group_extensions.rb | 2 +- 20 files changed, 351 insertions(+), 296 deletions(-) delete mode 100644 lib/concurrent/extension_helper.rb create mode 100644 lib/concurrent/native_extensions.rb create mode 100644 lib/concurrent/native_extensions/before.rb create mode 100644 lib/concurrent/native_extensions/load.rb create mode 100644 lib/concurrent/synchronized_object_implementations/abstract.rb create mode 100644 lib/concurrent/synchronized_object_implementations/java_pure.rb create mode 100644 lib/concurrent/synchronized_object_implementations/monitor.rb create mode 100644 lib/concurrent/synchronized_object_implementations/mutex.rb create mode 100644 lib/concurrent/synchronized_object_implementations/rbx.rb diff --git a/Rakefile b/Rakefile index 4e9d644c3..3d670f4df 100644 --- a/Rakefile +++ b/Rakefile @@ -1,6 +1,6 @@ #!/usr/bin/env rake -require 'concurrent/extension_helper' +require 'concurrent/native_extensions' ## load the two gemspec files CORE_GEMSPEC = Gem::Specification.load('concurrent-ruby.gemspec') diff --git a/concurrent-ruby-ext.gemspec b/concurrent-ruby-ext.gemspec index d9598f7a5..0cf8b46a6 100644 --- a/concurrent-ruby-ext.gemspec +++ b/concurrent-ruby-ext.gemspec @@ -20,7 +20,6 @@ Gem::Specification.new do |s| s.files = Dir['ext/**/*.{h,c,cpp}'] s.files += [ - 'lib/concurrent/extension_helper.rb', 'lib/concurrent/atomic_reference/concurrent_update_error.rb', 'lib/concurrent/atomic_reference/direct_update.rb', 'lib/concurrent/atomic_reference/numeric_cas_wrapper.rb', diff --git a/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java b/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java index d0c5c329f..0ad5ced50 100644 --- a/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java +++ b/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java @@ -20,24 +20,29 @@ public class JavaSynchronizedObjectLibrary implements Library { public void load(Ruby runtime, boolean wrap) throws IOException { - RubyModule concurrentMod = runtime.defineModule("Concurrent"); - RubyClass javaSynchronizedObjectClass = concurrentMod.defineClassUnder( - "JavaSynchronizedObject", - concurrentMod.getClass("AbstractSynchronizedObject"), - JRUBYREFERENCE_ALLOCATOR); - javaSynchronizedObjectClass.defineAnnotatedMethods(JavaSynchronizedObject.class); + RubyModule concurrentModule = runtime. + defineModule("Concurrent"). + defineModuleUnder("SynchronizedObjectImplementations"); + RubyClass parent = concurrentModule. + getClass("Abstract"); + if (parent == null) throw runtime.newRuntimeError("Concurrent::SynchronizedObject::Abstract is missing"); + + RubyClass synchronizedObjectJavaClass = concurrentModule. + defineClassUnder("Java", parent, JRUBYREFERENCE_ALLOCATOR); + + synchronizedObjectJavaClass.defineAnnotatedMethods(Java.class); } private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() { public IRubyObject allocate(Ruby runtime, RubyClass klazz) { - return new JavaSynchronizedObject(runtime, klazz); + return new Java(runtime, klazz); } }; - @JRubyClass(name = "JavaSynchronizedObject", parent = "AbstractSynchronizedObject") - public static class JavaSynchronizedObject extends RubyObject { + @JRubyClass(name = "Java", parent = "Abstract") + public static class Java extends RubyObject { - public JavaSynchronizedObject(Ruby runtime, RubyClass metaClass) { + public Java(Ruby runtime, RubyClass metaClass) { super(runtime, metaClass); } diff --git a/ext/concurrent/extconf.rb b/ext/concurrent/extconf.rb index 3d7ea5eda..dc74e10ce 100644 --- a/ext/concurrent/extconf.rb +++ b/ext/concurrent/extconf.rb @@ -1,6 +1,6 @@ require 'fileutils' -require 'concurrent/extension_helper' +require 'concurrent/native_extensions' EXTENSION_NAME = 'extension' diff --git a/lib/concurrent/atomic.rb b/lib/concurrent/atomic.rb index a40fb4e17..7cc0160a1 100644 --- a/lib/concurrent/atomic.rb +++ b/lib/concurrent/atomic.rb @@ -12,7 +12,7 @@ end ##################################################################### -require 'concurrent/extension_helper' +require 'concurrent/native_extensions' require 'concurrent/utility/engine' require 'concurrent/atomic_reference/concurrent_update_error' require 'concurrent/atomic_reference/mutex_atomic' diff --git a/lib/concurrent/atomic/atomic_boolean.rb b/lib/concurrent/atomic/atomic_boolean.rb index 915c08a16..059ca5f14 100644 --- a/lib/concurrent/atomic/atomic_boolean.rb +++ b/lib/concurrent/atomic/atomic_boolean.rb @@ -1,4 +1,4 @@ -require 'concurrent/extension_helper' +require 'concurrent/native_extensions' module Concurrent diff --git a/lib/concurrent/atomic/atomic_fixnum.rb b/lib/concurrent/atomic/atomic_fixnum.rb index b8807beb4..006a7d0e0 100644 --- a/lib/concurrent/atomic/atomic_fixnum.rb +++ b/lib/concurrent/atomic/atomic_fixnum.rb @@ -1,4 +1,4 @@ -require 'concurrent/extension_helper' +require 'concurrent/native_extensions' module Concurrent diff --git a/lib/concurrent/atomic_reference/jruby.rb b/lib/concurrent/atomic_reference/jruby.rb index 103b8cd8c..2c3b6f135 100644 --- a/lib/concurrent/atomic_reference/jruby.rb +++ b/lib/concurrent/atomic_reference/jruby.rb @@ -1,4 +1,4 @@ -require 'concurrent/extension_helper' +require 'concurrent/native_extensions' if defined?(Concurrent::JavaAtomic) require 'concurrent/atomic_reference/direct_update' diff --git a/lib/concurrent/atomic_reference/ruby.rb b/lib/concurrent/atomic_reference/ruby.rb index 7ae3f520b..6a3be3b0c 100644 --- a/lib/concurrent/atomic_reference/ruby.rb +++ b/lib/concurrent/atomic_reference/ruby.rb @@ -1,5 +1,5 @@ if defined? Concurrent::CAtomic - require 'concurrent/extension_helper' + require 'concurrent/native_extensions' require 'concurrent/atomic_reference/direct_update' require 'concurrent/atomic_reference/numeric_cas_wrapper' diff --git a/lib/concurrent/extension_helper.rb b/lib/concurrent/extension_helper.rb deleted file mode 100644 index 5849906b3..000000000 --- a/lib/concurrent/extension_helper.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'concurrent/utility/engine' - -module Concurrent - - class AbstractSynchronizedObject # FIXME has to be present before Java extensions are loaded - end - - @@c_ext_loaded ||= false - @@java_ext_loaded ||= false - - # @!visibility private - def self.allow_c_extensions? - on_cruby? - end - - if allow_c_extensions? && !@@c_ext_loaded - begin - require 'concurrent/extension' - @@c_ext_loaded = true - rescue LoadError - # may be a Windows cross-compiled native gem - begin - require "concurrent/#{RUBY_VERSION[0..2]}/extension" - @@c_ext_loaded = true - rescue LoadError - warn 'Performance on MRI may be improved with the concurrent-ruby-ext gem. Please see http://concurrent-ruby.com' - end - end - elsif on_jruby? && !@@java_ext_loaded - begin - require 'concurrent_ruby_ext' - @@java_ext_loaded = true - rescue LoadError - warn 'Performance on JRuby may be improved by installing the pre-compiled Java extensions. Please see http://concurrent-ruby.com' - end - end -end diff --git a/lib/concurrent/native_extensions.rb b/lib/concurrent/native_extensions.rb new file mode 100644 index 000000000..8095aeb93 --- /dev/null +++ b/lib/concurrent/native_extensions.rb @@ -0,0 +1,2 @@ +require 'concurrent/native_extensions/before' +require 'concurrent/native_extensions/load' diff --git a/lib/concurrent/native_extensions/before.rb b/lib/concurrent/native_extensions/before.rb new file mode 100644 index 000000000..2a0346aef --- /dev/null +++ b/lib/concurrent/native_extensions/before.rb @@ -0,0 +1 @@ +require 'concurrent/synchronized_object_implementations/abstract' diff --git a/lib/concurrent/native_extensions/load.rb b/lib/concurrent/native_extensions/load.rb new file mode 100644 index 000000000..0ccc7ec70 --- /dev/null +++ b/lib/concurrent/native_extensions/load.rb @@ -0,0 +1,46 @@ +require 'concurrent/utility/engine' + +module Concurrent + + @c_ext_loaded ||= false + @java_ext_loaded ||= false + + # @!visibility private + def self.allow_c_extensions? + on_cruby? + end + + if allow_c_extensions? && !@c_ext_loaded + tries = [ + lambda do + require 'concurrent/extension' + @c_ext_loaded = true + end, + lambda do + # may be a Windows cross-compiled native gem + require "concurrent/#{RUBY_VERSION[0..2]}/extension" + @c_ext_loaded = true + end, + lambda do + warn 'Performance on MRI may be improved with the concurrent-ruby-ext gem. Please see http://concurrent-ruby.com' + end] + + tries.each do |try| + begin + try.call + break + rescue LoadError + next + end + end + end + + if on_jruby? && !@java_ext_loaded + begin + require 'concurrent_ruby_ext' + @java_ext_loaded = true + rescue LoadError + warn 'Performance on JRuby may be improved by installing the pre-compiled Java extensions. Please see http://concurrent-ruby.com' + end + end +end diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb index 4dccbfa67..20b2312b5 100644 --- a/lib/concurrent/synchronized_object.rb +++ b/lib/concurrent/synchronized_object.rb @@ -1,252 +1,31 @@ require 'concurrent/utility/engine' +require 'concurrent/synchronized_object_implementations/abstract' +require 'concurrent/native_extensions' +require 'concurrent/synchronized_object_implementations/mutex' +require 'concurrent/synchronized_object_implementations/monitor' +require 'concurrent/synchronized_object_implementations/rbx' module Concurrent + module SynchronizedObjectImplementations + class Implementation < case + when Concurrent.on_jruby? + Java - # Safe synchronization under any Ruby implementation. - # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. - # Provides a single layer which can improve its implementation over time without changes needed to - # the classes using it. Use {SynchronizedObject} not this abstract class. - # - # @note this object does not support usage together with {Thread#wakeup} and {Thread#raise}. - # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `SynchronizedObject#wait` and - # `Thread#wakeup` will not work on all platforms. - # - # @see {Event} implementation as an example of this class use - # - # @example simple - # class AnClass < SynchronizedObject - # def initialize - # super - # synchronize { @value = 'asd' } - # end - # - # def value - # synchronize { @value } - # end - # end - class AbstractSynchronizedObject + when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 + Monitor - # @abstract for helper ivar initialization if needed, - # otherwise it can be left empty. - def initialize - raise NotImplementedError - end - - # @yield runs the block synchronized against this object, - # equvivalent of java's `synchronize(this) {}` - def synchronize - raise NotImplementedError - end - - private - - # wait until another thread calls #signal or #broadcast, - # spurious wake-ups can happen. - # @param [Numeric, nil] timeout in seconds, `nil` means no timeout - # @return [self] - def wait(timeout = nil) - synchronize { ns_wait(timeout) } - self - end - - # Wait until condition is met or timeout passes, - # protects against spurious wake-ups. - # @param [Numeric, nil] timeout in seconds, `nil` means no timeout - # @yield condition to be met - # @yieldreturn [true, false] - # @return [true, false] - def wait_until(timeout = nil, &condition) - synchronize { ns_wait_until(timeout, &condition) } - end - - # signal one waiting thread - # @return [self] - def signal - synchronize { ns_signal } - self - end - - # broadcast to all waiting threads - # @return [self] - def broadcast - synchronize { ns_broadcast } - self - end - - # @yield condition - def ns_wait_until(timeout, &condition) - if timeout - wait_until = Concurrent.monotonic_time + timeout - loop do - now = Concurrent.monotonic_time - condition_result = condition.call - # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 - # when passed to java #wait(long timeout) - return condition_result if (now + 0.001) >= wait_until || condition_result - ns_wait wait_until - now - end - else - ns_wait timeout until condition.call - true - end - end - - # @return [self] - def ns_wait(timeout = nil) - raise NotImplementedError - end + when Concurrent.on_cruby? + Mutex - # @return [self] - def ns_signal - raise NotImplementedError - end + when Concurrent.on_rbx? + Rbx - # @return [self] - def ns_broadcast - raise NotImplementedError + else + Mutex + end end - end - require 'concurrent/extension_helper' # FIXME weird order - - if Concurrent.on_jruby? - require 'jruby' - - class JavaPureSynchronizedObject < AbstractSynchronizedObject - def initialize - end - - def synchronize - JRuby.reference0(self).synchronized { yield } - end - - private - - def ns_wait(timeout = nil) - success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout) - self - ensure - ns_signal unless success - end + SynchronizedObject = SynchronizedObjectImplementations::Implementation - def ns_broadcast - JRuby.reference0(self).notifyAll - self - end - - def ns_signal - JRuby.reference0(self).notify - self - end - end - end - - class MutexSynchronizedObject < AbstractSynchronizedObject - def initialize - @__lock__do_not_use_directly = Mutex.new - @__condition__do_not_use_directly = ::ConditionVariable.new - end - - def synchronize - if @__lock__do_not_use_directly.owned? - yield - else - @__lock__do_not_use_directly.synchronize { yield } - end - end - - private - - def ns_signal - @__condition__do_not_use_directly.signal - self - end - - def ns_broadcast - @__condition__do_not_use_directly.broadcast - self - end - - def ns_wait(timeout = nil) - @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout - self - end - end - - class MonitorSynchronizedObject < MutexSynchronizedObject - def initialize - @__lock__do_not_use_directly = Monitor.new - @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond - end - - def synchronize - @__lock__do_not_use_directly.synchronize { yield } - end - - private - - def ns_wait(timeout = nil) - @__condition__do_not_use_directly.wait timeout - self - end - end - - if Concurrent.on_rbx? - class RbxSynchronizedObject < AbstractSynchronizedObject - def initialize - @waiters = [] - end - - def synchronize(&block) - Rubinius.synchronize(self, &block) - end - - private - - def ns_wait(timeout = nil) - wchan = Rubinius::Channel.new - - begin - @waiters.push wchan - Rubinius.unlock(self) - signaled = wchan.receive_timeout timeout - ensure - Rubinius.lock(self) - - if !signaled && !@waiters.delete(wchan) - # we timed out, but got signaled afterwards, - # so pass that signal on to the next waiter - @waiters.shift << true unless @waiters.empty? - end - end - - self - end - - def ns_signal - @waiters.shift << true unless @waiters.empty? - self - end - - def ns_broadcast - @waiters.shift << true until @waiters.empty? - self - end - end - end - - class SynchronizedObject < case - when Concurrent.on_jruby? - JavaSynchronizedObject - when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 - MonitorSynchronizedObject - when Concurrent.on_cruby? - MutexSynchronizedObject - when Concurrent.on_rbx? - RbxSynchronizedObject - else - MutexSynchronizedObject - end - end end diff --git a/lib/concurrent/synchronized_object_implementations/abstract.rb b/lib/concurrent/synchronized_object_implementations/abstract.rb new file mode 100644 index 000000000..4e76ffed3 --- /dev/null +++ b/lib/concurrent/synchronized_object_implementations/abstract.rb @@ -0,0 +1,121 @@ +module Concurrent + module SynchronizedObjectImplementations + # Safe synchronization under any Ruby implementation. + # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. + # Provides a single layer which can improve its implementation over time without changes needed to + # the classes using it. Use {SynchronizedObject} not this abstract class. + # + # @note this object does not support usage together with {Thread#wakeup} and {Thread#raise}. + # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `SynchronizedObject#wait` and + # `Thread#wakeup` will not work on all platforms. + # + # @see {Event} implementation as an example of this class use + # + # @example simple + # class AnClass < SynchronizedObject + # def initialize + # super + # synchronize { @value = 'asd' } + # end + # + # def value + # synchronize { @value } + # end + # end + class Abstract + + # @abstract for helper ivar initialization if needed, + # otherwise it can be left empty. + def initialize + raise NotImplementedError + end + + # @yield runs the block synchronized against this object, + # equvivalent of java's `synchronize(this) {}` + def synchronize + raise NotImplementedError + end + + private + + # wait until another thread calls #signal or #broadcast, + # spurious wake-ups can happen. + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + # @return [self] + # @note intended to be made public if required in child classes + def wait(timeout = nil) + synchronize { ns_wait(timeout) } + self + end + + # Wait until condition is met or timeout passes, + # protects against spurious wake-ups. + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + # @yield condition to be met + # @yieldreturn [true, false] + # @return [true, false] + # @note intended to be made public if required in child classes + def wait_until(timeout = nil, &condition) + synchronize { ns_wait_until(timeout, &condition) } + end + + # signal one waiting thread + # @return [self] + # @note intended to be made public if required in child classes + def signal + synchronize { ns_signal } + self + end + + # broadcast to all waiting threads + # @return [self] + # @note intended to be made public if required in child classes + def broadcast + synchronize { ns_broadcast } + self + end + + # @note only to be used inside synchronized block + # @yield condition + # @return [true, false] + # see #wait_until + def ns_wait_until(timeout, &condition) + if timeout + wait_until = Concurrent.monotonic_time + timeout + loop do + now = Concurrent.monotonic_time + condition_result = condition.call + # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 + # when passed to java #wait(long timeout) + return condition_result if (now + 0.001) >= wait_until || condition_result + ns_wait wait_until - now + end + else + ns_wait timeout until condition.call + true + end + end + + # @note only to be used inside synchronized block + # @return [self] + # @see #wait + def ns_wait(timeout = nil) + raise NotImplementedError + end + + # @note only to be used inside synchronized block + # @return [self] + # @see #signal + def ns_signal + raise NotImplementedError + end + + # @note only to be used inside synchronized block + # @return [self] + # @see #broadcast + def ns_broadcast + raise NotImplementedError + end + end + end +end diff --git a/lib/concurrent/synchronized_object_implementations/java_pure.rb b/lib/concurrent/synchronized_object_implementations/java_pure.rb new file mode 100644 index 000000000..72c3f232c --- /dev/null +++ b/lib/concurrent/synchronized_object_implementations/java_pure.rb @@ -0,0 +1,36 @@ +module Concurrent + module SynchronizedObjectImplementations + + if Concurrent.on_jruby? + require 'jruby' + + class JavaPure < Abstract + def initialize + end + + def synchronize + JRuby.reference0(self).synchronized { yield } + end + + private + + def ns_wait(timeout = nil) + success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout) + self + ensure + ns_signal unless success + end + + def ns_broadcast + JRuby.reference0(self).notifyAll + self + end + + def ns_signal + JRuby.reference0(self).notify + self + end + end + end + end +end diff --git a/lib/concurrent/synchronized_object_implementations/monitor.rb b/lib/concurrent/synchronized_object_implementations/monitor.rb new file mode 100644 index 000000000..51255231c --- /dev/null +++ b/lib/concurrent/synchronized_object_implementations/monitor.rb @@ -0,0 +1,21 @@ +module Concurrent + module SynchronizedObjectImplementations + class Monitor < Mutex + def initialize + @__lock__do_not_use_directly = ::Monitor.new + @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond + end + + def synchronize + @__lock__do_not_use_directly.synchronize { yield } + end + + private + + def ns_wait(timeout = nil) + @__condition__do_not_use_directly.wait timeout + self + end + end + end +end diff --git a/lib/concurrent/synchronized_object_implementations/mutex.rb b/lib/concurrent/synchronized_object_implementations/mutex.rb new file mode 100644 index 000000000..30b029085 --- /dev/null +++ b/lib/concurrent/synchronized_object_implementations/mutex.rb @@ -0,0 +1,35 @@ +module Concurrent + module SynchronizedObjectImplementations + class Mutex < Abstract + def initialize + @__lock__do_not_use_directly = ::Mutex.new + @__condition__do_not_use_directly = ::ConditionVariable.new + end + + def synchronize + if @__lock__do_not_use_directly.owned? + yield + else + @__lock__do_not_use_directly.synchronize { yield } + end + end + + private + + def ns_signal + @__condition__do_not_use_directly.signal + self + end + + def ns_broadcast + @__condition__do_not_use_directly.broadcast + self + end + + def ns_wait(timeout = nil) + @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout + self + end + end + end +end diff --git a/lib/concurrent/synchronized_object_implementations/rbx.rb b/lib/concurrent/synchronized_object_implementations/rbx.rb new file mode 100644 index 000000000..1598af111 --- /dev/null +++ b/lib/concurrent/synchronized_object_implementations/rbx.rb @@ -0,0 +1,47 @@ +module Concurrent + module SynchronizedObjectImplementations + if Concurrent.on_rbx? + class Rbx < Abstract + def initialize + @waiters = [] + end + + def synchronize(&block) + Rubinius.synchronize(self, &block) + end + + private + + def ns_wait(timeout = nil) + wchan = Rubinius::Channel.new + + begin + @waiters.push wchan + Rubinius.unlock(self) + signaled = wchan.receive_timeout timeout + ensure + Rubinius.lock(self) + + if !signaled && !@waiters.delete(wchan) + # we timed out, but got signaled afterwards, + # so pass that signal on to the next waiter + @waiters.shift << true unless @waiters.empty? + end + end + + self + end + + def ns_signal + @waiters.shift << true unless @waiters.empty? + self + end + + def ns_broadcast + @waiters.shift << true until @waiters.empty? + self + end + end + end + end +end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index fc90bff78..a952ebd6f 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -1,5 +1,5 @@ require 'rbconfig' -require 'concurrent/extension_helper.rb' +require 'concurrent/native_extensions' module Concurrent module TestHelpers From 759c02595cec907e8700784deb74d5ce5acc6d88 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 20 Apr 2015 17:52:11 +0200 Subject: [PATCH 16/16] Cleanup names --- ext/ConcurrentRubyExtService.java | 2 +- ...brary.java => SynchronizationLibrary.java} | 27 +++--- lib/concurrent.rb | 2 +- lib/concurrent/actor.rb | 2 +- lib/concurrent/actor/core.rb | 2 +- lib/concurrent/atomic/count_down_latch.rb | 4 +- lib/concurrent/atomic/cyclic_barrier.rb | 4 +- lib/concurrent/atomic/event.rb | 4 +- .../executor/serialized_execution.rb | 4 +- lib/concurrent/native_extensions/before.rb | 2 +- lib/concurrent/synchronization.rb | 28 +++++++ .../abstract_object.rb} | 84 +++++++++++++++++-- .../java_pure_object.rb} | 10 ++- .../monitor_object.rb} | 8 +- .../mutex_object.rb} | 8 +- .../rbx.rb => synchronization/rbx_object.rb} | 23 ++++- lib/concurrent/synchronized_object.rb | 31 ------- 17 files changed, 175 insertions(+), 70 deletions(-) rename ext/com/concurrent_ruby/ext/{JavaSynchronizedObjectLibrary.java => SynchronizationLibrary.java} (77%) create mode 100644 lib/concurrent/synchronization.rb rename lib/concurrent/{synchronized_object_implementations/abstract.rb => synchronization/abstract_object.rb} (61%) rename lib/concurrent/{synchronized_object_implementations/java_pure.rb => synchronization/java_pure_object.rb} (69%) rename lib/concurrent/{synchronized_object_implementations/monitor.rb => synchronization/monitor_object.rb} (66%) rename lib/concurrent/{synchronized_object_implementations/mutex.rb => synchronization/mutex_object.rb} (77%) rename lib/concurrent/{synchronized_object_implementations/rbx.rb => synchronization/rbx_object.rb} (65%) delete mode 100644 lib/concurrent/synchronized_object.rb diff --git a/ext/ConcurrentRubyExtService.java b/ext/ConcurrentRubyExtService.java index 1b4e92ec1..df70917c5 100644 --- a/ext/ConcurrentRubyExtService.java +++ b/ext/ConcurrentRubyExtService.java @@ -9,7 +9,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException { new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false); new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false); new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false); - new com.concurrent_ruby.ext.JavaSynchronizedObjectLibrary().load(runtime, false); + new com.concurrent_ruby.ext.SynchronizationLibrary().load(runtime, false); return true; } } diff --git a/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java similarity index 77% rename from ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java rename to ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index 0ad5ced50..ae979b42a 100644 --- a/ext/com/concurrent_ruby/ext/JavaSynchronizedObjectLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -17,32 +17,33 @@ import org.jruby.RubyNil; import org.jruby.runtime.ThreadContext; -public class JavaSynchronizedObjectLibrary implements Library { +public class SynchronizationLibrary implements Library { public void load(Ruby runtime, boolean wrap) throws IOException { - RubyModule concurrentModule = runtime. + RubyModule synchronizationModule = runtime. defineModule("Concurrent"). - defineModuleUnder("SynchronizedObjectImplementations"); - RubyClass parent = concurrentModule. - getClass("Abstract"); - if (parent == null) throw runtime.newRuntimeError("Concurrent::SynchronizedObject::Abstract is missing"); + defineModuleUnder("Synchronization"); + RubyClass parentClass = synchronizationModule.getClass("AbstractObject"); - RubyClass synchronizedObjectJavaClass = concurrentModule. - defineClassUnder("Java", parent, JRUBYREFERENCE_ALLOCATOR); + if (parentClass == null) + throw runtime.newRuntimeError("Concurrent::Synchronization::AbstractObject is missing"); - synchronizedObjectJavaClass.defineAnnotatedMethods(Java.class); + RubyClass synchronizedObjectJavaClass = + synchronizationModule.defineClassUnder("JavaObject", parentClass, JRUBYREFERENCE_ALLOCATOR); + + synchronizedObjectJavaClass.defineAnnotatedMethods(JavaObject.class); } private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() { public IRubyObject allocate(Ruby runtime, RubyClass klazz) { - return new Java(runtime, klazz); + return new JavaObject(runtime, klazz); } }; - @JRubyClass(name = "Java", parent = "Abstract") - public static class Java extends RubyObject { + @JRubyClass(name = "JavaObject", parent = "AbstractObject") + public static class JavaObject extends RubyObject { - public Java(Ruby runtime, RubyClass metaClass) { + public JavaObject(Ruby runtime, RubyClass metaClass) { super(runtime, metaClass); } diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 5a2a2a1aa..535e8d89c 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -1,6 +1,6 @@ require 'concurrent/version' -require 'concurrent/synchronized_object' +require 'concurrent/synchronization' require 'concurrent/configuration' diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index 7eb4fb6ed..a75b2234b 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -3,7 +3,7 @@ require 'concurrent/executor/serialized_execution' require 'concurrent/ivar' require 'concurrent/logging' -require 'concurrent/synchronized_object' +require 'concurrent/synchronization' module Concurrent # TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ? diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index aafc0cf23..0c8bc301c 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -9,7 +9,7 @@ module Actor # @note devel: core should not block on anything, e.g. it cannot wait on # children to terminate that would eat up all threads in task pool and # deadlock - class Core < SynchronizedObject + class Core < Synchronization::Object include TypeCheck include Concurrent::Logging diff --git a/lib/concurrent/atomic/count_down_latch.rb b/lib/concurrent/atomic/count_down_latch.rb index ea86607d0..38bc349ac 100644 --- a/lib/concurrent/atomic/count_down_latch.rb +++ b/lib/concurrent/atomic/count_down_latch.rb @@ -1,4 +1,4 @@ -require 'concurrent/synchronized_object' +require 'concurrent/synchronization' module Concurrent @@ -11,7 +11,7 @@ module Concurrent # method. Each of the other threads calls `#count_down` when done with its work. # When the latch counter reaches zero the waiting thread is unblocked and continues # with its work. A `CountDownLatch` can be used only once. Its value cannot be reset. - class PureCountDownLatch < SynchronizedObject + class PureCountDownLatch < Synchronization::Object # @!macro [attach] count_down_latch_method_initialize # diff --git a/lib/concurrent/atomic/cyclic_barrier.rb b/lib/concurrent/atomic/cyclic_barrier.rb index cde0d254b..69dc8e5d4 100644 --- a/lib/concurrent/atomic/cyclic_barrier.rb +++ b/lib/concurrent/atomic/cyclic_barrier.rb @@ -1,8 +1,8 @@ -require 'concurrent/synchronized_object' +require 'concurrent/synchronization' module Concurrent - class CyclicBarrier < SynchronizedObject + class CyclicBarrier < Synchronization::Object Generation = Struct.new(:status) private_constant :Generation diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index 3f53c8f5b..ac320828a 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -1,5 +1,5 @@ require 'thread' -require 'concurrent/synchronized_object' +require 'concurrent/synchronization' module Concurrent @@ -13,7 +13,7 @@ module Concurrent # `#reset` at any time once it has been set. # # @see http://msdn.microsoft.com/en-us/library/windows/desktop/ms682655.aspx - class Event < SynchronizedObject + class Event < Synchronization::Object # Creates a new `Event` in the unset state. Threads calling `#wait` on the # `Event` will block. diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index 1f3825f88..a63583ad3 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -1,12 +1,12 @@ require 'delegate' require 'concurrent/executor/executor' require 'concurrent/logging' -require 'concurrent/synchronized_object' +require 'concurrent/synchronization' module Concurrent # Ensures passed jobs in a serialized order never running at the same time. - class SerializedExecution < SynchronizedObject + class SerializedExecution < Synchronization::Object include Logging Job = Struct.new(:executor, :args, :block) do diff --git a/lib/concurrent/native_extensions/before.rb b/lib/concurrent/native_extensions/before.rb index 2a0346aef..361abe434 100644 --- a/lib/concurrent/native_extensions/before.rb +++ b/lib/concurrent/native_extensions/before.rb @@ -1 +1 @@ -require 'concurrent/synchronized_object_implementations/abstract' +require 'concurrent/synchronization/abstract_object' diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb new file mode 100644 index 000000000..8d379da3b --- /dev/null +++ b/lib/concurrent/synchronization.rb @@ -0,0 +1,28 @@ +require 'concurrent/utility/engine' +require 'concurrent/synchronization/abstract_object' +require 'concurrent/native_extensions' # JavaObject +require 'concurrent/synchronization/mutex_object' +require 'concurrent/synchronization/monitor_object' +require 'concurrent/synchronization/rbx_object' + +module Concurrent + module Synchronization + class Object < case + when Concurrent.on_jruby? + JavaObject + + when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) <= 0 + MonitorObject + + when Concurrent.on_cruby? + MutexObject + + when Concurrent.on_rbx? + RbxObject + + else + MutexObject + end + end + end +end diff --git a/lib/concurrent/synchronized_object_implementations/abstract.rb b/lib/concurrent/synchronization/abstract_object.rb similarity index 61% rename from lib/concurrent/synchronized_object_implementations/abstract.rb rename to lib/concurrent/synchronization/abstract_object.rb index 4e76ffed3..585db070c 100644 --- a/lib/concurrent/synchronized_object_implementations/abstract.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -1,18 +1,20 @@ module Concurrent - module SynchronizedObjectImplementations + # TODO rename to Synchronization + # TODO add newCondition + module Synchronization # Safe synchronization under any Ruby implementation. # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. # Provides a single layer which can improve its implementation over time without changes needed to - # the classes using it. Use {SynchronizedObject} not this abstract class. + # the classes using it. Use {Synchronization::Object} not this abstract class. # # @note this object does not support usage together with {Thread#wakeup} and {Thread#raise}. - # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `SynchronizedObject#wait` and + # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `Synchronization::Object#wait` and # `Thread#wakeup` will not work on all platforms. # # @see {Event} implementation as an example of this class use # # @example simple - # class AnClass < SynchronizedObject + # class AnClass < Synchronization::Object # def initialize # super # synchronize { @value = 'asd' } @@ -22,7 +24,7 @@ module SynchronizedObjectImplementations # synchronize { @value } # end # end - class Abstract + class AbstractObject # @abstract for helper ivar initialization if needed, # otherwise it can be left empty. @@ -116,6 +118,78 @@ def ns_signal def ns_broadcast raise NotImplementedError end + + # @example + # def initialize + # @val = :val # final never changed value + # ensure_ivar_visibility! + # # not it can be shared as Java's immutable objects with final fields + # end + def ensure_ivar_visibility! + raise NotImplementedError + end + + def self.attr_volatile *names + attr_accessor *names.map { |name| :"volatile_#{name}" } + end + + module CasAttributes + def list_attr_volatile_cas + @attr_volatile_cas_names ||= [] + # @attr_volatile_cas_names + + # if superclass.respond_to?(:list_attr_volatile_cas) + # superclass.list_attr_volatile_cas + # else + # [] + # end + end + + def attr_volatile_cas *names + names.each do |name| + class_eval <<-RUBY + def #(name} + #{CasAttributes.ivar_name(name)}.get + end + + def #(name}=(value) + #{CasAttributes.ivar_name(name)}.set value + end + + def #(name}_cas(old, value) + #{CasAttributes.ivar_name(name)}.compare_and_set old, value + end + + RUBY + + define_method name do + instance_variable_get CasAttributes.ivar_name(name) + end + + define_method "#{name}=" do |value| + instance_variable_set CasAttributes.ivar_name(name), value + Rubinius.memory_barrier + end + end + end + + def self.ivar_name(name) + :"@volatile_cas_#{name}" + end + + def self.extended(base) + base.include InstanceMethods + end + + module InstanceMethods + def initialize + self.class.list_attr_volatile_cas.each do |name| + isntance_variable_set CasAttributes.ivar_name(name), Atomic.new(nil) + end + ensure_ivar_visibility! + super + end + end + end end end end diff --git a/lib/concurrent/synchronized_object_implementations/java_pure.rb b/lib/concurrent/synchronization/java_pure_object.rb similarity index 69% rename from lib/concurrent/synchronized_object_implementations/java_pure.rb rename to lib/concurrent/synchronization/java_pure_object.rb index 72c3f232c..38633769b 100644 --- a/lib/concurrent/synchronized_object_implementations/java_pure.rb +++ b/lib/concurrent/synchronization/java_pure_object.rb @@ -1,10 +1,10 @@ module Concurrent - module SynchronizedObjectImplementations + module Synchronization if Concurrent.on_jruby? require 'jruby' - class JavaPure < Abstract + class JavaPureObject < AbstractObject def initialize end @@ -17,6 +17,8 @@ def synchronize def ns_wait(timeout = nil) success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout) self + rescue java.lang.InterruptedException => e + raise ThreadError(e.message) ensure ns_signal unless success end @@ -30,6 +32,10 @@ def ns_signal JRuby.reference0(self).notify self end + + def ensure_ivar_visibility! + # relying on undocumented behavior of JRuby, ivar access is volatile + end end end end diff --git a/lib/concurrent/synchronized_object_implementations/monitor.rb b/lib/concurrent/synchronization/monitor_object.rb similarity index 66% rename from lib/concurrent/synchronized_object_implementations/monitor.rb rename to lib/concurrent/synchronization/monitor_object.rb index 51255231c..024335d53 100644 --- a/lib/concurrent/synchronized_object_implementations/monitor.rb +++ b/lib/concurrent/synchronization/monitor_object.rb @@ -1,6 +1,6 @@ module Concurrent - module SynchronizedObjectImplementations - class Monitor < Mutex + module Synchronization + class MonitorObject < MutexObject def initialize @__lock__do_not_use_directly = ::Monitor.new @__condition__do_not_use_directly = @__lock__do_not_use_directly.new_cond @@ -16,6 +16,10 @@ def ns_wait(timeout = nil) @__condition__do_not_use_directly.wait timeout self end + + def ensure_ivar_visibility! + # relying on undocumented behavior of CRuby, GVL quire has lock which ensures visibility of ivars + end end end end diff --git a/lib/concurrent/synchronized_object_implementations/mutex.rb b/lib/concurrent/synchronization/mutex_object.rb similarity index 77% rename from lib/concurrent/synchronized_object_implementations/mutex.rb rename to lib/concurrent/synchronization/mutex_object.rb index 30b029085..4829cc54e 100644 --- a/lib/concurrent/synchronized_object_implementations/mutex.rb +++ b/lib/concurrent/synchronization/mutex_object.rb @@ -1,6 +1,6 @@ module Concurrent - module SynchronizedObjectImplementations - class Mutex < Abstract + module Synchronization + class MutexObject < AbstractObject def initialize @__lock__do_not_use_directly = ::Mutex.new @__condition__do_not_use_directly = ::ConditionVariable.new @@ -30,6 +30,10 @@ def ns_wait(timeout = nil) @__condition__do_not_use_directly.wait @__lock__do_not_use_directly, timeout self end + + def ensure_ivar_visibility! + # relying on undocumented behavior of CRuby, GVL quire has lock which ensures visibility of ivars + end end end end diff --git a/lib/concurrent/synchronized_object_implementations/rbx.rb b/lib/concurrent/synchronization/rbx_object.rb similarity index 65% rename from lib/concurrent/synchronized_object_implementations/rbx.rb rename to lib/concurrent/synchronization/rbx_object.rb index 1598af111..02d6e7d86 100644 --- a/lib/concurrent/synchronized_object_implementations/rbx.rb +++ b/lib/concurrent/synchronization/rbx_object.rb @@ -1,7 +1,7 @@ module Concurrent - module SynchronizedObjectImplementations + module Synchronization if Concurrent.on_rbx? - class Rbx < Abstract + class RbxObject < AbstractObject def initialize @waiters = [] end @@ -42,6 +42,25 @@ def ns_broadcast self end end + + def ensure_ivar_visibility! + Rubinius.memory_barrier + end + + def self.attr_volatile *names + names.each do |name| + ivar = :"@volatile_#{name}" + define_method name do + Rubinius.memory_barrier + instance_variable_get ivar + end + + define_method "#{name}=" do |value| + instance_variable_set ivar, value + Rubinius.memory_barrier + end + end + end end end end diff --git a/lib/concurrent/synchronized_object.rb b/lib/concurrent/synchronized_object.rb deleted file mode 100644 index 20b2312b5..000000000 --- a/lib/concurrent/synchronized_object.rb +++ /dev/null @@ -1,31 +0,0 @@ -require 'concurrent/utility/engine' -require 'concurrent/synchronized_object_implementations/abstract' -require 'concurrent/native_extensions' -require 'concurrent/synchronized_object_implementations/mutex' -require 'concurrent/synchronized_object_implementations/monitor' -require 'concurrent/synchronized_object_implementations/rbx' - -module Concurrent - module SynchronizedObjectImplementations - class Implementation < case - when Concurrent.on_jruby? - Java - - when Concurrent.on_cruby? && (RUBY_VERSION.split('.').map(&:to_i) <=> [1, 9, 3]) >= 0 - Monitor - - when Concurrent.on_cruby? - Mutex - - when Concurrent.on_rbx? - Rbx - - else - Mutex - end - end - end - - SynchronizedObject = SynchronizedObjectImplementations::Implementation - -end