diff --git a/.gitignore b/.gitignore index 5cb407a52..97c04221d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,3 @@ ext/**/*.bundle ext/**/*.so ext/**/*.jar pkg -*.gem diff --git a/.rspec b/.rspec index 20f9c8574..112cb72c2 100644 --- a/.rspec +++ b/.rspec @@ -1,2 +1,4 @@ --require spec_helper ---format progress +--color +--backtrace +--format documentation diff --git a/Gemfile b/Gemfile index 740894e73..3d6927108 100644 --- a/Gemfile +++ b/Gemfile @@ -1,6 +1,7 @@ source 'https://rubygems.org' gemspec name: 'concurrent-ruby' +gemspec name: 'concurrent-ruby-edge' group :development do gem 'rake', '~> 10.4.2' diff --git a/README.md b/README.md index 74df22078..1ad625c84 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,6 @@ This library contains a variety of concurrency abstractions at high and low leve ### High-level, general-purpose asynchronous concurrency abstractions -* [Actor](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Actor.html): Implements the Actor Model, where concurrent actors exchange messages. * [Agent](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Agent.html): A single atomic value that represents an identity. * [Async](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html): A mixin module that provides simple asynchronous behavior to any standard class/object or object. * [Future](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Future.html): An asynchronous operation that produces a value. @@ -60,7 +59,6 @@ This library contains a variety of concurrency abstractions at high and low leve * [Promise](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html): Similar to Futures, with more features. * [ScheduledTask](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ScheduledTask.html): Like a Future scheduled for a specific future time. * [TimerTask](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/TimerTask.html): A Thread that periodically wakes up to perform work at regular intervals. -* [Channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Channel.html): Communicating Sequential Processes (CSP). ### Java-inspired ThreadPools and other executors @@ -90,6 +88,19 @@ This library contains a variety of concurrency abstractions at high and low leve * [Software transactional memory](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/TVar.html) (TVar) * [ReadWriteLock](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ReadWriteLock.html) +### Edge features + +They are available in the `concurrent-ruby-edge` companion gem, install with `gem install concurrent-ruby-edge`. + +These features are under active development and may change frequently. They are expected not to +keep backward compatibility (there may also lack tests and documentation). Semantic versions will +be obeyed though. Features developed in `concurrent-ruby-edge` are expected to move to `concurrent-ruby` when final. + +* [Actor](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Actor.html): + Implements the Actor Model, where concurrent actors exchange messages. +* [Channel](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Channel.html): + Communicating Sequential Processes (CSP). + ## Usage All abstractions within this gem can be loaded simply by requiring it: @@ -105,9 +116,7 @@ require 'concurrent' # everything # groups -require 'concurrent/actor' # Concurrent::Actor and supporting code require 'concurrent/atomics' # atomic and thread synchronization classes -require 'concurrent/channels' # Concurrent::Channel and supporting code require 'concurrent/executors' # Thread pools and other executors require 'concurrent/utilities' # utility methods such as processor count and timers @@ -127,6 +136,11 @@ require 'concurrent/promise' # Concurrent::Promise require 'concurrent/scheduled_task' # Concurrent::ScheduledTask require 'concurrent/timer_task' # Concurrent::TimerTask require 'concurrent/tvar' # Concurrent::TVar + +# experimental - available in `concurrent-ruby-edge` companion gem + +require 'concurrent/actor' # Concurrent::Actor and supporting code +require 'concurrent/channel ' # Concurrent::Channel and supporting code ``` ## Installation @@ -147,8 +161,8 @@ and run `bundle install` from your shell. Potential performance improvements may be achieved under MRI by installing optional C extensions. To minimize installation errors the C extensions are available in the `concurrent-ruby-ext` extension -gem. The extension gem lists `concurrent-ruby` as a dependency so it is not necessary to install both. -Simply install the extension gen: +gem. `concurrent-ruby` and `concurrent-ruby-ext` are always released together with same version. +Simply install the extension gen too: ```ruby gem install concurrent-ruby-ext diff --git a/Rakefile b/Rakefile index 3d670f4df..67e5ee77e 100644 --- a/Rakefile +++ b/Rakefile @@ -1,23 +1,27 @@ #!/usr/bin/env rake +require 'concurrent/version' require 'concurrent/native_extensions' ## load the two gemspec files CORE_GEMSPEC = Gem::Specification.load('concurrent-ruby.gemspec') EXT_GEMSPEC = Gem::Specification.load('concurrent-ruby-ext.gemspec') +EDGE_GEMSPEC = Gem::Specification.load('concurrent-ruby-edge.gemspec') ## constants used for compile/build tasks GEM_NAME = 'concurrent-ruby' -EXTENSION_NAME = 'extension' +EXT_NAME = 'extension' +EDGE_NAME = 'edge' JAVA_EXT_NAME = 'concurrent_ruby_ext' if Concurrent.on_jruby? CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}-java.gem" else CORE_GEM = "#{GEM_NAME}-#{Concurrent::VERSION}.gem" - EXTENSION_GEM = "#{GEM_NAME}-ext-#{Concurrent::VERSION}.gem" + EXT_GEM = "#{GEM_NAME}-ext-#{Concurrent::VERSION}.gem" NATIVE_GEM = "#{GEM_NAME}-ext-#{Concurrent::VERSION}-#{Gem::Platform.new(RUBY_PLATFORM)}.gem" + EDGE_GEM = "#{GEM_NAME}-edge-#{Concurrent::EDGE_VERSION}.gem" end ## safely load all the rake tasks in the `tasks` directory @@ -49,7 +53,7 @@ elsif Concurrent.allow_c_extensions? ## create the compile tasks for the extension gem require 'rake/extensiontask' - Rake::ExtensionTask.new(EXTENSION_NAME, EXT_GEMSPEC) do |ext| + Rake::ExtensionTask.new(EXT_NAME, EXT_GEMSPEC) do |ext| ext.ext_dir = 'ext/concurrent' ext.lib_dir = 'lib/concurrent' ext.source_pattern = '*.{c,h}' @@ -63,9 +67,9 @@ elsif Concurrent.allow_c_extensions? 'x64-mingw32' => 'x86_64-w64-mingw32' } platforms.each do |platform, prefix| - task "copy:#{EXTENSION_NAME}:#{platform}:#{ruby_version}" do |t| + task "copy:#{EXT_NAME}:#{platform}:#{ruby_version}" do |t| %w[lib tmp/#{platform}/stage/lib].each do |dir| - so_file = "#{dir}/#{ruby_version[/^\d+\.\d+/]}/#{EXTENSION_NAME}.so" + so_file = "#{dir}/#{ruby_version[/^\d+\.\d+/]}/#{EXT_NAME}.so" if File.exists?(so_file) sh "#{prefix}-strip -S #{so_file}" end @@ -94,7 +98,11 @@ end namespace :build do - build_deps = [:clean] + task :mkdir_pkg do + mkdir_p 'pkg' + end + + build_deps = [:clean, 'build:mkdir_pkg'] build_deps << :compile if Concurrent.on_jruby? desc "Build #{CORE_GEM} into the pkg directory" @@ -104,8 +112,15 @@ namespace :build do end unless Concurrent.on_jruby? - desc "Build #{EXTENSION_GEM} into the pkg directory" - task :ext => [:clean] do + + desc "Build #{EDGE_GEM} into the pkg directory" + task :edge => 'build:mkdir_pkg' do + sh "gem build #{EDGE_GEMSPEC.name}.gemspec" + sh 'mv *.gem pkg/' + end + + desc "Build #{EXT_GEM} into the pkg directory" + task :ext => build_deps do sh "gem build #{EXT_GEMSPEC.name}.gemspec" sh 'mv *.gem pkg/' end @@ -113,8 +128,8 @@ namespace :build do if Concurrent.allow_c_extensions? desc "Build #{NATIVE_GEM} into the pkg directory" - task :native do - sh "gem compile pkg/#{EXTENSION_GEM}" + task :native => 'build:mkdir_pkg' do + sh "gem compile pkg/#{EXT_GEM}" sh 'mv *.gem pkg/' end end @@ -124,8 +139,8 @@ if Concurrent.on_jruby? desc 'Build JRuby-specific core gem (alias for `build:core`)' task :build => ['build:core'] else - desc 'Build core and extension gems' - task :build => ['build:core', 'build:ext'] + desc 'Build core, extension, and edge gems' + task :build => ['build:core', 'build:ext', 'build:edge'] end ## the RSpec task that compiles extensions when available @@ -134,9 +149,7 @@ begin require 'rspec' require 'rspec/core/rake_task' - RSpec::Core::RakeTask.new(:spec) do |t| - t.rspec_opts = '--color --backtrace --format documentation' - end + RSpec::Core::RakeTask.new(:spec) task :default => [:clean, :compile, :spec] rescue LoadError diff --git a/concurrent-ruby-edge.gemspec b/concurrent-ruby-edge.gemspec new file mode 100644 index 000000000..a43fe8385 --- /dev/null +++ b/concurrent-ruby-edge.gemspec @@ -0,0 +1,31 @@ +$:.push File.join(File.dirname(__FILE__), 'lib') + +require 'concurrent/version' +require 'concurrent/file_map' + +Gem::Specification.new do |s| + git_files = `git ls-files`.split("\n") + + s.name = 'concurrent-ruby-edge' + s.version = Concurrent::EDGE_VERSION + s.platform = Gem::Platform::RUBY + s.authors = ["Jerry D'Antonio", 'The Ruby Concurrency Team'] + s.email = ['jerry.dantonio@gmail.com', 'concurrent-ruby@googlegroups.com'] + s.homepage = 'http://www.concurrent-ruby.com' + s.summary = 'Edge features and additions to the concurrent-ruby gem.' + s.license = 'MIT' + s.date = Time.now.strftime('%Y-%m-%d') + s.files = Concurrent::FILE_MAP.fetch :edge + s.extra_rdoc_files = Dir['README*', 'LICENSE*'] + s.require_paths = ['lib'] + s.description = <<-TXT +These features are under active development and may change frequently. They are expected not to +keep backward compatibility (there may also lack tests and documentation). Semantic versions will +be obeyed though. Features developed in `concurrent-ruby-edge` are expected to move to `concurrent-ruby` when final. +Please see http://concurrent-ruby.com for more information. + TXT + + s.required_ruby_version = '>= 1.9.3' + + s.add_runtime_dependency 'concurrent-ruby', "~> #{Concurrent::VERSION}" +end diff --git a/concurrent-ruby.gemspec b/concurrent-ruby.gemspec index d4e306560..b7b587359 100644 --- a/concurrent-ruby.gemspec +++ b/concurrent-ruby.gemspec @@ -1,29 +1,30 @@ $:.push File.join(File.dirname(__FILE__), 'lib') require 'concurrent/version' +require 'concurrent/file_map' Gem::Specification.new do |s| - s.name = 'concurrent-ruby' - s.version = Concurrent::VERSION - s.platform = Gem::Platform::RUBY - s.author = "Jerry D'Antonio" - s.email = 'jerry.dantonio@gmail.com' - s.homepage = 'http://www.concurrent-ruby.com' - s.summary = 'Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.' - s.license = 'MIT' - s.date = Time.now.strftime('%Y-%m-%d') + git_files = `git ls-files`.split("\n") - s.description = <<-EOF - Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more. - Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns. - EOF - - s.files = Dir['lib/**/*.rb'] + s.name = 'concurrent-ruby' + s.version = Concurrent::VERSION + s.platform = Gem::Platform::RUBY + s.authors = ["Jerry D'Antonio", 'The Ruby Concurrency Team'] + s.email = ['jerry.dantonio@gmail.com', 'concurrent-ruby@googlegroups.com'] + s.homepage = 'http://www.concurrent-ruby.com' + s.summary = 'Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.' + s.license = 'MIT' + s.date = Time.now.strftime('%Y-%m-%d') + s.files = Concurrent::FILE_MAP.fetch :core s.extra_rdoc_files = Dir['README*', 'LICENSE*', 'CHANGELOG*'] s.require_paths = ['lib'] + s.description = <<-EOF +Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more. +Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns. + EOF if defined?(JRUBY_VERSION) - s.files += Dir['lib/**/*.jar'] + s.files += Dir['lib/**/*.jar'] s.platform = 'java' else s.add_runtime_dependency 'ref', '~> 1.0', '>= 1.0.5' diff --git a/doc/future-promise.md b/doc/future-promise.md new file mode 100644 index 000000000..5be5a6983 --- /dev/null +++ b/doc/future-promise.md @@ -0,0 +1,12 @@ +# Futures and Promises + +New implementation added in version 0.8 differs from previous versions and has little in common. +{Future} represents a value which will become {#completed?} in future, it'll contain {#value} if {#success?} or a {#reason} if {#failed?}. It cannot be directly completed, there are implementations of abstract {Promise} class for that, so {Promise}'s only purpose is to complete a given {Future} object. They are always constructed as a Pair even in chaining methods like {#then}, {#rescue}, {#then_delay}, etc. + +There is few {Promise} implementations: + +- OuterPromise - only Promise used by users, can be completed by outer code. Constructed with {Concurrent::Next.promise} helper method. +- Immediate - internal implementation of Promise used to represent immediate evaluation of a block. Constructed with {Concurrent::Next.future} helper method. +- Delay - internal implementation of Promise used to represent delayed evaluation of a block. Constructed with {Concurrent::Next.delay} helper method. +- ConnectedPromise - used internally to support {Future#with_default_executor} + diff --git a/lib/concurrent-edge.rb b/lib/concurrent-edge.rb new file mode 100644 index 000000000..7dd1a7f20 --- /dev/null +++ b/lib/concurrent-edge.rb @@ -0,0 +1,7 @@ +require 'concurrent' + +require 'concurrent/actor' +require 'concurrent/channel' +require 'concurrent/edge/future' + + diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 4acb31e29..8d8c830ae 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -5,9 +5,7 @@ require 'concurrent/configuration' -require 'concurrent/actor' require 'concurrent/atomics' -require 'concurrent/channels' require 'concurrent/collections' require 'concurrent/errors' require 'concurrent/executors' diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index a75b2234b..0b7055a12 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -1,9 +1,8 @@ require 'concurrent/configuration' -require 'concurrent/delay' require 'concurrent/executor/serialized_execution' -require 'concurrent/ivar' require 'concurrent/logging' require 'concurrent/synchronization' +require 'concurrent/edge/future' module Concurrent # TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ? @@ -40,9 +39,9 @@ def self.current Thread.current[:__current_actor__] end - @root = Delay.new do - Core.new(parent: nil, name: '/', class: Root, initialized: ivar = IVar.new).reference.tap do - ivar.no_error! + @root = Concurrent.delay do + Core.new(parent: nil, name: '/', class: Root, initialized: future = Concurrent.future).reference.tap do + future.wait! end end @@ -77,7 +76,7 @@ def self.spawn(*args, &block) # as {.spawn} but it'll raise when Actor not initialized properly def self.spawn!(*args, &block) - spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } + spawn(spawn_optionify(*args).merge(initialized: future = Concurrent.future), &block).tap { future.wait! } end # @overload spawn_optionify(context_class, name, *args) diff --git a/lib/concurrent/actor/behaviour/abstract.rb b/lib/concurrent/actor/behaviour/abstract.rb index 4e4f03780..a697a28d3 100644 --- a/lib/concurrent/actor/behaviour/abstract.rb +++ b/lib/concurrent/actor/behaviour/abstract.rb @@ -38,7 +38,7 @@ def broadcast(event) def reject_envelope(envelope) envelope.reject! ActorTerminated.new(reference) - dead_letter_routing << envelope unless envelope.ivar + dead_letter_routing << envelope unless envelope.future log Logging::DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}" end end diff --git a/lib/concurrent/actor/behaviour/sets_results.rb b/lib/concurrent/actor/behaviour/sets_results.rb index c8f805878..df3594e9e 100644 --- a/lib/concurrent/actor/behaviour/sets_results.rb +++ b/lib/concurrent/actor/behaviour/sets_results.rb @@ -1,7 +1,7 @@ module Concurrent module Actor module Behaviour - # Collects returning value and sets the IVar in the {Envelope} or error on failure. + # Collects returning value and sets the CompletableFuture in the {Envelope} or error on failure. class SetResults < Abstract attr_reader :error_strategy @@ -12,8 +12,8 @@ def initialize(core, subsequent, error_strategy) def on_envelope(envelope) result = pass envelope - if result != MESSAGE_PROCESSED && !envelope.ivar.nil? - envelope.ivar.set result + if result != MESSAGE_PROCESSED && !envelope.future.nil? + envelope.future.success result end nil rescue => error @@ -28,7 +28,7 @@ def on_envelope(envelope) else raise end - envelope.ivar.fail error unless envelope.ivar.nil? + envelope.future.fail error unless envelope.future.nil? end end end diff --git a/lib/concurrent/actor/behaviour/termination.rb b/lib/concurrent/actor/behaviour/termination.rb index d9dd95e85..39d345141 100644 --- a/lib/concurrent/actor/behaviour/termination.rb +++ b/lib/concurrent/actor/behaviour/termination.rb @@ -7,18 +7,18 @@ module Behaviour class Termination < Abstract # @!attribute [r] terminated - # @return [Event] event which will become set when actor is terminated. + # @return [Edge::Event] event which will become set when actor is terminated. attr_reader :terminated def initialize(core, subsequent) super core, subsequent - @terminated = Event.new + @terminated = Concurrent.event end # @note Actor rejects envelopes when terminated. # @return [true, false] if actor is terminated def terminated? - @terminated.set? + @terminated.completed? end def on_envelope(envelope) @@ -43,7 +43,7 @@ def on_envelope(envelope) # Terminates all its children, does not wait until they are terminated. def terminate! return true if terminated? - terminated.set + terminated.complete broadcast(:terminated) # TODO do not end up in Dead Letter Router parent << :remove_child if parent true diff --git a/lib/concurrent/actor/context.rb b/lib/concurrent/actor/context.rb index 77cc34520..15d7e7faa 100644 --- a/lib/concurrent/actor/context.rb +++ b/lib/concurrent/actor/context.rb @@ -20,7 +20,7 @@ class AbstractContext # @abstract override to define Actor's behaviour # @param [Object] message - # @return [Object] a result which will be used to set the IVar supplied to Reference#ask + # @return [Object] a result which will be used to set the Future supplied to Reference#ask # @note self should not be returned (or sent to other actors), {#reference} should be used # instead def on_message(message) @@ -46,7 +46,7 @@ def pass end # Defines an actor responsible for dead letters. Any rejected message send - # with {Reference#tell} is sent there, a message with ivar is considered + # with {Reference#tell} is sent there, a message with future is considered # already monitored for failures. Default behaviour is to use # {AbstractContext#dead_letter_routing} of the parent, so if no # {AbstractContext#dead_letter_routing} method is overridden in diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 0c8bc301c..8fe6f6981 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -40,7 +40,7 @@ class Core < Synchronization::Object # @option opts [Array)>] # behaviour_definition, array of pairs where each pair is behaviour # class and its args, see {Behaviour.basic_behaviour_definition} - # @option opts [IVar, nil] initialized, if present it'll be set or failed + # @option opts [CompletableFuture, nil] initialized, if present it'll be set or failed # after {Context} initialization # @option opts [Proc, nil] logger a proc accepting (level, progname, # message = nil, &block) params, can be used to hook actor instance to @@ -77,7 +77,7 @@ def initialize(opts = {}, &block) @args = opts.fetch(:args, []) @block = block - initialized = Type! opts[:initialized], IVar, NilClass + initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass messages = [] messages << :link if opts[:link] @@ -91,7 +91,7 @@ def initialize(opts = {}, &block) handle_envelope Envelope.new(message, nil, parent, reference) end - initialized.set reference if initialized + initialized.success reference if initialized rescue => ex log ERROR, ex @first_behaviour.terminate! diff --git a/lib/concurrent/actor/envelope.rb b/lib/concurrent/actor/envelope.rb index 5d5145f1e..fa80f111b 100644 --- a/lib/concurrent/actor/envelope.rb +++ b/lib/concurrent/actor/envelope.rb @@ -5,18 +5,18 @@ class Envelope # @!attribute [r] message # @return [Object] a message - # @!attribute [r] ivar - # @return [IVar] an ivar which becomes resolved after message is processed + # @!attribute [r] future + # @return [Edge::Future] a future which becomes resolved after message is processed # @!attribute [r] sender # @return [Reference, Thread] an actor or thread sending the message # @!attribute [r] address # @return [Reference] where this message will be delivered - attr_reader :message, :ivar, :sender, :address + attr_reader :message, :future, :sender, :address - def initialize(message, ivar, sender, address) + def initialize(message, future, sender, address) @message = message - @ivar = Type! ivar, IVar, NilClass + @future = Type! future, Edge::CompletableFuture, NilClass @sender = Type! sender, Reference, Thread @address = Type! address, Reference end @@ -34,7 +34,7 @@ def address_path end def reject!(error) - ivar.fail error unless ivar.nil? + future.fail error unless future.nil? end end end diff --git a/lib/concurrent/actor/internal_delegations.rb b/lib/concurrent/actor/internal_delegations.rb index ca2a4a30a..0b5f12871 100644 --- a/lib/concurrent/actor/internal_delegations.rb +++ b/lib/concurrent/actor/internal_delegations.rb @@ -25,7 +25,7 @@ def dead_letter_routing end def redirect(reference, envelope = self.envelope) - reference.message(envelope.message, envelope.ivar) + reference.message(envelope.message, envelope.future) Behaviour::MESSAGE_PROCESSED end diff --git a/lib/concurrent/actor/reference.rb b/lib/concurrent/actor/reference.rb index de6883e09..05b049444 100644 --- a/lib/concurrent/actor/reference.rb +++ b/lib/concurrent/actor/reference.rb @@ -32,10 +32,13 @@ def tell(message) # # sends message to the actor and asks for the result of its processing, returns immediately # @param [Object] message - # @param [Ivar] ivar to be fulfilled be message's processing result - # @return [IVar] supplied ivar - def ask(message, ivar = IVar.new) - message message, ivar + # @param [Edge::Future] future to be fulfilled be message's processing result + # @return [Edge::Future] supplied future + def ask(message, future = Concurrent.future) + message message, future + # # @return [Future] a future + # def ask(message) + # message message, ConcurrentNext.promise end # @note it's a good practice to use tell whenever possible. Ask should be used only for @@ -45,17 +48,26 @@ def ask(message, ivar = IVar.new) # # sends message to the actor and asks for the result of its processing, blocks # @param [Object] message - # @param [Ivar] ivar to be fulfilled be message's processing result + # @param [Edge::Future] future to be fulfilled be message's processing result # @return [Object] message's processing result - # @raise [Exception] ivar.reason if ivar is #rejected? - def ask!(message, ivar = IVar.new) - ask(message, ivar).value! + # @raise [Exception] future.reason if future is #rejected? + def ask!(message, future = Concurrent.future) + ask(message, future).value! + # # @param [Object] message + # # @return [Object] message's processing result + # # @raise [Exception] future.reason if future is #failed? + # def ask!(message) + # ask(message).value! end - # behaves as {#tell} when no ivar and as {#ask} when ivar - def message(message, ivar = nil) - core.on_envelope Envelope.new(message, ivar, Actor.current || Thread.current, self) - return ivar || self + # behaves as {#tell} when no future and as {#ask} when future + def message(message, future = nil) + core.on_envelope Envelope.new(message, future, Actor.current || Thread.current, self) + return future || self + # # behaves as {#tell} when no promise and as {#ask} when promise + # def message(message, promise = nil) + # core.on_envelope Envelope.new(message, promise, Actor.current || Thread.current, self) + # return promise ? promise.future : self end # @see AbstractContext#dead_letter_routing diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb deleted file mode 100644 index f2cbcce54..000000000 --- a/lib/concurrent/actress.rb +++ /dev/null @@ -1,3 +0,0 @@ -require 'concurrent/actor' - -Concurrent::Actress = Concurrent::Actor diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 8b3c67e2e..b37254acd 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -2,7 +2,6 @@ require 'concurrent/dereferenceable' require 'concurrent/observable' -require 'concurrent/executor/executor_options' require 'concurrent/utility/timeout' require 'concurrent/logging' @@ -15,7 +14,6 @@ module Concurrent class Agent include Dereferenceable include Observable - include ExecutorOptions include Logging attr_reader :timeout, :io_executor, :fast_executor @@ -31,8 +29,8 @@ def initialize(initial, opts = {}) @validator = Proc.new { |result| true } self.observers = CopyOnWriteObserverSet.new @serialized_execution = SerializedExecution.new - @io_executor = get_executor_from(opts) || Concurrent.global_io_executor - @fast_executor = get_executor_from(opts) || Concurrent.global_fast_executor + @io_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor + @fast_executor = Executor.executor_from_options(opts) || Concurrent.global_fast_executor init_mutex set_deref_options(opts) end diff --git a/lib/concurrent/channel.rb b/lib/concurrent/channel.rb new file mode 100644 index 000000000..7990c6300 --- /dev/null +++ b/lib/concurrent/channel.rb @@ -0,0 +1,6 @@ +require 'concurrent/channel/blocking_ring_buffer' +require 'concurrent/channel/buffered_channel' +require 'concurrent/channel/channel' +require 'concurrent/channel/ring_buffer' +require 'concurrent/channel/unbuffered_channel' +require 'concurrent/channel/waitable_list' diff --git a/lib/concurrent/collection/blocking_ring_buffer.rb b/lib/concurrent/channel/blocking_ring_buffer.rb similarity index 100% rename from lib/concurrent/collection/blocking_ring_buffer.rb rename to lib/concurrent/channel/blocking_ring_buffer.rb diff --git a/lib/concurrent/collection/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb similarity index 100% rename from lib/concurrent/collection/ring_buffer.rb rename to lib/concurrent/channel/ring_buffer.rb diff --git a/lib/concurrent/channels.rb b/lib/concurrent/channels.rb deleted file mode 100644 index 36a49ec81..000000000 --- a/lib/concurrent/channels.rb +++ /dev/null @@ -1,5 +0,0 @@ -require 'concurrent/collections' - -require 'concurrent/channel/channel' -require 'concurrent/channel/unbuffered_channel' -require 'concurrent/channel/buffered_channel' diff --git a/lib/concurrent/collections.rb b/lib/concurrent/collections.rb index 3fbb09321..8f1e2bb04 100644 --- a/lib/concurrent/collections.rb +++ b/lib/concurrent/collections.rb @@ -1,3 +1 @@ require 'concurrent/collection/priority_queue' -require 'concurrent/collection/ring_buffer' -require 'concurrent/collection/blocking_ring_buffer' diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index ce2d58c7f..7494df800 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -105,13 +105,23 @@ def self.global_timer_set GLOBAL_TIMER_SET.value end + # General access point to global executors. + # @param [Symbol, Executor] maps symbols: + # - :fast - {Concurrent.global_fast_executor} + # - :io - {Concurrent.global_io_executor} + # - :immediate - {Concurrent.global_immediate_executor} + # @return [Executor] + def self.executor(executor_identifier) + Executor.executor(executor_identifier) + end + def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + fallback_policy: :abort # shouldn't matter -- 0 max queue ) end @@ -123,7 +133,7 @@ def self.new_io_executor(opts = {}) auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + fallback_policy: :abort # shouldn't matter -- 0 max queue ) end diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 034c6ceed..043456b5b 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -1,7 +1,6 @@ require 'thread' require 'concurrent/configuration' require 'concurrent/obligation' -require 'concurrent/executor/executor_options' require 'concurrent/executor/immediate_executor' require 'concurrent/synchronization' @@ -40,7 +39,6 @@ module Concurrent # @see Concurrent::Dereferenceable class Delay < Synchronization::Object include Obligation - include ExecutorOptions # NOTE: Because the global thread pools are lazy-loaded with these objects # there is a performance hit every time we post a new task to one of these @@ -74,7 +72,7 @@ def initialize(opts = {}, &block) super() init_obligation(self) set_deref_options(opts) - @task_executor = get_executor_from(opts) + @task_executor = Executor.executor_from_options(opts) @task = block @state = :pending diff --git a/lib/concurrent/edge.rb b/lib/concurrent/edge.rb new file mode 100644 index 000000000..0b2fc182d --- /dev/null +++ b/lib/concurrent/edge.rb @@ -0,0 +1,24 @@ +module Concurrent + + # A submodule for unstable, highly experimental features that are likely to + # change often and which may never become part of the core gem. Also for + # new, experimental version of abstractions already in the core gem. + # + # Most new features should start in this module, clearly indicating the + # experimental and unstable nature of the feature. Once a feature becomes + # more stable and is a candidate for inclusion in the core gem it should + # be moved up to the `Concurrent` module, where it would reside once merged + # into the core gem. + # + # The only exception to this is for features which *replace* features from + # the core gem in ways that are breaking and not backward compatible. These + # features should remain in this module until merged into the core gem. This + # will prevent namespace collisions. + # + # This file should *never* be used as a global `require` for all files within + # the edge gem. Because these features are experimental users should always + # explicitly require only what they need. + module Edge + + end +end diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb new file mode 100644 index 000000000..a0a4bb1c9 --- /dev/null +++ b/lib/concurrent/edge/future.rb @@ -0,0 +1,1033 @@ +require 'concurrent' + +# TODO support Dereferencable ? +# TODO behaviour with Interrupt exceptions is undefined, use Signal.trap to avoid issues + +# @note different name just not to collide for now +module Concurrent + module Edge + + module FutureShortcuts + # TODO to construct event to be set later to trigger rest of the tree + + # User is responsible for completing the event once. + # @return [CompletableEvent] + def event(default_executor = :io) + CompletableEventPromise.new(default_executor).future + end + + # @overload future(default_executor = :io, &task) + # Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately. + # @return [Future] + # @note FIXME allow to pass in variables as Thread.new(args) {|args| _ } does + # @overload future(default_executor = :io) + # User is responsible for completing the future once. + # @return [CompletableFuture] + def future(default_executor = :io, &task) + if task + ImmediatePromise.new(default_executor).event.chain(&task) + else + CompletableFuturePromise.new(default_executor).future + end + end + + alias_method :async, :future + + # Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delays until + # requested by {Future#wait} method, {Future#value} and {Future#value!} methods are calling {Future#wait} internally. + # @return [Delay] + def delay(default_executor = :io, &task) + Delay.new(default_executor).event.chain(&task) + end + + # Schedules the block to be executed on executor in given intended_time. + # @return [Future] + def schedule(intended_time, default_executor = :io, &task) + ScheduledPromise.new(intended_time, default_executor).future.chain(&task) + end + + # fails on first error + # does not block a thread + # @return [Future] + def join(*futures) + AllPromise.new(futures).future + end + + # TODO pick names for join, any on class/instance + # consider renaming to zip as in scala + alias_method :all, :join + alias_method :zip, :join + + def any(*futures) + AnyPromise.new(futures).future + end + + def post!(*args, &job) + post_on(:fast, *args, &job) + end + + def post(*args, &job) + post_on(:io, *args, &job) + end + + def post_on(executor, *args, &job) + Concurrent.executor(executor).post *args, &job + end + + # TODO add first(futures, count=count) + # TODO allow to to have a join point for many futures and process them in batches by 10 + end + + extend FutureShortcuts + include FutureShortcuts + + class Event < Synchronization::Object + extend FutureShortcuts + + # @api private + def initialize(promise, default_executor = :io) + super() + synchronize { ns_initialize(promise, default_executor) } + end + + # Is obligation completion still pending? + # @return [Boolean] + def pending? + synchronize { ns_pending? } + end + + alias_method :incomplete?, :pending? + + def completed? + synchronize { ns_completed? } + end + + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + def wait(timeout = nil) + touch + synchronize { ns_wait_until_complete(timeout) } + end + + def touch + pr_touch synchronize { ns_promise_to_touch } + end + + def state + synchronize { ns_state } + end + + def default_executor + synchronize { ns_default_executor } + end + + # @yield [success, value, reason] of the parent + def chain(executor = nil, &callback) + pr_chain(default_executor, executor, &callback) + end + + alias_method :then, :chain + + def delay + pr_delay(default_executor) + end + + def schedule(intended_time) + pr_schedule(default_executor, intended_time) + end + + # @yield [success, value, reason] executed async on `executor` when completed + # @return self + def on_completion(executor = nil, &callback) + synchronize { ns_on_completion(ns_default_executor, executor, &callback) } + end + + # @yield [success, value, reason] executed sync when completed + # @return self + def on_completion!(&callback) + synchronize { ns_on_completion!(&callback) } + end + + # @return [Array] + def blocks + pr_blocks(synchronize { @callbacks }) + end + + def to_s + synchronize { ns_to_s } + end + + def inspect + synchronize { "#{ns_to_s[0..-2]} blocks:[#{pr_blocks(@callbacks).map(&:to_s).join(', ')}]>" } + end + + # TODO take block optionally + def join(*futures) + pr_join(default_executor, *futures) + end + + alias_method :+, :join + alias_method :and, :join + + # @api private + def complete(raise = true) + callbacks = synchronize { ns_complete(raise) } + pr_call_callbacks callbacks + self + end + + # @api private + # just for inspection + def callbacks + synchronize { @callbacks }.clone.freeze + end + + # @api private + def add_callback(method, *args) + synchronize { ns_add_callback(method, *args) } + end + + # @api private, only for inspection + def promise + synchronize { ns_promise } + end + + def with_default_executor(executor = default_executor) + AllPromise.new([self], executor).future + end + + private + + def ns_initialize(promise, default_executor = :io) + @promise = promise + @state = :pending + @callbacks = [] + @default_executor = default_executor + @touched = false # TODO use atom to avoid locking + end + + def ns_wait_until_complete(timeout = nil) + ns_wait_until(timeout) { ns_completed? } + self + end + + def ns_state + @state + end + + def ns_pending? + ns_state == :pending + end + + alias_method :ns_incomplete?, :ns_pending? + + def ns_completed? + ns_state == :completed + end + + def ns_promise_to_touch + unless @touched + @touched = true + ns_promise + end + end + + def pr_touch(promise) + promise.touch if promise + end + + def ns_promise + @promise + end + + def ns_default_executor + @default_executor + end + + def pr_chain(default_executor, executor = nil, &callback) + ChainPromise.new(self, default_executor, executor || default_executor, &callback).future + end + + def pr_delay(default_executor) + pr_join(default_executor, Delay.new(default_executor).future) + end + + def pr_schedule(default_executor, intended_time) + pr_chain(default_executor) { ScheduledPromise.new(intended_time).future.join(self) }.flat + end + + def pr_join(default_executor, *futures) + AllPromise.new([self, *futures], default_executor).future + end + + def ns_on_completion(default_executor, executor = nil, &callback) + ns_add_callback :pr_async_callback_on_completion, executor || default_executor, callback + end + + def ns_on_completion!(&callback) + ns_add_callback :pr_callback_on_completion, callback + end + + def pr_blocks(callbacks) + callbacks.each_with_object([]) do |callback, promises| + promises.push *callback.select { |v| v.is_a? AbstractPromise } + end + end + + def ns_to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{ns_state}>" + end + + def ns_complete(raise = true) + ns_check_multiple_assignment raise + ns_complete_state + ns_broadcast + callbacks, @callbacks = @callbacks, [] + callbacks + end + + def ns_add_callback(method, *args) + if ns_completed? + pr_call_callback method, *args + else + @callbacks << [method, *args] + end + self + end + + def ns_complete_state + @state = :completed + end + + def ns_check_multiple_assignment(raise, reason = nil) + if ns_completed? + if raise + raise reason || Concurrent::MultipleAssignmentError.new('multiple assignment') + else + return nil + end + end + end + + def pr_with_async(executor, &block) + Concurrent.post_on(executor, &block) + end + + def pr_async_callback_on_completion(executor, callback) + pr_with_async(executor) { pr_callback_on_completion callback } + end + + def pr_callback_on_completion(callback) + callback.call + end + + def pr_notify_blocked(promise) + promise.done self + end + + def pr_call_callback(method, *args) + # all methods has to be pure + self.send method, *args + end + + def pr_call_callbacks(callbacks) + callbacks.each { |method, *args| pr_call_callback method, *args } + end + end + + class Future < Event + + # Has the obligation been success? + # @return [Boolean] + def success? + synchronize { ns_success? } + end + + # Has the obligation been failed? + # @return [Boolean] + def failed? + state == :failed + end + + # @return [Object] see Dereferenceable#deref + def value(timeout = nil) + touch + synchronize { ns_value timeout } + end + + def reason(timeout = nil) + touch + synchronize { ns_reason timeout } + end + + def result(timeout = nil) + touch + synchronize { ns_result timeout } + end + + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + # @raise [Exception] when #failed? it raises #reason + def wait!(timeout = nil) + touch + synchronize { ns_wait_until_complete! timeout } + end + + # @raise [Exception] when #failed? it raises #reason + # @return [Object] see Dereferenceable#deref + def value!(timeout = nil) + touch + synchronize { ns_value! timeout } + end + + # @example allows failed Future to be risen + # raise Concurrent.future.fail + def exception(*args) + touch + synchronize { ns_exception(*args) } + end + + # @yield [value] executed only on parent success + def then(executor = nil, &callback) + pr_then(default_executor, executor, &callback) + end + + # Creates new future where its value is result of asking actor with value of this Future. + def then_ask(actor) + self.then { |v| actor.ask(v) }.flat + end + + # @yield [reason] executed only on parent failure + def rescue(executor = nil, &callback) + pr_rescue(default_executor, executor, &callback) + end + + def flat(level = 1) + FlattingPromise.new(self, level, default_executor).future + end + + def or(*futures) + AnyPromise.new([self, *futures], default_executor).future + end + + alias_method :|, :or + + # @yield [value] executed async on `executor` when success + # @return self + def on_success(executor = nil, &callback) + synchronize { ns_on_success(ns_default_executor, executor, &callback) } + end + + # @yield [reason] executed async on `executor` when failed? + # @return self + def on_failure(executor = nil, &callback) + synchronize { ns_on_failure(ns_default_executor, executor, &callback) } + end + + # @yield [value] executed sync when success + # @return self + def on_success!(&callback) + synchronize { ns_on_success!(&callback) } + end + + # @yield [reason] executed sync when failed? + # @return self + def on_failure!(&callback) + synchronize { ns_on_failure!(&callback) } + end + + # @api private + def complete(success, value, reason, raise = true) + callbacks = synchronize { ns_complete(success, value, reason, raise) } + pr_call_callbacks callbacks, success, value, reason + self + end + + def ns_add_callback(method, *args) + if ns_completed? + pr_call_callback method, ns_completed?, ns_value, ns_reason, *args + else + @callbacks << [method, *args] + end + self + end + + private + + def ns_initialize(promise, default_executor = :io) + super(promise, default_executor) + @value = nil + @reason = nil + end + + def ns_success? + ns_state == :success + end + + def ns_failed? + ns_state == :failed + end + + def ns_completed? + [:success, :failed].include? ns_state + end + + def ns_value(timeout = nil) + ns_wait_until_complete timeout + @value + end + + def ns_reason(timeout = nil) + ns_wait_until_complete timeout + @reason + end + + def ns_result(timeout = nil) + value = ns_value(timeout) + [ns_success?, value, ns_reason] + end + + def ns_wait_until_complete!(timeout = nil) + ns_wait_until_complete(timeout) + raise self if ns_failed? + self + end + + def ns_value!(timeout = nil) + ns_wait_until_complete!(timeout) + @value + end + + def ns_exception(*args) + raise 'obligation is not failed' unless ns_failed? + ns_reason.exception(*args) + end + + def pr_then(default_executor, executor = nil, &callback) + ThenPromise.new(self, default_executor, executor || default_executor, &callback).future + end + + def pr_rescue(default_executor, executor = nil, &callback) + RescuePromise.new(self, default_executor, executor || default_executor, &callback).future + end + + def ns_on_success(default_executor, executor = nil, &callback) + ns_add_callback :pr_async_callback_on_success, executor || default_executor, callback + end + + def ns_on_failure(default_executor, executor = nil, &callback) + ns_add_callback :pr_async_callback_on_failure, executor || default_executor, callback + end + + def ns_on_success!(&callback) + ns_add_callback :pr_callback_on_success, callback + end + + def ns_on_failure!(&callback) + ns_add_callback :pr_callback_on_failure, callback + end + + def ns_complete(success, value, reason, raise = true) + ns_check_multiple_assignment raise, reason + ns_complete_state(success, value, reason) + ns_broadcast + callbacks, @callbacks = @callbacks, [] + callbacks + end + + def ns_complete_state(success, value, reason) + if success + @value = value + @state = :success + else + @reason = reason + @state = :failed + end + end + + def pr_call_callbacks(callbacks, success, value, reason) + callbacks.each { |method, *args| pr_call_callback method, success, value, reason, *args } + end + + def pr_async_callback_on_success(success, value, reason, executor, callback) + pr_with_async(executor) { pr_callback_on_success success, value, reason, callback } + end + + def pr_async_callback_on_failure(success, value, reason, executor, callback) + pr_with_async(executor) { pr_callback_on_failure success, value, reason, callback } + end + + def pr_callback_on_success(success, value, reason, callback) + callback.call value if success + end + + def pr_callback_on_failure(success, value, reason, callback) + callback.call reason unless success + end + + def pr_callback_on_completion(success, value, reason, callback) + callback.call success, value, reason + end + + def pr_notify_blocked(success, value, reason, promise) + super(promise) + end + + def pr_async_callback_on_completion(success, value, reason, executor, callback) + pr_with_async(executor) { pr_callback_on_completion success, value, reason, callback } + end + end + + class CompletableEvent < Event + # Complete the event + # @api public + def complete(raise = true) + super raise + end + end + + class CompletableFuture < Future + # Complete the future + # @api public + def complete(success, value, reason, raise = true) + super success, value, reason, raise + end + + def success(value) + promise.success(value) + end + + def try_success(value) + promise.try_success(value) + end + + def fail(reason = StandardError.new) + promise.fail(reason) + end + + def try_fail(reason = StandardError.new) + promise.try_fail(reason) + end + + def evaluate_to(*args, &block) + promise.evaluate_to(*args, &block) + end + + def evaluate_to!(*args, &block) + promise.evaluate_to!(*args, &block) + end + end + + # TODO modularize blocked_by and notify blocked + + # @abstract + class AbstractPromise < Synchronization::Object + # @api private + def initialize(*args, &block) + super(&nil) + synchronize { ns_initialize(*args, &block) } + end + + def default_executor + future.default_executor + end + + def future + synchronize { ns_future } + end + + alias_method :event, :future + + def state + future.state + end + + def touch + end + + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end + + def inspect + to_s + end + + private + + def ns_initialize(future) + @future = future + end + + def ns_future + @future + end + + def complete(*args) + pr_complete(synchronize { ns_future }, *args) + end + + def pr_complete(future, *args) + future.complete(*args) + end + + def evaluate_to(*args, &block) + pr_evaluate_to(synchronize { ns_future }, *args, &block) + end + + # @return [Future] + def pr_evaluate_to(future, *args, &block) + pr_complete future, true, block.call(*args), nil + rescue => error + pr_complete future, false, nil, error + end + end + + class CompletableEventPromise < AbstractPromise + public :complete + + private + + def ns_initialize(default_executor = :io) + super CompletableEvent.new(self, default_executor) + end + end + + # @note Be careful not to fullfill the promise twice + # @example initialization + # Concurrent.promise + # @note TODO consider to allow being blocked_by + class CompletableFuturePromise < AbstractPromise + # Set the `Future` to a value and wake or notify all threads waiting on it. + # + # @param [Object] value the value to store in the `Future` + # @raise [Concurrent::MultipleAssignmentError] if the `Future` has already been set or otherwise completed + # @return [Future] + def success(value) + complete(true, value, nil) + end + + def try_success(value) + complete(true, value, nil, false) + end + + # Set the `Future` to failed due to some error and wake or notify all threads waiting on it. + # + # @param [Object] reason for the failure + # @raise [Concurrent::MultipleAssignmentError] if the `Future` has already been set or otherwise completed + # @return [Future] + def fail(reason = StandardError.new) + complete(false, nil, reason) + end + + def try_fail(reason = StandardError.new) + !!complete(false, nil, reason, false) + end + + public :complete + public :evaluate_to + + # @return [Future] + def evaluate_to!(*args, &block) + evaluate_to(*args, &block).wait! + end + + private + + def ns_initialize(default_executor = :io) + super CompletableFuture.new(self, default_executor) + end + end + + # @abstract + class InnerPromise < AbstractPromise + end + + # @abstract + class BlockedPromise < InnerPromise + def self.new(*args) + promise = super(*args) + promise.blocked_by.each { |f| f.add_callback :pr_notify_blocked, promise } + promise + end + + # @api private + def done(future) # FIXME pass in success/value/reason to avoid locking + # futures could be deleted from blocked_by one by one here, but that would be too expensive, + # it's done once when all are done to free the reference + completable, *args = synchronize do + completable = ns_done(future) + blocked_by, @blocked_by = @blocked_by, [] if completable + [completable, *ns_completable_args(future, blocked_by)] + end + pr_completable(*args) if completable + end + + def touch + synchronize { ns_blocked_by }.each(&:touch) + end + + # @api private + # for inspection only + def blocked_by + synchronize { ns_blocked_by } + end + + def inspect + "#{to_s[0..-2]} blocked_by:[#{ blocked_by.map(&:to_s).join(', ')}]>" + end + + private + + def ns_initialize(future, blocked_by_futures) + super future + @blocked_by = Array(blocked_by_futures) + @countdown = @blocked_by.size + end + + # @return [true,false] if completable + def ns_done(future) + (@countdown -= 1).zero? + end + + def ns_completable_args(done_future, blocked_by) + [done_future, blocked_by, ns_future] + end + + def pr_completable(_, _, _) + raise NotImplementedError + end + + def ns_blocked_by + @blocked_by + end + end + + # @abstract + class BlockedTaskPromise < BlockedPromise + def executor + synchronize { ns_executor } + end + + private + + def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) + raise ArgumentError, 'no block given' unless block_given? + super Future.new(self, default_executor), [blocked_by_future] + @task = task + @executor = executor + end + + def ns_executor + @executor + end + + def ns_task + @task + end + + def task + synchronize { ns_task } + end + + def ns_completable_args(done_future, blocked_by) + [done_future, blocked_by, ns_future, ns_executor, ns_task] + end + + def pr_completable(_, _, _, _, _) + raise NotImplementedError + end + end + + class ThenPromise < BlockedTaskPromise + private + + def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) + blocked_by_future.is_a? Future or + raise ArgumentError, 'only Future can be appended with then' + super(blocked_by_future, default_executor, executor, &task) + end + + def pr_completable(done_future, _, future, executor, task) + if done_future.success? + Concurrent.post_on(executor, done_future, task) { |done_future, task| evaluate_to done_future.value, &task } + else + pr_complete future, false, nil, done_future.reason + end + end + end + + class RescuePromise < BlockedTaskPromise + private + + def ns_initialize(blocked_by_future, default_executor = :io, executor = default_executor, &task) + blocked_by_future.is_a? Future or + raise ArgumentError, 'only Future can be rescued' + super(blocked_by_future, default_executor, executor, &task) + end + + def pr_completable(done_future, _, future, executor, task) + if done_future.failed? + Concurrent.post_on(executor, done_future, task) { |done_future, task| evaluate_to done_future.reason, &task } + else + pr_complete future, true, done_future.value, nil + end + end + end + + class ChainPromise < BlockedTaskPromise + private + + def pr_completable(done_future, _, _, executor, task) + if Future === done_future + Concurrent.post_on(executor, done_future, task) { |future, task| evaluate_to *future.result, &task } + else + Concurrent.post_on(executor, task) { |task| evaluate_to &task } + end + end + end + + # will be immediately completed + class ImmediatePromise < InnerPromise + def self.new(*args) + promise = super(*args) + Concurrent.post_on(:fast, promise) { |promise| promise.future.complete } + promise + end + + private + + def ns_initialize(default_executor = :io) + super Event.new(self, default_executor) + end + end + + class FlattingPromise < BlockedPromise + private + + def ns_done(future) + value = future.value # TODO get the value as argument + if @levels > 0 + case value + when Future + @countdown += 1 + @blocked_by << value + @levels -= 1 + value.add_callback :pr_notify_blocked, self + when Event + raise TypeError, 'cannot flatten to Event' + else + raise TypeError, "returned value '#{value}' is not a Future" + end + end + super future + end + + def ns_initialize(blocked_by_future, levels = 1, default_executor = :io) + blocked_by_future.is_a? Future or + raise ArgumentError, 'only Future can be flatten' + super(Future.new(self, default_executor), [blocked_by_future]) + @levels = levels + end + + def pr_completable(_, blocked_by, future) + pr_complete future, *blocked_by.last.result + end + end + + # used internally to support #with_default_executor + class AllPromise < BlockedPromise + private + + def ns_initialize(blocked_by_futures, default_executor = :io) + klass = blocked_by_futures.any? { |f| f.is_a?(Future) } ? Future : Event + # noinspection RubyArgCount + super(klass.new(self, default_executor), blocked_by_futures) + end + + def pr_completable(done_future, blocked_by, future) + results = blocked_by.select { |f| f.is_a?(Future) }.map(&:result) + if results.empty? + pr_complete future + else + if results.all? { |success, _, _| success } + params = results.map { |_, value, _| value } + pr_complete(future, true, params.size == 1 ? params.first : params, nil) + else + # TODO what about other reasons? + pr_complete future.false, nil, results.find { |success, _, _| !success }.last + end + end + end + end + + class AnyPromise < BlockedPromise + + private + + def ns_initialize(blocked_by_futures, default_executor = :io) + blocked_by_futures.all? { |f| f.is_a? Future } or + raise ArgumentError, 'accepts only Futures not Events' + super(Future.new(self, default_executor), blocked_by_futures) + end + + def ns_done(future) + true + end + + def pr_completable(done_future, _, future) + pr_complete future, *done_future.result, false + end + end + + class Delay < InnerPromise + def touch + pr_complete synchronize { ns_future } + end + + private + + def ns_initialize(default_executor = :io) + super Event.new(self, default_executor) + end + end + + # will be evaluated to task in intended_time + class ScheduledPromise < InnerPromise + def intended_time + synchronize { ns_intended_time } + end + + def inspect + "#{to_s[0..-2]} intended_time:[#{intended_time}}>" + end + + private + + def ns_initialize(intended_time, default_executor = :io) + super Event.new(self, default_executor) + in_seconds = begin + @intended_time = intended_time + now = Time.now + schedule_time = if intended_time.is_a? Time + intended_time + else + now + intended_time + end + [0, schedule_time.to_f - now.to_f].max + end + + Concurrent.global_timer_set.post(in_seconds) { complete } + end + + def ns_intended_time + @intended_time + end + end + end + + extend Edge::FutureShortcuts + include Edge::FutureShortcuts +end diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index d97b5379c..081784ffc 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -6,6 +6,65 @@ module Concurrent module Executor + include Logging + + # Get the requested `Executor` based on the values set in the options hash. + # + # @param [Hash] opts the options defining the requested executor + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:fast` returns the global fast executor, + # `:io` returns the global io executor, and `:immediate` returns a new + # `ImmediateExecutor` object. + # + # @return [Executor, nil] the requested thread pool, or nil when no option specified + # + # @!visibility private + def self.executor_from_options(opts = {}) # :nodoc: + case + when opts.key?(:executor) + if opts[:executor].nil? + nil + else + executor(opts[:executor]) + end + when opts.key?(:operation) || opts.key?(:task) + if opts[:operation] == true || opts[:task] == false + Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + return Concurrent.global_fast_executor + end + + if opts[:operation] == false || opts[:task] == true + Kernel.warn '[DEPRECATED] use `executor: :io` instead' + return Concurrent.global_io_executor + end + + raise ArgumentError.new("executor '#{opts[:executor]}' not recognized") + else + nil + end + end + + def self.executor(executor_identifier) + case executor_identifier + when :fast + Concurrent.global_fast_executor + when :io + Concurrent.global_io_executor + when :immediate + Concurrent.global_immediate_executor + when :operation + Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + Concurrent.global_fast_executor + when :task + Kernel.warn '[DEPRECATED] use `executor: :io` instead' + Concurrent.global_io_executor + when Executor + executor_identifier + else + raise ArgumentError, "executor not recognized by '#{executor_identifier}'" + end + end + # The policy defining how rejected tasks (tasks received once the # queue size reaches the configured `max_queue`, or after the # executor has shut down) are handled. Must be one of the values @@ -363,4 +422,6 @@ def synchronize end end end + + end diff --git a/lib/concurrent/executor/executor_options.rb b/lib/concurrent/executor/executor_options.rb deleted file mode 100644 index dfe3f95fc..000000000 --- a/lib/concurrent/executor/executor_options.rb +++ /dev/null @@ -1,62 +0,0 @@ -require 'concurrent/configuration' -require 'concurrent/executor/immediate_executor' - -module Concurrent - - # A mixin module for parsing options hashes related to gem-level configuration. - # @!visibility private - module ExecutorOptions # :nodoc: - - # Get the requested `Executor` based on the values set in the options hash. - # - # @param [Hash] opts the options defining the requested executor - # @option opts [Executor] :executor when set use the given `Executor` instance. - # Three special values are also supported: `:fast` returns the global fast executor, - # `:io` returns the global io executor, and `:immediate` returns a new - # `ImmediateExecutor` object. - # - # @return [Executor, nil] the requested thread pool, or nil when no option specified - # - # @!visibility private - def get_executor_from(opts = {}) # :nodoc: - case - when opts.key?(:executor) - case opts[:executor] - when :fast - Concurrent.global_fast_executor - when :io - Concurrent.global_io_executor - when :immediate - Concurrent.global_immediate_executor - when :operation - Kernel.warn '[DEPRECATED] use `executor: :fast` instead' - Concurrent.global_fast_executor - when :task - Kernel.warn '[DEPRECATED] use `executor: :io` instead' - Concurrent.global_io_executor - when Executor - opts[:executor] - when nil - nil - else - raise ArgumentError.new("executor '#{opts[:executor]}' not recognized") - end - - when opts.key?(:operation) || opts.key?(:task) - if opts[:operation] == true || opts[:task] == false - Kernel.warn '[DEPRECATED] use `executor: :fast` instead' - return Concurrent.global_fast_executor - end - - if opts[:operation] == false || opts[:task] == true - Kernel.warn '[DEPRECATED] use `executor: :io` instead' - return Concurrent.global_io_executor - end - - raise ArgumentError.new("executor '#{opts[:executor]}' not recognized") - else - nil - end - end - end -end diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index e93ccd005..053054dd7 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -187,13 +187,11 @@ def ns_shutdown_execution # @api private def ns_kill_execution - ns_shutdown_execution - unless stopped_event.wait(1) - @pool.each &:kill - @pool.clear - @ready.clear - # TODO log out unprocessed tasks in queue - end + # TODO log out unprocessed tasks in queue + # TODO try to shutdown first? + @pool.each &:kill + @pool.clear + @ready.clear end alias_method :kill_execution, :ns_kill_execution diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index a3d042fea..fa443c257 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -4,7 +4,6 @@ require 'concurrent/executor/executor' require 'concurrent/executor/single_thread_executor' require 'concurrent/utility/monotonic_time' -require 'concurrent/executor/executor_options' module Concurrent @@ -15,7 +14,6 @@ module Concurrent # @!macro monotonic_clock_warning class TimerSet include RubyExecutor - include ExecutorOptions # Create a new set of timed tasks. # @@ -28,7 +26,7 @@ class TimerSet # `ImmediateExecutor` object. def initialize(opts = {}) @queue = PriorityQueue.new(order: :min) - @task_executor = get_executor_from(opts) || Concurrent.global_io_executor + @task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor @timer_executor = SingleThreadExecutor.new @condition = Condition.new init_executor diff --git a/lib/concurrent/file_map.rb b/lib/concurrent/file_map.rb new file mode 100644 index 000000000..32c1487d7 --- /dev/null +++ b/lib/concurrent/file_map.rb @@ -0,0 +1,17 @@ +module Concurrent + + git_files = `git ls-files`.split("\n") + all_lib_files = Dir['lib/concurrent/**/*.rb'] & git_files + edge_lib_files = Dir['lib/concurrent/actor.rb', + 'lib/concurrent/actor/**/*.rb', + 'lib/concurrent/channel.rb', + 'lib/concurrent/channel/**/*.rb', + 'lib/concurrent/edge/**/*.rb'] & git_files + core_lib_files = all_lib_files - edge_lib_files + + FILE_MAP = { + core: core_lib_files + %w(lib/concurrent.rb lib/concurrent_ruby.rb), + edge: edge_lib_files + %w(lib/concurrent-edge.rb) + } +end + diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index be78e8791..02afa86c7 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -2,7 +2,6 @@ require 'concurrent/ivar' require 'concurrent/executor/safe_task_executor' -require 'concurrent/executor/executor_options' module Concurrent @@ -12,7 +11,6 @@ module Concurrent # @see http://clojuredocs.org/clojure_core/clojure.core/future Clojure's future function # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html java.util.concurrent.Future class Future < IVar - include ExecutorOptions # Create a new `Future` in the `:unscheduled` state. # @@ -29,7 +27,7 @@ def initialize(opts = {}, &block) super(IVar::NO_VALUE, opts) @state = :unscheduled @task = block - @executor = get_executor_from(opts) || Concurrent.global_io_executor + @executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor @args = get_arguments_from(opts) end diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 0831c39cc..fdae3ed12 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -2,7 +2,6 @@ require 'concurrent/ivar' require 'concurrent/obligation' -require 'concurrent/executor/executor_options' module Concurrent @@ -183,7 +182,6 @@ module Concurrent # - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` # - `rescue` is aliased by `catch` and `on_error` class Promise < IVar - include ExecutorOptions # Initialize a new Promise with the provided options. # @@ -207,7 +205,7 @@ def initialize(opts = {}, &block) opts.delete_if { |k, v| v.nil? } super(IVar::NO_VALUE, opts) - @executor = get_executor_from(opts) || Concurrent.global_io_executor + @executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor @args = get_arguments_from(opts) @parent = opts.fetch(:parent) { nil } diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index d70662885..d6798f637 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -1,7 +1,6 @@ require 'concurrent/ivar' require 'concurrent/utility/timer' require 'concurrent/executor/safe_task_executor' -require 'concurrent/executor/executor_options' module Concurrent @@ -134,7 +133,6 @@ module Concurrent # # @!macro monotonic_clock_warning class ScheduledTask < IVar - include ExecutorOptions attr_reader :delay @@ -166,7 +164,7 @@ def initialize(delay, opts = {}, &block) self.observers = CopyOnNotifyObserverSet.new @state = :unscheduled @task = block - @executor = get_executor_from(opts) || Concurrent.global_io_executor + @executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor end # Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending` diff --git a/lib/concurrent/version.rb b/lib/concurrent/version.rb index 6124e5fe0..182ea0f24 100644 --- a/lib/concurrent/version.rb +++ b/lib/concurrent/version.rb @@ -1,3 +1,4 @@ module Concurrent - VERSION = '0.8.0' + VERSION = '0.8.0' + EDGE_VERSION = '0.1.0' end diff --git a/lib/concurrent_ruby.rb b/lib/concurrent_ruby.rb index 2e9e4f3ab..632aa33a3 100644 --- a/lib/concurrent_ruby.rb +++ b/lib/concurrent_ruby.rb @@ -1 +1,2 @@ +warn "'[DEPRECATED] use `require 'concurrent'` instead of `require 'concurrent_ruby'`" require 'concurrent' diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index c1557c3ea..b63a472f4 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -89,7 +89,7 @@ def on_message(message) it 'terminates on failed initialization' do a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { raise } - expect(a.ask(nil).wait.rejected?).to be_truthy + expect(a.ask(nil).wait.failed?).to be_truthy expect(a.ask!(:terminated?)).to be_truthy end @@ -101,7 +101,7 @@ def on_message(message) it 'terminates on failed message processing' do a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } - expect(a.ask(nil).wait.rejected?).to be_truthy + expect(a.ask(nil).wait.failed?).to be_truthy expect(a.ask!(:terminated?)).to be_truthy end end @@ -144,8 +144,8 @@ def on_message(message) envelope = subject.ask!('a') expect(envelope).to be_a_kind_of Envelope expect(envelope.message).to eq 'a' - expect(envelope.ivar).to be_complete - expect(envelope.ivar.value).to eq envelope + expect(envelope.future).to be_completed + expect(envelope.future.value).to eq envelope expect(envelope.sender).to eq Thread.current terminate_actors subject end diff --git a/spec/concurrent/atomic/condition_spec.rb b/spec/concurrent/atomic/condition_spec.rb index ed72c0a05..d00258637 100644 --- a/spec/concurrent/atomic/condition_spec.rb +++ b/spec/concurrent/atomic/condition_spec.rb @@ -29,7 +29,7 @@ module Concurrent it 'should block the thread' do latch_1 = Concurrent::CountDownLatch.new latch_2 = Concurrent::CountDownLatch.new - mutex = Mutex.new + mutex = Mutex.new t = Thread.new do mutex.synchronize do @@ -46,8 +46,8 @@ module Concurrent end it 'should return a woken up result when is woken up by #signal' do - result = nil - mutex = Mutex.new + result = nil + mutex = Mutex.new latch_1 = Concurrent::CountDownLatch.new latch_2 = Concurrent::CountDownLatch.new @@ -74,8 +74,8 @@ module Concurrent end it 'should return a woken up result when is woken up by #broadcast' do - result = nil - mutex = Mutex.new + result = nil + mutex = Mutex.new latch_1 = Concurrent::CountDownLatch.new latch_2 = Concurrent::CountDownLatch.new @@ -110,7 +110,7 @@ module Concurrent it 'should block the thread' do latch_1 = Concurrent::CountDownLatch.new latch_2 = Concurrent::CountDownLatch.new - mutex = Mutex.new + mutex = Mutex.new t = Thread.new do mutex.synchronize do @@ -127,8 +127,8 @@ module Concurrent end it 'should return remaining time when is woken up by #signal' do - result = nil - mutex = Mutex.new + result = nil + mutex = Mutex.new latch_1 = Concurrent::CountDownLatch.new latch_2 = Concurrent::CountDownLatch.new @@ -156,8 +156,8 @@ module Concurrent end it 'should return remaining time when is woken up by #broadcast' do - result = nil - mutex = Mutex.new + result = nil + mutex = Mutex.new latch_1 = Concurrent::CountDownLatch.new latch_2 = Concurrent::CountDownLatch.new @@ -186,8 +186,8 @@ module Concurrent it 'should return 0 or negative number if timed out' do result = nil - mutex = Mutex.new - latch = Concurrent::CountDownLatch.new + mutex = Mutex.new + latch = Concurrent::CountDownLatch.new t = Thread.new do mutex.synchronize do @@ -214,7 +214,7 @@ module Concurrent describe '#wait' do it 'should block threads' do - mutex = Mutex.new + mutex = Mutex.new latch_1 = Concurrent::CountDownLatch.new(2) latch_2 = Concurrent::CountDownLatch.new(2) @@ -240,7 +240,7 @@ module Concurrent it 'wakes up only one thread' do latch_1 = Concurrent::CountDownLatch.new(2) latch_2 = Concurrent::CountDownLatch.new(2) - mutex = Mutex.new + mutex = Mutex.new t1 = Thread.new do mutex.synchronize do @@ -270,18 +270,22 @@ module Concurrent describe '#broadcast' do it 'wakes up all threads' do - latch = CountDownLatch.new(2) - mutex = Mutex.new - - t1 = Thread.new { mutex.synchronize { subject.wait(mutex); latch.count_down } } - t2 = Thread.new { mutex.synchronize { subject.wait(mutex); latch.count_down } } - - sleep(0.1) + mutex = Mutex.new + go = CountDownLatch.new(2) + threads = Array.new(2) do + Thread.new do + mutex.synchronize do + go.count_down + subject.wait(mutex) + end + end + end + go.wait mutex.synchronize { subject.broadcast } - sleep(0.2) - expect(latch.count).to eq 0 - [t1, t2].each { |t| t.kill } + threads.each do |t| + expect(t.join(0.1)).to eq t + end end end end diff --git a/spec/concurrent/atomic/count_down_latch_spec.rb b/spec/concurrent/atomic/count_down_latch_spec.rb index 71b8d12c2..0f3c4ef62 100644 --- a/spec/concurrent/atomic/count_down_latch_spec.rb +++ b/spec/concurrent/atomic/count_down_latch_spec.rb @@ -99,8 +99,8 @@ module Concurrent before(:each) do def subject.simulate_spurious_wake_up synchronize do - signal - broadcast + ns_signal + ns_broadcast end end end diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index 8330df0cb..de1a98357 100644 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -20,7 +20,7 @@ context 'not enough permits available' do it 'should block thread until permits are available' do semaphore.drain_permits - Thread.new { sleep(0.2) && semaphore.release } + Thread.new { sleep(0.2); semaphore.release } result = semaphore.acquire expect(result).to be_nil @@ -64,7 +64,7 @@ it 'acquires when permits are available within timeout' do semaphore.drain_permits - Thread.new { sleep 0.1 && semaphore.release } + Thread.new { sleep 0.1; semaphore.release } result = semaphore.try_acquire(1, 1) expect(result).to be_truthy end @@ -112,15 +112,16 @@ def subject.simulate_spurious_wake_up @condition.broadcast end end + subject.drain_permits end it 'should resist to spurious wake ups without timeout' do actual = Concurrent::AtomicBoolean.new(true) - latch = Concurrent::CountDownLatch.new + latch = Concurrent::CountDownLatch.new # would set actual to false - t = Thread.new { latch.wait(1); actual.value = subject.acquire } + t = Thread.new { latch.wait(1); actual.value = subject.acquire } latch.count_down subject.simulate_spurious_wake_up @@ -132,10 +133,10 @@ def subject.simulate_spurious_wake_up it 'should resist to spurious wake ups with timeout' do actual = Concurrent::AtomicBoolean.new(true) - latch = Concurrent::CountDownLatch.new + latch = Concurrent::CountDownLatch.new # sets actual to false in another thread - t = Thread.new { latch.wait(1); actual.value = subject.try_acquire(1, 0.3) } + t = Thread.new { latch.wait(1); actual.value = subject.try_acquire(1, 0.3) } latch.count_down subject.simulate_spurious_wake_up diff --git a/spec/concurrent/collection/blocking_ring_buffer_spec.rb b/spec/concurrent/channel/blocking_ring_buffer_spec.rb similarity index 100% rename from spec/concurrent/collection/blocking_ring_buffer_spec.rb rename to spec/concurrent/channel/blocking_ring_buffer_spec.rb diff --git a/spec/concurrent/collection/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb similarity index 100% rename from spec/concurrent/collection/ring_buffer_spec.rb rename to spec/concurrent/channel/ring_buffer_spec.rb diff --git a/spec/concurrent/edge/future_spec.rb b/spec/concurrent/edge/future_spec.rb new file mode 100644 index 000000000..cc9fde87c --- /dev/null +++ b/spec/concurrent/edge/future_spec.rb @@ -0,0 +1,236 @@ +require 'concurrent' +require 'thread' + +logger = Logger.new($stderr) +logger.level = Logger::DEBUG +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end + +describe 'Concurrent::Edge futures' do + + describe '.post' do + it 'executes tasks asynchronously' do + queue = Queue.new + value = 12 + Concurrent.post { queue.push(value) } + Concurrent.post(:io) { queue.push(value) } + expect(queue.pop).to eq value + expect(queue.pop).to eq value + end + end + + describe '.future' do + it 'executes' do + future = Concurrent.future(:immediate) { 1 + 1 } + expect(future.value).to eq 2 + end + end + + describe '.delay' do + it 'delays execution' do + delay = Concurrent.delay { 1 + 1 } + expect(delay.completed?).to eq false + expect(delay.value).to eq 2 + end + end + + describe '.schedule' do + it 'scheduled execution' do + start = Time.now.to_f + queue = Queue.new + future = Concurrent.schedule(0.1) { 1 + 1 }.then { |v| queue.push(v); queue.push(Time.now.to_f - start); queue } + + expect(future.value).to eq queue + expect(queue.pop).to eq 2 + expect(queue.pop).to be_between(0.1, 0.2) + end + + it 'scheduled execution in graph' do + start = Time.now.to_f + queue = Queue.new + future = Concurrent. + future { sleep 0.1; 1 }. + schedule(0.1). + then { |v| v + 1 }. + then { |v| queue.push(v); queue.push(Time.now.to_f - start); queue } + + future.wait! + expect(future.value).to eq queue + expect(queue.pop).to eq 2 + expect(queue.pop).to be_between(0.2, 0.3) + end + end + + describe '.event' do + specify do + completable_event = Concurrent.event + one = completable_event.chain { 1 } + join = Concurrent.join(completable_event).chain { 1 } + expect(one.completed?).to be false + completable_event.complete + expect(one.value).to eq 1 + expect(join.wait.completed?).to be true + end + end + + describe '.future without block' do + specify do + completable_future = Concurrent.future + one = completable_future.then(&:succ) + join = Concurrent.join(completable_future).then { |v| v } + expect(one.completed?).to be false + completable_future.success 0 + expect(one.value).to eq 1 + expect(join.wait!.completed?).to be true + expect(join.value!).to eq 0 + end + end + + describe '.any' do + it 'continues on first result' do + queue = Queue.new + f1 = Concurrent.future(:io) { queue.pop } + f2 = Concurrent.future(:io) { queue.pop } + + queue.push(1) + queue.push(2) + + anys = [Concurrent.any(f1, f2), + f1 | f2, + f1.or(f2)] + + anys.each do |any| + expect(any.value.to_s).to match /1|2/ + end + + end + end + + describe 'Future' do + it 'has sync and async callbacks' do + queue = Queue.new + future = Concurrent.future { :value } # executed on FAST_EXECUTOR pool by default + future.on_completion(:io) { queue.push(:async) } # async callback overridden to execute on IO_EXECUTOR pool + future.on_completion! { queue.push(:sync) } # sync callback executed right after completion in the same thread-pool + + expect(future.value).to eq :value + expect([queue.pop, queue.pop].sort).to eq [:async, :sync] + end + + it 'chains' do + future0 = Concurrent.future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR + future1 = future0.then(:fast) { raise 'boo' } # executed on IO_EXECUTOR + future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR + future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR + future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR + future5 = future3.with_default_executor(:fast) # connects new future with different executor, the new future is completed when future3 is + future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 + future7 = Concurrent.join(future0, future3) + future8 = future0.rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0 + + futures = [future0, future1, future2, future3, future4, future5, future6, future7, future8] + futures.each &:wait + + table = futures.each_with_index.map do |f, i| + '%5i %7s %10s %6s %4s %6s' % [i, f.success?, f.value, f.reason, + (f.promise.executor if f.promise.respond_to?(:executor)), + f.default_executor] + end.unshift('index success value reason pool d.pool') + + expect(table.join("\n")).to eq <<-TABLE.gsub(/^\s+\|/, '').strip + |index success value reason pool d.pool + | 0 true 3 io io + | 1 false boo fast io + | 2 false boo io io + | 3 true boo io io + | 4 true true io io + | 5 true boo fast + | 6 true Boo fast fast + | 7 true [3, "boo"] io + | 8 true 3 io io + TABLE + end + + it 'constructs promise like tree' do + # if head of the tree is not constructed with #future but with #delay it does not start execute, + # it's triggered later by calling wait or value on any of the dependent futures or the delay itself + three = (head = Concurrent.delay { 1 }).then { |v| v.succ }.then(&:succ) + four = three.delay.then(&:succ) + + # meaningful to_s and inspect defined for Future and Promise + expect(head.to_s).to match /<#Concurrent::Edge::Future:0x[\da-f]+ pending>/ + expect(head.inspect).to( + match(/<#Concurrent::Edge::Future:0x[\da-f]+ pending blocks:\[<#Concurrent::Edge::ThenPromise:0x[\da-f]+ pending>\]>/)) + + # evaluates only up to three, four is left unevaluated + expect(three.value).to eq 3 + expect(four).not_to be_completed + + expect(four.value).to eq 4 + + # futures hidden behind two delays trigger evaluation of both + double_delay = Concurrent.delay { 1 }.delay.then(&:succ) + expect(double_delay.value).to eq 2 + end + + it 'allows graphs' do + head = Concurrent.future { 1 } + branch1 = head.then(&:succ) + branch2 = head.then(&:succ).delay.then(&:succ) + results = [ + Concurrent.join(branch1, branch2).then { |b1, b2| b1 + b2 }, + branch1.join(branch2).then { |b1, b2| b1 + b2 }, + (branch1 + branch2).then { |b1, b2| b1 + b2 }] + + sleep 0.1 + expect(branch1).to be_completed + expect(branch2).not_to be_completed + + expect(results.map(&:value)).to eq [5, 5, 5] + end + + it 'has flat map' do + f = Concurrent.future { Concurrent.future { 1 } }.flat.then(&:succ) + expect(f.value!).to eq 2 + end + end + + it 'interoperability' do + actor = Concurrent::Actor::Utils::AdHoc.spawn :doubler do + -> v { v * 2 } + end + + expect(Concurrent. + future { 2 }. + then_ask(actor). + then { |v| v + 2 }. + value).to eq 6 + end + +end + +__END__ + +puts '-- connecting existing promises' + +source = Concurrent.delay { 1 } +promise = Concurrent.promise +promise.connect_to source +p promise.future.value # 1 +# or just +p Concurrent.promise.connect_to(source).value + + +puts '-- using shortcuts' + +include Concurrent # includes Future::Shortcuts + +# now methods on Concurrent are accessible directly + +p delay { 1 }.value, future { 1 }.value # => 1\n1 + +promise = promise() +promise.connect_to(future { 3 }) +p promise.future.value # 3 + diff --git a/spec/concurrent/executor/executor_options_spec.rb b/spec/concurrent/executor/executor_options_spec.rb index b81853c2b..2d02584e8 100644 --- a/spec/concurrent/executor/executor_options_spec.rb +++ b/spec/concurrent/executor/executor_options_spec.rb @@ -1,99 +1,93 @@ module Concurrent - describe ExecutorOptions do - - let(:executor){ ImmediateExecutor.new } - - let(:io_executor){ ImmediateExecutor.new } - let(:fast_executor){ ImmediateExecutor.new } - - subject { Class.new{ include ExecutorOptions }.new } - - context '#get_executor_from' do - - it 'returns the given :executor' do - expect(subject.get_executor_from(executor: executor)).to eq executor - end - - it 'returns the global io executor when :executor is :io' do - expect(Concurrent).to receive(:global_io_executor).and_return(:io_executor) - subject.get_executor_from(executor: :io) - end - - it 'returns the global fast executor when :executor is :fast' do - expect(Concurrent).to receive(:global_fast_executor).and_return(:fast_executor) - subject.get_executor_from(executor: :fast) - end - - it 'returns an immediate executor when :executor is :immediate' do - executor = subject.get_executor_from(executor: :immediate) - end - - it 'raises an exception when :executor is an unrecognized symbol' do - expect { - subject.get_executor_from(executor: :bogus) - }.to raise_error(ArgumentError) - end - - it 'returns the global fast executor when :operation is true' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) - expect(Concurrent).to receive(:global_fast_executor). - and_return(:fast_executor) - subject.get_executor_from(operation: true) - end - - it 'returns the global io executor when :operation is false' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) - expect(Concurrent).to receive(:global_io_executor). - and_return(:io_executor) - subject.get_executor_from(operation: false) - end - - it 'returns the global fast executor when :task is false' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) - expect(Concurrent).to receive(:global_fast_executor). - and_return(:fast_executor) - subject.get_executor_from(task: false) - end - - it 'returns the global io executor when :task is true' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) - expect(Concurrent).to receive(:global_io_executor). - and_return(:io_executor) - subject.get_executor_from(task: true) - end - - it 'returns nil when :executor is nil' do - expect(subject.get_executor_from(executor: nil)).to be_nil - end - - it 'returns nil when no option is given' do - expect(subject.get_executor_from).to be_nil - end - - specify ':executor overrides :operation' do - warn 'deprecated syntax' - expect(subject.get_executor_from(executor: executor, operation: true)). + describe 'Executor.executor_from_options' do + + let(:executor) { ImmediateExecutor.new } + let(:io_executor) { ImmediateExecutor.new } + let(:fast_executor) { ImmediateExecutor.new } + + it 'returns the given :executor' do + expect(Executor.executor_from_options(executor: executor)).to eq executor + end + + it 'returns the global io executor when :executor is :io' do + expect(Concurrent).to receive(:global_io_executor).and_return(:io_executor) + Executor.executor_from_options(executor: :io) + end + + it 'returns the global fast executor when :executor is :fast' do + expect(Concurrent).to receive(:global_fast_executor).and_return(:fast_executor) + Executor.executor_from_options(executor: :fast) + end + + it 'returns an immediate executor when :executor is :immediate' do + executor = Executor.executor_from_options(executor: :immediate) + end + + it 'raises an exception when :executor is an unrecognized symbol' do + expect { + Executor.executor_from_options(executor: :bogus) + }.to raise_error(ArgumentError) + end + + it 'returns the global fast executor when :operation is true' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_fast_executor). + and_return(:fast_executor) + Executor.executor_from_options(operation: true) + end + + it 'returns the global io executor when :operation is false' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_io_executor). + and_return(:io_executor) + Executor.executor_from_options(operation: false) + end + + it 'returns the global fast executor when :task is false' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_fast_executor). + and_return(:fast_executor) + Executor.executor_from_options(task: false) + end + + it 'returns the global io executor when :task is true' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_io_executor). + and_return(:io_executor) + Executor.executor_from_options(task: true) + end + + it 'returns nil when :executor is nil' do + expect(Executor.executor_from_options(executor: nil)).to be_nil + end + + it 'returns nil when no option is given' do + expect(Executor.executor_from_options).to be_nil + end + + specify ':executor overrides :operation' do + warn 'deprecated syntax' + expect(Executor.executor_from_options(executor: executor, operation: true)). to eq executor - end + end - specify ':executor overrides :task' do - warn 'deprecated syntax' - expect(subject.get_executor_from(executor: executor, task: true)). + specify ':executor overrides :task' do + warn 'deprecated syntax' + expect(Executor.executor_from_options(executor: executor, task: true)). to eq executor - end - - specify ':operation overrides :task' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) - expect(Concurrent).to receive(:global_fast_executor). - and_return(:fast_executor) - subject.get_executor_from(operation: true, task: true) - end + end + + specify ':operation overrides :task' do + warn 'deprecated syntax' + expect(Kernel).to receive(:warn).with(anything) + expect(Concurrent).to receive(:global_fast_executor). + and_return(:fast_executor) + Executor.executor_from_options(operation: true, task: true) end end end diff --git a/spec/concurrent/executor/global_thread_pool_shared.rb b/spec/concurrent/executor/global_thread_pool_shared.rb index e10b59cc1..f52dec3cb 100644 --- a/spec/concurrent/executor/global_thread_pool_shared.rb +++ b/spec/concurrent/executor/global_thread_pool_shared.rb @@ -26,8 +26,7 @@ it 'aliases #<<' do latch = Concurrent::CountDownLatch.new(1) subject << proc { latch.count_down } - latch.wait(0.2) - expect(latch.count).to eq 0 + expect(latch.wait(0.2)).to eq true end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9a893c615..7b6c025df 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -21,6 +21,7 @@ $VERBOSE = nil # suppress our deprecation warnings require 'concurrent' +require 'concurrent-edge' logger = Logger.new($stderr) logger.level = Logger::WARN