diff --git a/.gitignore b/.gitignore index a291e1f63..9fadacbd5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ Gemfile.lock -tests.txt *.gem .rvmrc .ruby-version @@ -21,9 +20,12 @@ coverage .DS_Store TAGS tmtags -*.swo -*.swp +*.sw? .idea .rbx/* -*.py -*.pyc +lib/*.jar +ext/*.bundle +ext/*.so +ext/*.jar +pkg +*.gem diff --git a/Gemfile b/Gemfile index 4d65462ed..be251a5a7 100644 --- a/Gemfile +++ b/Gemfile @@ -2,12 +2,12 @@ source 'https://rubygems.org' gemspec - group :development do gem 'rake', '~> 10.2.2' gem 'countloc', '~> 0.4.0', platforms: :mri gem 'yard', '~> 0.8.7.4' gem 'inch', '~> 0.4.1', platforms: :mri + gem 'rake-compiler', '~> 0.9.2', platforms: [:mri, :mswin, :mingw] end group :testing do diff --git a/Rakefile b/Rakefile index d15de5f71..99371ee0b 100644 --- a/Rakefile +++ b/Rakefile @@ -1,12 +1,9 @@ -$:.push File.join(File.dirname(__FILE__), 'lib') -$:.push File.join(File.dirname(__FILE__), 'tasks/support') - -require 'rubygems' +require 'rake' require 'bundler/gem_tasks' require 'rspec' require 'rspec/core/rake_task' -require 'concurrent' +require_relative 'lib/extension_helper' Bundler::GemHelper.install_tasks @@ -16,8 +13,67 @@ Dir.glob('tasks/**/*.rake').each do|rakefile| load rakefile end +desc "Run benchmarks" +task :bench do + exec "ruby -Ilib -Iext examples/bench_atomic.rb" +end + +if defined?(JRUBY_VERSION) + require 'ant' + + EXTENSION_NAME = 'concurrent_jruby' + + directory "pkg/classes" + + desc "Clean up build artifacts" + task :clean do + rm_rf "pkg/classes" + rm_rf "lib/#{EXTENSION_NAME}.jar" + end + + desc "Compile the extension" + task :compile_java => "pkg/classes" do |t| + ant.javac :srcdir => "ext", :destdir => t.prerequisites.first, + :source => "1.5", :target => "1.5", :debug => true, + :classpath => "${java.class.path}:${sun.boot.class.path}" + end + + desc "Build the jar" + task :jar => :compile_java do + ant.jar :basedir => "pkg/classes", :destfile => "lib/#{EXTENSION_NAME}.jar", :includes => "**/*.class" + end + + task :compile => :jar + +elsif use_c_extensions? + + EXTENSION_NAME = 'concurrent_cruby' + + require 'rake/extensiontask' + + CLEAN.include Rake::FileList['**/*.so', '**/*.bundle', '**/*.o', '**/mkmf.log', '**/Makefile'] + + spec = Gem::Specification.load('concurrent-ruby.gemspec') + Rake::ExtensionTask.new(EXTENSION_NAME, spec) do |ext| + ext.ext_dir = 'ext' + ext.name = EXTENSION_NAME + ext.source_pattern = "**/*.{h,c,cpp}" + end + + desc 'Clean, compile, and build the extension from scratch' + task :compile_c => [ :clean, :compile ] + + task :irb => [:compile] do + sh "irb -r ./lib/#{EXTENSION_NAME}.bundle -I #{File.join(File.dirname(__FILE__), 'lib')}" + end +end + RSpec::Core::RakeTask.new(:travis_spec) do |t| t.rspec_opts = '--tag ~@not_on_travis' end -task :default => [:travis_spec] +if use_c_extensions? + task :default => [:compile_c, :travis_spec] +else + task :default => [:travis_spec] +end diff --git a/concurrent-ruby.gemspec b/concurrent-ruby.gemspec index 81ea1cc79..97294ea1a 100644 --- a/concurrent-ruby.gemspec +++ b/concurrent-ruby.gemspec @@ -25,5 +25,12 @@ Gem::Specification.new do |s| s.extra_rdoc_files = Dir['README*', 'LICENSE*', 'CHANGELOG*'] s.require_paths = ['lib'] + if defined?(JRUBY_VERSION) + s.files = Dir['lib/concurrent_jruby.jar'] + s.platform = 'java' + else + s.extensions = 'ext/extconf.rb' + end + s.required_ruby_version = '>= 1.9.3' end diff --git a/examples/atomic_example.rb b/examples/atomic_example.rb new file mode 100644 index 000000000..5debe298f --- /dev/null +++ b/examples/atomic_example.rb @@ -0,0 +1,12 @@ +require 'concurrent' + +my_atomic = Concurrent::Atomic.new(0) +my_atomic.update {|v| v + 1} +puts "new value: #{my_atomic.value}" + +begin + my_atomic.try_update {|v| v + 1} +rescue Concurrent::Atomic::ConcurrentUpdateError => cue + # deal with it (retry, propagate, etc) +end +puts "new value: #{my_atomic.value}" diff --git a/examples/bench_atomic.rb b/examples/bench_atomic.rb new file mode 100644 index 000000000..ec9fccfa4 --- /dev/null +++ b/examples/bench_atomic.rb @@ -0,0 +1,109 @@ +require 'benchmark' +require 'concurrent' +require 'thread' +Thread.abort_on_exception = true + +$go = false # for synchronizing parallel threads + +# number of updates on the value +N = ARGV[1] ? ARGV[1].to_i : 100_000 + +# number of threads for parallel test +M = ARGV[0] ? ARGV[0].to_i : 100 + + +puts "*** Sequential updates ***" +Benchmark.bm(10) do |x| + value = 0 + x.report "no lock" do + N.times do + value += 1 + end + end + + @lock = Mutex.new + x.report "mutex" do + value = 0 + N.times do + @lock.synchronize do + value += 1 + end + end + end + + @atom = Concurrent::Atomic.new(0) + x.report "atomic" do + N.times do + @atom.update{|x| x += 1} + end + end +end + +def para_setup(num_threads, count, &block) + if num_threads % 2 > 0 + raise ArgumentError, "num_threads must be a multiple of two" + end + raise ArgumentError, "need block" unless block_given? + + # Keep those threads together + tg = ThreadGroup.new + + num_threads.times do |i| + diff = (i % 2 == 0) ? 1 : -1 + + t = Thread.new do + nil until $go + count.times do + yield diff + end + end + + tg.add(t) + end + + # Make sure all threads are started + while tg.list.find{|t| t.status != "run"} + Thread.pass + end + + # For good measure + GC.start + + tg +end + +def para_run(tg) + $go = true + tg.list.each{|t| t.join} + $go = false +end + +puts "*** Parallel updates ***" +Benchmark.bm(10) do |bm| + # This is not secure + value = 0 + tg = para_setup(M, N/M) do |diff| + value += diff + end + bm.report("no lock"){ para_run(tg) } + + + value = 0 + @lock = Mutex.new + tg = para_setup(M, N/M) do |diff| + @lock.synchronize do + value += diff + end + end + bm.report("mutex"){ para_run(tg) } + raise unless value == 0 + + + @atom = Concurrent::Atomic.new(0) + tg = para_setup(M, N/M) do |diff| + @atom.update{|x| x + diff} + end + bm.report("atomic"){ para_run(tg) } + raise unless @atom.value == 0 + +end diff --git a/examples/bench_atomic_1.rb b/examples/bench_atomic_1.rb new file mode 100644 index 000000000..c945e75ea --- /dev/null +++ b/examples/bench_atomic_1.rb @@ -0,0 +1,138 @@ +#!/usr/bin/env ruby + +$: << File.expand_path('../../lib', __FILE__) + +require 'optparse' +require 'thread' +require 'benchmark' + +require 'concurrent' + +Thread.abort_on_exception = true + +$conf = { + :lock => "atomic", + :num_threads => 100, + :count => 100_000, + :count_per_thread => nil, + :slow => nil, +} + +OptionParser.new do |opts| + opts.on("-c", "--count NUM") do |n| + $conf[:count] = n.to_i + end + opts.on("-p", "--count-per-thread") do |n| + $conf[:count_per_thread] = n.to_i + end + opts.on("-t", "--num-threads NUM") do |n| + $conf[:num_threads] = n.to_i + end + opts.on("-s", "--slow NUM") do |n| + $conf[:slow] = n.to_i + end + opts.on("-l", "--lock atomic|mutex") do |x| + $conf[:lock] = x + end + opts.on("-h", "--help"){ puts opts; exit } +end.parse!(ARGV) + +unless $conf[:count_per_thread] + $conf[:count_per_thread] = $conf[:count] / $conf[:num_threads] +end +$conf.delete(:count) + +if $conf[:slow].to_i > 0 + require 'digest/md5' + def slow_down + $conf[:slow].times do |i| + Digest::MD5.hexdigest(i.to_s) + end + end + + ret = [] + 10.times do + m = Benchmark.measure{ slow_down } + ret << m.real + end + + $conf[:slow_time] = [ret.min, ret.max] +else + def slow_down; end +end + +$stderr.puts $conf.inspect + +def para_prepare(&block) + num_threads = $conf[:num_threads] + count = $conf[:count_per_thread] + + if num_threads % 2 > 0 + raise ArgumentError, "num_threads must be a multiple of two" + end + + # Keep those threads together + tg = ThreadGroup.new + + num_threads.times do |i| + diff = (i % 2 == 0) ? 1 : -1 + + t = Thread.new do + nil until $go + count.times do + yield diff + end + end + + tg.add(t) + end + + # Make sure all threads are started + while tg.list.find{|t| t.status != "run"} + Thread.pass + end + + # For good measure + GC.start + + $go = false + + tg +end + + + +$tg = nil +if $conf[:lock] == "atomic" + $atom = Concurrent::Atomic.new(0) + $tg = para_prepare do |diff| + $atom.update do |x| + slow_down + x + diff + end + end +else + $lock = Mutex.new + $value = 0 + $tg = para_prepare do |diff| + $lock.synchronize do + slow_down + $value += diff + end + end +end + + +# Run ! +# +# NOTE: It seems to me that this measurement method +# is sensible to how the system dispatches his resources. +# +# More precise caluclation could be done using +# getrusage's times +ret = Benchmark.measure do + $go = true + $tg.list.each{|t| t.join} + $go = false +end +puts ret.real diff --git a/examples/graph_atomic_bench.rb b/examples/graph_atomic_bench.rb new file mode 100644 index 000000000..abafc6de0 --- /dev/null +++ b/examples/graph_atomic_bench.rb @@ -0,0 +1,69 @@ +#!/usr/bin/env ruby + +require 'optparse' + +conf = { + :vary => "threads", + :lock => "atomic" +} + +OptionParser.new do |opts| + opts.on("-l", "--lock atomic|mutex") do |l| + conf[:lock] = l + end + opts.on("-v", "--vary threads|speed") do |v| + conf[:vary] = v + end + opts.on("-h", "--help"){ puts opts; exit } +end.parse!(ARGV) + +result = File.open("results_#{conf[:lock]}_#{conf[:vary]}.csv", "w") + + +if conf[:vary] == "threads" + # Vary the number of concurrent threads that update the value. + # + # There is a total count of 1mio updates that is distributed + # between the number of threads. + # + # A pair number of threads is used so that even add and odd substract 1. + # This avoid creating instances for Bignum since the number should + # stay in the Fixnum range. + # + (1..100).each do |i| + i = i * 2 + + ret = [] + 10.times do + ret << `ruby ./bench_atomic_1.rb -l #{conf[:lock]} -t #{i}`.to_f + end + + line = ([i] + ret).join(', ') + + puts line + result.puts line + end +elsif conf[:vary] == "speed" + # Varies the execution time of the update block + # by using long calulation (MD5) + # + # NOTE: Thread.pass and sleep() are not usable by the atomic + # lock. It needs to run the whole block without hitting + # another atomic update otherwise it has to retry + # + # The expected result is that the atomic lock's performance + # will hit a certain threshold where it will be worse than mutexes. + # + (1..30).each do |i| + + ret = [] + 10.times do + ret << `ruby ./bench_atomic_1.rb -l #{conf[:lock]} -s #{i}`.to_f + end + + line = ([i] + ret).join(', ') + + puts line + result.puts line + end +end diff --git a/ext/AtomicReferenceService.java b/ext/AtomicReferenceService.java new file mode 100644 index 000000000..649a9f49f --- /dev/null +++ b/ext/AtomicReferenceService.java @@ -0,0 +1,12 @@ +import java.io.IOException; + +import org.jruby.Ruby; +import org.jruby.runtime.load.BasicLibraryService; + +public class AtomicReferenceService implements BasicLibraryService { + public boolean basicLoad(final Ruby runtime) throws IOException { + new com.concurrent_ruby.ext.AtomicReferenceLibrary().load(runtime, false); + return true; + } +} + diff --git a/ext/atomic_reference.c b/ext/atomic_reference.c new file mode 100644 index 000000000..e87fc2556 --- /dev/null +++ b/ext/atomic_reference.c @@ -0,0 +1,78 @@ +#include +#if defined(__sun) +#include +#endif + +#ifdef HAVE_LIBKERN_OSATOMIC_H +#include +#endif + +#include "atomic_reference.h" + +void ir_mark(void *value) { + rb_gc_mark_maybe((VALUE) value); +} + +VALUE ir_alloc(VALUE klass) { + return rb_data_object_alloc(klass, (void *) Qnil, ir_mark, NULL); +} + +VALUE ir_initialize(int argc, VALUE* argv, VALUE self) { + VALUE value = Qnil; + if (rb_scan_args(argc, argv, "01", &value) == 1) { + value = argv[0]; + } + DATA_PTR(self) = (void *) value; + return Qnil; +} + +VALUE ir_get(VALUE self) { + return (VALUE) DATA_PTR(self); +} + +VALUE ir_set(VALUE self, VALUE new_value) { + DATA_PTR(self) = (void *) new_value; + return new_value; +} + +VALUE ir_get_and_set(VALUE self, VALUE new_value) { + VALUE old_value; + old_value = (VALUE) DATA_PTR(self); + DATA_PTR(self) = (void *) new_value; + return old_value; +} + +VALUE ir_compare_and_set(volatile VALUE self, VALUE expect_value, VALUE new_value) { +#if __ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ >= 1050 + if (OSAtomicCompareAndSwap64(expect_value, new_value, &DATA_PTR(self))) { + return Qtrue; + } +#elif defined(__sun) +/* Assuming VALUE is uintptr_t */ +/* Based on the definition of uintptr_t from /usr/include/sys/int_types.h */ +#if defined(_LP64) || defined(_I32LPx) + /* 64-bit: uintptr_t === unsigned long */ + if (atomic_cas_ulong((uintptr_t *) &DATA_PTR(self), expect_value, new_value)) { + return Qtrue; + } +#else + /* 32-bit: uintptr_t === unsigned int */ + if (atomic_cas_uint((uintptr_t *) &DATA_PTR(self), expect_value, new_value)) { + return Qtrue; + } +#endif +#elif defined _MSC_VER && defined _M_AMD64 + if (InterlockedCompareExchange64((LONGLONG*)&DATA_PTR(self), new_value, expect_value)) { + return Qtrue; + } +#elif defined _MSC_VER && defined _M_IX86 + if (InterlockedCompareExchange((LONG*)&DATA_PTR(self), new_value, expect_value)) { + return Qtrue; + } +#else + if (__sync_bool_compare_and_swap(&DATA_PTR(self), expect_value, new_value)) { + return Qtrue; + } +#endif + return Qfalse; +} diff --git a/ext/atomic_reference.h b/ext/atomic_reference.h new file mode 100644 index 000000000..efcb676a3 --- /dev/null +++ b/ext/atomic_reference.h @@ -0,0 +1,12 @@ +#ifndef __ATOMIC_REFERENCE_H__ +#define __ATOMIC_REFERENCE_H__ + +void ir_mark(void*); +VALUE ir_alloc(VALUE); +VALUE ir_initialize(int, VALUE*, VALUE); +VALUE ir_get(VALUE); +VALUE ir_set(VALUE, VALUE); +VALUE ir_get_and_set(VALUE, VALUE); +VALUE ir_compare_and_set(volatile VALUE, VALUE, VALUE); + +#endif diff --git a/ext/com/concurrent_ruby/ext/AtomicReferenceLibrary.java b/ext/com/concurrent_ruby/ext/AtomicReferenceLibrary.java new file mode 100644 index 000000000..2aebb7b98 --- /dev/null +++ b/ext/com/concurrent_ruby/ext/AtomicReferenceLibrary.java @@ -0,0 +1,175 @@ +package com.concurrent_ruby.ext; + +import java.lang.reflect.Field; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jruby.Ruby; +import org.jruby.RubyClass; +import org.jruby.RubyModule; +import org.jruby.RubyNumeric; +import org.jruby.RubyObject; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ObjectAllocator; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.runtime.load.Library; + +/** + * This library adds an atomic reference type to JRuby for use in the atomic + * library. We do a native version to avoid the implicit value coercion that + * normally happens through JI. + * + * @author headius + */ +public class AtomicReferenceLibrary implements Library { + public void load(Ruby runtime, boolean wrap) throws IOException { + RubyModule concurrentMod = runtime.defineModule("Concurrent"); + RubyClass atomicCls = concurrentMod.defineClassUnder("Atomic", runtime.getObject(), JRUBYREFERENCE_ALLOCATOR); + try { + sun.misc.Unsafe.class.getMethod("getAndSetObject", Object.class); + atomicCls.setAllocator(JRUBYREFERENCE8_ALLOCATOR); + } catch (Exception e) { + // leave it as Java 6/7 version + } + atomicCls.defineAnnotatedMethods(JRubyReference.class); + } + + private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new JRubyReference(runtime, klazz); + } + }; + + private static final ObjectAllocator JRUBYREFERENCE8_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new JRubyReference8(runtime, klazz); + } + }; + + @JRubyClass(name="JRubyReference", parent="Object") + public static class JRubyReference extends RubyObject { + volatile IRubyObject reference; + + static final sun.misc.Unsafe UNSAFE; + static final long referenceOffset; + + static { + try { + UNSAFE = UnsafeHolder.U; + Class k = JRubyReference.class; + referenceOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("reference")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public JRubyReference(Ruby runtime, RubyClass klass) { + super(runtime, klass); + } + + @JRubyMethod + public IRubyObject initialize(ThreadContext context) { + UNSAFE.putObject(this, referenceOffset, context.nil); + return context.nil; + } + + @JRubyMethod + public IRubyObject initialize(ThreadContext context, IRubyObject value) { + UNSAFE.putObject(this, referenceOffset, value); + return context.nil; + } + + @JRubyMethod(name = {"get", "value"}) + public IRubyObject get() { + return reference; + } + + @JRubyMethod(name = {"set", "value="}) + public IRubyObject set(IRubyObject newValue) { + UNSAFE.putObjectVolatile(this, referenceOffset, newValue); + return newValue; + } + + @JRubyMethod(name = {"compare_and_set", "compare_and_swap"}) + public IRubyObject compare_and_set(ThreadContext context, IRubyObject expectedValue, IRubyObject newValue) { + Ruby runtime = context.runtime; + + if (expectedValue instanceof RubyNumeric) { + // numerics are not always idempotent in Ruby, so we need to do slower logic + return compareAndSetNumeric(context, expectedValue, newValue); + } + + return runtime.newBoolean(UNSAFE.compareAndSwapObject(this, referenceOffset, expectedValue, newValue)); + } + + @JRubyMethod(name = {"get_and_set", "swap"}) + public IRubyObject get_and_set(ThreadContext context, IRubyObject newValue) { + // less-efficient version for Java 6 and 7 + while (true) { + IRubyObject oldValue = get(); + if (UNSAFE.compareAndSwapObject(this, referenceOffset, oldValue, newValue)) { + return oldValue; + } + } + } + + private IRubyObject compareAndSetNumeric(ThreadContext context, IRubyObject expectedValue, IRubyObject newValue) { + Ruby runtime = context.runtime; + + // loop until: + // * reference CAS would succeed for same-valued objects + // * current and expected have different values as determined by #equals + while (true) { + IRubyObject current = reference; + + if (!(current instanceof RubyNumeric)) { + // old value is not numeric, CAS fails + return runtime.getFalse(); + } + + RubyNumeric currentNumber = (RubyNumeric)current; + if (!currentNumber.equals(expectedValue)) { + // current number does not equal expected, fail CAS + return runtime.getFalse(); + } + + // check that current has not changed, or else allow loop to repeat + boolean success = UNSAFE.compareAndSwapObject(this, referenceOffset, current, newValue); + if (success) { + // value is same and did not change in interim...success + return runtime.getTrue(); + } + } + } + } + + private static final class UnsafeHolder { + private UnsafeHolder(){} + + public static final sun.misc.Unsafe U = loadUnsafe(); + + private static sun.misc.Unsafe loadUnsafe() { + try { + Class unsafeClass = Class.forName("sun.misc.Unsafe"); + Field f = unsafeClass.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (sun.misc.Unsafe) f.get(null); + } catch (Exception e) { + return null; + } + } + } + + public static class JRubyReference8 extends JRubyReference { + public JRubyReference8(Ruby runtime, RubyClass klass) { + super(runtime, klass); + } + + @Override + public IRubyObject get_and_set(ThreadContext context, IRubyObject newValue) { + // efficient version for Java 8 + return (IRubyObject)UNSAFE.getAndSetObject(this, referenceOffset, newValue); + } + } +} diff --git a/ext/extconf.rb b/ext/extconf.rb new file mode 100644 index 000000000..fd16ebe84 --- /dev/null +++ b/ext/extconf.rb @@ -0,0 +1,45 @@ +require_relative '../lib/extension_helper' + +if defined?(JRUBY_VERSION) + puts 'JRuby detected. Pure Java optimizations will be used.' +elsif ! use_c_extensions? + puts 'C optimizations are only supported on MRI 2.0 and above.' +else + + require 'mkmf' + + extension_name = 'concurrent_cruby' + dir_config(extension_name) + + have_header "libkern/OSAtomic.h" + + def compiler_is_gcc + if CONFIG["GCC"] && CONFIG["GCC"] != "" + return true + elsif ( # This could stand to be more generic... but I am afraid. + CONFIG["CC"] =~ /\bgcc\b/ + ) + return true + end + return false + end + + if compiler_is_gcc + case CONFIG["arch"] + when /mswin32|mingw|solaris/ + $CFLAGS += " -march=native" + when 'i686-linux' + $CFLAGS += " -march=i686" + end + end + + try_run(< + +#include "atomic_reference.h" + +// module and class definitions + +static VALUE rb_mConcurrent; +static VALUE rb_cAtomic; + +// Init_concurrent_cruby + +void Init_concurrent_cruby() { + + // define modules and classes + rb_mConcurrent = rb_define_module("Concurrent"); + rb_cAtomic = rb_define_class_under(rb_mConcurrent, "Atomic", rb_cObject); + + // CAtomic + rb_define_alloc_func(rb_cAtomic, ir_alloc); + rb_define_method(rb_cAtomic, "initialize", ir_initialize, -1); + rb_define_method(rb_cAtomic, "get", ir_get, 0); + rb_define_method(rb_cAtomic, "value", ir_get, 0); + rb_define_method(rb_cAtomic, "set", ir_set, 1); + rb_define_method(rb_cAtomic, "value=", ir_set, 1); + rb_define_method(rb_cAtomic, "get_and_set", ir_get_and_set, 1); + rb_define_method(rb_cAtomic, "swap", ir_get_and_set, 1); + rb_define_method(rb_cAtomic, "compare_and_set", ir_compare_and_set, 2); + rb_define_method(rb_cAtomic, "compare_and_swap", ir_compare_and_set, 2); +} diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 15302c436..58c06ff74 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -1,4 +1,5 @@ require 'concurrent/version' + require 'concurrent/configuration' require 'concurrent/atomics' @@ -8,6 +9,7 @@ require 'concurrent/executors' require 'concurrent/utilities' +require 'concurrent/atomic' require 'concurrent/agent' require 'concurrent/async' require 'concurrent/dataflow' diff --git a/lib/concurrent/atomic.rb b/lib/concurrent/atomic.rb new file mode 100644 index 000000000..0013c68ce --- /dev/null +++ b/lib/concurrent/atomic.rb @@ -0,0 +1,13 @@ +begin + # force fallback impl with FORCE_ATOMIC_FALLBACK=1 + if /[^0fF]/ =~ ENV['FORCE_ATOMIC_FALLBACK'] + ruby_engine = 'fallback' + else + ruby_engine = defined?(RUBY_ENGINE)? RUBY_ENGINE : 'ruby' + end + + require "concurrent/atomic_reference/#{ruby_engine}" +rescue LoadError + warn "#{__FILE__}:#{__LINE__}: unsupported Ruby engine `#{RUBY_ENGINE}', using less-efficient Atomic impl" + require 'concurrent/atomic_reference/fallback' +end diff --git a/lib/concurrent/atomic/atomic.rb b/lib/concurrent/atomic/atomic.rb deleted file mode 100644 index e328da8b8..000000000 --- a/lib/concurrent/atomic/atomic.rb +++ /dev/null @@ -1,48 +0,0 @@ -module Concurrent - - class MutexAtomic - - def initialize(init = nil) - @value = init - @mutex = Mutex.new - end - - def value - @mutex.lock - @value - ensure - @mutex.unlock - end - - def value=(value) - @mutex.lock - @value = value - ensure - @mutex.unlock - end - - def modify - @mutex.lock - result = yield @value - @value = result - ensure - @mutex.unlock - end - - def compare_and_set(expect, update) - @mutex.lock - if @value == expect - @value = update - true - else - false - end - ensure - @mutex.unlock - end - end - - class Atomic < MutexAtomic - end - -end diff --git a/lib/concurrent/atomic_reference/concurrent_update_error.rb b/lib/concurrent/atomic_reference/concurrent_update_error.rb new file mode 100644 index 000000000..c1b22c24e --- /dev/null +++ b/lib/concurrent/atomic_reference/concurrent_update_error.rb @@ -0,0 +1,9 @@ +module Concurrent + + class Atomic + class ConcurrentUpdateError < ThreadError + # frozen pre-allocated backtrace to speed ConcurrentUpdateError + CONC_UP_ERR_BACKTRACE = ['backtrace elided; set verbose to enable'].freeze + end + end +end diff --git a/lib/concurrent/atomic_reference/delegated_update.rb b/lib/concurrent/atomic_reference/delegated_update.rb new file mode 100644 index 000000000..96771c502 --- /dev/null +++ b/lib/concurrent/atomic_reference/delegated_update.rb @@ -0,0 +1,28 @@ +require 'concurrent/atomic_reference/concurrent_update_error' + +module Concurrent + + # Define update methods that delegate to @ref field + class Atomic + # Pass the current value to the given block, replacing it + # with the block's result. May retry if the value changes + # during the block's execution. + def update + true until @ref.compare_and_set(old_value = @ref.get, new_value = yield(old_value)) + new_value + end + + def try_update + old_value = @ref.get + new_value = yield old_value + unless @ref.compare_and_set(old_value, new_value) + if $VERBOSE + raise ConcurrentUpdateError, "Update failed" + else + raise ConcurrentUpdateError, "Update failed", ConcurrentUpdateError::CONC_UP_ERR_BACKTRACE + end + end + new_value + end + end +end diff --git a/lib/concurrent/atomic_reference/direct_update.rb b/lib/concurrent/atomic_reference/direct_update.rb new file mode 100644 index 000000000..80408dab2 --- /dev/null +++ b/lib/concurrent/atomic_reference/direct_update.rb @@ -0,0 +1,28 @@ +require 'concurrent/atomic_reference/concurrent_update_error' + +module Concurrent + + # Define update methods that use direct paths + class Atomic + # Pass the current value to the given block, replacing it + # with the block's result. May retry if the value changes + # during the block's execution. + def update + true until compare_and_set(old_value = get, new_value = yield(old_value)) + new_value + end + + def try_update + old_value = get + new_value = yield old_value + unless compare_and_set(old_value, new_value) + if $VERBOSE + raise ConcurrentUpdateError, "Update failed" + else + raise ConcurrentUpdateError, "Update failed", ConcurrentUpdateError::CONC_UP_ERR_BACKTRACE + end + end + new_value + end + end +end diff --git a/lib/concurrent/atomic_reference/fallback.rb b/lib/concurrent/atomic_reference/fallback.rb new file mode 100644 index 000000000..74809566d --- /dev/null +++ b/lib/concurrent/atomic_reference/fallback.rb @@ -0,0 +1,45 @@ +require 'thread' +require 'concurrent/atomic_reference/direct_update' + +module Concurrent + + # Portable/generic (but not very memory or scheduling-efficient) fallback + class Atomic #:nodoc: all + def initialize(value = nil) + @mutex = Mutex.new + @value = value + end + + def get + @mutex.synchronize { @value } + end + alias value get + + def set(new_value) + @mutex.synchronize { @value = new_value } + end + alias value= set + + def get_and_set(new_value) + @mutex.synchronize do + old_value = @value + @value = new_value + old_value + end + end + alias swap get_and_set + + def compare_and_set(old_value, new_value) + return false unless @mutex.try_lock + begin + return false unless @value.equal? old_value + @value = new_value + ensure + @mutex.unlock + end + true + end + + require 'concurrent/atomic_reference/numeric_cas_wrapper' + end +end diff --git a/lib/concurrent/atomic_reference/jruby.rb b/lib/concurrent/atomic_reference/jruby.rb new file mode 100644 index 000000000..095104f7f --- /dev/null +++ b/lib/concurrent/atomic_reference/jruby.rb @@ -0,0 +1,2 @@ +require 'concurrent_jruby' +require 'concurrent/atomic_reference/direct_update' diff --git a/lib/concurrent/atomic_reference/numeric_cas_wrapper.rb b/lib/concurrent/atomic_reference/numeric_cas_wrapper.rb new file mode 100644 index 000000000..f313688df --- /dev/null +++ b/lib/concurrent/atomic_reference/numeric_cas_wrapper.rb @@ -0,0 +1,23 @@ +module Concurrent + + class Atomic + alias _compare_and_set compare_and_set + def compare_and_set(expected, new) + if expected.kind_of? Numeric + while true + old = get + + return false unless old.kind_of? Numeric + + return false unless old == expected + + result = _compare_and_set(old, new) + return result if result + end + else + _compare_and_set(expected, new) + end + end + alias compare_and_swap compare_and_set + end +end diff --git a/lib/concurrent/atomic_reference/rbx.rb b/lib/concurrent/atomic_reference/rbx.rb new file mode 100644 index 000000000..c90ec45cb --- /dev/null +++ b/lib/concurrent/atomic_reference/rbx.rb @@ -0,0 +1,12 @@ +module Concurrent + + # extend Rubinius's version adding aliases and numeric logic + class Atomic < Rubinius::AtomicReference + alias value get + alias value= set + alias swap get_and_set + end + + require 'concurrent/atomic_reference/direct_update' + require 'concurrent/atomic_reference/numeric_cas_wrapper' +end diff --git a/lib/concurrent/atomic_reference/ruby.rb b/lib/concurrent/atomic_reference/ruby.rb new file mode 100644 index 000000000..f62d537ca --- /dev/null +++ b/lib/concurrent/atomic_reference/ruby.rb @@ -0,0 +1,3 @@ +require 'concurrent_cruby' +require 'concurrent/atomic_reference/direct_update' +require 'concurrent/atomic_reference/numeric_cas_wrapper' diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 31e11ff0e..b3b0d3985 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -1,4 +1,4 @@ -require 'concurrent/atomic/atomic' +require 'concurrent/atomic' require 'concurrent/atomic/atomic_boolean' require 'concurrent/atomic/atomic_fixnum' require 'concurrent/atomic/condition' diff --git a/lib/concurrent_cruby.bundle b/lib/concurrent_cruby.bundle new file mode 100755 index 000000000..babb2f391 Binary files /dev/null and b/lib/concurrent_cruby.bundle differ diff --git a/lib/extension_helper.rb b/lib/extension_helper.rb new file mode 100644 index 000000000..82e0725c1 --- /dev/null +++ b/lib/extension_helper.rb @@ -0,0 +1,7 @@ +require 'rbconfig' + +def use_c_extensions? + host_os = RbConfig::CONFIG['host_os'] + ruby_name = RbConfig::CONFIG['ruby_install_name'] + (ruby_name =~ /^ruby$/i || host_os =~ /mswin32/i || host_os =~ /mingw32/i) && RUBY_VERSION >= '2.0' +end diff --git a/spec/concurrent/atomic/atomic_spec.rb b/spec/concurrent/atomic/atomic_spec.rb deleted file mode 100644 index a22062a15..000000000 --- a/spec/concurrent/atomic/atomic_spec.rb +++ /dev/null @@ -1,133 +0,0 @@ -require 'spec_helper' - -share_examples_for :atomic do - - context 'construction' do - - it 'sets the initial value' do - described_class.new(:foo).value.should eq :foo - end - - it 'defaults the initial value to nil' do - described_class.new.value.should eq nil - end - end - - context '#value' do - - it 'returns the current value' do - counter = described_class.new(:foo) - counter.value.should eq :foo - end - end - - context '#value=' do - - it 'sets the #value to the given object' do - atomic = described_class.new(:foo) - atomic.value = :bar - atomic.value.should eq :bar - end - - it 'returns the new value' do - atomic = described_class.new(:foo) - (atomic.value = :bar).should eq :bar - end - end - - context '#modify' do - - it 'yields the current value' do - atomic = described_class.new(:foo) - current = [] - atomic.modify { |value| current << value } - current.should eq [:foo] - end - - it 'stores the value returned from the yield' do - atomic = described_class.new(:foo) - atomic.modify { |value| :bar } - atomic.value.should eq :bar - end - - it 'returns the new value' do - atomic = described_class.new(:foo) - atomic.modify{ |value| :bar }.should eq :bar - end - end - - context '#compare_and_set' do - - it 'returns false if the value is not found' do - described_class.new(:foo).compare_and_set(:bar, :foo).should eq false - end - - it 'returns true if the value is found' do - described_class.new(:foo).compare_and_set(:foo, :bar).should eq true - end - - it 'sets if the value is found' do - f = described_class.new(:foo) - f.compare_and_set(:foo, :bar) - f.value.should eq :bar - end - - it 'does not set if the value is not found' do - f = described_class.new(:foo) - f.compare_and_set(:bar, :baz) - f.value.should eq :foo - end - end -end - -module Concurrent - - describe MutexAtomic do - - it_should_behave_like :atomic - - specify 'construction is synchronized' do - mutex = double('mutex') - Mutex.should_receive(:new).once.with(no_args).and_return(mutex) - described_class.new - end - - specify 'value is synchronized' do - mutex = double('mutex') - Mutex.stub(:new).with(no_args).and_return(mutex) - mutex.should_receive(:lock) - mutex.should_receive(:unlock) - described_class.new.value - end - - specify 'value= is synchronized' do - mutex = double('mutex') - Mutex.stub(:new).with(no_args).and_return(mutex) - mutex.should_receive(:lock) - mutex.should_receive(:unlock) - described_class.new.value = 10 - end - - specify 'modify is synchronized' do - mutex = double('mutex') - Mutex.stub(:new).with(no_args).and_return(mutex) - mutex.should_receive(:lock) - mutex.should_receive(:unlock) - described_class.new(:foo).modify { |value| value } - end - - specify 'compare_and_set is synchronized' do - mutex = double('mutex') - Mutex.stub(:new).with(no_args).and_return(mutex) - mutex.should_receive(:lock) - mutex.should_receive(:unlock) - described_class.new(14).compare_and_set(14, 2) - end - end - - describe Atomic do - it 'inherits from MutexAtomic' do - Atomic.ancestors.should include(MutexAtomic) - end - end -end diff --git a/spec/concurrent/atomic_spec.rb b/spec/concurrent/atomic_spec.rb new file mode 100644 index 000000000..6de0ff4af --- /dev/null +++ b/spec/concurrent/atomic_spec.rb @@ -0,0 +1,131 @@ +require 'spec_helper' + +module Concurrent + + describe Atomic do + + specify :test_construct do + atomic = Atomic.new + atomic.value.should be_nil + + atomic = Atomic.new(0) + atomic.value.should eq 0 + end + + specify :test_value do + atomic = Atomic.new(0) + atomic.value = 1 + + atomic.value.should eq 1 + end + + specify :test_update do + # use a number outside JRuby's fixnum cache range, to ensure identity is preserved + atomic = Atomic.new(1000) + res = atomic.update {|v| v + 1} + + atomic.value.should eq 1001 + res.should eq 1001 + end + + specify :test_try_update do + # use a number outside JRuby's fixnum cache range, to ensure identity is preserved + atomic = Atomic.new(1000) + res = atomic.try_update {|v| v + 1} + + atomic.value.should eq 1001 + res.should eq 1001 + end + + specify :test_swap do + atomic = Atomic.new(1000) + res = atomic.swap(1001) + + atomic.value.should eq 1001 + res.should eq 1000 + end + + specify :test_try_update_fails do + # use a number outside JRuby's fixnum cache range, to ensure identity is preserved + atomic = Atomic.new(1000) + expect { + # assigning within block exploits implementation detail for test + atomic.try_update{|v| atomic.value = 1001 ; v + 1} + }.to raise_error(Concurrent::Atomic::ConcurrentUpdateError) + end + + specify :test_update_retries do + tries = 0 + # use a number outside JRuby's fixnum cache range, to ensure identity is preserved + atomic = Atomic.new(1000) + # assigning within block exploits implementation detail for test + atomic.update{|v| tries += 1 ; atomic.value = 1001 ; v + 1} + + tries.should eq 2 + end + + specify :test_numeric_cas do + atomic = Atomic.new(0) + + # 9-bit idempotent Fixnum (JRuby) + max_8 = 2**256 - 1 + min_8 = -(2**256) + + atomic.set(max_8) + max_8.upto(max_8 + 2) do |i| + atomic.compare_and_swap(i, i+1).should be_true, "CAS failed for numeric #{i} => #{i + 1}" + end + + atomic.set(min_8) + min_8.downto(min_8 - 2) do |i| + atomic.compare_and_swap(i, i-1).should be_true, "CAS failed for numeric #{i} => #{i - 1}" + end + + # 64-bit idempotent Fixnum (MRI, Rubinius) + max_64 = 2**62 - 1 + min_64 = -(2**62) + + atomic.set(max_64) + max_64.upto(max_64 + 2) do |i| + atomic.compare_and_swap(i, i+1).should be_true, "CAS failed for numeric #{i} => #{i + 1}" + end + + atomic.set(min_64) + min_64.downto(min_64 - 2) do |i| + atomic.compare_and_swap(i, i-1).should be_true, "CAS failed for numeric #{i} => #{i - 1}" + end + + ## 64-bit overflow into Bignum (JRuby) + max_64 = 2**63 - 1 + min_64 = (-2**63) + + atomic.set(max_64) + max_64.upto(max_64 + 2) do |i| + atomic.compare_and_swap(i, i+1).should be_true, "CAS failed for numeric #{i} => #{i + 1}" + end + + atomic.set(min_64) + min_64.downto(min_64 - 2) do |i| + atomic.compare_and_swap(i, i-1).should be_true, "CAS failed for numeric #{i} => #{i - 1}" + end + + # non-idempotent Float (JRuby, Rubinius, MRI < 2.0.0 or 32-bit) + atomic.set(1.0 + 0.1) + atomic.compare_and_set(1.0 + 0.1, 1.2).should be_true, "CAS failed for #{1.0 + 0.1} => 1.2" + + # Bignum + atomic.set(2**100) + atomic.compare_and_set(2**100, 0).should be_true, "CAS failed for #{2**100} => 0" + + # Rational + require 'rational' unless ''.respond_to? :to_r + atomic.set(Rational(1,3)) + atomic.compare_and_set(Rational(1,3), 0).should be_true, "CAS failed for #{Rational(1,3)} => 0" + + # Complex + require 'complex' unless ''.respond_to? :to_c + atomic.set(Complex(1,2)) + atomic.compare_and_set(Complex(1,2), 0).should be_true, "CAS failed for #{Complex(1,2)} => 0" + end + end +end diff --git a/tasks/metrics.rake b/tasks/metrics.rake index 661d06ba6..dfa81d4bd 100644 --- a/tasks/metrics.rake +++ b/tasks/metrics.rake @@ -1,4 +1,7 @@ -desc 'Display LOC (lines of code) report' -task :loc do - puts `countloc -r lib` +unless defined?(JRUBY_VERSION) + + desc 'Display LOC (lines of code) report' + task :loc do + puts `countloc -r lib` + end end