diff --git a/.travis.yml b/.travis.yml index f331e2ff0..f665ab33b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,4 +26,4 @@ matrix: - rvm: jruby-head - rvm: 1.9.3 -script: "rake compile && bundle exec rspec --color --backtrace --tag ~unfinished --seed 1 --format documentation ./spec" +script: "rake compile && bundle exec rspec --color --backtrace --tag ~unfinished --seed 1 ./spec" diff --git a/CHANGELOG.md b/CHANGELOG.md index 722139aaa..e46b768ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,33 @@ * Deprecated all clock-time based timer scheduling - Only support scheduling by delay - Effects `Concurrent.timer`, `TimerSet`, and `ScheduledTask` +* Consistent `at_exit` behavior for Java and Ruby thread pools. +* Added `at_exit` handler to Ruby thread pools (already in Java thread pools) + - Ruby handler stores the object id and retrieves from `ObjectSpace` + - JRuby disables `ObjectSpace` by default so that handler stores the object reference +* Added a `:stop_on_exit` option to thread pools to enable/disable `at_exit` handler +* Updated thread pool docs to better explain shutting down thread pools +* Simpler `:executor` option syntax for all abstractions which support this option +* Added `Executor#auto_terminate?` predicate method (for thread pools) +* Added `at_exit` handler to `TimerSet` +* Simplified auto-termination of the global executors + - Can now disable auto-termination of global executors + - Added shutdown/kill/wait_for_termination variants for global executors +* Can now disable auto-termination for *all* executors (the nuclear option) +* Simplified auto-termination of the global executors +* Deprecated terms "task pool" and "operation pool" + - New terms are "io executor" and "fast executor" + - New functions added with new names + - Deprecation warnings added to functions referencing old names +* Moved all thread pool related functions from `Concurrent::Configuration` to `Concurrent` + - Old functions still exist with deprecation warnings + - New functions have updated names as appropriate +* All high-level abstractions default to the "io executor" +* Fixed bug in `Actor` causing it to prematurely warm global thread pools on gem load + - This also fixed a `RejectedExecutionError` bug when running with minitest/autorun via JRuby +* Added `LazyReference`, a simpler and faster varition of `Delay` + - Updated most internal uses of `Delay` with `LazyReference` +* Moved global logger up to the `Concurrent` namespace and refactored the code ## Current Release v0.8.0 (25 January 2015) diff --git a/Gemfile b/Gemfile index bec4a264a..b33a15e16 100644 --- a/Gemfile +++ b/Gemfile @@ -4,21 +4,20 @@ gemspec name: 'concurrent-ruby' group :development do gem 'rake', '~> 10.3.2' - gem 'rake-compiler', '~> 0.9.2' + gem 'rake-compiler', '~> 0.9.5' gem 'gem-compiler', '~> 0.3.0' end group :testing do - gem 'rspec', '~> 3.0.0' - gem 'simplecov', '~> 0.8.2', :require => false - gem 'coveralls', '~> 0.7.0', :require => false - gem 'timecop', '~> 0.7.1' + gem 'rspec', '~> 3.2.0' + gem 'simplecov', '~> 0.9.2', :require => false + gem 'coveralls', '~> 0.7.11', :require => false + gem 'timecop', '~> 0.7.3' end group :documentation do gem 'countloc', '~> 0.4.0', :platforms => :mri, :require => false - gem 'rubycritic', '~> 1.0.2', :platforms => :mri, require: false - gem 'yard', '~> 0.8.7.4', :require => false - gem 'inch', '~> 0.4.6', :platforms => :mri, :require => false - gem 'redcarpet', '~> 3.1.2', platforms: :mri # understands github markdown + gem 'yard', '~> 0.8.7.6', :require => false + gem 'inch', '~> 0.5.10', :platforms => :mri, :require => false + gem 'redcarpet', '~> 3.2.2', platforms: :mri # understands github markdown end diff --git a/README.md b/README.md index 6c29822ff..8bd6e6087 100644 --- a/README.md +++ b/README.md @@ -66,36 +66,29 @@ This library contains a variety of concurrency abstractions at high and low leve * See [ThreadPool](http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html) overview, which also contains a list of other Executors available. -### Thread-safe Observers - -* [Concurrent::Observable](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Observable.html) mixin module -* [CopyOnNotifyObserverSet](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CopyOnNotifyObserverSet.html) -* [CopyOnWriteObserverSet](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CopyOnWriteObserverSet.html) - ### Thread synchronization classes and algorithms -Lower-level abstractions mainly used as building blocks. - -* [condition](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Condition.html) -* [countdown latch](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CountDownLatch.html) -* [cyclic barrier](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CyclicBarrier.html) -* [event](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Event.html) -* [exchanger](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Exchanger.html) -* [semaphore](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Semaphore.html) -* [timeout](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#timeout-class_method) -* [timer](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#timer-class_method) +* [Condition](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Condition.html) +* [CountdownLatch](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CountDownLatch.html) +* [CyclicBarrier](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CyclicBarrier.html) +* [Event](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Event.html) +* [Exchanger](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Exchanger.html) +* [Semaphore](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Semaphore.html) +* [Timeout](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#timeout-class_method) +* [Timer](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#timer-class_method) ### Thread-safe variables -Lower-level abstractions mainly used as building blocks. - * [AtomicBoolean](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/AtomicBoolean.html) * [AtomicFixnum](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/AtomicFixnum.html) -* AtomicReference (no docs currently available, check source) +* [AtomicReference](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/MutexAtomic.html) +* [Delay](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Delay.html) +* [LazyReference](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/LazyReference.html) +* [LazyRegister](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/LazyRegister.html) * [I-Structures](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/IVar.html) (IVar) * [M-Structures](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/MVar.html) (MVar) -* [thread-local variables](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadLocalVar.html) -* [software transactional memory](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/TVar.html) (TVar) +* [Thread-local variables](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadLocalVar.html) +* [Software transactional memory](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/TVar.html) (TVar) ## Usage @@ -128,6 +121,8 @@ require 'concurrent/delay' # Concurrent::Delay require 'concurrent/exchanger' # Concurrent::Exchanger require 'concurrent/future' # Concurrent::Future require 'concurrent/ivar' # Concurrent::IVar +require 'concurrent/lazy_register' # Concurrent::LazyRegister +require 'concurrent/lazy_reference' # Concurrent::LazyReference require 'concurrent/mvar' # Concurrent::MVar require 'concurrent/promise' # Concurrent::Promise require 'concurrent/scheduled_task' # Concurrent::ScheduledTask diff --git a/examples/actor_stress_test.rb b/examples/actor_stress_test.rb new file mode 100755 index 000000000..a41934f82 --- /dev/null +++ b/examples/actor_stress_test.rb @@ -0,0 +1,139 @@ +#!/usr/bin/env ruby + +$: << File.expand_path('../../lib', __FILE__) + +require 'benchmark' +require 'optparse' +require 'thread' +require 'rspec/expectations' + +require 'concurrent/actor' + +class ActorStressTester + include ::RSpec::Matchers + + TESTS_PER_RUN = 5 + THREADS_PER_TEST = 10 + LOOPS_PER_THREAD = 25 + + class Ping < Concurrent::Actor::Context + def initialize(queue) + @queue = queue + end + + def on_message(message) + case message + when :child + Concurrent::Actor::Utils::AdHoc.spawn(:pong, @queue) do |queue| + -> m { queue << m } + end + else + @queue << message + message + end + end + end + + def initialize(opts = {}) + @tests = opts.fetch(:tests, TESTS_PER_RUN) + @threads = opts.fetch(:threads, THREADS_PER_TEST) + @loops = opts.fetch(:loops, LOOPS_PER_THREAD) + end + + def run + plural = ->(number){ number == 1 ? '' : 's' } + + puts "Running #{@tests} test#{plural.call(@tests)} " + + "with #{@threads} thread#{plural.call(@threads)} each " + + "and #{@loops} loop#{plural.call(@loops)} per thread..." + + Benchmark.bm do |bm| + @tests.times do + bm.report do + test(@threads, @loops) + end + end + end + end + + def test(threads, loops) + (1..threads).collect do + Thread.new do + loops.times do + + queue = Queue.new + actor = Ping.spawn(:ping, queue) + + core = Concurrent::Actor.root.send(:core) + children = core.instance_variable_get(:@children) + expect(children).to include(actor) + + actor << 'a' << 1 + expect(queue.pop).to eq 'a' + expect(actor.ask(2).value).to eq 2 + + expect(actor.parent).to eq Concurrent::Actor.root + expect(Concurrent::Actor.root.path).to eq '/' + expect(actor.path).to eq '/ping' + + child = actor.ask(:child).value + expect(child.path).to eq '/ping/pong' + + queue.clear + child.ask(3) + expect(queue.pop).to eq 3 + + actor << :terminate! + expect(actor.ask(:blow_up).wait).to be_rejected + terminate_actors(actor, child) + end + end + end.each(&:join) + end + + def terminate_actors(*actors) + actors.each do |actor| + unless actor.ask!(:terminated?) + actor.ask!(:terminate!) + end + end + end +end + +# def trace! +# set_trace_func proc { |event, file, line, id, binding, classname| +# # thread = eval('Thread.current', binding).object_id.to_s(16) +# printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line +# } +# yield +# ensure +# set_trace_func nil +# end + +if $0 == __FILE__ + + options = {} + + OptionParser.new do |opts| + opts.banner = "Usage: #{File.basename(__FILE__)} [options]" + + opts.on("--tests=TESTS", "Number of tests per run") do |value| + options[:tests] = value.to_i + end + + opts.on("--threads=THREADS", "Number of threads per test") do |value| + options[:threads] = value.to_i + end + + opts.on("--loops=LOOPS", "Number of loops per thread") do |value| + options[:loops] = value.to_i + end + + opts.on("-h", "--help", "Prints this help") do + puts opts + exit + end + end.parse! + + ActorStressTester.new(options).run +end diff --git a/examples/lazy_and_delay.rb b/examples/lazy_and_delay.rb new file mode 100755 index 000000000..3f50463d2 --- /dev/null +++ b/examples/lazy_and_delay.rb @@ -0,0 +1,23 @@ +#!/usr/bin/env ruby + +$: << File.expand_path('../../lib', __FILE__) + +require 'benchmark' + +require 'concurrent/delay' +require 'concurrent/lazy_reference' + +n = 500_000 + +delay = Concurrent::Delay.new{ nil } +lazy = Concurrent::LazyReference.new{ nil } + +delay.value +lazy.value + +Benchmark.bm do |x| + puts 'Benchmarking Delay...' + x.report { n.times{ delay.value } } + puts 'Benchmarking Lazy...' + x.report { n.times{ lazy.value } } +end diff --git a/lib/concurrent.rb b/lib/concurrent.rb index ba7ddc1d4..1a8910adf 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -2,28 +2,26 @@ require 'concurrent/configuration' +require 'concurrent/actor' require 'concurrent/atomics' require 'concurrent/channels' require 'concurrent/collections' +require 'concurrent/errors' require 'concurrent/executors' require 'concurrent/utilities' -require 'concurrent/actor' require 'concurrent/atomic' -require 'concurrent/lazy_register' require 'concurrent/agent' require 'concurrent/async' +require 'concurrent/atomic' require 'concurrent/dataflow' require 'concurrent/delay' -require 'concurrent/dereferenceable' -require 'concurrent/errors' require 'concurrent/exchanger' require 'concurrent/future' require 'concurrent/ivar' +require 'concurrent/lazy_reference' +require 'concurrent/lazy_register' require 'concurrent/mvar' -require 'concurrent/obligation' -require 'concurrent/observable' -require 'concurrent/options_parser' require 'concurrent/promise' require 'concurrent/scheduled_task' require 'concurrent/timer_task' diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index 8773000aa..b39504f48 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -1,4 +1,5 @@ require 'concurrent/configuration' +require 'concurrent/delay' require 'concurrent/executor/serialized_execution' require 'concurrent/ivar' require 'concurrent/logging' @@ -39,7 +40,7 @@ def self.current Thread.current[:__current_actor__] end - @root = Delay.new do + @root = Delay.new(executor: :immediate) do Core.new(parent: nil, name: '/', class: Root, initialized: ivar = IVar.new).reference.tap do ivar.no_error! end @@ -59,7 +60,7 @@ def self.root # Actor.spawn name: :ping3, # class: AdHoc, # args: [1] - # executor: Concurrent.configuration.global_task_pool do |add| + # executor: Concurrent.global_io_executor do |add| # lambda { |number| number + add } # end # diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index ff04048d3..49718b7c5 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -35,7 +35,7 @@ class Core # @option opts [Class] reference a custom descendant of {Reference} to use # @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour # @option opts [Array] args arguments for actor_class instantiation - # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` + # @option opts [Executor] executor, default is `Concurrent.global_io_executor` # @option opts [true, false] link, atomically link the actor to its parent # @option opts [true, false] supervise, atomically supervise the actor by its parent # @option opts [Array)>] @@ -56,7 +56,7 @@ def initialize(opts = {}, &block) @context_class = Child! opts.fetch(:class), AbstractContext allocate_context - @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor + @executor = Type! opts.fetch(:executor, Concurrent.global_io_executor), Executor raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self diff --git a/lib/concurrent/actor/reference.rb b/lib/concurrent/actor/reference.rb index a43e992c9..de6883e09 100644 --- a/lib/concurrent/actor/reference.rb +++ b/lib/concurrent/actor/reference.rb @@ -27,8 +27,8 @@ def tell(message) # @note it's a good practice to use tell whenever possible. Ask should be used only for # testing and when it returns very shortly. It can lead to deadlock if all threads in - # global_task_pool will block on while asking. It's fine to use it form outside of actors and - # global_task_pool. + # global_io_executor will block on while asking. It's fine to use it form outside of actors and + # global_io_executor. # # sends message to the actor and asks for the result of its processing, returns immediately # @param [Object] message @@ -40,8 +40,8 @@ def ask(message, ivar = IVar.new) # @note it's a good practice to use tell whenever possible. Ask should be used only for # testing and when it returns very shortly. It can lead to deadlock if all threads in - # global_task_pool will block on while asking. It's fine to use it form outside of actors and - # global_task_pool. + # global_io_executor will block on while asking. It's fine to use it form outside of actors and + # global_io_executor. # # sends message to the actor and asks for the result of its processing, blocks # @param [Object] message diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 3cdee2b1b..94716ee81 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -2,7 +2,7 @@ require 'concurrent/dereferenceable' require 'concurrent/observable' -require 'concurrent/options_parser' +require 'concurrent/executor/executor_options' require 'concurrent/utility/timeout' require 'concurrent/logging' @@ -14,39 +14,41 @@ module Concurrent # @return [Fixnum] the maximum number of seconds before an update is cancelled class Agent include Dereferenceable - include Concurrent::Observable + include Observable + include ExecutorOptions include Logging - attr_reader :timeout, :task_executor, :operation_executor + attr_reader :timeout, :io_executor, :fast_executor # Initialize a new Agent with the given initial value and provided options. # # @param [Object] initial the initial value - # @param [Hash] opts the options used to define the behavior at update and deref # - # @option opts [Boolean] :operation (false) when `true` will execute the future on the global - # operation pool (for long-running operations), when `false` will execute the future on the - # global task pool (for short-running tasks) - # @option opts [object] :executor when provided will run all operations on - # this executor rather than the global thread pool (overrides :operation) - # - # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro [attach] executor_and_deref_options + # + # @param [Hash] opts the options used to define the behavior at update and deref + # and to specify the executor on which to perform actions + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:task` returns the global task pool, + # `:operation` returns the global operation pool, and `:immediate` returns a new + # `ImmediateExecutor` object. + # @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data + # @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data + # @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing + # the internal value and returning the value returned from the proc def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } self.observers = CopyOnWriteObserverSet.new @serialized_execution = SerializedExecution.new - @task_executor = OptionsParser.get_task_executor_from(opts) - @operation_executor = OptionsParser.get_operation_executor_from(opts) + @io_executor = get_executor_from(opts) || Concurrent.global_io_executor + @fast_executor = get_executor_from(opts) || Concurrent.global_fast_executor init_mutex set_deref_options(opts) end - # Specifies a block operation to be performed when an update operation raises + # Specifies a block fast to be performed when an update fast raises # an exception. Rescue blocks will be checked in order they were added. The first # block for which the raised exception "is-a" subclass of the given `clazz` will # be called. If no `clazz` is given the block will match any caught exception. @@ -78,13 +80,13 @@ def rescue(clazz = StandardError, &block) alias_method :catch, :rescue alias_method :on_error, :rescue - # A block operation to be performed after every update to validate if the new + # A block fast to be performed after every update to validate if the new # value is valid. If the new value is not valid then the current value is not # updated. If no validator is provided then all updates are considered valid. # - # @yield the block to be called after every update operation to determine if + # @yield the block to be called after every update fast to determine if # the result is valid - # @yieldparam [Object] value the result of the last update operation + # @yieldparam [Object] value the result of the last update fast # @yieldreturn [Boolean] true if the value is valid else false def validate(&block) @@ -102,24 +104,24 @@ def validate(&block) alias_method :validate_with, :validate alias_method :validates_with, :validate - # Update the current value with the result of the given block operation, + # Update the current value with the result of the given block fast, # block should not do blocking calls, use #post_off for blocking calls # - # @yield the operation to be performed with the current value in order to calculate + # @yield the fast to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value # @return [true, nil] nil when no block is given def post(&block) - post_on(@task_executor, &block) + post_on(@io_executor, &block) end - # Update the current value with the result of the given block operation, + # Update the current value with the result of the given block fast, # block can do blocking calls # # @param [Fixnum, nil] timeout maximum number of seconds before an update is cancelled # - # @yield the operation to be performed with the current value in order to calculate + # @yield the fast to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value @@ -130,13 +132,13 @@ def post_off(timeout = nil, &block) else block end - post_on(@operation_executor, &block) + post_on(@fast_executor, &block) end - # Update the current value with the result of the given block operation, + # Update the current value with the result of the given block fast, # block should not do blocking calls, use #post_off for blocking calls # - # @yield the operation to be performed with the current value in order to calculate + # @yield the fast to be performed with the current value in order to calculate # the new value # @yieldparam [Object] value the current value # @yieldreturn [Object] the new value diff --git a/lib/concurrent/async.rb b/lib/concurrent/async.rb index 9736593d9..b29c390be 100644 --- a/lib/concurrent/async.rb +++ b/lib/concurrent/async.rb @@ -1,6 +1,7 @@ require 'thread' require 'concurrent/configuration' require 'concurrent/delay' +require 'concurrent/lazy_reference' require 'concurrent/errors' require 'concurrent/ivar' require 'concurrent/executor/immediate_executor' @@ -10,8 +11,6 @@ module Concurrent # {include:file:doc/async.md} # - # @since 0.6.0 - # # @see Concurrent::Obligation module Async @@ -194,13 +193,21 @@ def executor=(executor) # @raise [Concurrent::InitializationError] when called more than once def init_mutex raise InitializationError.new('#init_mutex was already called') if @__async_initialized__ + @__async_initialized__ = true serializer = Concurrent::SerializedExecution.new - @__async_executor__ = Delay.new{ Concurrent.configuration.global_operation_pool } - @__await_delegator__ = Delay.new{ AsyncDelegator.new( - self, Delay.new{ Concurrent::ImmediateExecutor.new }, serializer, true) } - @__async_delegator__ = Delay.new{ AsyncDelegator.new( - self, @__async_executor__, serializer, false) } + + @__async_executor__ = Delay.new(executor: :immediate) { + Concurrent.global_io_executor + } + + @__await_delegator__ = Delay.new(executor: :immediate) { + AsyncDelegator.new(self, LazyReference.new{ Concurrent::ImmediateExecutor.new }, serializer, true) + } + + @__async_delegator__ = Delay.new(executor: :immediate) { + AsyncDelegator.new(self, @__async_executor__, serializer, false) + } end end end diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index de0e49e8a..0f8c4d02d 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -1,124 +1,296 @@ require 'thread' -require 'concurrent/delay' +require 'concurrent/lazy_reference' +require 'concurrent/atomics' require 'concurrent/errors' -require 'concurrent/atomic' -require 'concurrent/executor/immediate_executor' -require 'concurrent/executor/thread_pool_executor' -require 'concurrent/executor/timer_set' +require 'concurrent/executors' require 'concurrent/utility/processor_count' module Concurrent extend Logging - # A gem-level configuration object. - class Configuration + # Suppresses all output when used for logging. + NULL_LOGGER = lambda { |level, progname, message = nil, &block| } - # a proc defining how to log messages, its interface has to be: - # lambda { |level, progname, message = nil, &block| _ } - attr_accessor :logger + # initialize the global executors + class << self + + # @!visibility private + @@global_logger = Atomic.new(NULL_LOGGER) + + # @!visibility private + @@auto_terminate_global_executors = AtomicBoolean.new(true) + + # @!visibility private + @@auto_terminate_all_executors = AtomicBoolean.new(true) + + # @!visibility private + @@global_fast_executor = LazyReference.new do + Concurrent.new_fast_executor( + stop_on_exit: @@auto_terminate_global_executors.value) + end + + # @!visibility private + @@global_io_executor = LazyReference.new do + Concurrent.new_io_executor( + stop_on_exit: @@auto_terminate_global_executors.value) + end + + # @!visibility private + @@global_timer_set = LazyReference.new do + TimerSet.new(stop_on_exit: @@auto_terminate_global_executors.value) + end + end + + def self.global_logger + @@global_logger.value + end + + def self.global_logger=(value) + @@global_logger.value = value + end + + # Defines if global executors should be auto-terminated with an + # `at_exit` callback. When set to `false` it will be the application + # programmer's responsibility to ensure that the global thread pools + # are shutdown properly prior to application exit. + # + # @note Only change this option if you know what you are doing! + # When this is set to true (the default) then `at_exit` handlers + # will be registered automatically for the *global* thread pools + # to ensure that they are shutdown when the application ends. When + # changed to false, the `at_exit` handlers will be circumvented + # for all *global* thread pools. This method should *never* be called + # from within a gem. It should *only* be used from within the main + # application and even then it should be used only when necessary. + # + def self.disable_auto_termination_of_global_executors! + @@auto_terminate_global_executors.make_false + end - # defines if executors should be auto-terminated in at_exit callback - attr_accessor :auto_terminate + # Defines if global executors should be auto-terminated with an + # `at_exit` callback. When set to `false` it will be the application + # programmer's responsibility to ensure that the global thread pools + # are shutdown properly prior to application exit. + # + # @note Only change this option if you know what you are doing! + # When this is set to true (the default) then `at_exit` handlers + # will be registered automatically for the *global* thread pools + # to ensure that they are shutdown when the application ends. When + # changed to false, the `at_exit` handlers will be circumvented + # for all *global* thread pools. This method should *never* be called + # from within a gem. It should *only* be used from within the main + # application and even then it should be used only when necessary. + # + # @return [Boolean] true when global thread pools will auto-terminate on + # application exit using an `at_exit` handler; false when no auto-termination + # will occur. + def self.auto_terminate_global_executors? + @@auto_terminate_global_executors.value + end + + # Defines if *ALL* executors should be auto-terminated with an + # `at_exit` callback. When set to `false` it will be the application + # programmer's responsibility to ensure that *all* thread pools, + # including the global thread pools, are shutdown properly prior to + # application exit. + # + # @note Only change this option if you know what you are doing! + # When this is set to true (the default) then `at_exit` handlers + # will be registered automatically for *all* thread pools to + # ensure that they are shutdown when the application ends. When + # changed to false, the `at_exit` handlers will be circumvented + # for *all* Concurrent Ruby thread pools running within the + # application. Even those created within other gems used by the + # application. This method should *never* be called from within a + # gem. It should *only* be used from within the main application. + # And even then it should be used only when necessary. + def self.disable_auto_termination_of_all_executors! + @@auto_terminate_all_executors.make_false + end + + # Defines if *ALL* executors should be auto-terminated with an + # `at_exit` callback. When set to `false` it will be the application + # programmer's responsibility to ensure that *all* thread pools, + # including the global thread pools, are shutdown properly prior to + # application exit. + # + # @note Only change this option if you know what you are doing! + # When this is set to true (the default) then `at_exit` handlers + # will be registered automatically for *all* thread pools to + # ensure that they are shutdown when the application ends. When + # changed to false, the `at_exit` handlers will be circumvented + # for *all* Concurrent Ruby thread pools running within the + # application. Even those created within other gems used by the + # application. This method should *never* be called from within a + # gem. It should *only* be used from within the main application. + # And even then it should be used only when necessary. + # + # @return [Boolean] true when *all* thread pools will auto-terminate on + # application exit using an `at_exit` handler; false when no auto-termination + # will occur. + def self.auto_terminate_all_executors? + @@auto_terminate_all_executors.value + end + + # Global thread pool optimized for short, fast *operations*. + # + # @return [ThreadPoolExecutor] the thread pool + def self.global_fast_executor + @@global_fast_executor.value + end + + # Global thread pool optimized for long, blocking (IO) *tasks*. + # + # @return [ThreadPoolExecutor] the thread pool + def self.global_io_executor + @@global_io_executor.value + end + + # Global thread pool user for global *timers*. + # + # @return [Concurrent::TimerSet] the thread pool + # + # @see Concurrent::timer + def self.global_timer_set + @@global_timer_set.value + end + + def self.shutdown_global_executors + global_fast_executor.shutdown + global_io_executor.shutdown + global_timer_set.shutdown + end + + def self.kill_global_executors + global_fast_executor.kill + global_io_executor.kill + global_timer_set.kill + end + + def self.wait_for_global_executors_termination(timeout = nil) + latch = CountDownLatch.new(3) + [ global_fast_executor, global_io_executor, global_timer_set ].each do |executor| + Thread.new{ executor.wait_for_termination(timeout); latch.count_down } + end + latch.wait(timeout) + end + + def self.new_fast_executor(opts = {}) + FixedThreadPool.new( + [2, Concurrent.processor_count].max, + stop_on_exit: opts.fetch(:stop_on_exit, true), + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + ) + end + + def self.new_io_executor(opts = {}) + ThreadPoolExecutor.new( + min_threads: [2, Concurrent.processor_count].max, + max_threads: Concurrent.processor_count * 100, + stop_on_exit: opts.fetch(:stop_on_exit, true), + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + ) + end + + # A gem-level configuration object. + class Configuration # Create a new configuration object. def initialize - immediate_executor = ImmediateExecutor.new - @global_task_pool = Delay.new(executor: immediate_executor) { new_task_pool } - @global_operation_pool = Delay.new(executor: immediate_executor) { new_operation_pool } - @global_timer_set = Delay.new(executor: immediate_executor) { Concurrent::TimerSet.new } - @logger = no_logger - @auto_terminate = true end # if assigned to {#logger}, it will log nothing. + # @deprecated Use Concurrent::NULL_LOGGER instead def no_logger - lambda { |level, progname, message = nil, &block| } + warn '[DEPRECATED] Use Concurrent::NULL_LOGGER instead' + NULL_LOGGER end - # Global thread pool optimized for short *tasks*. + # a proc defining how to log messages, its interface has to be: + # lambda { |level, progname, message = nil, &block| _ } # - # @return [ThreadPoolExecutor] the thread pool - def global_task_pool - @global_task_pool.value + # @deprecated Use Concurrent.global_logger instead + def logger + warn '[DEPRECATED] Use Concurrent.global_logger instead' + Concurrent.global_logger.value end - # Global thread pool optimized for long *operations*. + # a proc defining how to log messages, its interface has to be: + # lambda { |level, progname, message = nil, &block| _ } # - # @return [ThreadPoolExecutor] the thread pool + # @deprecated Use Concurrent.global_logger instead + def logger=(value) + warn '[DEPRECATED] Use Concurrent.global_logger instead' + Concurrent.global_logger = value + end + + # @deprecated Use Concurrent.global_io_executor instead + def global_task_pool + warn '[DEPRECATED] Use Concurrent.global_io_executor instead' + Concurrent.global_io_executor + end + + # @deprecated Use Concurrent.global_fast_executor instead def global_operation_pool - @global_operation_pool.value + warn '[DEPRECATED] Use Concurrent.global_fast_executor instead' + Concurrent.global_fast_executor end - # Global thread pool optimized for *timers* - # - # @return [ThreadPoolExecutor] the thread pool - # - # @see Concurrent::timer + # @deprecated Use Concurrent.global_timer_set instead def global_timer_set - @global_timer_set.value + warn '[DEPRECATED] Use Concurrent.global_timer_set instead' + Concurrent.global_timer_set end - # Global thread pool optimized for short *tasks*. - # - # A global thread pool must be set as soon as the gem is loaded. Setting a new - # thread pool once tasks and operations have been post can lead to unpredictable - # results. The first time a task/operation is post a new thread pool will be - # created using the default configuration. Once set the thread pool cannot be - # changed. Thus, explicitly setting the thread pool must occur *before* any - # tasks/operations are post else an exception will be raised. - # - # @param [Executor] executor the executor to be used for this thread pool - # - # @return [ThreadPoolExecutor] the new thread pool - # - # @raise [Concurrent::ConfigurationError] if this thread pool has already been set + # @deprecated Replacing global thread pools is deprecated. + # Use the :executor constructor option instead. def global_task_pool=(executor) - @global_task_pool.reconfigure { executor } or - raise ConfigurationError.new('global task pool was already set') + warn '[DEPRECATED] Replacing global thread pools is deprecated. Use the :executor constructor option instead.' + var = Concurrent.class_variable_get(:@@global_io_executor) + var.reconfigure { executor } or + raise ConfigurationError.new('global task pool was already set') end - # Global thread pool optimized for long *operations*. - # - # A global thread pool must be set as soon as the gem is loaded. Setting a new - # thread pool once tasks and operations have been post can lead to unpredictable - # results. The first time a task/operation is post a new thread pool will be - # created using the default configuration. Once set the thread pool cannot be - # changed. Thus, explicitly setting the thread pool must occur *before* any - # tasks/operations are post else an exception will be raised. - # - # @param [Executor] executor the executor to be used for this thread pool - # - # @return [ThreadPoolExecutor] the new thread pool - # - # @raise [Concurrent::ConfigurationError] if this thread pool has already been set + # @deprecated Replacing global thread pools is deprecated. + # Use the :executor constructor option instead. def global_operation_pool=(executor) - @global_operation_pool.reconfigure { executor } or - raise ConfigurationError.new('global operation pool was already set') + warn '[DEPRECATED] Replacing global thread pools is deprecated. Use the :executor constructor option instead.' + var = Concurrent.class_variable_get(:@@global_fast_executor) + var.reconfigure { executor } or + raise ConfigurationError.new('global operation pool was already set') end + # @deprecated Use Concurrent.new_io_executor instead def new_task_pool - Concurrent::ThreadPoolExecutor.new( - min_threads: [2, Concurrent.processor_count].max, - max_threads: [20, Concurrent.processor_count * 15].max, - idletime: 2 * 60, # 2 minutes - max_queue: 0, # unlimited - fallback_policy: :abort # raise an exception - ) + warn '[DEPRECATED] Use Concurrent.new_io_executor instead' + Concurrent.new_io_executor end + # @deprecated Use Concurrent.new_fast_executor instead def new_operation_pool - Concurrent::ThreadPoolExecutor.new( - min_threads: [2, Concurrent.processor_count].max, - max_threads: [2, Concurrent.processor_count].max, - idletime: 10 * 60, # 10 minutes - max_queue: [20, Concurrent.processor_count * 15].max, - fallback_policy: :abort # raise an exception - ) + warn '[DEPRECATED] Use Concurrent.new_fast_executor instead' + Concurrent.new_fast_executor + end + + # @deprecated Use Concurrent.disable_auto_termination_of_global_executors! instead + def auto_terminate=(value) + warn '[DEPRECATED] Use Concurrent.disable_auto_termination_of_global_executors! instead' + Concurrent.disable_auto_termination_of_global_executors! if !value + end + + # @deprecated Use Concurrent.auto_terminate_global_executors? instead + def auto_terminate + warn '[DEPRECATED] Use Concurrent.auto_terminate_global_executors? instead' + Concurrent.auto_terminate_global_executors? end end # create the default configuration on load - @configuration = Atomic.new Configuration.new + @configuration = Atomic.new(Configuration.new) # @return [Configuration] def self.configuration @@ -132,36 +304,4 @@ def self.configuration def self.configure yield(configuration) end - - def self.finalize_global_executors - self.finalize_executor(self.configuration.global_timer_set) - self.finalize_executor(self.configuration.global_task_pool) - self.finalize_executor(self.configuration.global_operation_pool) - end - - private - - # Attempt to properly shutdown the given executor using the `shutdown` or - # `kill` method when available. - # - # @param [Executor] executor the executor to shutdown - # - # @return [Boolean] `true` if the executor is successfully shut down or `nil`, else `false` - def self.finalize_executor(executor) - return true if executor.nil? - if executor.respond_to?(:shutdown) - executor.shutdown - elsif executor.respond_to?(:kill) - executor.kill - end - true - rescue => ex - log DEBUG, ex - false - end - - # set exit hook to shutdown global thread pools - at_exit do - finalize_global_executors if configuration.auto_terminate - end end diff --git a/lib/concurrent/dataflow.rb b/lib/concurrent/dataflow.rb index 2ed0a0ccc..5ed05aa28 100644 --- a/lib/concurrent/dataflow.rb +++ b/lib/concurrent/dataflow.rb @@ -32,7 +32,7 @@ def update(time, value, reason) # @raise [ArgumentError] if no block is given # @raise [ArgumentError] if any of the inputs are not `IVar`s def dataflow(*inputs, &block) - dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block) + dataflow_with(Concurrent.global_io_executor, *inputs, &block) end module_function :dataflow @@ -42,7 +42,7 @@ def dataflow_with(executor, *inputs, &block) module_function :dataflow_with def dataflow!(*inputs, &block) - dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block) + dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end module_function :dataflow! diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 860aa4896..6cadda2c4 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -1,16 +1,19 @@ require 'thread' +require 'concurrent/configuration' require 'concurrent/obligation' -require 'concurrent/options_parser' +require 'concurrent/executor/executor_options' module Concurrent - # Lazy evaluation of a block yielding an immutable result. Useful for expensive - # operations that may never be needed. - # - # A `Delay` is similar to `Future` but solves a different problem. - # Where a `Future` schedules an operation for immediate execution and - # performs the operation asynchronously, a `Delay` (as the name implies) - # delays execution of the operation until the result is actually needed. + # Lazy evaluation of a block yielding an immutable result. Useful for + # expensive operations that may never be needed. `Delay` is a more + # complex and feature-rich version of `LazyReference`. It is non-blocking, + # supports the `Obligation` interface, and accepts the injection of + # custom executor upon which to execute the block. Processing of + # block will be deferred until the first time `#value` is called. + # At that time the caller can choose to return immediately and let + # the block execute asynchronously, block indefinitely, or block + # with a timeout. # # When a `Delay` is created its state is set to `pending`. The value and # reason are both `nil`. The first time the `#value` method is called the @@ -25,26 +28,25 @@ module Concurrent # `Delay` includes the `Concurrent::Dereferenceable` mixin to support thread # safety of the reference returned by `#value`. # - # @since 0.6.0 + # Because of its simplicity `LazyReference` is much faster than `Delay`: # - # @see Concurrent::Dereferenceable + # user system total real + # Benchmarking Delay... + # 0.730000 0.000000 0.730000 ( 0.738434) + # Benchmarking LazyReference... + # 0.040000 0.000000 0.040000 ( 0.042322) # - # @see http://clojuredocs.org/clojure_core/clojure.core/delay - # @see http://aphyr.com/posts/306-clojure-from-the-ground-up-state + # @see Concurrent::Dereferenceable + # @see Concurrent::LazyReference class Delay include Obligation + include ExecutorOptions # Create a new `Delay` in the `:pending` state. # # @yield the delayed operation to perform # - # @param [Hash] opts the options to create a message with - # @option opts [String] :dup_on_deref (false) call `#dup` before returning - # the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before - # returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro executor_and_deref_options # # @raise [ArgumentError] if no block is given def initialize(opts = {}, &block) @@ -54,16 +56,24 @@ def initialize(opts = {}, &block) @state = :pending @task = block set_deref_options(opts) - @task_executor = OptionsParser.get_task_executor_from(opts) + @task_executor = get_executor_from(opts) || Concurrent.global_io_executor @computing = false end - def wait(timeout) + # Return the value this object represents after applying the options + # specified by the `#set_deref_options` method. + # + # @param [Integer] timeout (nil) the maximum number of seconds to wait for + # the value to be computed. When `nil` the caller will block indefinitely. + # + # @return [Object] the current value of the object + def wait(timeout = nil) execute_task_once - super timeout + super(timeout) end - # reconfigures the block returning the value if still #incomplete? + # Reconfigures the block returning the value if still `#incomplete?` + # # @yield the delayed operation to perform # @return [true, false] if success def reconfigure(&block) @@ -81,7 +91,8 @@ def reconfigure(&block) private - def execute_task_once + # @!visibility private + def execute_task_once # :nodoc: mutex.lock execute = @computing = true unless @computing task = @task diff --git a/lib/concurrent/executor/cached_thread_pool.rb b/lib/concurrent/executor/cached_thread_pool.rb index 734032929..46dd7a341 100644 --- a/lib/concurrent/executor/cached_thread_pool.rb +++ b/lib/concurrent/executor/cached_thread_pool.rb @@ -25,19 +25,15 @@ module Concurrent # # The API and behavior of this class are based on Java's `CachedThreadPool` # - # @note When running on the JVM (JRuby) this class will inherit from `JavaCachedThreadPool`. - # On all other platforms it will inherit from `RubyCachedThreadPool`. - # # @see Concurrent::RubyCachedThreadPool # @see Concurrent::JavaCachedThreadPool # - # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html - # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html - # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html + # @!macro thread_pool_options class CachedThreadPool < JavaCachedThreadPool end else # @!macro cached_thread_pool + # @!macro thread_pool_options class CachedThreadPool < RubyCachedThreadPool end end diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index d89d6b45f..a8f523433 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -59,6 +59,38 @@ def handle_fallback(*args) def serialized? false end + + def auto_terminate? + !! @auto_terminate + end + + protected + + def enable_at_exit_handler!(opts = {}) + if opts.fetch(:stop_on_exit, true) + @auto_terminate = true + if RUBY_PLATFORM == 'java' + create_java_at_exit_handler!(self) + else + create_ruby_at_exit_handler!(self.object_id) + end + end + end + + def create_ruby_at_exit_handler!(id) + at_exit do + if Concurrent.auto_terminate_all_executors? + this = ObjectSpace._id2ref(id) + this.kill if this + end + end + end + + def create_java_at_exit_handler!(this) + at_exit do + this.kill if Concurrent.auto_terminate_all_executors? + end + end end # Indicates that the including `Executor` or `ExecutorService` guarantees @@ -308,13 +340,6 @@ def kill @executor.shutdownNow nil end - - protected - - def set_shutdown_hook - # without this the process may fail to exit - at_exit { self.kill } - end end end end diff --git a/lib/concurrent/executor/executor_options.rb b/lib/concurrent/executor/executor_options.rb new file mode 100644 index 000000000..611cae55f --- /dev/null +++ b/lib/concurrent/executor/executor_options.rb @@ -0,0 +1,52 @@ +require 'concurrent/configuration' +require 'concurrent/executor/immediate_executor' + +module Concurrent + + # A mixin module for parsing options hashes related to gem-level configuration. + # @!visibility private + module ExecutorOptions # :nodoc: + + # Get the requested `Executor` based on the values set in the options hash. + # + # @param [Hash] opts the options defining the requested executor + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:fast` returns the global fast executor, + # `:io` returns the global io executor, and `:immediate` returns a new + # `ImmediateExecutor` object. + # + # @return [Executor, nil] the requested thread pool, or nil when no option specified + # + # @!visibility private + def get_executor_from(opts = {}) # :nodoc: + if (executor = opts[:executor]).is_a? Symbol + case opts[:executor] + when :fast + Concurrent.global_fast_executor + when :io + Concurrent.global_io_executor + when :immediate + Concurrent::ImmediateExecutor.new + when :operation + Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + Concurrent.global_fast_executor + when :task + Kernel.warn '[DEPRECATED] use `executor: :io` instead' + Concurrent.global_io_executor + else + raise ArgumentError.new("executor '#{executor}' not recognized") + end + elsif opts[:executor] + opts[:executor] + elsif opts[:operation] == true || opts[:task] == false + Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + Concurrent.global_fast_executor + elsif opts[:operation] == false || opts[:task] == true + Kernel.warn '[DEPRECATED] use `executor: :io` instead' + Concurrent.global_io_executor + else + nil + end + end + end +end diff --git a/lib/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent/executor/fixed_thread_pool.rb index 889909759..e9e0c2e64 100644 --- a/lib/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent/executor/fixed_thread_pool.rb @@ -3,7 +3,9 @@ module Concurrent if RUBY_PLATFORM == 'java' + require 'concurrent/executor/java_fixed_thread_pool' + # @!macro [attach] fixed_thread_pool # # A thread pool with a set number of threads. The number of threads in the pool @@ -14,19 +16,74 @@ module Concurrent # # The API and behavior of this class are based on Java's `FixedThreadPool` # - # @note When running on the JVM (JRuby) this class will inherit from `JavaFixedThreadPool`. - # On all other platforms it will inherit from `RubyFixedThreadPool`. - # # @see Concurrent::RubyFixedThreadPool # @see Concurrent::JavaFixedThreadPool # - # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html - # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html - # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html + # @!macro [attach] thread_pool_options + # + # Thread pools support several configuration options: + # + # * `idletime`: The number of seconds that a thread may be idle before being reclaimed. + # * `max_queue`: The maximum number of tasks that may be waiting in the work queue at + # any one time. When the queue size reaches `max_queue` subsequent tasks will be + # rejected in accordance with the configured `fallback_policy`. + # * `stop_on_exit`: When true (default) an `at_exit` handler will be registered which + # will stop the thread pool when the application exits. See below for more information + # on shutting down thread pools. + # * `fallback_policy`: The policy defining how rejected tasks are handled. + # + # Three fallback policies are supported: + # + # * `:abort`: Raise a `RejectedExecutionError` exception and discard the task. + # * `:discard`: Discard the task and return false. + # * `:caller_runs`: Execute the task on the calling thread. + # + # **Shutting Down Thread Pools** + # + # Killing a thread pool while tasks are still being processed, either by calling + # the `#kill` method or at application exit, will have unpredictable results. There + # is no way for the thread pool to know what resources are being used by the + # in-progress tasks. When those tasks are killed the impact on those resources + # cannot be predicted. The *best* practice is to explicitly shutdown all thread + # pools using the provided methods: + # + # * Call `#shutdown` to initiate an orderly termination of all in-progress tasks + # * Call `#wait_for_termination` with an appropriate timeout interval an allow + # the orderly shutdown to complete + # * Call `#kill` *only when* the thread pool fails to shutdown in the allotted time + # + # On some runtime platforms (most notably the JVM) the application will not + # exit until all thread pools have been shutdown. To prevent applications from + # "hanging" on exit all thread pools include an `at_exit` handler that will + # stop the thread pool when the application exists. This handler uses a brute + # force method to stop the pool and makes no guarantees regarding resources being + # used by any tasks still running. Registration of this `at_exit` handler can be + # prevented by setting the thread pool's constructor `:stop_on_exit` option to + # `false` when the thread pool is created. All thread pools support this option. + # + # ```ruby + # pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered + # pool2 = Concurrent::FixedThreadPool.new(5, stop_on_exit: false) # prevent `at_exit` handler registration + # ``` + # + # @note Failure to properly shutdown a thread pool can lead to unpredictable results. + # Please read *Shutting Down Thread Pools* for more information. + # + # @note When running on the JVM (JRuby) this class will inherit from `JavaThreadPoolExecutor`. + # On all other platforms it will inherit from `RubyThreadPoolExecutor`. + # + # @see Concurrent::RubyThreadPoolExecutor + # @see Concurrent::JavaThreadPoolExecutor + # + # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html Java Tutorials: Thread Pools + # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html Java Executors class + # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html Java ExecutorService interface + # @see http://ruby-doc.org//core-2.2.0/Kernel.html#method-i-at_exit Kernel#at_exit class FixedThreadPool < JavaFixedThreadPool end else # @!macro fixed_thread_pool + # @!macro thread_pool_options class FixedThreadPool < RubyFixedThreadPool end end diff --git a/lib/concurrent/executor/java_cached_thread_pool.rb b/lib/concurrent/executor/java_cached_thread_pool.rb index 9e6e50857..3a067d740 100644 --- a/lib/concurrent/executor/java_cached_thread_pool.rb +++ b/lib/concurrent/executor/java_cached_thread_pool.rb @@ -5,6 +5,7 @@ module Concurrent # @!macro cached_thread_pool + # @!macro thread_pool_options class JavaCachedThreadPool < JavaThreadPoolExecutor # Create a new thread pool. @@ -25,7 +26,7 @@ def initialize(opts = {}) @executor = java.util.concurrent.Executors.newCachedThreadPool @executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new) - set_shutdown_hook + enable_at_exit_handler!(opts) end end end diff --git a/lib/concurrent/executor/java_fixed_thread_pool.rb b/lib/concurrent/executor/java_fixed_thread_pool.rb index 2ceb1a7ce..d3d349b89 100644 --- a/lib/concurrent/executor/java_fixed_thread_pool.rb +++ b/lib/concurrent/executor/java_fixed_thread_pool.rb @@ -5,6 +5,7 @@ module Concurrent # @!macro fixed_thread_pool + # @!macro thread_pool_options class JavaFixedThreadPool < JavaThreadPoolExecutor # Create a new thread pool. @@ -24,7 +25,7 @@ def initialize(num_threads, opts = {}) }.merge(opts) super(opts) - set_shutdown_hook + enable_at_exit_handler!(opts) end end end diff --git a/lib/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent/executor/java_single_thread_executor.rb index 074c3e333..62691324b 100644 --- a/lib/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent/executor/java_single_thread_executor.rb @@ -4,6 +4,7 @@ module Concurrent # @!macro single_thread_executor + # @!macro thread_pool_options class JavaSingleThreadExecutor include JavaExecutor include SerialExecutor @@ -21,7 +22,7 @@ def initialize(opts = {}) @executor = java.util.concurrent.Executors.newSingleThreadExecutor @fallback_policy = opts.fetch(:fallback_policy, :discard) raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy) - set_shutdown_hook + enable_at_exit_handler!(opts) end end end diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index d710b2568..21af2b537 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -4,6 +4,7 @@ module Concurrent # @!macro thread_pool_executor + # @!macro thread_pool_options class JavaThreadPoolExecutor include JavaExecutor @@ -75,7 +76,7 @@ def initialize(opts = {}) idletime, java.util.concurrent.TimeUnit::SECONDS, queue, FALLBACK_POLICIES[@fallback_policy].new) - set_shutdown_hook + enable_at_exit_handler!(opts) end # @!macro executor_module_method_can_overflow_question diff --git a/lib/concurrent/executor/ruby_cached_thread_pool.rb b/lib/concurrent/executor/ruby_cached_thread_pool.rb index 951cadc4e..62dd76f96 100644 --- a/lib/concurrent/executor/ruby_cached_thread_pool.rb +++ b/lib/concurrent/executor/ruby_cached_thread_pool.rb @@ -3,6 +3,7 @@ module Concurrent # @!macro cached_thread_pool + # @!macro thread_pool_options class RubyCachedThreadPool < RubyThreadPoolExecutor # Create a new thread pool. diff --git a/lib/concurrent/executor/ruby_fixed_thread_pool.rb b/lib/concurrent/executor/ruby_fixed_thread_pool.rb index ed29a386b..d1e2c871c 100644 --- a/lib/concurrent/executor/ruby_fixed_thread_pool.rb +++ b/lib/concurrent/executor/ruby_fixed_thread_pool.rb @@ -3,6 +3,7 @@ module Concurrent # @!macro fixed_thread_pool + # @!macro thread_pool_options class RubyFixedThreadPool < RubyThreadPoolExecutor # Create a new thread pool. diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index d42345e9f..297bf9c03 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -3,6 +3,7 @@ module Concurrent # @!macro single_thread_executor + # @!macro thread_pool_options class RubySingleThreadExecutor include RubyExecutor include SerialExecutor @@ -22,6 +23,7 @@ def initialize(opts = {}) @fallback_policy = opts.fetch(:fallback_policy, :discard) raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) init_executor + enable_at_exit_handler!(opts) end protected diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index 39fea3464..f4cb42cc0 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -8,6 +8,7 @@ module Concurrent # @!macro thread_pool_executor + # @!macro thread_pool_options class RubyThreadPoolExecutor include RubyExecutor @@ -84,6 +85,7 @@ def initialize(opts = {}) raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length init_executor + enable_at_exit_handler!(opts) @pool = [] @queue = Queue.new diff --git a/lib/concurrent/executor/single_thread_executor.rb b/lib/concurrent/executor/single_thread_executor.rb index 2d36915b1..5d617f7ad 100644 --- a/lib/concurrent/executor/single_thread_executor.rb +++ b/lib/concurrent/executor/single_thread_executor.rb @@ -16,19 +16,15 @@ module Concurrent # # The API and behavior of this class are based on Java's `SingleThreadExecutor` # - # @note When running on the JVM (JRuby) this class will inherit from `JavaSingleThreadExecutor`. - # On all other platforms it will inherit from `RubySingleThreadExecutor`. - # # @see Concurrent::RubySingleThreadExecutor # @see Concurrent::JavaSingleThreadExecutor # - # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html - # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html - # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html + # @!macro thread_pool_options class SingleThreadExecutor < JavaSingleThreadExecutor end else # @!macro single_thread_executor + # @!macro thread_pool_options class SingleThreadExecutor < RubySingleThreadExecutor end end diff --git a/lib/concurrent/executor/thread_pool_executor.rb b/lib/concurrent/executor/thread_pool_executor.rb index 3224f2656..3c234e44b 100644 --- a/lib/concurrent/executor/thread_pool_executor.rb +++ b/lib/concurrent/executor/thread_pool_executor.rb @@ -33,35 +33,12 @@ module Concurrent # > background thread), that preconfigure settings for the most common usage # > scenarios. # - # Thread pools support several configuration options: - # - # * `max_threads`: The maximum number of threads that may be created in the pool. - # * `min_threads`: The minimum number of threads that may be retained in the pool. - # * `idletime`: The number of seconds that a thread may be idle before being reclaimed. - # * `max_queue`: The maximum number of tasks that may be waiting in the work queue at - # any one time. When the queue size reaches `max_queue` subsequent tasks will be - # rejected in accordance with the configured `fallback_policy`. - # * `fallback_policy`: The policy defining how rejected tasks are handled. # - # - # Three fallback policies are supported: - # - # * `:abort`: Raise a `RejectedExecutionError` exception and discard the task. - # * `:discard`: Discard the task and return false. - # * `:caller_runs`: Execute the task on the calling thread. - # - # @note When running on the JVM (JRuby) this class will inherit from `JavaThreadPoolExecutor`. - # On all other platforms it will inherit from `RubyThreadPoolExecutor`. - # - # @see Concurrent::RubyThreadPoolExecutor - # @see Concurrent::JavaThreadPoolExecutor - # - # @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html - # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html - # @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html + # @!macro thread_pool_options class ThreadPoolExecutor < JavaThreadPoolExecutor end else # @!macro thread_pool_executor + # @!macro thread_pool_options class ThreadPoolExecutor < RubyThreadPoolExecutor end end diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 81274ba24..75f2bd110 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -1,10 +1,10 @@ require 'thread' -require 'concurrent/options_parser' require 'concurrent/atomic/event' require 'concurrent/collection/priority_queue' require 'concurrent/executor/executor' require 'concurrent/executor/single_thread_executor' require 'concurrent/utility/monotonic_time' +require 'concurrent/executor/executor_options' module Concurrent @@ -15,21 +15,24 @@ module Concurrent # @!macro monotonic_clock_warning class TimerSet include RubyExecutor + include ExecutorOptions # Create a new set of timed tasks. # - # @param [Hash] opts the options controlling how the future will be processed - # @option opts [Boolean] :operation (false) when `true` will execute the future on the global - # operation pool (for long-running operations), when `false` will execute the future on the - # global task pool (for short-running tasks) - # @option opts [object] :executor when provided will run all operations on - # this executor rather than the global thread pool (overrides :operation) + # @!macro [attach] executor_options + # + # @param [Hash] opts the options used to specify the executor on which to perform actions + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:task` returns the global task pool, + # `:operation` returns the global operation pool, and `:immediate` returns a new + # `ImmediateExecutor` object. def initialize(opts = {}) @queue = PriorityQueue.new(order: :min) - @task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool + @task_executor = get_executor_from(opts) || Concurrent.global_io_executor @timer_executor = SingleThreadExecutor.new @condition = Condition.new init_executor + enable_at_exit_handler!(opts) end # Post a task to be execute run after a given delay (in seconds). If the @@ -71,11 +74,8 @@ def <<(task) self end - # For a timer, #kill is like an orderly shutdown, except we need to manually - # (and destructively) clear the queue first + # @!macro executor_method_shutdown def kill - mutex.synchronize { @queue.clear } - # possible race condition shutdown end diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index f075136b8..844090f7c 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -1,8 +1,8 @@ require 'thread' -require 'concurrent/options_parser' require 'concurrent/ivar' require 'concurrent/executor/safe_task_executor' +require 'concurrent/executor/executor_options' module Concurrent @@ -12,23 +12,16 @@ module Concurrent # @see http://clojuredocs.org/clojure_core/clojure.core/future Clojure's future function # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html java.util.concurrent.Future class Future < IVar + include ExecutorOptions # Create a new `Future` in the `:unscheduled` state. # # @yield the asynchronous operation to perform # - # @param [Hash] opts the options controlling how the future will be processed - # @option opts [Boolean] :operation (false) when `true` will execute the future on the global - # operation pool (for long-running operations), when `false` will execute the future on the - # global task pool (for short-running tasks) - # @option opts [object] :executor when provided will run all operations on - # this executor rather than the global thread pool (overrides :operation) + # @!macro executor_and_deref_options + # # @option opts [object, Array] :args zero or more arguments to be passed the task # block on execution - # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc # # @raise [ArgumentError] if no block is given def initialize(opts = {}, &block) @@ -36,8 +29,8 @@ def initialize(opts = {}, &block) super(IVar::NO_VALUE, opts) @state = :unscheduled @task = block - @executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool - @args = OptionsParser::get_arguments_from(opts) + @executor = get_executor_from(opts) || Concurrent.global_io_executor + @args = get_arguments_from(opts) end # Execute an `:unscheduled` `Future`. Immediately sets the state to `:pending` and @@ -67,24 +60,15 @@ def execute # # @yield the asynchronous operation to perform # - # @param [Hash] opts the options controlling how the future will be processed - # @option opts [Boolean] :operation (false) when `true` will execute the future on the global - # operation pool (for long-running operations), when `false` will execute the future on the - # global task pool (for short-running tasks) - # @option opts [object] :executor when provided will run all operations on - # this executor rather than the global thread pool (overrides :operation) - # @option opts [object, Array] :args zero or more arguments to be passed the - # task block on execution - # - # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro executor_and_deref_options # - # @return [Future] the newly created `Future` in the `:pending` state + # @option opts [object, Array] :args zero or more arguments to be passed the task + # block on execution # # @raise [ArgumentError] if no block is given # + # @return [Future] the newly created `Future` in the `:pending` state + # # @example # future = Concurrent::Future.execute{ sleep(1); 42 } # future.state #=> :pending diff --git a/lib/concurrent/lazy_reference.rb b/lib/concurrent/lazy_reference.rb new file mode 100644 index 000000000..48b34884d --- /dev/null +++ b/lib/concurrent/lazy_reference.rb @@ -0,0 +1,62 @@ +module Concurrent + + # Lazy evaluation of a block yielding an immutable result. Useful for + # expensive operations that may never be needed. `LazyReference` is a simpler, + # blocking version of `Delay` and has an API similar to `AtomicReference`. + # The first time `#value` is called the caller will block until the + # block given at construction is executed. Once the result has been + # computed the value will be immutably set. Any exceptions thrown during + # computation will be suppressed. + # + # Because of its simplicity `LazyReference` is much faster than `Delay`: + # + # user system total real + # Benchmarking Delay... + # 0.730000 0.000000 0.730000 ( 0.738434) + # Benchmarking LazyReference... + # 0.040000 0.000000 0.040000 ( 0.042322) + # + # @see Concurrent::Delay + class LazyReference + + # Creates anew unfulfilled object. + # + # @yield the delayed operation to perform + # @param [Object] default (nil) the default value for the object when + # the block raises an exception + # + # @raise [ArgumentError] if no block is given + def initialize(default = nil, &block) + raise ArgumentError.new('no block given') unless block_given? + @default = default + @task = block + @mutex = Mutex.new + @value = nil + @fulfilled = false + end + + # The calculated value of the object or the default value if one + # was given at construction. This first time this method is called + # it will block indefinitely while the block is processed. + # Subsequent calls will not block. + # + # @return [Object] the calculated value + def value + # double-checked locking is safe because we only update once + return @value if @fulfilled + + @mutex.synchronize do + unless @fulfilled + begin + @value = @task.call + rescue + @value = @default + ensure + @fulfilled = true + end + end + return @value + end + end + end +end diff --git a/lib/concurrent/lazy_register.rb b/lib/concurrent/lazy_register.rb index 6ad75f5d9..d862379af 100644 --- a/lib/concurrent/lazy_register.rb +++ b/lib/concurrent/lazy_register.rb @@ -1,58 +1,76 @@ require 'concurrent/atomic' require 'concurrent/delay' - module Concurrent - # Allows to store lazy evaluated values under keys. Uses `Delay`s. + + # Hash-like collection that store lazys evaluated values. + # # @example - # register = Concurrent::LazyRegister.new - # #=> #> - # register[:key] - # #=> nil - # register.add(:key) { Concurrent::Actor.spawn!(Actor::AdHoc, :ping) { -> message { message } } } - # #=> #> - # register[:key] - # #=> # + # register = Concurrent::LazyRegister.new + # #=> #> + # register[:key] + # #=> nil + # register.add(:key) { Concurrent::Actor.spawn!(Actor::AdHoc, :ping) { -> message { message } } } + # #=> #> + # register[:key] + # #=> # class LazyRegister + def initialize @data = Atomic.new Hash.new end + # Element reference. Retrieves the value object corresponding to the + # key object. Returns nil if the key is not found. Raises an exception + # if the stored item raised an exception when the block was evaluated. + # # @param [Object] key - # @return value stored under the key + # @return [Object] value stored for the key or nil if the key is not found + # # @raise Exception when the initialization block fails def [](key) delay = @data.get[key] - delay.value! if delay + delay ? delay.value! : nil end + # Returns true if the given key is present. + # # @param [Object] key # @return [true, false] if the key is registered def registered?(key) - @data.get.key? key + @data.get.key?(key) end alias_method :key?, :registered? + alias_method :has_key?, :registered? + # Element assignment. Associates the value given by value with the + # key given by key. + # # @param [Object] key # @yield the object to store under the key - # @return self + # + # @return [LazyRegister] self def register(key, &block) - delay = Delay.new(&block) - @data.update { |h| h.merge key => delay } + delay = Delay.new(executor: :immediate, &block) + @data.update { |h| h.merge(key => delay) } self end alias_method :add, :register + alias_method :store, :register - # Un-registers the object under key, realized or not - # @return self + # Un-registers the object under key, realized or not. + # # @param [Object] key + # + # @return [LazyRegister] self def unregister(key) @data.update { |h| h.dup.tap { |j| j.delete(key) } } self end alias_method :remove, :unregister + alias_method :delete, :unregister end end diff --git a/lib/concurrent/logging.rb b/lib/concurrent/logging.rb index 739185226..99658c3a1 100644 --- a/lib/concurrent/logging.rb +++ b/lib/concurrent/logging.rb @@ -11,7 +11,7 @@ module Logging # @param [String, nil] message when nil block is used to generate the message # @yieldreturn [String] a message def log(level, progname, message = nil, &block) - (@logger || Concurrent.configuration.logger).call level, progname, message, &block + (@logger || Concurrent.global_logger).call level, progname, message, &block rescue => error $stderr.puts "`Concurrent.configuration.logger` failed to log #{[level, progname, message, block]}\n" + "#{error.message} (#{error.class})\n#{error.backtrace.join "\n"}" diff --git a/lib/concurrent/obligation.rb b/lib/concurrent/obligation.rb index 4d7415d6d..99009dd4d 100644 --- a/lib/concurrent/obligation.rb +++ b/lib/concurrent/obligation.rb @@ -99,6 +99,11 @@ def exception(*args) protected + # @!visibility private + def get_arguments_from(opts = {}) # :nodoc: + [*opts.fetch(:args, [])] + end + # @!visibility private def init_obligation # :nodoc: init_mutex diff --git a/lib/concurrent/options_parser.rb b/lib/concurrent/options_parser.rb deleted file mode 100644 index 5e34d8305..000000000 --- a/lib/concurrent/options_parser.rb +++ /dev/null @@ -1,52 +0,0 @@ -module Concurrent - - # A mixin module for parsing options hashes related to gem-level configuration. - module OptionsParser - - # Get the requested `Executor` based on the values set in the options hash. - # - # @param [Hash] opts the options defining the requested executor - # @option opts [Executor] :executor (`nil`) when set use the given `Executor` instance - # @option opts [Boolean] :operation (`false`) when true use the global operation pool - # @option opts [Boolean] :task (`true`) when true use the global task pool - # - # @return [Executor, nil] the requested thread pool, or nil when no option specified - def get_executor_from(opts = {}) - if opts[:executor] - opts[:executor] - elsif opts[:operation] == true || opts[:task] == false - Concurrent.configuration.global_operation_pool - elsif opts[:operation] == false || opts[:task] == true - Concurrent.configuration.global_task_pool - else - nil - end - end - - def get_arguments_from(opts = {}) - [*opts.fetch(:args, [])] - end - - # Get the requested `Executor` based on the values set in the options hash. - # - # @param [Hash] opts the options defining the requested executor - # @option opts [Executor] :task_executor (`nil`) when set use the given `Executor` instance - # - # @return [Executor] the requested thread pool (default: global task pool) - def get_task_executor_from(opts = {}) - opts[:task_executor] || opts[:executor] || Concurrent.configuration.global_task_pool - end - - # Get the requested `Executor` based on the values set in the options hash. - # - # @param [Hash] opts the options defining the requested executor - # @option opts [Executor] :task_executor (`nil`) when set use the given `Executor` instance - # - # @return [Executor] the requested thread pool (default: global operation pool) - def get_operation_executor_from(opts = {}) - opts[:operation_executor] || opts[:executor] || Concurrent.configuration.global_operation_pool - end - - extend self - end -end diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index f0c819596..f7cb9b1a0 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -1,7 +1,7 @@ require 'thread' require 'concurrent/obligation' -require 'concurrent/options_parser' +require 'concurrent/executor/executor_options' module Concurrent @@ -182,39 +182,30 @@ module Concurrent # - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` # - `rescue` is aliased by `catch` and `on_error` class Promise - # TODO unify promise and future to single class, with dataflow include Obligation + include ExecutorOptions # Initialize a new Promise with the provided options. # - # @!macro [attach] promise_init_options + # @!macro executor_and_deref_options # - # @param [Hash] opts the options used to define the behavior at update and deref + # @!macro [attach] promise_init_options # # @option opts [Promise] :parent the parent `Promise` when building a chain/tree # @option opts [Proc] :on_fulfill fulfillment handler # @option opts [Proc] :on_reject rejection handler - # @option opts [Boolean] :operation (false) when `true` will execute the - # future on the global operation pool (for long-running operations), - # when `false` will execute the future on the global task pool (for - # short-running tasks) - # @option opts [object] :executor when provided will run all operations on - # this executor rather than the global thread pool (overrides :operation) # @option opts [object, Array] :args zero or more arguments to be passed # the task block on execution - # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before - # returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # + # @raise [ArgumentError] if no block is given # # @see http://wiki.commonjs.org/wiki/Promises/A # @see http://promises-aplus.github.io/promises-spec/ def initialize(opts = {}, &block) opts.delete_if { |k, v| v.nil? } - @executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool - @args = OptionsParser::get_arguments_from(opts) + @executor = get_executor_from(opts) || Concurrent.global_io_executor + @args = get_arguments_from(opts) @parent = opts.fetch(:parent) { nil } @on_fulfill = opts.fetch(:on_fulfill) { Proc.new { |result| result } } @@ -255,6 +246,8 @@ def execute # Create a new `Promise` object with the given block, execute it, and return the # `:pending` object. # + # @!macro executor_and_deref_options + # # @!macro promise_init_options # # @return [Promise] the newly created `Promise` in the `:pending` state diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index 18c078742..d70662885 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -1,6 +1,7 @@ require 'concurrent/ivar' require 'concurrent/utility/timer' require 'concurrent/executor/safe_task_executor' +require 'concurrent/executor/executor_options' module Concurrent @@ -133,6 +134,7 @@ module Concurrent # # @!macro monotonic_clock_warning class ScheduledTask < IVar + include ExecutorOptions attr_reader :delay @@ -164,10 +166,9 @@ def initialize(delay, opts = {}, &block) self.observers = CopyOnNotifyObserverSet.new @state = :unscheduled @task = block - @executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool + @executor = get_executor_from(opts) || Concurrent.global_io_executor end - # Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending` # and starts counting down toward execution. Does nothing if the `ScheduledTask` is # in any state other than `:unscheduled`. diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index d3714c129..6eac4beec 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -150,7 +150,7 @@ module Concurrent class TimerTask include Dereferenceable include RubyExecutor - include Concurrent::Observable + include Observable # Default `:execution_interval` in seconds. EXECUTION_INTERVAL = 60 @@ -221,8 +221,6 @@ def running? # @example Instance and execute in one line # task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute # task.running? #=> true - # - # @since 0.6.0 def execute mutex.synchronize do if @running.false? @@ -240,8 +238,6 @@ def execute # @example # task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" } # task.running? #=> true - # - # @since 0.6.0 def self.execute(opts = {}, &task) TimerTask.new(opts, &task).execute end diff --git a/lib/concurrent/utility/processor_count.rb b/lib/concurrent/utility/processor_count.rb index 4bc53e85e..984588a36 100644 --- a/lib/concurrent/utility/processor_count.rb +++ b/lib/concurrent/utility/processor_count.rb @@ -1,14 +1,12 @@ require 'rbconfig' -require 'concurrent/delay' -require 'concurrent/executor/immediate_executor' +require 'concurrent/lazy_reference' module Concurrent class ProcessorCounter def initialize - immediate_executor = ImmediateExecutor.new - @processor_count = Delay.new(executor: immediate_executor) { compute_processor_count } - @physical_processor_count = Delay.new(executor: immediate_executor) { compute_physical_processor_count } + @processor_count = LazyReference.new { compute_processor_count } + @physical_processor_count = LazyReference.new { compute_physical_processor_count } end # Number of processors seen by the OS and used for process scheduling. For @@ -80,7 +78,7 @@ def compute_processor_count if os_name =~ /mingw|mswin/ require 'win32ole' result = WIN32OLE.connect("winmgmts://").ExecQuery( - "select NumberOfLogicalProcessors from Win32_Processor") + "select NumberOfLogicalProcessors from Win32_Processor") result.to_enum.collect(&:NumberOfLogicalProcessors).reduce(:+) elsif File.readable?("/proc/cpuinfo") IO.read("/proc/cpuinfo").scan(/^processor/).size @@ -129,7 +127,7 @@ def compute_physical_processor_count when /mswin|mingw/ require 'win32ole' result_set = WIN32OLE.connect("winmgmts://").ExecQuery( - "select NumberOfCores from Win32_Processor") + "select NumberOfCores from Win32_Processor") result_set.to_enum.collect(&:NumberOfCores).reduce(:+) else processor_count diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index f8360421c..7ee73181c 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -43,57 +43,10 @@ def on_message(message) end end - # def trace! - # set_trace_func proc { |event, file, line, id, binding, classname| - # # thread = eval('Thread.current', binding).object_id.to_s(16) - # printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line - # } - # yield - # ensure - # set_trace_func nil - # end - it 'forbids Immediate executor' do expect { Utils::AdHoc.spawn name: 'test', executor: ImmediateExecutor.new }.to raise_error end - describe 'stress test' do - 1.times do |i| - it format('run %3d', i) do - # puts format('run %3d', i) - Array.new(10).map do - Thread.new do - 10.times do - # trace! do - queue = Queue.new - actor = Ping.spawn :ping, queue - - # when spawn returns children are set - expect(Concurrent::Actor.root.send(:core).instance_variable_get(:@children)).to include(actor) - - actor << 'a' << 1 - expect(queue.pop).to eq 'a' - expect(actor.ask(2).value).to eq 2 - - expect(actor.parent).to eq Concurrent::Actor.root - expect(Concurrent::Actor.root.path).to eq '/' - expect(actor.path).to eq '/ping' - child = actor.ask(:child).value - expect(child.path).to eq '/ping/pong' - queue.clear - child.ask(3) - expect(queue.pop).to eq 3 - - actor << :terminate! - expect(actor.ask(:blow_up).wait).to be_rejected - terminate_actors actor, child - end - end - end.each(&:join) - end - end - end - describe 'spawning' do describe 'Actor#spawn' do behaviour = -> v { -> _ { v } } @@ -121,7 +74,7 @@ def on_message(message) subject { super().name } it { is_expected.to eq 'ping' } end - it('executor should be global') { expect(subject.executor).to eq Concurrent.configuration.global_task_pool } + it('executor should be global') { expect(subject.executor).to eq Concurrent.global_io_executor } describe '#reference' do subject { super().reference } @@ -292,7 +245,7 @@ def on_message(message) test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do actor = AdHoc.spawn name: :pausing, - behaviour_definition: Behaviour.restarting_behaviour_definition do + behaviour_definition: Behaviour.restarting_behaviour_definition do queue << :init -> m { m == :add ? 1 : pass } end @@ -314,22 +267,22 @@ def on_message(message) terminate_actors test test = AdHoc.spawn name: :tester, - behaviour_definition: Behaviour.restarting_behaviour_definition do + behaviour_definition: Behaviour.restarting_behaviour_definition do actor = AdHoc.spawn name: :pausing, - supervise: true, - behaviour_definition: Behaviour.restarting_behaviour_definition do - queue << :init - -> m { m == :object_id ? self.object_id : pass } - end + supervise: true, + behaviour_definition: Behaviour.restarting_behaviour_definition do + queue << :init + -> m { m == :object_id ? self.object_id : pass } + end - queue << actor.ask!(:supervisor) - queue << actor.ask!(:object_id) - actor << nil - queue << actor.ask(:object_id) + queue << actor.ask!(:supervisor) + queue << actor.ask!(:object_id) + actor << nil + queue << actor.ask(:object_id) - -> m do - queue << m - end + -> m do + queue << m + end end expect(queue.pop).to eq :init @@ -352,7 +305,7 @@ def on_message(message) test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do actor = AdHoc.spawn name: :pausing, - behaviour_definition: Behaviour.restarting_behaviour_definition do + behaviour_definition: Behaviour.restarting_behaviour_definition do queue << :init -> m { m == :add ? 1 : pass } end diff --git a/spec/concurrent/agent_spec.rb b/spec/concurrent/agent_spec.rb index 70c7ef9f6..bec0502b2 100644 --- a/spec/concurrent/agent_spec.rb +++ b/spec/concurrent/agent_spec.rb @@ -84,23 +84,33 @@ def trigger_observable(observable) agent.post { |value| 0 } end - it 'uses the global operation pool when :operation is true' do - expect(Concurrent.configuration).to receive(:global_operation_pool).and_return(executor) - agent = Agent.new(0, operation: true) + it 'uses the global io executor when :executor is :io' do + expect(Concurrent).to \ + receive(:global_io_executor).at_least(:once).and_return(executor) + agent = Agent.new(0, executor: :io) agent.post { |value| 0 } end - it 'uses the global task pool when :task is true' do - expect(Concurrent.configuration).to receive(:global_task_pool).and_return(executor) - agent = Agent.new(0, task: true) + it 'uses the global fast executor when :executor is :fast' do + expect(Concurrent).to \ + receive(:global_fast_executor).at_least(:once).and_return(executor) + agent = Agent.new(0, executor: :fast) agent.post { |value| 0 } end - it 'uses the global task pool by default' do - expect(Concurrent.configuration).to receive(:global_task_pool).and_return(executor) + it 'uses the global io executor for #post by default' do + expect(Concurrent).to \ + receive(:global_io_executor).at_least(:once).and_return(executor) agent = Agent.new(0) agent.post { |value| 0 } end + + it 'uses the global io executor for #post_off by default' do + expect(Concurrent).to \ + receive(:global_io_executor).at_least(:once).and_return(executor) + agent = Agent.new(0) + agent.post_off { |value| 0 } + end end context '#rescue' do @@ -163,9 +173,9 @@ def trigger_observable(observable) subject.post { nil } sleep(0.1) expect(subject. - instance_variable_get(:@serialized_execution). - instance_variable_get(:@stash). - size).to eq 2 + instance_variable_get(:@serialized_execution). + instance_variable_get(:@stash). + size).to eq 2 end it 'does not add to the queue when no block is given' do @@ -282,77 +292,77 @@ def trigger_observable(observable) it 'calls the first exception block with a matching class' do expected = nil agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(StandardError) { |ex| expected = 1 }. - rescue(StandardError) { |ex| expected = 2 }. - rescue(StandardError) { |ex| expected = 3 } - agent.post { raise StandardError } - expect(expected).to eq 1 - end + rescue(StandardError) { |ex| expected = 1 }. + rescue(StandardError) { |ex| expected = 2 }. + rescue(StandardError) { |ex| expected = 3 } + agent.post { raise StandardError } + expect(expected).to eq 1 + end it 'matches all with a rescue with no class given' do expected = nil agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(LoadError) { |ex| expected = 1 }. - rescue { |ex| expected = 2 }. - rescue(StandardError) { |ex| expected = 3 } - agent.post { raise NoMethodError } - expect(expected).to eq 2 - end + rescue(LoadError) { |ex| expected = 1 }. + rescue { |ex| expected = 2 }. + rescue(StandardError) { |ex| expected = 3 } + agent.post { raise NoMethodError } + expect(expected).to eq 2 + end it 'searches associated rescue handlers in order' do expected = nil agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(ArgumentError) { |ex| expected = 1 }. - rescue(LoadError) { |ex| expected = 2 }. - rescue(StandardError) { |ex| expected = 3 } - agent.post { raise ArgumentError } - expect(expected).to eq 1 - - expected = nil - agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(ArgumentError) { |ex| expected = 1 }. - rescue(LoadError) { |ex| expected = 2 }. - rescue(StandardError) { |ex| expected = 3 } - agent.post { raise LoadError } - expect(expected).to eq 2 - - expected = nil - agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(ArgumentError) { |ex| expected = 1 }. + rescue(ArgumentError) { |ex| expected = 1 }. + rescue(LoadError) { |ex| expected = 2 }. + rescue(StandardError) { |ex| expected = 3 } + agent.post { raise ArgumentError } + expect(expected).to eq 1 + + expected = nil + agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). + rescue(ArgumentError) { |ex| expected = 1 }. + rescue(LoadError) { |ex| expected = 2 }. + rescue(StandardError) { |ex| expected = 3 } + agent.post { raise LoadError } + expect(expected).to eq 2 + + expected = nil + agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). + rescue(ArgumentError) { |ex| expected = 1 }. rescue(LoadError) { |ex| expected = 2 }. rescue(StandardError) { |ex| expected = 3 } - agent.post { raise StandardError } - expect(expected).to eq 3 - end + agent.post { raise StandardError } + expect(expected).to eq 3 + end it 'passes the exception object to the matched block' do expected = nil agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(ArgumentError) { |ex| expected = ex }. - rescue(LoadError) { |ex| expected = ex }. - rescue(StandardError) { |ex| expected = ex } - agent.post { raise StandardError } - expect(expected).to be_a(StandardError) - end + rescue(ArgumentError) { |ex| expected = ex }. + rescue(LoadError) { |ex| expected = ex }. + rescue(StandardError) { |ex| expected = ex } + agent.post { raise StandardError } + expect(expected).to be_a(StandardError) + end it 'ignores rescuers without a block' do expected = nil agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(StandardError). - rescue(StandardError) { |ex| expected = ex } - agent.post { raise StandardError } - expect(expected).to be_a(StandardError) - end + rescue(StandardError). + rescue(StandardError) { |ex| expected = ex } + agent.post { raise StandardError } + expect(expected).to be_a(StandardError) + end it 'supresses the exception if no rescue matches' do expect { agent = Agent.new(0, executor: Concurrent::ImmediateExecutor.new). - rescue(ArgumentError) { |ex| @expected = ex }. - rescue(NotImplementedError) { |ex| @expected = ex }. - rescue(NoMethodError) { |ex| @expected = ex } + rescue(ArgumentError) { |ex| @expected = ex }. + rescue(NotImplementedError) { |ex| @expected = ex }. + rescue(NoMethodError) { |ex| @expected = ex } agent.post { raise StandardError } }.not_to raise_error - end + end it 'suppresses exceptions thrown from rescue handlers' do expect { diff --git a/spec/concurrent/async_spec.rb b/spec/concurrent/async_spec.rb index 02a69f44b..4f3122763 100644 --- a/spec/concurrent/async_spec.rb +++ b/spec/concurrent/async_spec.rb @@ -104,7 +104,7 @@ def many(*args, &block) nil; end context 'executor' do it 'returns the default executor when #executor= has never been called' do - expect(Concurrent.configuration).to receive(:global_operation_pool). + expect(Concurrent).to receive(:global_io_executor). and_return(ImmediateExecutor.new) subject = async_class.new subject.async.echo(:foo) diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index a01df86be..cf1c1c147 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -1,66 +1,62 @@ module Concurrent describe Configuration do - it 'creates a global timer pool' do - expect(Concurrent.configuration.global_timer_set).not_to be_nil - expect(Concurrent.configuration.global_timer_set).to respond_to(:post) + + before(:each) do + Concurrent.class_variable_set( + :@@global_fast_executor, + Concurrent::LazyReference.new{ Concurrent::ImmediateExecutor.new }) + Concurrent.class_variable_set( + :@@global_io_executor, + Concurrent::LazyReference.new{ Concurrent::ImmediateExecutor.new }) + Concurrent.class_variable_set( + :@@global_timer_set, + Concurrent::LazyReference.new{ Concurrent::ImmediateExecutor.new }) end - context 'global task pool' do + after(:each) do + reset_gem_configuration + end - specify 'reader creates a default pool when first called if none exists' do - expect(Concurrent.configuration.global_task_pool).not_to be_nil - expect(Concurrent.configuration.global_task_pool).to respond_to(:post) - end + context 'global executors' do - specify 'writer memoizes the given executor' do - executor = ImmediateExecutor.new - Concurrent.configure do |config| - config.global_task_pool = executor - end - expect(Concurrent.configuration.global_task_pool).to eq executor + it 'creates a global timer set' do + expect(Concurrent.global_timer_set).not_to be_nil + expect(Concurrent.global_timer_set).to respond_to(:post) end - specify 'writer raises an exception if called after initialization' do - executor = ImmediateExecutor.new - Concurrent.configure do |config| - config.global_task_pool = executor - end - Concurrent.configuration.global_task_pool - expect { - Concurrent.configure do |config| - config.global_task_pool = executor - end - }.to raise_error(ConfigurationError) + it 'creates a global fast executor' do + expect(Concurrent.global_fast_executor).not_to be_nil + expect(Concurrent.global_fast_executor).to respond_to(:post) end - end - context 'global operation pool' do + it 'creates a global io executor' do + expect(Concurrent.global_io_executor).not_to be_nil + expect(Concurrent.global_io_executor).to respond_to(:post) + end - specify 'reader creates a default pool when first called if none exists' do - expect(Concurrent.configuration.global_operation_pool).not_to be_nil - expect(Concurrent.configuration.global_operation_pool).to respond_to(:post) + specify '#shutdown_global_executors acts on all global executors' do + expect(Concurrent.global_fast_executor).to receive(:shutdown).with(no_args) + expect(Concurrent.global_io_executor).to receive(:shutdown).with(no_args) + expect(Concurrent.global_timer_set).to receive(:shutdown).with(no_args) + Concurrent.shutdown_global_executors end - specify 'writer memoizes the given executor' do - executor = ImmediateExecutor.new - Concurrent.configure do |config| - config.global_operation_pool = executor - end - expect(Concurrent.configuration.global_operation_pool).to eq executor + specify '#kill_global_executors acts on all global executors' do + expect(Concurrent.global_fast_executor).to receive(:kill).with(no_args) + expect(Concurrent.global_io_executor).to receive(:kill).with(no_args) + expect(Concurrent.global_timer_set).to receive(:kill).with(no_args) + Concurrent.kill_global_executors end - specify 'writer raises an exception if called after initialization' do - executor = ImmediateExecutor.new - Concurrent.configure do |config| - config.global_operation_pool = executor + context '#wait_for_global_executors_termination' do + + it 'acts on all global executors' do + expect(Concurrent.global_fast_executor).to receive(:wait_for_termination).with(0.1) + expect(Concurrent.global_io_executor).to receive(:wait_for_termination).with(0.1) + expect(Concurrent.global_timer_set).to receive(:wait_for_termination).with(0.1) + Concurrent.wait_for_global_executors_termination(0.1) end - Concurrent.configuration.global_operation_pool - expect { - Concurrent.configure do |config| - config.global_operation_pool = executor - end - }.to raise_error(ConfigurationError) end end end diff --git a/spec/concurrent/dataflow_spec.rb b/spec/concurrent/dataflow_spec.rb index 588527265..2734ccab3 100644 --- a/spec/concurrent/dataflow_spec.rb +++ b/spec/concurrent/dataflow_spec.rb @@ -10,10 +10,10 @@ module Concurrent expect { Concurrent::dataflow_with(root_executor) }.to raise_error(ArgumentError) end - specify '#dataflow uses the global operation pool' do + specify '#dataflow uses the global fast executor' do input = Future.execute{0} expect(Concurrent).to receive(:dataflow_with).once. - with(Concurrent.configuration.global_operation_pool, input) + with(Concurrent.global_io_executor, input) Concurrent::dataflow(input){0} end diff --git a/spec/concurrent/executor/executor_options_spec.rb b/spec/concurrent/executor/executor_options_spec.rb new file mode 100644 index 000000000..b81853c2b --- /dev/null +++ b/spec/concurrent/executor/executor_options_spec.rb @@ -0,0 +1,99 @@ +module Concurrent + + describe ExecutorOptions do + + let(:executor){ ImmediateExecutor.new } + + let(:io_executor){ ImmediateExecutor.new } + let(:fast_executor){ ImmediateExecutor.new } + + subject { Class.new{ include ExecutorOptions }.new } + + context '#get_executor_from' do + + it 'returns the given :executor' do + expect(subject.get_executor_from(executor: executor)).to eq executor + end + + it 'returns the global io executor when :executor is :io' do + expect(Concurrent).to receive(:global_io_executor).and_return(:io_executor) + subject.get_executor_from(executor: :io) + end + + it 'returns the global fast executor when :executor is :fast' do + expect(Concurrent).to receive(:global_fast_executor).and_return(:fast_executor) + subject.get_executor_from(executor: :fast) + end + + it 'returns an immediate executor when :executor is :immediate' do + executor = subject.get_executor_from(executor: :immediate) + end + + it 'raises an exception when :executor is an unrecognized symbol' do + expect { + subject.get_executor_from(executor: :bogus) + }.to raise_error(ArgumentError) + end + + it 'returns the global fast executor when :operation is true' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_fast_executor). + and_return(:fast_executor) + subject.get_executor_from(operation: true) + end + + it 'returns the global io executor when :operation is false' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_io_executor). + and_return(:io_executor) + subject.get_executor_from(operation: false) + end + + it 'returns the global fast executor when :task is false' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_fast_executor). + and_return(:fast_executor) + subject.get_executor_from(task: false) + end + + it 'returns the global io executor when :task is true' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_io_executor). + and_return(:io_executor) + subject.get_executor_from(task: true) + end + + it 'returns nil when :executor is nil' do + expect(subject.get_executor_from(executor: nil)).to be_nil + end + + it 'returns nil when no option is given' do + expect(subject.get_executor_from).to be_nil + end + + specify ':executor overrides :operation' do + warn 'deprecated syntax' + expect(subject.get_executor_from(executor: executor, operation: true)). + to eq executor + end + + specify ':executor overrides :task' do + warn 'deprecated syntax' + expect(subject.get_executor_from(executor: executor, task: true)). + to eq executor + end + + specify ':operation overrides :task' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_fast_executor). + and_return(:fast_executor) + subject.get_executor_from(operation: true, task: true) + end + end + end +end diff --git a/spec/concurrent/executor/java_thread_pool_executor_spec.rb b/spec/concurrent/executor/java_thread_pool_executor_spec.rb index fea47f58b..09209416b 100644 --- a/spec/concurrent/executor/java_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/java_thread_pool_executor_spec.rb @@ -21,6 +21,8 @@ module Concurrent ) end + it_should_behave_like :thread_pool + it_should_behave_like :thread_pool_executor context '#overload_policy' do diff --git a/spec/concurrent/executor/per_thread_executor_spec.rb b/spec/concurrent/executor/per_thread_executor_spec.rb index 986de2420..78f22c457 100644 --- a/spec/concurrent/executor/per_thread_executor_spec.rb +++ b/spec/concurrent/executor/per_thread_executor_spec.rb @@ -15,7 +15,7 @@ module Concurrent it 'creates a new thread for a call without arguments' do thread = Thread.new{ nil } expect(Thread).to receive(:new).with(no_args()).and_return(thread) - expect(Concurrent.configuration.global_task_pool).not_to receive(:post).with(any_args()) + expect(Concurrent.global_fast_executor).not_to receive(:post).with(any_args()) subject.post{ nil } end @@ -28,7 +28,7 @@ module Concurrent it 'creates a new thread for a call with arguments' do thread = Thread.new{ nil } expect(Thread).to receive(:new).with(1,2,3).and_return(thread) - expect(Concurrent.configuration.global_task_pool).not_to receive(:post).with(any_args()) + expect(Concurrent.global_fast_executor).not_to receive(:post).with(any_args()) subject.post(1,2,3){ nil } end @@ -47,7 +47,7 @@ module Concurrent it 'aliases #<<' do thread = Thread.new{ nil } expect(Thread).to receive(:new).with(no_args()).and_return(thread) - expect(Concurrent.configuration.global_task_pool).not_to receive(:post).with(any_args()) + expect(Concurrent.global_fast_executor).not_to receive(:post).with(any_args()) subject << proc{ nil } end end @@ -59,7 +59,7 @@ module Concurrent it 'creates a new thread for a call without arguments' do thread = Thread.new{ nil } expect(Thread).to receive(:new).with(no_args()).and_return(thread) - expect(Concurrent.configuration.global_task_pool).not_to receive(:post).with(any_args()) + expect(Concurrent.global_fast_executor).not_to receive(:post).with(any_args()) subject.post{ nil } end @@ -72,7 +72,7 @@ module Concurrent it 'creates a new thread for a call with arguments' do thread = Thread.new{ nil } expect(Thread).to receive(:new).with(1,2,3).and_return(thread) - expect(Concurrent.configuration.global_task_pool).not_to receive(:post).with(any_args()) + expect(Concurrent.global_fast_executor).not_to receive(:post).with(any_args()) subject.post(1,2,3){ nil } end @@ -91,7 +91,7 @@ module Concurrent it 'aliases #<<' do thread = Thread.new{ nil } expect(Thread).to receive(:new).with(no_args()).and_return(thread) - expect(Concurrent.configuration.global_task_pool).not_to receive(:post).with(any_args()) + expect(Concurrent.global_fast_executor).not_to receive(:post).with(any_args()) subject << proc{ nil } end end diff --git a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb index 270bb866f..9e43020f2 100644 --- a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb @@ -19,6 +19,8 @@ module Concurrent ) end + it_should_behave_like :thread_pool + it_should_behave_like :thread_pool_executor context '#remaining_capacity' do diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index 23ee774e4..6bc51ac46 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -2,7 +2,6 @@ shared_examples :thread_pool_executor do - after(:each) do subject.kill subject.wait_for_termination(0.1) diff --git a/spec/concurrent/executor/thread_pool_shared.rb b/spec/concurrent/executor/thread_pool_shared.rb index 40eab0210..2a69ca62a 100644 --- a/spec/concurrent/executor/thread_pool_shared.rb +++ b/spec/concurrent/executor/thread_pool_shared.rb @@ -9,6 +9,31 @@ it_should_behave_like :executor_service + context '#auto_terminate?' do + + it 'returns true by default' do + expect(subject.auto_terminate?).to be true + end + + it 'returns true when :enable_at_exit_handler is true' do + if described_class.to_s =~ /FixedThreadPool$/ + subject = described_class.new(1, stop_on_exit: true) + else + subject = described_class.new(stop_on_exit: true) + end + expect(subject.auto_terminate?).to be true + end + + it 'returns false when :enable_at_exit_handler is false' do + if described_class.to_s =~ /FixedThreadPool$/ + subject = described_class.new(1, stop_on_exit: false) + else + subject = described_class.new(stop_on_exit: false) + end + expect(subject.auto_terminate?).to be false + end + end + context '#length' do it 'returns zero on creation' do diff --git a/spec/concurrent/executor/timer_set_spec.rb b/spec/concurrent/executor/timer_set_spec.rb index fe4ed0596..8e8fbb6e0 100644 --- a/spec/concurrent/executor/timer_set_spec.rb +++ b/spec/concurrent/executor/timer_set_spec.rb @@ -14,8 +14,8 @@ module Concurrent sleep(0.1) end - it 'uses the global task pool be default' do - expect(Concurrent.configuration.global_task_pool).to receive(:post).with(no_args) + it 'uses the global io executor be default' do + expect(Concurrent.global_io_executor).to receive(:post).with(no_args) subject = TimerSet.new subject.post(0){ nil } sleep(0.1) diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index f68e21517..2f1713e83 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -116,18 +116,8 @@ def trigger_observable(observable) Future.execute(executor: executor){ nil } end - it 'uses the global operation pool when :operation is true' do - expect(Concurrent.configuration).to receive(:global_operation_pool).and_return(executor) - Future.execute(operation: true){ nil } - end - - it 'uses the global task pool when :task is true' do - expect(Concurrent.configuration).to receive(:global_task_pool).and_return(executor) - Future.execute(task: true){ nil } - end - - it 'uses the global operation pool by default' do - expect(Concurrent.configuration).to receive(:global_operation_pool).and_return(executor) + it 'uses the global io executor by default' do + expect(Concurrent).to receive(:global_io_executor).and_return(executor) Future.execute{ nil } end end diff --git a/spec/concurrent/lazy_reference_spec.rb b/spec/concurrent/lazy_reference_spec.rb new file mode 100644 index 000000000..55ff08263 --- /dev/null +++ b/spec/concurrent/lazy_reference_spec.rb @@ -0,0 +1,82 @@ +module Concurrent + + describe LazyReference do + + context '#initialize' do + + it 'raises an exception when no block given' do + expect { + LazyReference.new + }.to raise_error(ArgumentError) + end + end + + context '#value' do + + let(:task){ proc{ nil } } + + it 'does not call the block before #value is called' do + expect(task).to_not receive(:call).with(any_args) + LazyReference.new(&task) + end + + it 'calls the block when #value is called' do + expect(task).to receive(:call).once.with(any_args).and_return(nil) + LazyReference.new(&task).value + end + + it 'only calls the block once no matter how often #value is called' do + expect(task).to receive(:call).once.with(any_args).and_return(nil) + lazy = LazyReference.new(&task) + 5.times{ lazy.value } + end + + it 'does not lock the mutex once the block has been called' do + mutex = Mutex.new + allow(Mutex).to receive(:new).and_return(mutex) + + lazy = LazyReference.new(&task) + lazy.value + + expect(mutex).to_not receive(:synchronize).with(any_args) + expect(mutex).to_not receive(:lock).with(any_args) + expect(mutex).to_not receive(:try_lock).with(any_args) + + 5.times{ lazy.value } + end + + context 'on exception' do + + it 'suppresses the error' do + expect { + LazyReference.new{ raise StandardError } + }.to_not raise_exception + end + + it 'sets the value to nil when no default is given' do + lazy = LazyReference.new{ raise StandardError } + expect(lazy.value).to be_nil + end + + it 'sets the value appropriately when given a default' do + lazy = LazyReference.new(100){ raise StandardError } + expect(lazy.value).to eq 100 + end + + it 'does not try to call the block again' do + mutex = Mutex.new + allow(Mutex).to receive(:new).and_return(mutex) + + lazy = LazyReference.new{ raise StandardError } + lazy.value + + expect(mutex).to_not receive(:synchronize).with(any_args) + expect(mutex).to_not receive(:lock).with(any_args) + expect(mutex).to_not receive(:try_lock).with(any_args) + + 5.times{ lazy.value } + end + end + end + end +end diff --git a/spec/concurrent/obligation_spec.rb b/spec/concurrent/obligation_spec.rb index 8bd669493..22712a2b0 100644 --- a/spec/concurrent/obligation_spec.rb +++ b/spec/concurrent/obligation_spec.rb @@ -279,8 +279,50 @@ def initialize it 'should execute the block within the mutex' do obligation.if_state(:unscheduled) { expect(obligation.mutex).to be_locked } end - end + context '#get_arguments_from' do + + it 'returns an empty array when opts is not given' do + args = obligation.send(:get_arguments_from) + expect(args).to be_a Array + expect(args).to be_empty + end + + it 'returns an empty array when opts is an empty hash' do + args = obligation.send(:get_arguments_from, {}) + expect(args).to be_a Array + expect(args).to be_empty + end + + it 'returns an empty array when there is no :args key' do + args = obligation.send(:get_arguments_from, foo: 'bar') + expect(args).to be_a Array + expect(args).to be_empty + end + + it 'returns an empty array when the :args key has a nil value' do + args = obligation.send(:get_arguments_from, args: nil) + expect(args).to be_a Array + expect(args).to be_empty + end + + it 'returns a one-element array when the :args key has a non-array value' do + args = obligation.send(:get_arguments_from, args: 'foo') + expect(args).to eq ['foo'] + end + + it 'returns an array when when the :args key has an array value' do + expected = [1, 2, 3, 4] + args = obligation.send(:get_arguments_from, args: expected) + expect(args).to eq expected + end + + it 'returns the given array when the :args key has a complex array value' do + expected = [(1..10).to_a, (20..30).to_a, (100..110).to_a] + args = obligation.send(:get_arguments_from, args: expected) + expect(args).to eq expected + end + end end end diff --git a/spec/concurrent/options_parser_spec.rb b/spec/concurrent/options_parser_spec.rb deleted file mode 100644 index 9a22635bb..000000000 --- a/spec/concurrent/options_parser_spec.rb +++ /dev/null @@ -1,109 +0,0 @@ -module Concurrent - - describe OptionsParser do - - let(:executor){ ImmediateExecutor.new } - - let(:task_pool){ ImmediateExecutor.new } - let(:operation_pool){ ImmediateExecutor.new } - - context '#get_arguments_from' do - - it 'returns an empty array when opts is not given' do - args = OptionsParser::get_arguments_from - expect(args).to be_a Array - expect(args).to be_empty - end - - it 'returns an empty array when opts is an empty hash' do - args = OptionsParser::get_arguments_from({}) - expect(args).to be_a Array - expect(args).to be_empty - end - - it 'returns an empty array when there is no :args key' do - args = OptionsParser::get_arguments_from(foo: 'bar') - expect(args).to be_a Array - expect(args).to be_empty - end - - it 'returns an empty array when the :args key has a nil value' do - args = OptionsParser::get_arguments_from(args: nil) - expect(args).to be_a Array - expect(args).to be_empty - end - - it 'returns a one-element array when the :args key has a non-array value' do - args = OptionsParser::get_arguments_from(args: 'foo') - expect(args).to eq ['foo'] - end - - it 'returns an array when when the :args key has an array value' do - expected = [1, 2, 3, 4] - args = OptionsParser::get_arguments_from(args: expected) - expect(args).to eq expected - end - - it 'returns the given array when the :args key has a complex array value' do - expected = [(1..10).to_a, (20..30).to_a, (100..110).to_a] - args = OptionsParser::get_arguments_from(args: expected) - expect(args).to eq expected - end - end - - context '#get_executor_from' do - - it 'returns the given :executor' do - expect(OptionsParser::get_executor_from(executor: executor)).to eq executor - end - - it 'returns the global operation pool when :operation is true' do - expect(Concurrent.configuration).to receive(:global_operation_pool). - and_return(:operation_pool) - OptionsParser::get_executor_from(operation: true) - end - - it 'returns the global task pool when :operation is false' do - expect(Concurrent.configuration).to receive(:global_task_pool). - and_return(:task_pool) - OptionsParser::get_executor_from(operation: false) - end - - it 'returns the global operation pool when :task is false' do - expect(Concurrent.configuration).to receive(:global_operation_pool). - and_return(:operation_pool) - OptionsParser::get_executor_from(task: false) - end - - it 'returns the global task pool when :task is true' do - expect(Concurrent.configuration).to receive(:global_task_pool). - and_return(:task_pool) - OptionsParser::get_executor_from(task: true) - end - - it 'returns nil when :executor is nil' do - expect(OptionsParser::get_executor_from(executor: nil)).to be_nil - end - - it 'returns nil task pool when no option is given' do - expect(OptionsParser::get_executor_from).to be_nil - end - - specify ':executor overrides :operation' do - expect(OptionsParser::get_executor_from(executor: executor, operation: true)). - to eq executor - end - - specify ':executor overrides :task' do - expect(OptionsParser::get_executor_from(executor: executor, task: true)). - to eq executor - end - - specify ':operation overrides :task' do - expect(Concurrent.configuration).to receive(:global_operation_pool). - and_return(:operation_pool) - OptionsParser::get_executor_from(operation: true, task: true) - end - end - end -end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 439ce5830..9a893c615 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -19,11 +19,12 @@ add_filter '/yardoc/' end +$VERBOSE = nil # suppress our deprecation warnings require 'concurrent' logger = Logger.new($stderr) logger.level = Logger::WARN -Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| +Concurrent.global_logger = lambda do |level, progname, message = nil, &block| logger.add level, message, progname, &block end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index c99b64eb9..7fcef8ef1 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -3,6 +3,7 @@ module Concurrent module TestHelpers + def delta(v1, v2) if block_given? v1 = yield(v1) @@ -31,11 +32,25 @@ def do_no_reset! @do_not_reset = true end + GLOBAL_EXECUTORS = [ + [:@@global_fast_executor, ->{ LazyReference.new{ Concurrent.new_fast_executor }}], + [:@@global_io_executor, ->{ LazyReference.new{ Concurrent.new_io_executor }}], + [:@@global_timer_set, ->{ LazyReference.new{ Concurrent::TimerSet.new }}], + ] + @@killed = false def reset_gem_configuration - Concurrent.instance_variable_get(:@configuration).value = Concurrent::Configuration.new if @@killed - @@killed = false + if @@killed + GLOBAL_EXECUTORS.each do |var, factory| + executor = Concurrent.class_variable_get(var).value + executor.shutdown + executor.kill + executor = nil + Concurrent.class_variable_set(var, factory.call) + end + @@killed = false + end end def kill_rogue_threads(warning = true)