Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions doc/synchronization.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ end

### Naming conventions

Instance variables with camel case names are final and never reassigned.
Instance variables with camel case names are final and never reassigned, e.g. `@FinalVariable`.

## Fields with CAS operations

Expand All @@ -115,7 +115,61 @@ class Event < Synchronization::Object
end
```

## Memory model (sort of)
Operations on `@Touched` field have volatile semantic.

// TODO
## Memory model

*Intended for further revision, and extension.*

When writing libraries in `concurrent-ruby` we are reasoning based on following memory model which is further extended by features provided in `Synchronization::Object` (described above).

The memory model is constructed based on our best effort and knowledge of the 3 main Ruby implementations (CRuby, JRuby, Rubinius). When considering certain aspect we always choose the weakest guarantee (e.g. local variable updates are always visible in CRuby but not in JRuby, so in this case JRuby behavior is picked). If some Ruby behavior is omitted here it is considered unsafe for use in parallel environment (Reasons may be lack of information, or difficulty of verification).

This takes in account following implementations:

- CRuby 1.9 - 2.2 (no differences found)
- JRuby 1.7
- JRuby 9 *not examined yet, same behavior as in 1.7 assumed*
- Rubinius 2.5

We are interested in following behaviors:

- **volatility** - in Java's sense. Any written value is immediately visible to any subsequent reads including all writes leading to this value.
- **atomicity** - operation is either done or not as a whole.

### Variables

- **Local variables** - atomic assignment, non-volatile.
- Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure.
- Reason: local variables are non-volatile on Jruby and Rubinius.
- **Instance variables** - atomic assignment, non-volatile.
- Consequence: Different thread may see old values; different thread may see not fully-initialized object.
- Reason: local variables are non-volatile on Jruby and Rubinius.
- **Constants** - atomic assignment, volatile.

Other:

- **Global variables** - we don't use them, omitted (atomic and volatile on JRuby and CRuby, Rubinius unknown)
- **Class variables** - we don't use them, omitted (atomic and volatile on JRuby and CRuby, Rubinius unknown)

### Assumptions

Following operations are **assumed** thread-safe, volatile and atomic on all implementations:

- Class definition
- Method definition
- Library requirement

It's best practice though to eager load before going into parallel part of an application.

### Issues to be aware of

- **Initialization** - Since instance variables are not volatile and a particular implementation may preinitialize values with nils, based on shapes it already saw, a second thread obtaining reference to newly constructed may still see old preinitialized values instead of values set in `initialize` method. To fix this `ensure_ivar_visibility!` can be used or the object can be safely published in a volatile field.
- **`||=`, `+=` and similar** - are not atomic.

### Notes/Sources on implementations

- [JRuby wiki page on concurrency](https://github.com/jruby/jruby/wiki/Concurrency-in-jruby)
- [Rubinius page on concurrency](http://rubini.us/doc/en/systems/concurrency/)
- CRuby has GVL. Any GVL release and acquire uses lock which means that all writes done by a releasing thread will be visible to the second acquiring thread. See: <https://github.com/ruby/ruby/blob/ruby_2_2/thread_pthread.c#L101-L107>

38 changes: 24 additions & 14 deletions examples/benchmark_new_futures.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,52 @@
warmup = 2 * scale
warmup *= 10 if Concurrent.on_jruby?

Benchmark.ips(time, warmup) do |x|
x.report('status-old') { f = Concurrent::Promise.execute { nil }; 100.times { f.complete? } }
x.report('status-new') { f = Concurrent.future(:fast) { nil }; 100.times { f.completed? } }
x.compare!
end

Benchmark.ips(time, warmup) do |x|
of = Concurrent::Promise.execute { 1 }
nf = Concurrent.future { 1 }
nf = Concurrent.future(:fast) { 1 }
x.report('value-old') { of.value! }
x.report('value-new') { nf.value! }
x.compare!
end

Benchmark.ips(time, warmup) do |x|
x.report('graph-old') do
head = Concurrent::Promise.execute { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value!
head = Concurrent::Promise.execute { 1 }
10.times do
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
head = Concurrent::Promise.zip(branch1, branch2).then { |a, b| a + b }
end
head.value!
end
x.report('graph-new') do
head = Concurrent.future { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
(branch1 + branch2).then { |(a, b)| a + b }.value!
head = Concurrent.future(:fast) { 1 }
10.times do
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
head = (branch1 & branch2).then { |a, b| a + b }
end
head.value!
end
x.compare!
end

Benchmark.ips(time, warmup) do |x|
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
x.report('immediate-new') { Concurrent.future { nil }.value! }
x.report('immediate-new') { Concurrent.future(:fast) { nil }.value! }
x.compare!
end

Benchmark.ips(time, warmup) do |x|
of = Concurrent::Promise.execute { 1 }
nf = Concurrent.future { 1 }
x.report('then-old') { of.then(&:succ).then(&:succ).value! }
x.report('then-new') { nf.then(&:succ).then(&:succ).value! }
nf = Concurrent.future(:fast) { 1 }
x.report('then-old') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
x.report('then-new') { 100.times.reduce(nf) { |nf, _| nf.then(&:succ) }.value! }
x.compare!
end

158 changes: 158 additions & 0 deletions examples/edge_futures.in.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
### Simple asynchronous task

future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
future.completed?
# block until evaluated
future.value
future.completed?


### Failing asynchronous task

future = Concurrent.future { raise 'Boom' }
future.value
future.value! rescue $!
future.reason
# re-raising
raise future rescue $!


### Chaining

head = Concurrent.future { 1 } #
branch1 = head.then(&:succ) #
branch2 = head.then(&:succ).then(&:succ) #
branch1.zip(branch2).value
(branch1 & branch2).then { |(a, b)| a + b }.value
# pick only first completed
(branch1 | branch2).value


### Error handling

Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value


### Delay

# will not evaluate until asked by #value or other method requiring completion
scheduledfuture = Concurrent.delay { 'lazy' }
sleep 0.1 #
future.completed?
future.value

# propagates trough chain allowing whole or partial lazy chains

head = Concurrent.delay { 1 }
branch1 = head.then(&:succ)
branch2 = head.delay.then(&:succ)
join = branch1 & branch2

sleep 0.1 # nothing will complete
[head, branch1, branch2, join].map(&:completed?)

branch1.value
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
[head, branch1, branch2, join].map(&:completed?)

join.value


### Flatting

Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future

# more complicated example
Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }.
flat(1).
then { |f| f.then(&:succ) }.
flat(1).value


### Schedule

scheduled = Concurrent.schedule(0.1) { 1 }

scheduled.completed?
scheduled.value # available after 0.1sec

# and in chain
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
# will not be scheduled until value is requested
sleep 0.1 #
scheduled.value # returns after another 0.1sec


### Completable Future and Event

future = Concurrent.future
event = Concurrent.event

# will be blocked until completed
t1 = Thread.new { future.value } #
t2 = Thread.new { event.wait } #

future.success 1
future.success 1 rescue $!
future.try_success 2
event.complete

[t1, t2].each &:join #


### Callbacks

queue = Queue.new
future = Concurrent.delay { 1 + 1 }

future.on_success { queue << 1 } # evaluated asynchronously
future.on_success! { queue << 2 } # evaluated on completing thread

queue.empty?
future.value
queue.pop
queue.pop


### Thread-pools

Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait


### Interoperability with actors

actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
-> v { v ** 2 }
end

Concurrent.
future { 2 }.
then_ask(actor).
then { |v| v + 2 }.
value

actor.ask(2).then(&:succ).value


### Common use-cases Examples

# simple background processing
Concurrent.future { do_stuff }

# parallel background processing
jobs = 10.times.map { |i| Concurrent.future { i } } #
Concurrent.zip(*jobs).value


# periodic task
def schedule_job
Concurrent.schedule(1) { do_stuff }.
rescue { |e| report_error e }.
then { schedule_job }
end

schedule_job


Loading