diff --git a/README.md b/README.md index 6e4dc2371..875e40043 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ This library contains a variety of concurrency abstractions at high and low leve ### High-level, general-purpose asynchronous concurrency abstractions * [Async](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html): A mixin module that provides simple asynchronous behavior to any standard class/object or object. +* [Atom](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Atom.html): A way to manage shared, synchronous, independent state. * [Future](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Future.html): An asynchronous operation that produces a value. * [Dataflow](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#dataflow-class_method): Built on Futures, Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. * [Promise](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html): Similar to Futures, with more features. diff --git a/doc/agent.md b/doc/agent.md deleted file mode 100644 index c9f97567d..000000000 --- a/doc/agent.md +++ /dev/null @@ -1,82 +0,0 @@ -`Agent`s are inspired by [Clojure's](http://clojure.org/) [agent](http://clojure.org/agents) function. An `Agent` is a single atomic value that represents an identity. The current value of the `Agent` can be requested at any time (`deref`). Each `Agent` has a work queue and operates on the global thread pool (see below). Consumers can `post` code blocks to the `Agent`. The code block (function) will receive the current value of the `Agent` as its sole parameter. The return value of the block will become the new value of the `Agent`. `Agent`s support two error handling modes: fail and continue. A good example of an `Agent` is a shared incrementing counter, such as the score in a video game. - -An `Agent` must be initialize with an initial value. This value is always accessible via the `value` (or `deref`) methods. Code blocks sent to the `Agent` will be processed in the order received. As each block is processed the current value is updated with the result from the block. This update is an atomic operation so a `deref` will never block and will always return the current value. - -When an `Agent` is created it may be given an optional `validate` block and zero or more `rescue` blocks. When a new value is calculated the value will be checked against the validator, if present. If the validator returns `true` the new value will be accepted. If it returns `false` it will be rejected. If a block raises an exception during execution the list of `rescue` blocks will be seacrhed in order until one matching the current exception is found. That `rescue` block will then be called an passed the exception object. If no matching `rescue` block is found, or none were configured, then the exception will be suppressed. - -`Agent`s also implement Ruby's [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html). Code that observes an `Agent` will receive a callback with the new value any time the value is changed. - -## Copy Options - -Object references in Ruby are mutable. This can lead to serious problems when the value of an `Agent` is a mutable reference. Which is always the case unless the value is a `Fixnum`, `Symbol`, or similar "primative" data type. Each `Agent` instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the `Agent` is created: - -* `:dup_on_deref` when true the `Agent` will call the `#dup` method on the `value` object every time the `#value` methid is called (default: false) -* `:freeze_on_deref` when true the `Agent` will call the `#freeze` method on the `value` object every time the `#value` method is called (default: false) -* `:copy_on_deref` when given a `Proc` object the `Proc` will be run every time the `#value` method is called. The `Proc` will be given the current `value` as its only parameter and the result returned by the block will be the return value of the `#value` call. When `nil` this option will be ignored (default: nil) - -## Examples - -A simple example: - -```ruby -require 'concurrent' - -score = Concurrent::Agent.new(10) -score.value #=> 10 - -score << proc{|current| current + 100 } -sleep(0.1) -score.value #=> 110 - -score << proc{|current| current * 2 } -sleep(0.1) -score.value #=> 220 - -score << proc{|current| current - 50 } -sleep(0.1) -score.value #=> 170 -``` - -With validation and error handling: - -```ruby -score = Concurrent::Agent.new(0).validate{|value| value <= 1024 }. - rescue(NoMethodError){|ex| puts "Bam!" }. - rescue(ArgumentError){|ex| puts "Pow!" }. - rescue{|ex| puts "Boom!" } -score.value #=> 0 - -score << proc{|current| current + 2048 } -sleep(0.1) -score.value #=> 0 - -score << proc{|current| raise ArgumentError } -sleep(0.1) -#=> puts "Pow!" -score.value #=> 0 - -score << proc{|current| current + 100 } -sleep(0.1) -score.value #=> 100 -``` - -With observation: - -```ruby -bingo = Class.new{ - def update(time, score) - puts "Bingo! [score: #{score}, time: #{time}]" if score >= 100 - end -}.new - -score = Concurrent::Agent.new(0) -score.add_observer(bingo) - -score << proc{|current| sleep(0.1); current += 30 } -score << proc{|current| sleep(0.1); current += 30 } -score << proc{|current| sleep(0.1); current += 30 } -score << proc{|current| sleep(0.1); current += 30 } - -sleep(1) -#=> Bingo! [score: 120, time: 2013-07-22 21:26:08 -0400] -``` diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 54dce984c..1d2f59714 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -13,6 +13,7 @@ require 'concurrent/struct' require 'concurrent/atomic/atomic_reference' +require 'concurrent/atom' require 'concurrent/async' require 'concurrent/dataflow' require 'concurrent/delay' diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 482dfdd8d..bd7d5bbc3 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -6,7 +6,74 @@ module Concurrent - # {include:file:doc/agent.md} + # `Agent`s are inspired by [Clojure's](http://clojure.org/) [agent](http://clojure.org/agents) function. An `Agent` is a single atomic value that represents an identity. The current value of the `Agent` can be requested at any time (`deref`). Each `Agent` has a work queue and operates on the global thread pool (see below). Consumers can `post` code blocks to the `Agent`. The code block (function) will receive the current value of the `Agent` as its sole parameter. The return value of the block will become the new value of the `Agent`. `Agent`s support two error handling modes: fail and continue. A good example of an `Agent` is a shared incrementing counter, such as the score in a video game. + # + # An `Agent` must be initialize with an initial value. This value is always accessible via the `value` (or `deref`) methods. Code blocks sent to the `Agent` will be processed in the order received. As each block is processed the current value is updated with the result from the block. This update is an atomic operation so a `deref` will never block and will always return the current value. + # + # When an `Agent` is created it may be given an optional `validate` block and zero or more `rescue` blocks. When a new value is calculated the value will be checked against the validator, if present. If the validator returns `true` the new value will be accepted. If it returns `false` it will be rejected. If a block raises an exception during execution the list of `rescue` blocks will be seacrhed in order until one matching the current exception is found. That `rescue` block will then be called an passed the exception object. If no matching `rescue` block is found, or none were configured, then the exception will be suppressed. + # + # `Agent`s also implement Ruby's [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html). Code that observes an `Agent` will receive a callback with the new value any time the value is changed. + # + # @!macro copy_options + # + # @example Simple Example + # + # require 'concurrent' + # + # score = Concurrent::Agent.new(10) + # score.value #=> 10 + # + # score << proc{|current| current + 100 } + # sleep(0.1) + # score.value #=> 110 + # + # score << proc{|current| current * 2 } + # sleep(0.1) + # score.value #=> 220 + # + # score << proc{|current| current - 50 } + # sleep(0.1) + # score.value #=> 170 + # + # @example With Validation and Error Handling + # + # score = Concurrent::Agent.new(0).validate{|value| value <= 1024 }. + # rescue(NoMethodError){|ex| puts "Bam!" }. + # rescue(ArgumentError){|ex| puts "Pow!" }. + # rescue{|ex| puts "Boom!" } + # score.value #=> 0 + # + # score << proc{|current| current + 2048 } + # sleep(0.1) + # score.value #=> 0 + # + # score << proc{|current| raise ArgumentError } + # sleep(0.1) + # #=> puts "Pow!" + # score.value #=> 0 + # + # score << proc{|current| current + 100 } + # sleep(0.1) + # score.value #=> 100 + # + # @example With Observation + # + # bingo = Class.new{ + # def update(time, score) + # puts "Bingo! [score: #{score}, time: #{time}]" if score >= 100 + # end + # }.new + # + # score = Concurrent::Agent.new(0) + # score.add_observer(bingo) + # + # score << proc{|current| sleep(0.1); current += 30 } + # score << proc{|current| sleep(0.1); current += 30 } + # score << proc{|current| sleep(0.1); current += 30 } + # score << proc{|current| sleep(0.1); current += 30 } + # + # sleep(1) + # #=> Bingo! [score: 120, time: 2013-07-22 21:26:08 -0400] # # @!attribute [r] timeout # @return [Fixnum] the maximum number of seconds before an update is cancelled @@ -21,18 +88,7 @@ class Agent # # @param [Object] initial the initial value # - # @!macro [attach] executor_and_deref_options - # - # @param [Hash] opts the options used to define the behavior at update and deref - # and to specify the executor on which to perform actions - # @option opts [Executor] :executor when set use the given `Executor` instance. - # Three special values are also supported: `:task` returns the global task pool, - # `:operation` returns the global operation pool, and `:immediate` returns a new - # `ImmediateExecutor` object. - # @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro executor_and_deref_options def initialize(initial, opts = {}) @value = initial @rescuers = [] @@ -128,19 +184,19 @@ def post(&block) def post_off(timeout = nil, &block) warn '[DEPRECATED] post_off with timeout options is deprecated and will be removed' task = if timeout - lambda do |value| - future = Future.execute do - block.call(value) - end - if future.wait(timeout) - future.value! - else - raise Concurrent::TimeoutError - end - end - else - block - end + lambda do |value| + future = Future.execute do + block.call(value) + end + if future.wait(timeout) + future.value! + else + raise Concurrent::TimeoutError + end + end + else + block + end post_on(@io_executor, &task) end diff --git a/lib/concurrent/atom.rb b/lib/concurrent/atom.rb new file mode 100644 index 000000000..d2221c493 --- /dev/null +++ b/lib/concurrent/atom.rb @@ -0,0 +1,136 @@ +require 'concurrent/dereferenceable' +require 'concurrent/atomic/atomic_reference' +require 'concurrent/synchronization/object' + +module Concurrent + + # Atoms provide a way to manage shared, synchronous, independent state. + # + # An atom is initialized with an initial value and an optional validation + # proc. At any time the value of the atom can be synchronously and safely + # changed. If a validator is given at construction then any new value + # will be checked against the validator and will be rejected if the + # validator returns false or raises an exception. + # + # There are two ways to change the value of an atom: {#compare_and_set} and + # {#swap}. The former will set the new value if and only if it validates and + # the current value matches the new value. The latter will atomically set the + # new value to the result of running the given block if and only if that + # value validates. + # + # @!macro copy_options + # + # @see http://clojure.org/atoms Clojure Atoms + class Atom < Synchronization::Object + include Dereferenceable + + # @!macro [attach] atom_initialize + # + # Create a new atom with the given initial value. + # + # @param [Object] value The initial value + # @param [Hash] opts The options used to configure the atom + # @option opts [Proc] :validator (nil) Optional proc used to validate new + # values. It must accept one and only one argument which will be the + # intended new value. The validator will return true if the new value + # is acceptable else return false (preferrably) or raise an exception. + # + # @!macro deref_options + # + # @raise [ArgumentError] if the validator is not a `Proc` (when given) + def initialize(value, opts = {}) + super() + synchronize{ ns_initialize(value, opts) } + end + + # The current value of the atom. + # + # @return [Object] The current value. + def value + apply_deref_options(@value.value) + end + alias_method :deref, :value + + # Atomically swaps the value of atom using the given block. The current + # value will be passed to the block, as will any arguments passed as + # arguments to the function. The new value will be validated against the + # (optional) validator proc given at construction. If validation fails the + # value will not be changed. + # + # Internally, {#swap} reads the current value, applies the block to it, and + # attempts to compare-and-set it in. Since another thread may have changed + # the value in the intervening time, it may have to retry, and does so in a + # spin loop. The net effect is that the value will always be the result of + # the application of the supplied block to a current value, atomically. + # However, because the block might be called multiple times, it must be free + # of side effects. + # + # @note The given block may be called multiple times, and thus should be free + # of side effects. + # + # @param [Object] args Zero or more arguments passed to the block. + # + # @yield [value, args] Calculates a new value for the atom based on the + # current value and any supplied agruments. + # @yieldparam value [Object] The current value of the atom. + # @yieldparam args [Object] All arguments passed to the function, in order. + # @yieldreturn [Object] The intended new value of the atom. + # + # @return [Object] The final value of the atom after all operations and + # validations are complete. + # + # @raise [ArgumentError] When no block is given. + def swap(*args) + raise ArgumentError.new('no block given') unless block_given? + + begin + loop do + old_value = @value.value + new_value = yield(old_value, *args) + return old_value unless @validator.call(new_value) + return new_value if compare_and_set!(old_value, new_value) + end + rescue + return @value.value + end + end + + # @!macro [attach] atom_compare_and_set + # Atomically sets the value of atom to the new value if and only if the + # current value of the atom is identical to the old value and the new + # value successfully validates against the (optional) validator given + # at construction. + # + # @param [Object] old_value The expected current value. + # @param [Object] new_value The intended new value. + # + # @return [Boolean] True if the value is changed else false. + def compare_and_set(old_value, new_value) + compare_and_set!(old_value, new_value) + rescue + false + end + + private + + # @!macro atom_initialize + # @!visibility private + def ns_initialize(value, opts) + @validator = opts.fetch(:validator, ->(v){ true }) + raise ArgumentError.new('validator must be a proc') unless @validator.is_a? Proc + @value = Concurrent::AtomicReference.new(value) + ns_set_deref_options(opts) + end + + # @!macro atom_compare_and_set + # @raise [Exception] if the validator proc raises an exception + # @!visibility private + def compare_and_set!(old_value, new_value) + if @validator.call(new_value) # may raise exception + @value.compare_and_set(old_value, new_value) + else + false + end + end + end +end diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 55329b50e..2dccb1625 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -29,6 +29,8 @@ module Concurrent # `Delay` includes the `Concurrent::Dereferenceable` mixin to support thread # safety of the reference returned by `#value`. # + # @!macro copy_options + # # @!macro [attach] delay_note_regarding_blocking # @note The default behavior of `Delay` is to block indefinitely when # calling either `value` or `wait`, executing the delayed operation on @@ -51,7 +53,15 @@ class Delay < Synchronization::Object # Create a new `Delay` in the `:pending` state. # - # @!macro executor_and_deref_options + # @!macro [attach] executor_and_deref_options + # + # @param [Hash] opts the options used to define the behavior at update and deref + # and to specify the executor on which to perform actions + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:task` returns the global task pool, + # `:operation` returns the global operation pool, and `:immediate` returns a new + # `ImmediateExecutor` object. + # @!macro deref_options # # @yield the delayed operation to perform # diff --git a/lib/concurrent/dereferenceable.rb b/lib/concurrent/dereferenceable.rb index 6a9cf4a6c..a9f3408ef 100644 --- a/lib/concurrent/dereferenceable.rb +++ b/lib/concurrent/dereferenceable.rb @@ -6,37 +6,12 @@ module Concurrent # Most classes in this library that expose a `#value` getter method do so using the # `Dereferenceable` mixin module. # - # Objects with this mixin can be configured with a few options that can help protect - # the program from potentially dangerous operations. - # - # * `:dup_on_deref` when true will call the `#dup` method on the `value` - # object every time the `#value` method is called (default: false) - # * `:freeze_on_deref` when true will call the `#freeze` method on the `value` object - # every time the `#value` method is called (default: false) - # * `:copy_on_deref` when given a `Proc` object the `Proc` will be run every - # time the `#value` method is called. The `Proc` will be given the current - # `value` as its only parameter and the result returned by the block will - # be the return value of the `#value` call. When `nil` this option will be - # ignored (default: nil) + # @!macro copy_options module Dereferenceable # Return the value this object represents after applying the options specified # by the `#set_deref_options` method. # - # When multiple deref options are set the order of operations is strictly defined. - # The order of deref operations is: - # * `:copy_on_deref` - # * `:dup_on_deref` - # * `:freeze_on_deref` - # - # Because of this ordering there is no need to `#freeze` an object created by a - # provided `:copy_on_deref` block. Simply set `:freeze_on_deref` to `true`. - # Setting both `:dup_on_deref` to `true` and `:freeze_on_deref` to `true` is - # as close to the behavior of a "pure" functional language (like Erlang, Clojure, - # or Haskell) as we are likely to get in Ruby. - # - # This method is thread-safe and synchronized with the internal `#mutex`. - # # @return [Object] the current value of the object def value mutex.synchronize { apply_deref_options(@value) } @@ -71,30 +46,35 @@ def init_mutex(mutex = Mutex.new) @mutex = mutex end - # Set the options which define the operations #value performs before - # returning data to the caller (dereferencing). - # - # @note Most classes that include this module will call `#set_deref_options` - # from within the constructor, thus allowing these options to be set at - # object creation. - # - # @param [Hash] opts the options defining dereference behavior. - # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro [attach] dereferenceable_set_deref_options + # Set the options which define the operations #value performs before + # returning data to the caller (dereferencing). + # + # @note Most classes that include this module will call `#set_deref_options` + # from within the constructor, thus allowing these options to be set at + # object creation. + # + # @param [Hash] opts the options defining dereference behavior. + # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data + # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data + # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing + # the internal value and returning the value returned from the proc def set_deref_options(opts = {}) - mutex.synchronize do - @dup_on_deref = opts[:dup_on_deref] || opts[:dup] - @freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze] - @copy_on_deref = opts[:copy_on_deref] || opts[:copy] - @do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref) - nil - end + mutex.synchronize{ ns_set_deref_options(opts) } + end + + # @!macro dereferenceable_set_deref_options + # @!visibility private + def ns_set_deref_options(opts) + @dup_on_deref = opts[:dup_on_deref] || opts[:dup] + @freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze] + @copy_on_deref = opts[:copy_on_deref] || opts[:copy] + @do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref) + nil end # @!visibility private - def apply_deref_options(value) # :nodoc: + def apply_deref_options(value) return nil if value.nil? return value if @do_nothing_on_deref value = @copy_on_deref.call(value) if @copy_on_deref diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index ca38a8ce2..d6f26da11 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -8,6 +8,8 @@ module Concurrent # {include:file:doc/future.md} # + # @!macro copy_options + # # @see http://ruby-doc.org/stdlib-2.1.1/libdoc/observer/rdoc/Observable.html Ruby Observable module # @see http://clojuredocs.org/clojure_core/clojure.core/future Clojure's future function # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html java.util.concurrent.Future diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index c80558c85..a1a624729 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -26,19 +26,59 @@ module Concurrent # when the values they depend on are ready you want `dataflow`. `IVar` is # generally a low-level primitive. # - # **See Also:** + # @!macro [attach] copy_options + # ## Copy Options # - # * For the theory: Arvind, R. Nikhil, and K. Pingali. - # [I-Structures: Data structures for parallel computing](http://dl.acm.org/citation.cfm?id=69562). - # In Proceedings of Workshop on Graph Reduction, 1986. - # * For recent application: - # [DataDrivenFuture in Habanero Java from Rice](http://www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html). + # Object references in Ruby are mutable. This can lead to serious + # problems when the {#value} of an object is a mutable reference. Which + # is always the case unless the value is a `Fixnum`, `Symbol`, or similar + # "primative" data type. Each instance can be configured with a few + # options that can help protect the program from potentially dangerous + # operations. Each of these options can be optionally set when the oject + # instance is created: # - # @example Create, set and get an `IVar` - # ivar = Concurrent::IVar.new - # ivar.set 14 - # ivar.get #=> 14 - # ivar.set 2 # would now be an error + # * `:dup_on_deref` When true the object will call the `#dup` method on + # the `value` object every time the `#value` methid is called + # (default: false) + # * `:freeze_on_deref` When true the object will call the `#freeze` + # method on the `value` object every time the `#value` method is called + # (default: false) + # * `:copy_on_deref` When given a `Proc` object the `Proc` will be run + # every time the `#value` method is called. The `Proc` will be given + # the current `value` as its only argument and the result returned by + # the block will be the return value of the `#value` call. When `nil` + # this option will be ignored (default: nil) + # + # When multiple deref options are set the order of operations is strictly defined. + # The order of deref operations is: + # * `:copy_on_deref` + # * `:dup_on_deref` + # * `:freeze_on_deref` + # + # Because of this ordering there is no need to `#freeze` an object created by a + # provided `:copy_on_deref` block. Simply set `:freeze_on_deref` to `true`. + # Setting both `:dup_on_deref` to `true` and `:freeze_on_deref` to `true` is + # as close to the behavior of a "pure" functional language (like Erlang, Clojure, + # or Haskell) as we are likely to get in Ruby. + # + # ## Examples + # + # Create, set and get an `IVar` + # + # ```ruby + # ivar = Concurrent::IVar.new + # ivar.set 14 + # ivar.get #=> 14 + # ivar.set 2 # would now be an error + # ``` + # + # ## See Also + # + # 1. For the theory: Arvind, R. Nikhil, and K. Pingali. + # [I-Structures: Data structures for parallel computing](http://dl.acm.org/citation.cfm?id=69562). + # In Proceedings of Workshop on Graph Reduction, 1986. + # 2. For recent application: + # [DataDrivenFuture in Habanero Java from Rice](http://www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html). class IVar < Synchronization::Object include Obligation include Observable @@ -50,12 +90,15 @@ class IVar < Synchronization::Object # # @param [Object] value the initial value # @param [Hash] opts the options to create a message with - # @option opts [String] :dup_on_deref (false) call `#dup` before returning - # the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before - # returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # + # @!macro [attach] deref_options + # @option opts [Boolean] :dup_on_deref (false) Call `#dup` before + # returning the data from {#value} + # @option opts [Boolean] :freeze_on_deref (false) Call `#freeze` before + # returning the data from {#value} + # @option opts [Proc] :copy_on_deref (nil) When calling the {#value} + # method, call the given proc passing the internal value as the sole + # argument then return the new value returned from the proc. def initialize(value = NO_VALUE, opts = {}, &block) if value != NO_VALUE && block_given? raise ArgumentError.new('provide only a value or a block') diff --git a/lib/concurrent/mvar.rb b/lib/concurrent/mvar.rb index 1f486e3c2..b20934889 100644 --- a/lib/concurrent/mvar.rb +++ b/lib/concurrent/mvar.rb @@ -24,13 +24,16 @@ module Concurrent # Note that unlike the original Haskell paper, our `#take` is blocking. This is how # Haskell and Scala do it today. # - # **See Also:** + # @!macro copy_options + # + # ## See Also # # 1. P. Barth, R. Nikhil, and Arvind. [M-Structures: Extending a parallel, non- strict, functional language with state](http://dl.acm.org/citation.cfm?id=652538). In Proceedings of the 5th - # ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991. + # ACM Conference on Functional Programming Languages and Computer Architecture (FPCA), 1991. + # # 2. S. Peyton Jones, A. Gordon, and S. Finne. [Concurrent Haskell](http://dl.acm.org/citation.cfm?id=237794). - # In Proceedings of the 23rd Symposium on Principles of Programming Languages - # (PoPL), 1996. + # In Proceedings of the 23rd Symposium on Principles of Programming Languages + # (PoPL), 1996. class MVar include Dereferenceable @@ -45,17 +48,8 @@ class MVar # Create a new `MVar`, either empty or with an initial value. # # @param [Hash] opts the options controlling how the future will be processed - # @option opts [Boolean] :operation (false) when `true` will execute the - # future on the global operation pool (for long-running operations), when - # `false` will execute the future on the global task pool (for - # short-running tasks) - # @option opts [object] :executor when provided will run all operations on - # this executor rather than the global thread pool (overrides :operation) - # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [String] :freeze_on_deref (false) call `#freeze` before - # returning the data - # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # + # @!macro deref_options def initialize(value = EMPTY, opts = {}) @value = value @mutex = Mutex.new diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index b21cb1ebb..4bbadab1a 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -56,6 +56,8 @@ module Concurrent # # Promises run on the global thread pool. # + # @!macro copy_options + # # ### Examples # # Start by requiring promises diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index ab20c7c1f..cbfc73023 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -51,6 +51,8 @@ module Concurrent # module from the Ruby standard library. With one exception `ScheduledTask` # behaves identically to [Future](Observable) with regard to these modules. # + # @!macro copy_options + # # @example Basic usage # # require 'concurrent' diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index 6566ea345..9d69599ac 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -49,6 +49,8 @@ module Concurrent # interval is exceeded the observer will receive a `Concurrent::TimeoutError` # object as the third argument. # + # @!macro copy_options + # # @example Basic usage # task = Concurrent::TimerTask.new{ puts 'Boom!' } # task.execute @@ -169,6 +171,8 @@ class TimerTask < RubyExecutorService # upon instantiation or to wait until the first # execution_interval # has passed (default: false) # + # @!macro deref_options + # # @raise ArgumentError when no block is given. # # @yield to the block after :execution_interval seconds have passed since @@ -178,13 +182,7 @@ class TimerTask < RubyExecutorService # refer to the execution context of the block rather than the running # `TimerTask`. # - # @note Calls Concurrent::Dereferenceable# set_deref_options passing `opts`. - # All options supported by Concurrent::Dereferenceable can be set - # during object initialization. - # # @return [TimerTask] the new `TimerTask` - # - # @see Concurrent::Dereferenceable# set_deref_options def initialize(opts = {}, &task) raise ArgumentError.new('no block given') unless block_given? super diff --git a/spec/concurrent/atom_spec.rb b/spec/concurrent/atom_spec.rb new file mode 100644 index 000000000..1422f508e --- /dev/null +++ b/spec/concurrent/atom_spec.rb @@ -0,0 +1,165 @@ +require_relative 'dereferenceable_shared' + +module Concurrent + + describe Atom do + + it_should_behave_like :dereferenceable do + def dereferenceable_subject(value, opts = {}) + Atom.new(value, opts) + end + end + + context 'construction' do + + it 'sets the initial value to the given value' do + atom = Atom.new(42) + expect(atom.value).to eq 42 + end + + it 'raises an exception if the validator is not a proc' do + expect { + Atom.new(42, validator: 42) + }.to raise_error(ArgumentError) + end + end + + context '#compare_and_set' do + + it 'sets the new value if the current value matches' do + atom = Atom.new(42) + atom.compare_and_set(42, :foo) + expect(atom.value).to eq :foo + end + + it 'returns true if the current value matches' do + atom = Atom.new(42) + expect(atom.compare_and_set(42, :foo)).to be true + end + + it 'rejects the new value if the current value does not match' do + atom = Atom.new(42) + atom.compare_and_set(:foo, 'bar') + expect(atom.value).to eq 42 + end + + it 'returns false if the current value does not match' do + atom = Atom.new(42) + expect(atom.compare_and_set(:foo, 'bar')).to be false + end + + it 'rejects the new value if the validator returns false' do + validator = ->(value){ false } + atom = Atom.new(42, validator: validator) + atom.compare_and_set(42, :foo) + expect(atom.value).to eq 42 + end + + it 'rejects the new value if the validator raises an exception' do + validator = ->(value){ raise StandardError } + atom = Atom.new(42, validator: validator) + atom.compare_and_set(42, :foo) + expect(atom.value).to eq 42 + end + + it 'returns false if the validator returns false' do + validator = ->(value){ false } + atom = Atom.new(42, validator: validator) + expect(atom.compare_and_set(42, :foo)).to be false + end + + it 'returns false if the validator raises an exception' do + validator = ->(value){ raise StandardError } + atom = Atom.new(42, validator: validator) + expect(atom.compare_and_set(42, :foo)).to be false + end + end + + context 'swap' do + + it 'raises an exception when no block is given' do + atom = Atom.new(42) + expect { + atom.swap + }.to raise_error(ArgumentError) + end + + it 'passes the current value to the block' do + actual = nil + expected = 42 + atom = Atom.new(expected) + atom.swap do |value| + actual = value + end + expect(actual).to eq expected + end + + it 'passes all arguments to the block' do + actual = nil + expected = [1, 2, 3] + atom = Atom.new(42) + atom.swap(*expected) do |value, *args| + actual = args + end + expect(actual).to eq expected + end + + it 'sets the new value to the result of the block' do + atom = Atom.new(42) + atom.swap{ :foo } + expect(atom.value).to eq :foo + end + + it 'rejects the new value if the validator returns false' do + validator = ->(value){ false } + atom = Atom.new(42, validator: validator) + atom.swap{ 100 } + expect(atom.value).to eq 42 + end + + it 'rejects the new value if the validator raises an exception' do + validator = ->(value){ raise StandardError } + atom = Atom.new(42, validator: validator) + atom.swap{ 100 } + expect(atom.value).to eq 42 + end + + it 'returns the new value on success' do + atom = Atom.new(42) + expect(atom.swap{ :foo }).to eq :foo + end + + it 'returns the old value if the validator returns false' do + validator = ->(value){ false } + atom = Atom.new(42, validator: validator) + expect(atom.swap{ 100 }).to eq 42 + end + + it 'returns the old value if the validator raises an exception' do + validator = ->(value){ raise StandardError } + atom = Atom.new(42, validator: validator) + expect(atom.swap{ 100 }).to eq 42 + end + + #it 'calls the block more than once if the value changes underneath' do + #latch = Concurrent::CountDownLatch.new + #counter = Concurrent::AtomicBoolean.new(0) + #atom = Atom.new(0) + + #t = Thread.new do + #atom.swap do |value| + #counter.increment + #latch.wait + #42 + #end + #end + + #atom.swap{ 100 } + #latch.count_down + #t.join(1) + + #expect(counter.value).to eq 2 + #end + end + end +end