Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions doc/synchronization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
`Synchronization` module provides common layer for synchronization. It provides same guaranties independent of any particular Ruby implementation.

*This is a new module, it is expected to fully stabilize for 1.0 release.*

## Synchronization::Object

Provides common parent for all objects which need to be synchronized or be using other synchronization tools. It provides:

- Synchronized block
- Methods for waiting and signaling
- Volatile fields
- Ensure visibility of final fields
- Fields with CAS operations

## Synchronized block

`Synchronization::Object` provides private method `#synchronize(&block)`. For a given object only one Thread can enter one of the blocks synchronized against this object. Object is locked when a thread enters one of the synchronized blocks.

Example of a simple counter which can be used by multiple threads:

```ruby
class SafeCounter < Concurrent::Synchronization::Object
def initialize
super
synchronize { @count = 0 }
end

def increment
synchronize { @count += 1 }
end

def count
synchronize { @count }
end
end
```

### Naming conventions

Methods starting with `ns_` are marking methods that are not using synchronization by themselves, they have to be used inside synchronize block. They are usually used in pairs to separate the synchronization from behavior:

```ruby
def compute
service.report synchronize { ns_compute }
end

private

def ns_compute
ns_compute_reduce ns_compute_map
end
```
where `compute` defines how is it synchronized and `ns_compute` handles the behavior (in this case the computation). `ns_` methods should only call other `ns_` methods or `pr_` methods. They can call normal methods on other objects, but that should be done with care (better to avoid) because the thread escapes this object while the lock is still held, which can lead to deadlock. That's why the `report` method is called in `compute` and not in `ns_compute`.

`pr_` methods are pure functions they can be used in and outside of synchronized blocks.

## Methods for waiting and signaling

Sometimes while already inside the synchronized block some condition is not met. Then the thread needs to wait (releasing the lock) until the condition is met. The waiting thread is then signaled that it can continue.

To fulfill these needs there are private methods:

- `ns_wait` {include:Concurrent::Synchronization::AbstractObject#ns_wait}
- `ns_wait_until` {include:Concurrent::Synchronization::AbstractObject#ns_wait_until}
- `ns_signal` {include:Concurrent::Synchronization::AbstractObject#ns_signal}
- `ns_broadcast` {include:Concurrent::Synchronization::AbstractObject#ns_broadcast}

All methods have to be called inside synchronized block.

## Volatile fields

`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field.

## Ensure visibility of final fields

Instance variables assigned only once in `initialize` method are not guaranteed to be visible to all threads. For that user can call `ensure_ivar_visibility!` method, like in following example taken from `Edge::AbstractPromise` implementation:

```ruby
class AbstractPromise < Synchronization::Object
def initialize(future, *args, &block)
super(*args, &block)
@Future = future
ensure_ivar_visibility!
end
# ...
end
```

### Naming conventions

Instance variables with camel case names are final and never reassigned.

## Fields with CAS operations

They are not supported directly, but AtomicReference can be stored in final field and then CAS operations can be done on it, like in following example taken from `Edge::Event` implementation:

```ruby
class Event < Synchronization::Object
extend FutureShortcuts

def initialize(promise, default_executor = :io)
@Promise = promise
@DefaultExecutor = default_executor
@Touched = AtomicBoolean.new(false)
super()
ensure_ivar_visibility!
end
# ...
def touch
# distribute touch to promise only once
@Promise.touch if @Touched.make_true
self
end
# ...
end
```

## Memory model (sort of)

// TODO

48 changes: 48 additions & 0 deletions examples/benchmark_new_futures.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require 'benchmark/ips'
require 'concurrent'
require 'concurrent-edge'

scale = 1
time = 10 * scale
warmup = 2 * scale
warmup *= 10 if Concurrent.on_jruby?


Benchmark.ips(time, warmup) do |x|
of = Concurrent::Promise.execute { 1 }
nf = Concurrent.future { 1 }
x.report('value-old') { of.value! }
x.report('value-new') { nf.value! }
x.compare!
end

Benchmark.ips(time, warmup) do |x|
x.report('graph-old') do
head = Concurrent::Promise.execute { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value!
end
x.report('graph-new') do
head = Concurrent.future { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
(branch1 + branch2).then { |(a, b)| a + b }.value!
end
x.compare!
end

Benchmark.ips(time, warmup) do |x|
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
x.report('immediate-new') { Concurrent.future { nil }.value! }
x.compare!
end

Benchmark.ips(time, warmup) do |x|
of = Concurrent::Promise.execute { 1 }
nf = Concurrent.future { 1 }
x.report('then-old') { of.then(&:succ).then(&:succ).value! }
x.report('then-new') { nf.then(&:succ).then(&:succ).value! }
x.compare!
end

22 changes: 15 additions & 7 deletions ext/com/concurrent_ruby/ext/SynchronizationLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.load.Library;
import org.jruby.runtime.Block;
import org.jruby.runtime.Visibility;
import org.jruby.RubyBoolean;
import org.jruby.RubyNil;
import org.jruby.runtime.ThreadContext;
Expand Down Expand Up @@ -47,19 +48,21 @@ public JavaObject(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod
public IRubyObject initialize(ThreadContext context) {
return context.nil;
@JRubyMethod(rest = true)
public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) {
synchronized (this) {
return callMethod(context, "ns_initialize", args, block);
}
}

@JRubyMethod(name = "synchronize")
@JRubyMethod(name = "synchronize", visibility = Visibility.PRIVATE)
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
synchronized (this) {
return block.yield(context, null);
}
}

@JRubyMethod(name = "ns_wait", optional = 1)
@JRubyMethod(name = "ns_wait", optional = 1, visibility = Visibility.PRIVATE)
public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;
if (args.length > 1) {
Expand Down Expand Up @@ -91,16 +94,21 @@ public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
return this;
}

@JRubyMethod(name = "ns_signal")
@JRubyMethod(name = "ns_signal", visibility = Visibility.PRIVATE)
public IRubyObject nsSignal(ThreadContext context) {
notify();
return this;
}

@JRubyMethod(name = "ns_broadcast")
@JRubyMethod(name = "ns_broadcast", visibility = Visibility.PRIVATE)
public IRubyObject nsBroadcast(ThreadContext context) {
notifyAll();
return this;
}

@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PRIVATE)
public IRubyObject ensureIvarVisibilityBang(ThreadContext context) {
return context.nil;
}
}
}
2 changes: 1 addition & 1 deletion ext/concurrent/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def create_dummy_makefile
end
end

if defined?(JRUBY_VERSION) || ! Concurrent.allow_c_extensions?
if Concurrent.on_jruby? || ! Concurrent.allow_c_extensions?
create_dummy_makefile
warn 'C optimizations are not supported on this version of Ruby.'
else
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/actor/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Core < Synchronization::Object
# any logging system
# @param [Proc] block for class instantiation
def initialize(opts = {}, &block)
super(&nil)
super(&nil) # TODO use ns_initialize
synchronize do
@mailbox = Array.new
@serialized_execution = SerializedExecution.new
Expand Down
13 changes: 5 additions & 8 deletions lib/concurrent/at_exit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ module Concurrent
class AtExitImplementation < Synchronization::Object
include Logging

def initialize(enabled = true)
super()
synchronize do
@handlers = {}
@enabled = enabled
end
end

# Add a handler to be run at `Kernel#at_exit`
# @param [Object] handler_id optionally provide an id, if allready present, handler is replaced
# @yield the handler
Expand Down Expand Up @@ -80,6 +72,11 @@ def run

private

def ns_initialize(enabled = true)
@handlers = {}
@enabled = enabled
end

def runner
run if synchronize { @enabled }
end
Expand Down
9 changes: 7 additions & 2 deletions lib/concurrent/atomic/count_down_latch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ class PureCountDownLatch < Synchronization::Object
#
# @raise [ArgumentError] if `count` is not an integer or is less than zero
def initialize(count = 1)
super()
unless count.is_a?(Fixnum) && count >= 0
raise ArgumentError.new('count must be in integer greater than or equal zero')
end
synchronize { @count = count }
super(count)
end

# @!macro [attach] count_down_latch_method_wait
Expand Down Expand Up @@ -58,6 +57,12 @@ def count_down
def count
synchronize { @count }
end

private

def ns_initialize(count)
@count = count
end
end

if Concurrent.on_jruby?
Expand Down
12 changes: 6 additions & 6 deletions lib/concurrent/atomic/cyclic_barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@ class CyclicBarrier < Synchronization::Object
#
# @raise [ArgumentError] if `parties` is not an integer or is less than zero
def initialize(parties, &block)
super(&nil)
if !parties.is_a?(Fixnum) || parties < 1
raise ArgumentError.new('count must be in integer greater than or equal zero')
end
synchronize do
@parties = parties
@action = block
ns_next_generation
end
super(parties, &block)
end

# @return [Fixnum] the number of threads needed to pass the barrier
Expand Down Expand Up @@ -101,6 +96,11 @@ def ns_next_generation
@number_waiting = 0
end

def ns_initialize(parties, &block)
@parties = parties
@action = block
ns_next_generation
end

end
end
9 changes: 5 additions & 4 deletions lib/concurrent/atomic/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ class Event < Synchronization::Object
# `Event` will block.
def initialize
super
synchronize do
@set = false
@iteration = 0
end
end

# Is the object in the set state?
Expand Down Expand Up @@ -83,5 +79,10 @@ def ns_set
end
true
end

def ns_initialize
@set = false
@iteration = 0
end
end
end
2 changes: 1 addition & 1 deletion lib/concurrent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def self.global_timer_set
end

# General access point to global executors.
# @param [Symbol, Executor] maps symbols:
# @param [Symbol, Executor] executor_identifier symbols:
# - :fast - {Concurrent.global_fast_executor}
# - :io - {Concurrent.global_io_executor}
# - :immediate - {Concurrent.global_immediate_executor}
Expand Down
2 changes: 2 additions & 0 deletions lib/concurrent/delay.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def initialize(opts = {}, &block)
@computing = false
end

protected :synchronize

# Return the value this object represents after applying the options
# specified by the `#set_deref_options` method. If the delayed operation
# raised an exception this method will return nil. The execption object
Expand Down
Loading