diff --git a/CHANGELOG.md b/CHANGELOG.md index d9aceecf9..722139aaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -### Next Release v0.8.1 (TBD) +### Next Release v0.9.0 (Target Date: 5 April 2015) * Pure Java implementations of - `AtomicBoolean` @@ -7,7 +7,14 @@ * Fixed bug when pruning Ruby thread pools * Fixed bug in time calculations within `ScheduledTask` * Default `count` in `CountDownLatch` to 1 -* Use monotonic clock for timeouts on all platforms where supported +* Use monotonic clock for all timers via `Concurrent.monotonic_time` + - Use `Process.clock_gettime(Process::CLOCK_MONOTONIC)` when available + - Fallback to `java.lang.System.nanoTime()` on unsupported JRuby versions + - Pure Ruby implementation for everything else + - Effects `Concurrent.timer`, `Concurrent.timeout`, `TimerSet`, `TimerTask`, and `ScheduledTask` +* Deprecated all clock-time based timer scheduling + - Only support scheduling by delay + - Effects `Concurrent.timer`, `TimerSet`, and `ScheduledTask` ## Current Release v0.8.0 (25 January 2015) diff --git a/doc/scheduled_task.md b/doc/scheduled_task.md deleted file mode 100644 index eb2809f6f..000000000 --- a/doc/scheduled_task.md +++ /dev/null @@ -1,148 +0,0 @@ -`ScheduledTask` is a close relative of `Concurrent::Future` but with one important difference. A `Future` is set to execute as soon as possible whereas a `ScheduledTask` is set to execute at a specific time. This implementation is loosely based on Java's [ScheduledExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html). - -### Example - -```ruby -require 'concurrent' -require 'thread' # for Queue -require 'open-uri' # for open(uri) - -class Ticker - def get_year_end_closing(symbol, year) - uri = "http://ichart.finance.yahoo.com/table.csv?s=#{symbol}&a=11&b=01&c=#{year}&d=11&e=31&f=#{year}&g=m" - data = open(uri) {|f| f.collect{|line| line.strip } } - data[1].split(',')[4].to_f - end -end - -# Future -price = Concurrent::Future.execute{ Ticker.new.get_year_end_closing('TWTR', 2013) } -price.state #=> :pending -sleep(1) # do other stuff -price.value #=> 63.65 -price.state #=> :fulfilled - -# ScheduledTask -task = Concurrent::ScheduledTask.execute(2){ Ticker.new.get_year_end_closing('INTC', 2013) } -task.state #=> :pending -sleep(3) # do other stuff -task.value #=> 25.96 -``` - - -### Scheduling - -The *intended* schedule time of task execution is set on object construction with first argument. The time can be a numeric (floating point or integer) representing a number of seconds in the future or it can ba a `Time` object representing the approximate time of execution. Any other value, a numeric equal to or less than zero, or a time in the past will result in an exception. - -The *actual* schedule time of task execution is set when the `execute` method is called. If the *intended* schedule time was given as a number of seconds then the *actual* schedule time will be calculated from the current time. If the *intended* schedule time was given as a `Time` object the current time will be checked against the *intended* schedule time. If the *intended* schedule time is now in the past an exception will be raised. - -The constructor can also be given zero or more processing options. Currently the only supported options are those recognized by the [Dereferenceable](Dereferenceable) module. - -The final constructor argument is a block representing the task to be performed at the scheduled time. If no block is given an `ArgumentError` will be raised. - -#### States - -`ScheduledTask` mixes in the [Obligation](Obligation) module thus giving it "future" behavior. This includes the expected lifecycle states. `ScheduledTask` has one additional state, however. While the task (block) is being executed the state of the object will be `:in_progress`. This additional state is necessary because it has implications for task cancellation. - -#### Cancellation - -A `:pending` task can be cancelled using the `#cancel` method. A task in any other state, including `:in_progress`, cannot be cancelled. The `#cancel` method returns a boolean indicating the success of the cancellation attempt. A cancelled `ScheduledTask` cannot be restarted. It is immutable. - -### Obligation and Observation - -The result of a `ScheduledTask` can be obtained either synchronously or asynchronously. `ScheduledTask` mixes in both the [Obligation](Obligation) module and the [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html) module from the Ruby standard library. With one exception `ScheduledTask` behaves identically to [Future](Observable) with regard to these modules. - -Unlike `Future`, however, an observer added to a `ScheduledTask` *after* the task operation has completed will *not* receive notification. The reason for this is the subtle but important difference in intent between the two abstractions. With a `Future` there is no way to know when the operation will complete. Therefore the *expected* behavior of an observer is to be notified. With a `ScheduledTask` however, the approximate time of execution is known. It is often explicitly set as a constructor argument. It is always available via the `#schedule_time` attribute reader. Therefore it is always possible for calling code to know whether the observer is being added prior to task execution. It is also easy to add an observer long before task execution begins (since there is never a reason to create a scheduled task that starts immediately). Subsequently, the *expectation* is that the caller of `#add_observer` is making the call within an appropriate time. - -### Examples - -Successful task execution using seconds for scheduling: - -```ruby -require 'concurrent' - -task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } -task.state #=> :unscheduled -task.schedule_time #=> nil -task.execute -task.state #=> pending -task.schedule_time #=> 2013-11-07 12:20:07 -0500 - -# wait for it... -sleep(3) - -task.unscheduled? #=> false -task.pending? #=> false -task.fulfilled? #=> true -task.rejected? #=> false -task.value #=> 'What does the fox say?' -``` - -A `ScheduledTask` can be created and executed in one line: - -```ruby -task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute -task.state #=> pending -task.schedule_time #=> 2013-11-07 12:20:07 -0500 -``` - -Failed task execution using a `Time` object for scheduling: - -```ruby -t = Time.now + 2 -task = Concurrent::ScheduledTask.execute(t){ raise StandardError.new('Call me maybe?') } -task.pending? #=> true -task.schedule_time #=> 2013-11-07 12:22:01 -0500 - -# wait for it... -sleep(3) - -task.unscheduled? #=> false -task.pending? #=> false -task.fulfilled? #=> false -task.rejected? #=> true -task.value #=> nil -task.reason #=> # -``` - -An exception will be thrown on creation if the schedule time is in the past: - -```ruby -task = Concurrent::ScheduledTask.new(Time.now - 10){ nil } - #=> ArgumentError: schedule time must be in the future - -task = Concurrent::ScheduledTask.execute(-10){ nil } - #=> ArgumentError: seconds must be greater than zero -``` - -An exception will also be thrown when `#execute` is called if the current time has -progressed past the intended schedule time: - -```ruby -task = Concurrent::ScheduledTask.new(Time.now + 10){ nil } -sleep(20) - -task.execute - #=> ArgumentError: schedule time must be in the future -``` - -Task execution with observation: - -```ruby -observer = Class.new{ - def update(time, value, reason) - puts "The task completed at #{time} with value '#{value}'" - end -}.new - -task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } -task.add_observer(observer) -task.execute -task.pending? #=> true -task.schedule_time #=> 2013-11-07 12:20:07 -0500 - -# wait for it... -sleep(3) - -#>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?' -``` diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 3b70829f6..ba7ddc1d4 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -29,6 +29,24 @@ require 'concurrent/timer_task' require 'concurrent/tvar' +# @!macro [new] monotonic_clock_warning +# +# @note Time calculations one all platforms and languages are sensitive to +# changes to the system clock. To alleviate the potential problems +# associated with changing the system clock while an application is running, +# most modern operating systems provide a monotonic clock that operates +# independently of the system clock. A monotonic clock cannot be used to +# determine human-friendly clock times. A monotonic clock is used exclusively +# for calculating time intervals. Not all Ruby platforms provide access to an +# operating system monotonic clock. On these platforms a pure-Ruby monotonic +# clock will be used as a fallback. An operating system monotonic clock is both +# faster and more reliable than the pure-Ruby implementation. The pure-Ruby +# implementation should be fast and reliable enough for most non-realtime +# operations. At this time the common Ruby platforms that provide access to an +# operating system monotonic clock are MRI 2.1 and above and JRuby (all versions). +# +# @see http://linux.die.net/man/3/clock_gettime Linux clock_gettime(3) + # Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, # F#, C#, Java, and classic concurrency patterns. # diff --git a/lib/concurrent/atomic/condition.rb b/lib/concurrent/atomic/condition.rb index 17e3ac781..2cba62458 100644 --- a/lib/concurrent/atomic/condition.rb +++ b/lib/concurrent/atomic/condition.rb @@ -1,3 +1,5 @@ +require 'concurrent/utility/monotonic_time' + module Concurrent # Condition is a better implementation of standard Ruby ConditionVariable. The @@ -41,14 +43,16 @@ def initialize # @param [Mutex] mutex the locked mutex guarding the wait # @param [Object] timeout nil means no timeout # @return [Result] + # + # @!macro monotonic_clock_warning def wait(mutex, timeout = nil) - start_time = clock_time + start_time = Concurrent.monotonic_time @condition.wait(mutex, timeout) if timeout.nil? Result.new(nil) else - Result.new(start_time + timeout - clock_time) + Result.new(start_time + timeout - Concurrent.monotonic_time) end end @@ -65,21 +69,5 @@ def broadcast @condition.broadcast true end - - private - - if defined?(Process::CLOCK_MONOTONIC) - def clock_time - Process.clock_gettime Process::CLOCK_MONOTONIC - end - elsif RUBY_PLATFORM == 'java' - def clock_time - java.lang.System.nanoTime() / 1_000_000_000.0 - end - else - def clock_time - Time.now.to_f - end - end end end diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index 312d35ced..39fea3464 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -1,8 +1,9 @@ require 'thread' -require_relative 'executor' require 'concurrent/atomic/event' +require 'concurrent/executor/executor' require 'concurrent/executor/ruby_thread_pool_worker' +require 'concurrent/utility/monotonic_time' module Concurrent @@ -91,7 +92,7 @@ def initialize(opts = {}) @largest_length = 0 @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented - @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max + @last_gc_time = Concurrent.monotonic_time - [1.0, (@gc_interval * 2.0)].max end # @!macro executor_module_method_can_overflow_question @@ -225,13 +226,13 @@ def ensure_capacity? # # @!visibility private def prune_pool - if Time.now.to_f - @gc_interval >= @last_gc_time + if Concurrent.monotonic_time - @gc_interval >= @last_gc_time @pool.delete_if { |worker| worker.dead? } # send :stop for each thread over idletime @pool. - select { |worker| @idletime != 0 && Time.now.to_f - @idletime > worker.last_activity }. + select { |worker| @idletime != 0 && Concurrent.monotonic_time - @idletime > worker.last_activity }. each { @queue << :stop } - @last_gc_time = Time.now.to_f + @last_gc_time = Concurrent.monotonic_time end end diff --git a/lib/concurrent/executor/ruby_thread_pool_worker.rb b/lib/concurrent/executor/ruby_thread_pool_worker.rb index 999b125a0..0949bfc52 100644 --- a/lib/concurrent/executor/ruby_thread_pool_worker.rb +++ b/lib/concurrent/executor/ruby_thread_pool_worker.rb @@ -1,5 +1,6 @@ require 'thread' require 'concurrent/logging' +require 'concurrent/utility/monotonic_time' module Concurrent @@ -12,7 +13,7 @@ def initialize(queue, parent) @queue = queue @parent = parent @mutex = Mutex.new - @last_activity = Time.now.to_f + @last_activity = Concurrent.monotonic_time @thread = nil end @@ -64,7 +65,7 @@ def run(thread = Thread.current) # let it fail log DEBUG, ex ensure - @last_activity = Time.now.to_f + @last_activity = Concurrent.monotonic_time @parent.on_end_task end end diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index d59d7cbe4..81274ba24 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -1,15 +1,18 @@ require 'thread' -require_relative 'executor' require 'concurrent/options_parser' require 'concurrent/atomic/event' require 'concurrent/collection/priority_queue' +require 'concurrent/executor/executor' require 'concurrent/executor/single_thread_executor' +require 'concurrent/utility/monotonic_time' module Concurrent - # Executes a collection of tasks at the specified times. A master thread + # Executes a collection of tasks, each after a given delay. A master task # monitors the set and schedules each task for execution at the appropriate # time. Tasks are run on the global task pool or on the supplied executor. + # + # @!macro monotonic_clock_warning class TimerSet include RubyExecutor @@ -29,12 +32,11 @@ def initialize(opts = {}) init_executor end - # Post a task to be execute at the specified time. The given time may be either - # a `Time` object or the number of seconds to wait. If the intended execution - # time is within 1/100th of a second of the current time the task will be - # immediately post to the executor. + # Post a task to be execute run after a given delay (in seconds). If the + # delay is less than 1/100th of a second the task will be immediately post + # to the executor. # - # @param [Object] intended_time the time to schedule the task for execution + # @param [Float] delay the number of seconds to wait for before executing the task # # @yield the task to be performed # @@ -42,17 +44,19 @@ def initialize(opts = {}) # # @raise [ArgumentError] if the intended execution time is not in the future # @raise [ArgumentError] if no block is given - def post(intended_time, *args, &task) - time = TimerSet.calculate_schedule_time(intended_time).to_f + # + # @!macro deprecated_scheduling_by_clock_time + def post(delay, *args, &task) raise ArgumentError.new('no block given') unless block_given? + delay = TimerSet.calculate_delay!(delay) # raises exceptions mutex.synchronize do return false unless running? - if (time - Time.now.to_f) <= 0.01 + if (delay) <= 0.01 @task_executor.post(*args, &task) else - @queue.push(Task.new(time, args, task)) + @queue.push(Task.new(Concurrent.monotonic_time + delay, args, task)) @timer_executor.post(&method(:process_tasks)) end end @@ -61,33 +65,41 @@ def post(intended_time, *args, &task) true end + # @!visibility private + def <<(task) + post(0.0, &task) + self + end + # For a timer, #kill is like an orderly shutdown, except we need to manually # (and destructively) clear the queue first def kill mutex.synchronize { @queue.clear } + # possible race condition shutdown end - # Calculate an Epoch time with milliseconds at which to execute a - # task. If the given time is a `Time` object it will be converted - # accordingly. If the time is an integer value greater than zero - # it will be understood as a number of seconds in the future and - # will be added to the current time to calculate Epoch. + # Schedule a task to be executed after a given delay (in seconds). # - # @param [Object] intended_time the time (as a `Time` object or an integer) - # to schedule the task for execution - # @param [Time] now (Time.now) the time from which to calculate an interval + # @param [Float] delay the number of seconds to wait for before executing the task # - # @return [Fixnum] the intended time as seconds/millis from Epoch + # @return [Float] the number of seconds to delay # # @raise [ArgumentError] if the intended execution time is not in the future - def self.calculate_schedule_time(intended_time, now = Time.now) - if intended_time.is_a?(Time) - raise ArgumentError.new('schedule time must be in the future') if intended_time <= now - intended_time + # @raise [ArgumentError] if no block is given + # + # @!macro deprecated_scheduling_by_clock_time + # + # @!visibility private + def self.calculate_delay!(delay) + if delay.is_a?(Time) + warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock' + now = Time.now + raise ArgumentError.new('schedule time must be in the future') if delay <= now + delay.to_f - now.to_f else - raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0 - now + intended_time + raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0 + delay.to_f end end @@ -126,9 +138,11 @@ def process_tasks loop do task = mutex.synchronize { @queue.peek } break unless task - interval = task.time - Time.now.to_f - if interval <= 0 + now = Concurrent.monotonic_time + diff = task.time - now + + if diff <= 0 # We need to remove the task from the queue before passing # it to the executor, to avoid race conditions where we pass # the peek'ed task to the executor and then pop a different @@ -145,7 +159,7 @@ def process_tasks @task_executor.post(*task.args, &task.op) else mutex.synchronize do - @condition.wait(mutex, [interval, 60].min) + @condition.wait(mutex, [diff, 60].min) end end end diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index dea505c72..18c078742 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -4,47 +4,229 @@ module Concurrent - # {include:file:doc/scheduled_task.md} + # `ScheduledTask` is a close relative of `Concurrent::Future` but with one + # important difference: A `Future` is set to execute as soon as possible + # whereas a `ScheduledTask` is set to execute after a specified delay. This + # implementation is loosely based on Java's + # [ScheduledExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html). + # + # The *intended* schedule time of task execution is set on object construction + # with the `delay` argument. The delay is a numeric (floating point or integer) + # representing a number of seconds in the future. Any other value or a numeric + # equal to or less than zero will result in an exception. The *actual* schedule + # time of task execution is set when the `execute` method is called. + # + # The constructor can also be given zero or more processing options. Currently + # the only supported options are those recognized by the + # [Dereferenceable](Dereferenceable) module. + # + # The final constructor argument is a block representing the task to be performed. + # If no block is given an `ArgumentError` will be raised. + # + # **States** + # + # `ScheduledTask` mixes in the [Obligation](Obligation) module thus giving it + # "future" behavior. This includes the expected lifecycle states. `ScheduledTask` + # has one additional state, however. While the task (block) is being executed the + # state of the object will be `:in_progress`. This additional state is necessary + # because it has implications for task cancellation. + # + # **Cancellation** + # + # A `:pending` task can be cancelled using the `#cancel` method. A task in any + # other state, including `:in_progress`, cannot be cancelled. The `#cancel` + # method returns a boolean indicating the success of the cancellation attempt. + # A cancelled `ScheduledTask` cannot be restarted. It is immutable. + # + # **Obligation and Observation** + # + # The result of a `ScheduledTask` can be obtained either synchronously or + # asynchronously. `ScheduledTask` mixes in both the [Obligation](Obligation) + # module and the + # [Observable](http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html) + # module from the Ruby standard library. With one exception `ScheduledTask` + # behaves identically to [Future](Observable) with regard to these modules. + # + # @example Basic usage + # + # require 'concurrent' + # require 'thread' # for Queue + # require 'open-uri' # for open(uri) + # + # class Ticker + # def get_year_end_closing(symbol, year) + # uri = "http://ichart.finance.yahoo.com/table.csv?s=#{symbol}&a=11&b=01&c=#{year}&d=11&e=31&f=#{year}&g=m" + # data = open(uri) {|f| f.collect{|line| line.strip } } + # data[1].split(',')[4].to_f + # end + # end + # + # # Future + # price = Concurrent::Future.execute{ Ticker.new.get_year_end_closing('TWTR', 2013) } + # price.state #=> :pending + # sleep(1) # do other stuff + # price.value #=> 63.65 + # price.state #=> :fulfilled + # + # # ScheduledTask + # task = Concurrent::ScheduledTask.execute(2){ Ticker.new.get_year_end_closing('INTC', 2013) } + # task.state #=> :pending + # sleep(3) # do other stuff + # task.value #=> 25.96 + # + # @example Successful task execution + # + # task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } + # task.state #=> :unscheduled + # task.execute + # task.state #=> pending + # + # # wait for it... + # sleep(3) + # + # task.unscheduled? #=> false + # task.pending? #=> false + # task.fulfilled? #=> true + # task.rejected? #=> false + # task.value #=> 'What does the fox say?' + # + # @example One line creation and execution + # + # task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute + # task.state #=> pending + # + # task = Concurrent::ScheduledTask.execute(2){ 'What do you get when you multiply 6 by 9?' } + # task.state #=> pending + # + # @example Failed task execution + # + # task = Concurrent::ScheduledTask.execute(2){ raise StandardError.new('Call me maybe?') } + # task.pending? #=> true + # + # # wait for it... + # sleep(3) + # + # task.unscheduled? #=> false + # task.pending? #=> false + # task.fulfilled? #=> false + # task.rejected? #=> true + # task.value #=> nil + # task.reason #=> # + # + # @example Task execution with observation + # + # observer = Class.new{ + # def update(time, value, reason) + # puts "The task completed at #{time} with value '#{value}'" + # end + # }.new + # + # task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } + # task.add_observer(observer) + # task.execute + # task.pending? #=> true + # + # # wait for it... + # sleep(3) + # + # #>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?' + # + # @!macro monotonic_clock_warning class ScheduledTask < IVar - attr_reader :schedule_time + attr_reader :delay - def initialize(intended_time, opts = {}, &block) + # Schedule a task for execution at a specified future time. + # + # @yield the task to be performed + # + # @param [Float] delay the number of seconds to wait for before executing the task + # + # @param [Hash] opts the options controlling how the future will be processed + # @option opts [Boolean] :operation (false) when `true` will execute the future on the global + # operation pool (for long-running operations), when `false` will execute the future on the + # global task pool (for short-running tasks) + # @option opts [object] :executor when provided will run all operations on + # this executor rather than the global thread pool (overrides :operation) + # + # @!macro [attach] deprecated_scheduling_by_clock_time + # + # @note Scheduling is now based on a monotonic clock. This makes the timer much + # more accurate, but only when scheduling based on a delay interval. + # Scheduling a task based on a clock time is deprecated. It will still work + # but will not be supported in the 1.0 release. + def initialize(delay, opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? - TimerSet.calculate_schedule_time(intended_time) # raises exceptons + @delay = TimerSet.calculate_delay!(delay) super(NO_VALUE, opts) self.observers = CopyOnNotifyObserverSet.new - @intended_time = intended_time @state = :unscheduled @task = block @executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool end - # @since 0.5.0 + + # Execute an `:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending` + # and starts counting down toward execution. Does nothing if the `ScheduledTask` is + # in any state other than `:unscheduled`. + # + # @return [ScheduledTask] a reference to `self` def execute if compare_and_set_state(:pending, :unscheduled) - now = Time.now - @schedule_time = TimerSet.calculate_schedule_time(@intended_time, now) - Concurrent::timer(@schedule_time.to_f - now.to_f) { @executor.post(&method(:process_task)) } + @schedule_time = Time.now + @delay + Concurrent::timer(@delay) { @executor.post(&method(:process_task)) } self end end - # @since 0.5.0 - def self.execute(intended_time, opts = {}, &block) - return ScheduledTask.new(intended_time, opts, &block).execute + # Create a new `ScheduledTask` object with the given block, execute it, and return the + # `:pending` object. + # + # @param [Float] delay the number of seconds to wait for before executing the task + # + # @param [Hash] opts the options controlling how the future will be processed + # @option opts [Boolean] :operation (false) when `true` will execute the future on the global + # operation pool (for long-running operations), when `false` will execute the future on the + # global task pool (for short-running tasks) + # @option opts [object] :executor when provided will run all operations on + # this executor rather than the global thread pool (overrides :operation) + # + # @return [ScheduledTask] the newly created `ScheduledTask` in the `:pending` state + # + # @raise [ArgumentError] if no block is given + # + # @!macro deprecated_scheduling_by_clock_time + def self.execute(delay, opts = {}, &block) + return ScheduledTask.new(delay, opts, &block).execute + end + + # @deprecated + def schedule_time + warn '[DEPRECATED] time is now based on a monotonic clock' + @schedule_time end + # Has the task been cancelled? + # + # @return [Boolean] true if the task is in the given state else false def cancelled? state == :cancelled end + # In the task execution in progress? + # + # @return [Boolean] true if the task is in the given state else false def in_progress? state == :in_progress end + # Cancel this task and prevent it from executing. A task can only be + # cancelled if it is pending or unscheduled. + # + # @return [Boolean] true if task execution is successfully cancelled + # else false def cancel if_state(:unscheduled, :pending) do @state = :cancelled @@ -54,16 +236,11 @@ def cancel end alias_method :stop, :cancel - def add_observer(*args, &block) - if_state(:unscheduled, :pending, :in_progress) do - observers.add_observer(*args, &block) - end - end - protected :set, :fail, :complete private + # @!visibility private def process_task if compare_and_set_state(:in_progress, :pending) success, val, reason = SafeTaskExecutor.new(@task).execute diff --git a/lib/concurrent/utilities.rb b/lib/concurrent/utilities.rb index 248a21e91..9f5d8f0bb 100644 --- a/lib/concurrent/utilities.rb +++ b/lib/concurrent/utilities.rb @@ -1,3 +1,4 @@ +require 'concurrent/utility/monotonic_time' require 'concurrent/utility/processor_count' require 'concurrent/utility/timeout' require 'concurrent/utility/timer' diff --git a/lib/concurrent/utility/monotonic_time.rb b/lib/concurrent/utility/monotonic_time.rb new file mode 100644 index 000000000..aef2a136c --- /dev/null +++ b/lib/concurrent/utility/monotonic_time.rb @@ -0,0 +1,147 @@ +module Concurrent + + # Clock that cannot be set and represents monotonic time since + # some unspecified starting point. + # @!visibility private + GLOBAL_MONOTONIC_CLOCK = Class.new { + + if defined?(Process::CLOCK_MONOTONIC) + # @!visibility private + def get_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + elsif RUBY_PLATFORM == 'java' + # @!visibility private + def get_time + java.lang.System.nanoTime() / 1_000_000_000.0 + end + else + + require 'thread' + + # @!visibility private + def initialize + @mutex = Mutex.new + @correction = 0 + @last_time = Time.now.to_f + end + + # @!visibility private + def get_time + @mutex.synchronize { + @correction ||= 0 # compensating any back time shifts + now = Time.now.to_f + corrected_now = now + @correction + if @last_time < corrected_now + @last_time = corrected_now + else + @correction += @last_time - corrected_now + 0.000_001 + @last_time = @correction + now + end + } + end + end + }.new + + # @!macro [attach] monotonic_get_time + # + # Returns the current time a tracked by the application monotonic clock. + # + # @return [Float] The current monotonic time when `since` not given else + # the elapsed monotonic time between `since` and the current time + # + # @!macro monotonic_clock_warning + def monotonic_time + GLOBAL_MONOTONIC_CLOCK.get_time + end + module_function :monotonic_time +end + +__END__ + +#!/usr/bin/env ruby + +# $ ./time_test.rb +# Native: 1735.94062338, Ruby: 1425391307.2322402 +# user system total real +# Native time... +# 0.310000 0.000000 0.310000 ( 0.306102) +# Ruby time... +# 1.750000 0.000000 1.750000 ( 1.757991) +# Native interval... +# 0.360000 0.010000 0.370000 ( 0.358779) +# Ruby interval... +# 1.850000 0.000000 1.850000 ( 1.857620) +# Native: 1740.221591108, Ruby: 1425391312.2985182 + +$: << File.expand_path('./lib', __FILE__) + +require 'benchmark' +require 'thread' + +class MonotonicClock + def initialize + @mutex = Mutex.new + @correction = 0 + @last_time = Time.now.to_f + end + + def get_time_native + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def get_interval_native(since) + Process.clock_gettime(Process::CLOCK_MONOTONIC) - since.to_f + end + + def get_time_ruby + @mutex.synchronize do + @correction ||= 0 # compensating any back time shifts + now = Time.now.to_f + corrected_now = now + @correction + if @last_time < corrected_now + return @last_time = corrected_now + else + @correction += @last_time - corrected_now + 0.000_001 + return @last_time = @correction + now + end + end + end + + def get_interval_ruby(since) + get_time_ruby - since.to_f + end +end + +COUNT = 2_000_000 +CLOCK = MonotonicClock.new + +native_now = CLOCK.get_time_native +ruby_now = CLOCK.get_time_ruby + +puts "Native: #{native_now}, Ruby: #{ruby_now}" + +Benchmark.bm do |bm| + + puts "Native time..." + bm.report do + COUNT.times{ CLOCK.get_time_native } + end + + puts "Ruby time..." + bm.report do + COUNT.times{ CLOCK.get_time_ruby } + end + + puts "Native interval..." + bm.report do + COUNT.times{ CLOCK.get_interval_native(native_now) } + end + + puts "Ruby interval..." + bm.report do + COUNT.times{ CLOCK.get_interval_ruby(ruby_now) } + end +end + +puts "Native: #{CLOCK.get_time_native}, Ruby: #{CLOCK.get_time_ruby}" diff --git a/lib/concurrent/utility/timeout.rb b/lib/concurrent/utility/timeout.rb index 75ae9999b..578d84be2 100644 --- a/lib/concurrent/utility/timeout.rb +++ b/lib/concurrent/utility/timeout.rb @@ -6,6 +6,8 @@ module Concurrent # Wait the given number of seconds for the block operation to complete. + # Intended to be a simpler and more reliable replacement to the Ruby + # standard library `Timeout::timeout` method. # # @param [Integer] seconds The number of seconds to wait # @@ -14,8 +16,9 @@ module Concurrent # @raise [Concurrent::TimeoutError] when the block operation does not complete # in the allotted number of seconds. # - # @note This method is intended to be a simpler and more reliable replacement - # to the Ruby standard library `Timeout::timeout` method. + # @see http://ruby-doc.org/stdlib-2.2.0/libdoc/timeout/rdoc/Timeout.html Ruby Timeout::timeout + # + # @!macro monotonic_clock_warning def timeout(seconds) thread = Thread.new do diff --git a/lib/concurrent/utility/timer.rb b/lib/concurrent/utility/timer.rb index 139180192..86851dd7e 100644 --- a/lib/concurrent/utility/timer.rb +++ b/lib/concurrent/utility/timer.rb @@ -10,6 +10,8 @@ module Concurrent # @yield the task to execute # # @return [Boolean] true + # + # @!macro monotonic_clock_warning def timer(seconds, *args, &block) raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0 diff --git a/spec/concurrent/executor/timer_set_spec.rb b/spec/concurrent/executor/timer_set_spec.rb index b490c2c01..fe4ed0596 100644 --- a/spec/concurrent/executor/timer_set_spec.rb +++ b/spec/concurrent/executor/timer_set_spec.rb @@ -22,6 +22,7 @@ module Concurrent end it 'executes a given task when given a Time' do + warn 'deprecated syntax' latch = CountDownLatch.new(1) subject.post(Time.now + 0.1){ latch.count_down } expect(latch.wait(0.2)).to be_truthy @@ -70,6 +71,7 @@ module Concurrent end it 'raises an exception when given a task with a past Time value' do + warn 'deprecated syntax' expect { subject.post(Time.now - 10){ nil } }.to raise_error(ArgumentError) diff --git a/spec/concurrent/scheduled_task_spec.rb b/spec/concurrent/scheduled_task_spec.rb index b433ad38a..4167f1df2 100644 --- a/spec/concurrent/scheduled_task_spec.rb +++ b/spec/concurrent/scheduled_task_spec.rb @@ -53,17 +53,20 @@ def trigger_observable(observable) context '#initialize' do it 'accepts a number of seconds (from now) as the schedule time' do + expected = 60 Timecop.freeze do now = Time.now - task = ScheduledTask.new(60){ nil }.execute - expect(task.schedule_time.to_i).to eq now.to_i + 60 + task = ScheduledTask.new(expected){ nil }.execute + expect(task.delay).to be_within(0.1).of(expected) end end - it 'accepts a time object as the schedule time' do - schedule = Time.now + (60*10) + it 'accepts a Time object as the schedule time' do + warn 'deprecated syntax' + expected = 60 * 10 + schedule = Time.now + expected task = ScheduledTask.new(schedule){ nil }.execute - expect(task.schedule_time).to eq schedule + expect(task.delay).to be_within(0.1).of(expected) end it 'raises an exception when seconds is less than zero' do @@ -88,11 +91,6 @@ def trigger_observable(observable) task = ScheduledTask.new(1){ nil } expect(task).to be_unscheduled end - - it 'sets the #schedule_time to nil prior to execution' do - task = ScheduledTask.new(1){ nil } - expect(task.schedule_time).to be_nil - end end context 'instance #execute' do @@ -108,27 +106,6 @@ def trigger_observable(observable) task.execute end - it 'calculates the #schedule_time on execution' do - Timecop.freeze do - now = Time.now - task = ScheduledTask.new(5){ nil } - Timecop.travel(10) - task.execute - expect(task.schedule_time.to_i).to eq now.to_i + 15 - end - end - - it 'raises an exception if expected schedule time is in the past' do - Timecop.freeze do - schedule = Time.now + (10) - task = ScheduledTask.new(schedule){ nil } - Timecop.travel(60) - expect { - task.execute - }.to raise_error(ArgumentError) - end - end - it 'allows setting the execution interval to 0' do expect { 1000.times { ScheduledTask.execute(0) { } } }.not_to raise_error end @@ -267,24 +244,6 @@ def update(time, value, reason) expect(task.add_observer(observer)).to be_truthy end - it 'returns false for an observer added once :cancelled' do - task = ScheduledTask.new(1){ 42 } - task.cancel - expect(task.add_observer(observer)).to be_falsey - end - - it 'returns false for an observer added once :fulfilled' do - task = ScheduledTask.new(0.1){ 42 }.execute - task.value(1) - expect(task.add_observer(observer)).to be_falsey - end - - it 'returns false for an observer added once :rejected' do - task = ScheduledTask.new(0.1){ raise StandardError }.execute - task.value(0.2) - expect(task.add_observer(observer)).to be_falsey - end - it 'notifies all observers on fulfillment' do task = ScheduledTask.new(0.1){ 42 }.execute task.add_observer(observer) @@ -300,30 +259,6 @@ def update(time, value, reason) expect(observer.value).to be_nil expect(observer.reason).to be_a(StandardError) end - - it 'does not notify an observer added after fulfillment' do - expect(observer).not_to receive(:update).with(any_args) - task = ScheduledTask.new(0.1){ 42 }.execute - task.value(1) - task.add_observer(observer) - sleep(0.1) - end - - it 'does not notify an observer added after rejection' do - expect(observer).not_to receive(:update).with(any_args) - task = ScheduledTask.new(0.1){ raise StandardError }.execute - task.value(1) - task.add_observer(observer) - sleep(0.1) - end - - it 'does not notify an observer added after cancellation' do - expect(observer).not_to receive(:update).with(any_args) - task = ScheduledTask.new(0.1){ 42 }.execute - task.cancel - task.add_observer(observer) - task.value(1) - end end end end diff --git a/yardoc b/yardoc index db7483b80..5aff4706f 160000 --- a/yardoc +++ b/yardoc @@ -1 +1 @@ -Subproject commit db7483b80f6efb7df9930f0031508d7e2eb6bad0 +Subproject commit 5aff4706f70d757c15830edd66d74759ff364eda