diff --git a/lib/concurrent.rb b/lib/concurrent.rb index f2a8ce645..c5386e37d 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -5,7 +5,6 @@ require 'concurrent/configuration' require 'concurrent/atomics' -require 'concurrent/collections' require 'concurrent/errors' require 'concurrent/executors' require 'concurrent/utilities' diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index e0c2e6a0a..f7875c7a0 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -1,4 +1,5 @@ require 'thread' +require 'concurrent/collection/copy_on_write_observer_set' require 'concurrent/concern/dereferenceable' require 'concurrent/concern/observable' require 'concurrent/concern/logging' @@ -95,7 +96,7 @@ def initialize(initial, opts = {}) @value = initial @rescuers = [] @validator = Proc.new { |result| true } - self.observers = CopyOnWriteObserverSet.new + self.observers = Collection::CopyOnWriteObserverSet.new @serialized_execution = SerializedExecution.new @io_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor @fast_executor = Executor.executor_from_options(opts) || Concurrent.global_fast_executor diff --git a/lib/concurrent/atomic/copy_on_notify_observer_set.rb b/lib/concurrent/atomic/copy_on_notify_observer_set.rb deleted file mode 100644 index 8c2b5eaab..000000000 --- a/lib/concurrent/atomic/copy_on_notify_observer_set.rb +++ /dev/null @@ -1,111 +0,0 @@ -require 'concurrent/synchronization' - -module Concurrent - - # A thread safe observer set implemented using copy-on-read approach: - # observers are added and removed from a thread safe collection; every time - # a notification is required the internal data structure is copied to - # prevent concurrency issues - class CopyOnNotifyObserverSet < Synchronization::Object - - def initialize - super() - synchronize { ns_initialize } - end - - # Adds an observer to this set. If a block is passed, the observer will be - # created by this method and no other params should be passed - # - # @param [Object] observer the observer to add - # @param [Symbol] func the function to call on the observer during notification. - # Default is :update - # @return [Object] the added observer - def add_observer(observer=nil, func=:update, &block) - if observer.nil? && block.nil? - raise ArgumentError, 'should pass observer as a first argument or block' - elsif observer && block - raise ArgumentError.new('cannot provide both an observer and a block') - end - - if block - observer = block - func = :call - end - - synchronize do - @observers[observer] = func - observer - end - end - - # @param [Object] observer the observer to remove - # @return [Object] the deleted observer - def delete_observer(observer) - synchronize do - @observers.delete(observer) - observer - end - end - - # Deletes all observers - # @return [CopyOnWriteObserverSet] self - def delete_observers - synchronize do - @observers.clear - self - end - end - - # @return [Integer] the observers count - def count_observers - synchronize { @observers.count } - end - - # Notifies all registered observers with optional args - # @param [Object] args arguments to be passed to each observer - # @return [CopyOnWriteObserverSet] self - def notify_observers(*args, &block) - observers = duplicate_observers - notify_to(observers, *args, &block) - self - end - - # Notifies all registered observers with optional args and deletes them. - # - # @param [Object] args arguments to be passed to each observer - # @return [CopyOnWriteObserverSet] self - def notify_and_delete_observers(*args, &block) - observers = duplicate_and_clear_observers - notify_to(observers, *args, &block) - self - end - - protected - - def ns_initialize - @observers = {} - end - - private - - def duplicate_and_clear_observers - synchronize do - observers = @observers.dup - @observers.clear - observers - end - end - - def duplicate_observers - synchronize { observers = @observers.dup } - end - - def notify_to(observers, *args) - raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty? - observers.each do |observer, function| - args = yield if block_given? - observer.send(function, *args) - end - end - end -end diff --git a/lib/concurrent/atomic/copy_on_write_observer_set.rb b/lib/concurrent/atomic/copy_on_write_observer_set.rb deleted file mode 100644 index 085490a64..000000000 --- a/lib/concurrent/atomic/copy_on_write_observer_set.rb +++ /dev/null @@ -1,115 +0,0 @@ -require 'concurrent/synchronization' - -module Concurrent - - # A thread safe observer set implemented using copy-on-write approach: - # every time an observer is added or removed the whole internal data structure is - # duplicated and replaced with a new one. - class CopyOnWriteObserverSet < Synchronization::Object - - def initialize - super() - synchronize { ns_initialize } - end - - # Adds an observer to this set - # If a block is passed, the observer will be created by this method and no - # other params should be passed - # @param [Object] observer the observer to add - # @param [Symbol] func the function to call on the observer during notification. - # Default is :update - # @return [Object] the added observer - def add_observer(observer=nil, func=:update, &block) - if observer.nil? && block.nil? - raise ArgumentError, 'should pass observer as a first argument or block' - elsif observer && block - raise ArgumentError.new('cannot provide both an observer and a block') - end - - if block - observer = block - func = :call - end - - synchronize do - new_observers = @observers.dup - new_observers[observer] = func - @observers = new_observers - observer - end - end - - # @param [Object] observer the observer to remove - # @return [Object] the deleted observer - def delete_observer(observer) - synchronize do - new_observers = @observers.dup - new_observers.delete(observer) - @observers = new_observers - observer - end - end - - # Deletes all observers - # @return [CopyOnWriteObserverSet] self - def delete_observers - self.observers = {} - self - end - - # @return [Integer] the observers count - def count_observers - observers.count - end - - # Notifies all registered observers with optional args - # @param [Object] args arguments to be passed to each observer - # @return [CopyOnWriteObserverSet] self - def notify_observers(*args, &block) - notify_to(observers, *args, &block) - self - end - - # Notifies all registered observers with optional args and deletes them. - # - # @param [Object] args arguments to be passed to each observer - # @return [CopyOnWriteObserverSet] self - def notify_and_delete_observers(*args, &block) - old = clear_observers_and_return_old - notify_to(old, *args, &block) - self - end - - protected - - def ns_initialize - @observers = {} - end - - private - - def notify_to(observers, *args) - raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty? - observers.each do |observer, function| - args = yield if block_given? - observer.send(function, *args) - end - end - - def observers - synchronize { @observers } - end - - def observers=(new_set) - synchronize { @observers = new_set } - end - - def clear_observers_and_return_old - synchronize do - old_observers = @observers - @observers = {} - old_observers - end - end - end -end diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 50e327c29..b0827103c 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -41,8 +41,6 @@ require 'concurrent/atomic/atomic_boolean' require 'concurrent/atomic/atomic_fixnum' require 'concurrent/atomic/condition' -require 'concurrent/atomic/copy_on_notify_observer_set' -require 'concurrent/atomic/copy_on_write_observer_set' require 'concurrent/atomic/cyclic_barrier' require 'concurrent/atomic/count_down_latch' require 'concurrent/atomic/event' diff --git a/lib/concurrent/collection/copy_on_notify_observer_set.rb b/lib/concurrent/collection/copy_on_notify_observer_set.rb new file mode 100644 index 000000000..c0f4d06e8 --- /dev/null +++ b/lib/concurrent/collection/copy_on_notify_observer_set.rb @@ -0,0 +1,113 @@ +require 'concurrent/synchronization' + +module Concurrent + module Collection + + # A thread safe observer set implemented using copy-on-read approach: + # observers are added and removed from a thread safe collection; every time + # a notification is required the internal data structure is copied to + # prevent concurrency issues + class CopyOnNotifyObserverSet < Synchronization::Object + + def initialize + super() + synchronize { ns_initialize } + end + + # Adds an observer to this set. If a block is passed, the observer will be + # created by this method and no other params should be passed + # + # @param [Object] observer the observer to add + # @param [Symbol] func the function to call on the observer during notification. + # Default is :update + # @return [Object] the added observer + def add_observer(observer=nil, func=:update, &block) + if observer.nil? && block.nil? + raise ArgumentError, 'should pass observer as a first argument or block' + elsif observer && block + raise ArgumentError.new('cannot provide both an observer and a block') + end + + if block + observer = block + func = :call + end + + synchronize do + @observers[observer] = func + observer + end + end + + # @param [Object] observer the observer to remove + # @return [Object] the deleted observer + def delete_observer(observer) + synchronize do + @observers.delete(observer) + observer + end + end + + # Deletes all observers + # @return [CopyOnWriteObserverSet] self + def delete_observers + synchronize do + @observers.clear + self + end + end + + # @return [Integer] the observers count + def count_observers + synchronize { @observers.count } + end + + # Notifies all registered observers with optional args + # @param [Object] args arguments to be passed to each observer + # @return [CopyOnWriteObserverSet] self + def notify_observers(*args, &block) + observers = duplicate_observers + notify_to(observers, *args, &block) + self + end + + # Notifies all registered observers with optional args and deletes them. + # + # @param [Object] args arguments to be passed to each observer + # @return [CopyOnWriteObserverSet] self + def notify_and_delete_observers(*args, &block) + observers = duplicate_and_clear_observers + notify_to(observers, *args, &block) + self + end + + protected + + def ns_initialize + @observers = {} + end + + private + + def duplicate_and_clear_observers + synchronize do + observers = @observers.dup + @observers.clear + observers + end + end + + def duplicate_observers + synchronize { observers = @observers.dup } + end + + def notify_to(observers, *args) + raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty? + observers.each do |observer, function| + args = yield if block_given? + observer.send(function, *args) + end + end + end + end +end diff --git a/lib/concurrent/collection/copy_on_write_observer_set.rb b/lib/concurrent/collection/copy_on_write_observer_set.rb new file mode 100644 index 000000000..95f8c39da --- /dev/null +++ b/lib/concurrent/collection/copy_on_write_observer_set.rb @@ -0,0 +1,117 @@ +require 'concurrent/synchronization' + +module Concurrent + module Collection + + # A thread safe observer set implemented using copy-on-write approach: + # every time an observer is added or removed the whole internal data structure is + # duplicated and replaced with a new one. + class CopyOnWriteObserverSet < Synchronization::Object + + def initialize + super() + synchronize { ns_initialize } + end + + # Adds an observer to this set + # If a block is passed, the observer will be created by this method and no + # other params should be passed + # @param [Object] observer the observer to add + # @param [Symbol] func the function to call on the observer during notification. + # Default is :update + # @return [Object] the added observer + def add_observer(observer=nil, func=:update, &block) + if observer.nil? && block.nil? + raise ArgumentError, 'should pass observer as a first argument or block' + elsif observer && block + raise ArgumentError.new('cannot provide both an observer and a block') + end + + if block + observer = block + func = :call + end + + synchronize do + new_observers = @observers.dup + new_observers[observer] = func + @observers = new_observers + observer + end + end + + # @param [Object] observer the observer to remove + # @return [Object] the deleted observer + def delete_observer(observer) + synchronize do + new_observers = @observers.dup + new_observers.delete(observer) + @observers = new_observers + observer + end + end + + # Deletes all observers + # @return [CopyOnWriteObserverSet] self + def delete_observers + self.observers = {} + self + end + + # @return [Integer] the observers count + def count_observers + observers.count + end + + # Notifies all registered observers with optional args + # @param [Object] args arguments to be passed to each observer + # @return [CopyOnWriteObserverSet] self + def notify_observers(*args, &block) + notify_to(observers, *args, &block) + self + end + + # Notifies all registered observers with optional args and deletes them. + # + # @param [Object] args arguments to be passed to each observer + # @return [CopyOnWriteObserverSet] self + def notify_and_delete_observers(*args, &block) + old = clear_observers_and_return_old + notify_to(old, *args, &block) + self + end + + protected + + def ns_initialize + @observers = {} + end + + private + + def notify_to(observers, *args) + raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty? + observers.each do |observer, function| + args = yield if block_given? + observer.send(function, *args) + end + end + + def observers + synchronize { @observers } + end + + def observers=(new_set) + synchronize { @observers = new_set } + end + + def clear_observers_and_return_old + synchronize do + old_observers = @observers + @observers = {} + old_observers + end + end + end + end +end diff --git a/lib/concurrent/collection/priority_queue.rb b/lib/concurrent/collection/priority_queue.rb index 1e49b14b7..cc088b319 100644 --- a/lib/concurrent/collection/priority_queue.rb +++ b/lib/concurrent/collection/priority_queue.rb @@ -1,348 +1,350 @@ module Concurrent + module Collection - # @!macro [attach] priority_queue - # - # A queue collection in which the elements are sorted based on their - # comparison (spaceship) operator `<=>`. Items are added to the queue - # at a position relative to their priority. On removal the element - # with the "highest" priority is removed. By default the sort order is - # from highest to lowest, but a lowest-to-highest sort order can be - # set on construction. - # - # The API is based on the `Queue` class from the Ruby standard library. - # - # The pure Ruby implementation, `MutexPriorityQueue` uses a heap algorithm - # stored in an array. The algorithm is based on the work of Robert Sedgewick - # and Kevin Wayne. - # - # The JRuby native implementation is a thin wrapper around the standard - # library `java.util.PriorityQueue`. - # - # When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`. - # When running under all other interpreters it extends `MutexPriorityQueue`. - # - # @note This implementation is *not* thread safe. - # - # @see http://en.wikipedia.org/wiki/Priority_queue - # @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html - # - # @see http://algs4.cs.princeton.edu/24pq/index.php#2.6 - # @see http://algs4.cs.princeton.edu/24pq/MaxPQ.java.html - # - # @see http://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html - class MutexPriorityQueue - - # @!macro [attach] priority_queue_method_initialize + # @!macro [attach] priority_queue # - # Create a new priority queue with no items. - # - # @param [Hash] opts the options for creating the queue - # @option opts [Symbol] :order (:max) dictates the order in which items are - # stored: from highest to lowest when `:max` or `:high`; from lowest to - # highest when `:min` or `:low` - def initialize(opts = {}) - order = opts.fetch(:order, :max) - @comparator = [:min, :low].include?(order) ? -1 : 1 - clear - end - - # @!macro [attach] priority_queue_method_clear + # A queue collection in which the elements are sorted based on their + # comparison (spaceship) operator `<=>`. Items are added to the queue + # at a position relative to their priority. On removal the element + # with the "highest" priority is removed. By default the sort order is + # from highest to lowest, but a lowest-to-highest sort order can be + # set on construction. # - # Removes all of the elements from this priority queue. - def clear - @queue = [nil] - @length = 0 - true - end - - # @!macro [attach] priority_queue_method_delete + # The API is based on the `Queue` class from the Ruby standard library. # - # Deletes all items from `self` that are equal to `item`. - # - # @param [Object] item the item to be removed from the queue - # @return [Object] true if the item is found else false - def delete(item) - original_length = @length - k = 1 - while k <= @length - if @queue[k] == item - swap(k, @length) - @length -= 1 - sink(k) - @queue.pop - else - k += 1 - end - end - @length != original_length - end - - # @!macro [attach] priority_queue_method_empty - # - # Returns `true` if `self` contains no elements. - # - # @return [Boolean] true if there are no items in the queue else false - def empty? - size == 0 - end - - # @!macro [attach] priority_queue_method_include + # The pure Ruby implementation, `MutexPriorityQueue` uses a heap algorithm + # stored in an array. The algorithm is based on the work of Robert Sedgewick + # and Kevin Wayne. # - # Returns `true` if the given item is present in `self` (that is, if any - # element == `item`), otherwise returns false. - # - # @param [Object] item the item to search for - # - # @return [Boolean] true if the item is found else false - def include?(item) - @queue.include?(item) - end - alias_method :has_priority?, :include? - - # @!macro [attach] priority_queue_method_length - # - # The current length of the queue. - # - # @return [Fixnum] the number of items in the queue - def length - @length - end - alias_method :size, :length - - # @!macro [attach] priority_queue_method_peek - # - # Retrieves, but does not remove, the head of this queue, or returns `nil` - # if this queue is empty. - # - # @return [Object] the head of the queue or `nil` when empty - def peek - @queue[1] - end - - # @!macro [attach] priority_queue_method_pop - # - # Retrieves and removes the head of this queue, or returns `nil` if this - # queue is empty. - # - # @return [Object] the head of the queue or `nil` when empty - def pop - max = @queue[1] - swap(1, @length) - @length -= 1 - sink(1) - @queue.pop - max - end - alias_method :deq, :pop - alias_method :shift, :pop - - # @!macro [attach] priority_queue_method_push - # - # Inserts the specified element into this priority queue. - # - # @param [Object] item the item to insert onto the queue - def push(item) - @length += 1 - @queue << item - swim(@length) - true - end - alias_method :<<, :push - alias_method :enq, :push - - # @!macro [attach] priority_queue_method_from_list - # - # Create a new priority queue from the given list. - # - # @param [Enumerable] list the list to build the queue from - # @param [Hash] opts the options for creating the queue - # - # @return [PriorityQueue] the newly created and populated queue - def self.from_list(list, opts = {}) - queue = new(opts) - list.each{|item| queue << item } - queue - end - - protected - - # Exchange the values at the given indexes within the internal array. - # - # @param [Integer] x the first index to swap - # @param [Integer] y the second index to swap - # - # @!visibility private - def swap(x, y) - temp = @queue[x] - @queue[x] = @queue[y] - @queue[y] = temp - end - - # Are the items at the given indexes ordered based on the priority - # order specified at construction? + # The JRuby native implementation is a thin wrapper around the standard + # library `java.util.PriorityQueue`. # - # @param [Integer] x the first index from which to retrieve a comparable value - # @param [Integer] y the second index from which to retrieve a comparable value + # When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`. + # When running under all other interpreters it extends `MutexPriorityQueue`. # - # @return [Boolean] true if the two elements are in the correct priority order - # else false - # - # @!visibility private - def ordered?(x, y) - (@queue[x] <=> @queue[y]) == @comparator - end - - # Percolate down to maintain heap invariant. - # - # @param [Integer] k the index at which to start the percolation - # - # @!visibility private - def sink(k) - while (j = (2 * k)) <= @length do - j += 1 if j < @length && ! ordered?(j, j+1) - break if ordered?(k, j) - swap(k, j) - k = j - end - end - - # Percolate up to maintain heap invariant. - # - # @param [Integer] k the index at which to start the percolation - # - # @!visibility private - def swim(k) - while k > 1 && ! ordered?(k/2, k) do - swap(k, k/2) - k = k/2 - end - end - end - - if Concurrent.on_jruby? - - # @!macro priority_queue - class JavaPriorityQueue - - # @!macro priority_queue_method_initialize + # @note This implementation is *not* thread safe. + # + # @see http://en.wikipedia.org/wiki/Priority_queue + # @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html + # + # @see http://algs4.cs.princeton.edu/24pq/index.php#2.6 + # @see http://algs4.cs.princeton.edu/24pq/MaxPQ.java.html + # + # @see http://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html + class MutexPriorityQueue + + # @!macro [attach] priority_queue_method_initialize + # + # Create a new priority queue with no items. + # + # @param [Hash] opts the options for creating the queue + # @option opts [Symbol] :order (:max) dictates the order in which items are + # stored: from highest to lowest when `:max` or `:high`; from lowest to + # highest when `:min` or `:low` def initialize(opts = {}) order = opts.fetch(:order, :max) - if [:min, :low].include?(order) - @queue = java.util.PriorityQueue.new(11) # 11 is the default initial capacity - else - @queue = java.util.PriorityQueue.new(11, java.util.Collections.reverseOrder()) - end + @comparator = [:min, :low].include?(order) ? -1 : 1 + clear end - # @!macro priority_queue_method_clear + # @!macro [attach] priority_queue_method_clear + # + # Removes all of the elements from this priority queue. def clear - @queue.clear + @queue = [nil] + @length = 0 true end - # @!macro priority_queue_method_delete + # @!macro [attach] priority_queue_method_delete + # + # Deletes all items from `self` that are equal to `item`. + # + # @param [Object] item the item to be removed from the queue + # @return [Object] true if the item is found else false def delete(item) - found = false - while @queue.remove(item) do - found = true + original_length = @length + k = 1 + while k <= @length + if @queue[k] == item + swap(k, @length) + @length -= 1 + sink(k) + @queue.pop + else + k += 1 + end end - found + @length != original_length end - # @!macro priority_queue_method_empty + # @!macro [attach] priority_queue_method_empty + # + # Returns `true` if `self` contains no elements. + # + # @return [Boolean] true if there are no items in the queue else false def empty? - @queue.size == 0 + size == 0 end - # @!macro priority_queue_method_include + # @!macro [attach] priority_queue_method_include + # + # Returns `true` if the given item is present in `self` (that is, if any + # element == `item`), otherwise returns false. + # + # @param [Object] item the item to search for + # + # @return [Boolean] true if the item is found else false def include?(item) - @queue.contains(item) + @queue.include?(item) end alias_method :has_priority?, :include? - # @!macro priority_queue_method_length + # @!macro [attach] priority_queue_method_length + # + # The current length of the queue. + # + # @return [Fixnum] the number of items in the queue def length - @queue.size + @length end alias_method :size, :length - # @!macro priority_queue_method_peek + # @!macro [attach] priority_queue_method_peek + # + # Retrieves, but does not remove, the head of this queue, or returns `nil` + # if this queue is empty. + # + # @return [Object] the head of the queue or `nil` when empty def peek - @queue.peek + @queue[1] end - # @!macro priority_queue_method_pop + # @!macro [attach] priority_queue_method_pop + # + # Retrieves and removes the head of this queue, or returns `nil` if this + # queue is empty. + # + # @return [Object] the head of the queue or `nil` when empty def pop - @queue.poll + max = @queue[1] + swap(1, @length) + @length -= 1 + sink(1) + @queue.pop + max end alias_method :deq, :pop alias_method :shift, :pop - # @!macro priority_queue_method_push + # @!macro [attach] priority_queue_method_push + # + # Inserts the specified element into this priority queue. + # + # @param [Object] item the item to insert onto the queue def push(item) - @queue.add(item) + @length += 1 + @queue << item + swim(@length) + true end alias_method :<<, :push alias_method :enq, :push - # @!macro priority_queue_method_from_list + # @!macro [attach] priority_queue_method_from_list + # + # Create a new priority queue from the given list. + # + # @param [Enumerable] list the list to build the queue from + # @param [Hash] opts the options for creating the queue + # + # @return [PriorityQueue] the newly created and populated queue def self.from_list(list, opts = {}) queue = new(opts) list.each{|item| queue << item } queue end + + protected + + # Exchange the values at the given indexes within the internal array. + # + # @param [Integer] x the first index to swap + # @param [Integer] y the second index to swap + # + # @!visibility private + def swap(x, y) + temp = @queue[x] + @queue[x] = @queue[y] + @queue[y] = temp + end + + # Are the items at the given indexes ordered based on the priority + # order specified at construction? + # + # @param [Integer] x the first index from which to retrieve a comparable value + # @param [Integer] y the second index from which to retrieve a comparable value + # + # @return [Boolean] true if the two elements are in the correct priority order + # else false + # + # @!visibility private + def ordered?(x, y) + (@queue[x] <=> @queue[y]) == @comparator + end + + # Percolate down to maintain heap invariant. + # + # @param [Integer] k the index at which to start the percolation + # + # @!visibility private + def sink(k) + while (j = (2 * k)) <= @length do + j += 1 if j < @length && ! ordered?(j, j+1) + break if ordered?(k, j) + swap(k, j) + k = j + end + end + + # Percolate up to maintain heap invariant. + # + # @param [Integer] k the index at which to start the percolation + # + # @!visibility private + def swim(k) + while k > 1 && ! ordered?(k/2, k) do + swap(k, k/2) + k = k/2 + end + end end - end - PriorityQueueImplementation = case - when Concurrent.on_jruby? - JavaPriorityQueue - else - MutexPriorityQueue - end - private_constant :PriorityQueueImplementation + if Concurrent.on_jruby? - # @!macro priority_queue - class PriorityQueue < PriorityQueueImplementation + # @!macro priority_queue + class JavaPriorityQueue - alias_method :has_priority?, :include? + # @!macro priority_queue_method_initialize + def initialize(opts = {}) + order = opts.fetch(:order, :max) + if [:min, :low].include?(order) + @queue = java.util.PriorityQueue.new(11) # 11 is the default initial capacity + else + @queue = java.util.PriorityQueue.new(11, java.util.Collections.reverseOrder()) + end + end - alias_method :size, :length + # @!macro priority_queue_method_clear + def clear + @queue.clear + true + end - alias_method :deq, :pop - alias_method :shift, :pop + # @!macro priority_queue_method_delete + def delete(item) + found = false + while @queue.remove(item) do + found = true + end + found + end - alias_method :<<, :push - alias_method :enq, :push + # @!macro priority_queue_method_empty + def empty? + @queue.size == 0 + end - # @!method initialize(opts = {}) - # @!macro priority_queue_method_initialize + # @!macro priority_queue_method_include + def include?(item) + @queue.contains(item) + end + alias_method :has_priority?, :include? - # @!method clear - # @!macro priority_queue_method_clear + # @!macro priority_queue_method_length + def length + @queue.size + end + alias_method :size, :length - # @!method delete(item) - # @!macro priority_queue_method_delete + # @!macro priority_queue_method_peek + def peek + @queue.peek + end - # @!method empty? - # @!macro priority_queue_method_empty + # @!macro priority_queue_method_pop + def pop + @queue.poll + end + alias_method :deq, :pop + alias_method :shift, :pop - # @!method include?(item) - # @!macro priority_queue_method_include + # @!macro priority_queue_method_push + def push(item) + @queue.add(item) + end + alias_method :<<, :push + alias_method :enq, :push + + # @!macro priority_queue_method_from_list + def self.from_list(list, opts = {}) + queue = new(opts) + list.each{|item| queue << item } + queue + end + end + end - # @!method length - # @!macro priority_queue_method_length + PriorityQueueImplementation = case + when Concurrent.on_jruby? + JavaPriorityQueue + else + MutexPriorityQueue + end + private_constant :PriorityQueueImplementation - # @!method peek - # @!macro priority_queue_method_peek + # @!macro priority_queue + class PriorityQueue < PriorityQueueImplementation + + alias_method :has_priority?, :include? + + alias_method :size, :length - # @!method pop - # @!macro priority_queue_method_pop + alias_method :deq, :pop + alias_method :shift, :pop + + alias_method :<<, :push + alias_method :enq, :push - # @!method push(item) - # @!macro priority_queue_method_push + # @!method initialize(opts = {}) + # @!macro priority_queue_method_initialize - # @!method self.from_list(list, opts = {}) - # @!macro priority_queue_method_from_list + # @!method clear + # @!macro priority_queue_method_clear + + # @!method delete(item) + # @!macro priority_queue_method_delete + + # @!method empty? + # @!macro priority_queue_method_empty + + # @!method include?(item) + # @!macro priority_queue_method_include + + # @!method length + # @!macro priority_queue_method_length + + # @!method peek + # @!macro priority_queue_method_peek + + # @!method pop + # @!macro priority_queue_method_pop + + # @!method push(item) + # @!macro priority_queue_method_push + + # @!method self.from_list(list, opts = {}) + # @!macro priority_queue_method_from_list + end end end diff --git a/lib/concurrent/collections.rb b/lib/concurrent/collections.rb deleted file mode 100644 index 8f1e2bb04..000000000 --- a/lib/concurrent/collections.rb +++ /dev/null @@ -1 +0,0 @@ -require 'concurrent/collection/priority_queue' diff --git a/lib/concurrent/concern/observable.rb b/lib/concurrent/concern/observable.rb index 5069c187f..d01a76386 100644 --- a/lib/concurrent/concern/observable.rb +++ b/lib/concurrent/concern/observable.rb @@ -1,5 +1,5 @@ -require 'concurrent/atomic/copy_on_notify_observer_set' -require 'concurrent/atomic/copy_on_write_observer_set' +require 'concurrent/collection/copy_on_notify_observer_set' +require 'concurrent/collection/copy_on_write_observer_set' module Concurrent module Concern diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 71acd7608..91bb6bca2 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -76,7 +76,7 @@ def kill # @param [Hash] opts the options to create the object with. # @!visibility private def ns_initialize(opts) - @queue = PriorityQueue.new(order: :min) + @queue = Collection::PriorityQueue.new(order: :min) @task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor @timer_executor = SingleThreadExecutor.new @condition = Event.new diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 75c2ab56f..62bb9f712 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -1,6 +1,7 @@ require 'thread' require 'concurrent/errors' +require 'concurrent/collection/copy_on_write_observer_set' require 'concurrent/concern/obligation' require 'concurrent/concern/observable' require 'concurrent/synchronization' @@ -155,7 +156,7 @@ def try_set(value = NO_VALUE, &block) def ns_initialize(value, opts) value = yield if block_given? init_obligation(self) - self.observers = CopyOnWriteObserverSet.new + self.observers = Collection::CopyOnWriteObserverSet.new set_deref_options(opts) if value == NO_VALUE diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index cbfc73023..de6b7042d 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -1,6 +1,7 @@ require 'concurrent/errors' require 'concurrent/ivar' require 'concurrent/configuration' +require 'concurrent/collection/copy_on_notify_observer_set' require 'concurrent/executor/executor' require 'concurrent/executor/timer_set' require 'concurrent/utility/monotonic_time' @@ -178,7 +179,7 @@ def initialize(delay, opts = {}, &task) @task = task @time = nil @executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor - self.observers = CopyOnNotifyObserverSet.new + self.observers = Collection::CopyOnNotifyObserverSet.new end end diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index faa9685a9..042be35e6 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -1,3 +1,4 @@ +require 'concurrent/collection/copy_on_notify_observer_set' require 'concurrent/concern/dereferenceable' require 'concurrent/concern/observable' require 'concurrent/atomic/atomic_boolean' @@ -279,7 +280,7 @@ def ns_initialize(opts, &task) @executor = Concurrent::SafeTaskExecutor.new(task) @running = Concurrent::AtomicBoolean.new(false) - self.observers = CopyOnNotifyObserverSet.new + self.observers = Collection::CopyOnNotifyObserverSet.new end # @!visibility private diff --git a/spec/concurrent/atomic/copy_on_notify_observer_set_spec.rb b/spec/concurrent/atomic/copy_on_notify_observer_set_spec.rb deleted file mode 100644 index 6b1ed6d44..000000000 --- a/spec/concurrent/atomic/copy_on_notify_observer_set_spec.rb +++ /dev/null @@ -1,9 +0,0 @@ -require_relative 'observer_set_shared' - -module Concurrent - - describe CopyOnNotifyObserverSet do - it_behaves_like 'an observer set' - end - -end diff --git a/spec/concurrent/atomic/copy_on_write_observer_set_spec.rb b/spec/concurrent/atomic/copy_on_write_observer_set_spec.rb deleted file mode 100644 index ff7dad77d..000000000 --- a/spec/concurrent/atomic/copy_on_write_observer_set_spec.rb +++ /dev/null @@ -1,9 +0,0 @@ -require_relative 'observer_set_shared' - -module Concurrent - - describe CopyOnWriteObserverSet do - it_behaves_like 'an observer set' - end - -end diff --git a/spec/concurrent/collection/copy_on_notify_observer_set_spec.rb b/spec/concurrent/collection/copy_on_notify_observer_set_spec.rb new file mode 100644 index 000000000..e533252c4 --- /dev/null +++ b/spec/concurrent/collection/copy_on_notify_observer_set_spec.rb @@ -0,0 +1,10 @@ +require_relative 'observer_set_shared' + +module Concurrent + module Collection + + describe CopyOnNotifyObserverSet do + it_behaves_like 'an observer set' + end + end +end diff --git a/spec/concurrent/collection/copy_on_write_observer_set_spec.rb b/spec/concurrent/collection/copy_on_write_observer_set_spec.rb new file mode 100644 index 000000000..a18eb9e75 --- /dev/null +++ b/spec/concurrent/collection/copy_on_write_observer_set_spec.rb @@ -0,0 +1,10 @@ +require_relative 'observer_set_shared' + +module Concurrent + module Collection + + describe CopyOnWriteObserverSet do + it_behaves_like 'an observer set' + end + end +end diff --git a/spec/concurrent/atomic/observer_set_shared.rb b/spec/concurrent/collection/observer_set_shared.rb similarity index 100% rename from spec/concurrent/atomic/observer_set_shared.rb rename to spec/concurrent/collection/observer_set_shared.rb diff --git a/spec/concurrent/collection/priority_queue_spec.rb b/spec/concurrent/collection/priority_queue_spec.rb index ae5b16256..072948f02 100644 --- a/spec/concurrent/collection/priority_queue_spec.rb +++ b/spec/concurrent/collection/priority_queue_spec.rb @@ -287,28 +287,30 @@ end module Concurrent + module Collection - describe MutexPriorityQueue do - - it_should_behave_like :priority_queue - end - - if Concurrent.on_jruby? - - describe JavaPriorityQueue do + describe MutexPriorityQueue do it_should_behave_like :priority_queue end - end - describe PriorityQueue do if Concurrent.on_jruby? - it 'inherits from JavaPriorityQueue' do - expect(PriorityQueue.ancestors).to include(JavaPriorityQueue) + + describe JavaPriorityQueue do + + it_should_behave_like :priority_queue end - else - it 'inherits from MutexPriorityQueue' do - expect(PriorityQueue.ancestors).to include(MutexPriorityQueue) + end + + describe PriorityQueue do + if Concurrent.on_jruby? + it 'inherits from JavaPriorityQueue' do + expect(PriorityQueue.ancestors).to include(JavaPriorityQueue) + end + else + it 'inherits from MutexPriorityQueue' do + expect(PriorityQueue.ancestors).to include(MutexPriorityQueue) + end end end end diff --git a/spec/concurrent/concern/observable_spec.rb b/spec/concurrent/concern/observable_spec.rb index 87f87c106..30b478892 100644 --- a/spec/concurrent/concern/observable_spec.rb +++ b/spec/concurrent/concern/observable_spec.rb @@ -22,7 +22,7 @@ module Concern end it 'uses the given observer set' do - expected = CopyOnWriteObserverSet.new + expected = Collection::CopyOnWriteObserverSet.new subject.observers = expected expect(subject.observers).to eql expected end