Skip to content
Merged

Atom #308

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ This library contains a variety of concurrency abstractions at high and low leve
### High-level, general-purpose asynchronous concurrency abstractions

* [Async](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Async.html): A mixin module that provides simple asynchronous behavior to any standard class/object or object.
* [Atom](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Atom.html): A way to manage shared, synchronous, independent state.
* [Future](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Future.html): An asynchronous operation that produces a value.
* [Dataflow](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent.html#dataflow-class_method): Built on Futures, Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
* [Promise](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html): Similar to Futures, with more features.
Expand Down
82 changes: 0 additions & 82 deletions doc/agent.md

This file was deleted.

1 change: 1 addition & 0 deletions lib/concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require 'concurrent/struct'

require 'concurrent/atomic/atomic_reference'
require 'concurrent/atom'
require 'concurrent/async'
require 'concurrent/dataflow'
require 'concurrent/delay'
Expand Down
108 changes: 82 additions & 26 deletions lib/concurrent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,74 @@

module Concurrent

# {include:file:doc/agent.md}
# `Agent`s are inspired by [Clojure's](http://clojure.org/) [agent](http://clojure.org/agents) function. An `Agent` is a single atomic value that represents an identity. The current value of the `Agent` can be requested at any time (`deref`). Each `Agent` has a work queue and operates on the global thread pool (see below). Consumers can `post` code blocks to the `Agent`. The code block (function) will receive the current value of the `Agent` as its sole parameter. The return value of the block will become the new value of the `Agent`. `Agent`s support two error handling modes: fail and continue. A good example of an `Agent` is a shared incrementing counter, such as the score in a video game.
#
# An `Agent` must be initialize with an initial value. This value is always accessible via the `value` (or `deref`) methods. Code blocks sent to the `Agent` will be processed in the order received. As each block is processed the current value is updated with the result from the block. This update is an atomic operation so a `deref` will never block and will always return the current value.
#
# When an `Agent` is created it may be given an optional `validate` block and zero or more `rescue` blocks. When a new value is calculated the value will be checked against the validator, if present. If the validator returns `true` the new value will be accepted. If it returns `false` it will be rejected. If a block raises an exception during execution the list of `rescue` blocks will be seacrhed in order until one matching the current exception is found. That `rescue` block will then be called an passed the exception object. If no matching `rescue` block is found, or none were configured, then the exception will be suppressed.
#
# `Agent`s also implement Ruby's [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html). Code that observes an `Agent` will receive a callback with the new value any time the value is changed.
#
# @!macro copy_options
#
# @example Simple Example
#
# require 'concurrent'
#
# score = Concurrent::Agent.new(10)
# score.value #=> 10
#
# score << proc{|current| current + 100 }
# sleep(0.1)
# score.value #=> 110
#
# score << proc{|current| current * 2 }
# sleep(0.1)
# score.value #=> 220
#
# score << proc{|current| current - 50 }
# sleep(0.1)
# score.value #=> 170
#
# @example With Validation and Error Handling
#
# score = Concurrent::Agent.new(0).validate{|value| value <= 1024 }.
# rescue(NoMethodError){|ex| puts "Bam!" }.
# rescue(ArgumentError){|ex| puts "Pow!" }.
# rescue{|ex| puts "Boom!" }
# score.value #=> 0
#
# score << proc{|current| current + 2048 }
# sleep(0.1)
# score.value #=> 0
#
# score << proc{|current| raise ArgumentError }
# sleep(0.1)
# #=> puts "Pow!"
# score.value #=> 0
#
# score << proc{|current| current + 100 }
# sleep(0.1)
# score.value #=> 100
#
# @example With Observation
#
# bingo = Class.new{
# def update(time, score)
# puts "Bingo! [score: #{score}, time: #{time}]" if score >= 100
# end
# }.new
#
# score = Concurrent::Agent.new(0)
# score.add_observer(bingo)
#
# score << proc{|current| sleep(0.1); current += 30 }
# score << proc{|current| sleep(0.1); current += 30 }
# score << proc{|current| sleep(0.1); current += 30 }
# score << proc{|current| sleep(0.1); current += 30 }
#
# sleep(1)
# #=> Bingo! [score: 120, time: 2013-07-22 21:26:08 -0400]
#
# @!attribute [r] timeout
# @return [Fixnum] the maximum number of seconds before an update is cancelled
Expand All @@ -21,18 +88,7 @@ class Agent
#
# @param [Object] initial the initial value
#
# @!macro [attach] executor_and_deref_options
#
# @param [Hash] opts the options used to define the behavior at update and deref
# and to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
# @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data
# @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data
# @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
# @!macro executor_and_deref_options
def initialize(initial, opts = {})
@value = initial
@rescuers = []
Expand Down Expand Up @@ -128,19 +184,19 @@ def post(&block)
def post_off(timeout = nil, &block)
warn '[DEPRECATED] post_off with timeout options is deprecated and will be removed'
task = if timeout
lambda do |value|
future = Future.execute do
block.call(value)
end
if future.wait(timeout)
future.value!
else
raise Concurrent::TimeoutError
end
end
else
block
end
lambda do |value|
future = Future.execute do
block.call(value)
end
if future.wait(timeout)
future.value!
else
raise Concurrent::TimeoutError
end
end
else
block
end
post_on(@io_executor, &task)
end

Expand Down
136 changes: 136 additions & 0 deletions lib/concurrent/atom.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
require 'concurrent/dereferenceable'
require 'concurrent/atomic/atomic_reference'
require 'concurrent/synchronization/object'

module Concurrent

# Atoms provide a way to manage shared, synchronous, independent state.
#
# An atom is initialized with an initial value and an optional validation
# proc. At any time the value of the atom can be synchronously and safely
# changed. If a validator is given at construction then any new value
# will be checked against the validator and will be rejected if the
# validator returns false or raises an exception.
#
# There are two ways to change the value of an atom: {#compare_and_set} and
# {#swap}. The former will set the new value if and only if it validates and
# the current value matches the new value. The latter will atomically set the
# new value to the result of running the given block if and only if that
# value validates.
#
# @!macro copy_options
#
# @see http://clojure.org/atoms Clojure Atoms
class Atom < Synchronization::Object
include Dereferenceable

# @!macro [attach] atom_initialize
#
# Create a new atom with the given initial value.
#
# @param [Object] value The initial value
# @param [Hash] opts The options used to configure the atom
# @option opts [Proc] :validator (nil) Optional proc used to validate new
# values. It must accept one and only one argument which will be the
# intended new value. The validator will return true if the new value
# is acceptable else return false (preferrably) or raise an exception.
#
# @!macro deref_options
#
# @raise [ArgumentError] if the validator is not a `Proc` (when given)
def initialize(value, opts = {})
super()
synchronize{ ns_initialize(value, opts) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be either synchronized but then all later access has to be synchronized too, which is not what you want probably. ensure_ivar_visibility! will ensure visibility but later access synchronization is not required.

end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance variable visibility should be enforced here, otherwise it's unsafe.


# The current value of the atom.
#
# @return [Object] The current value.
def value
apply_deref_options(@value.value)
end
alias_method :deref, :value

# Atomically swaps the value of atom using the given block. The current
# value will be passed to the block, as will any arguments passed as
# arguments to the function. The new value will be validated against the
# (optional) validator proc given at construction. If validation fails the
# value will not be changed.
#
# Internally, {#swap} reads the current value, applies the block to it, and
# attempts to compare-and-set it in. Since another thread may have changed
# the value in the intervening time, it may have to retry, and does so in a
# spin loop. The net effect is that the value will always be the result of
# the application of the supplied block to a current value, atomically.
# However, because the block might be called multiple times, it must be free
# of side effects.
#
# @note The given block may be called multiple times, and thus should be free
# of side effects.
#
# @param [Object] args Zero or more arguments passed to the block.
#
# @yield [value, args] Calculates a new value for the atom based on the
# current value and any supplied agruments.
# @yieldparam value [Object] The current value of the atom.
# @yieldparam args [Object] All arguments passed to the function, in order.
# @yieldreturn [Object] The intended new value of the atom.
#
# @return [Object] The final value of the atom after all operations and
# validations are complete.
#
# @raise [ArgumentError] When no block is given.
def swap(*args)
raise ArgumentError.new('no block given') unless block_given?

begin
loop do
old_value = @value.value
new_value = yield(old_value, *args)
return old_value unless @validator.call(new_value)
return new_value if compare_and_set!(old_value, new_value)
end
rescue
return @value.value
end
end

# @!macro [attach] atom_compare_and_set
# Atomically sets the value of atom to the new value if and only if the
# current value of the atom is identical to the old value and the new
# value successfully validates against the (optional) validator given
# at construction.
#
# @param [Object] old_value The expected current value.
# @param [Object] new_value The intended new value.
#
# @return [Boolean] True if the value is changed else false.
def compare_and_set(old_value, new_value)
compare_and_set!(old_value, new_value)
rescue
false
end

private

# @!macro atom_initialize
# @!visibility private
def ns_initialize(value, opts)
@validator = opts.fetch(:validator, ->(v){ true })
raise ArgumentError.new('validator must be a proc') unless @validator.is_a? Proc
@value = Concurrent::AtomicReference.new(value)
ns_set_deref_options(opts)
end

# @!macro atom_compare_and_set
# @raise [Exception] if the validator proc raises an exception
# @!visibility private
def compare_and_set!(old_value, new_value)
if @validator.call(new_value) # may raise exception
@value.compare_and_set(old_value, new_value)
else
false
end
end
end
end
12 changes: 11 additions & 1 deletion lib/concurrent/delay.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ module Concurrent
# `Delay` includes the `Concurrent::Dereferenceable` mixin to support thread
# safety of the reference returned by `#value`.
#
# @!macro copy_options
#
# @!macro [attach] delay_note_regarding_blocking
# @note The default behavior of `Delay` is to block indefinitely when
# calling either `value` or `wait`, executing the delayed operation on
Expand All @@ -51,7 +53,15 @@ class Delay < Synchronization::Object

# Create a new `Delay` in the `:pending` state.
#
# @!macro executor_and_deref_options
# @!macro [attach] executor_and_deref_options
#
# @param [Hash] opts the options used to define the behavior at update and deref
# and to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
# @!macro deref_options
#
# @yield the delayed operation to perform
#
Expand Down
Loading