Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
require 'concurrent/configuration'

require 'concurrent/atomics'
require 'concurrent/collections'
require 'concurrent/errors'
require 'concurrent/executors'
require 'concurrent/utilities'
Expand Down
3 changes: 2 additions & 1 deletion lib/concurrent/agent.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'thread'
require 'concurrent/collection/copy_on_write_observer_set'
require 'concurrent/concern/dereferenceable'
require 'concurrent/concern/observable'
require 'concurrent/concern/logging'
Expand Down Expand Up @@ -95,7 +96,7 @@ def initialize(initial, opts = {})
@value = initial
@rescuers = []
@validator = Proc.new { |result| true }
self.observers = CopyOnWriteObserverSet.new
self.observers = Collection::CopyOnWriteObserverSet.new
@serialized_execution = SerializedExecution.new
@io_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
@fast_executor = Executor.executor_from_options(opts) || Concurrent.global_fast_executor
Expand Down
111 changes: 0 additions & 111 deletions lib/concurrent/atomic/copy_on_notify_observer_set.rb

This file was deleted.

115 changes: 0 additions & 115 deletions lib/concurrent/atomic/copy_on_write_observer_set.rb

This file was deleted.

2 changes: 0 additions & 2 deletions lib/concurrent/atomics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
require 'concurrent/atomic/atomic_boolean'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/atomic/condition'
require 'concurrent/atomic/copy_on_notify_observer_set'
require 'concurrent/atomic/copy_on_write_observer_set'
require 'concurrent/atomic/cyclic_barrier'
require 'concurrent/atomic/count_down_latch'
require 'concurrent/atomic/event'
Expand Down
113 changes: 113 additions & 0 deletions lib/concurrent/collection/copy_on_notify_observer_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
require 'concurrent/synchronization'

module Concurrent
module Collection

# A thread safe observer set implemented using copy-on-read approach:
# observers are added and removed from a thread safe collection; every time
# a notification is required the internal data structure is copied to
# prevent concurrency issues
class CopyOnNotifyObserverSet < Synchronization::Object

def initialize
super()
synchronize { ns_initialize }
end

# Adds an observer to this set. If a block is passed, the observer will be
# created by this method and no other params should be passed
#
# @param [Object] observer the observer to add
# @param [Symbol] func the function to call on the observer during notification.
# Default is :update
# @return [Object] the added observer
def add_observer(observer=nil, func=:update, &block)
if observer.nil? && block.nil?
raise ArgumentError, 'should pass observer as a first argument or block'
elsif observer && block
raise ArgumentError.new('cannot provide both an observer and a block')
end

if block
observer = block
func = :call
end

synchronize do
@observers[observer] = func
observer
end
end

# @param [Object] observer the observer to remove
# @return [Object] the deleted observer
def delete_observer(observer)
synchronize do
@observers.delete(observer)
observer
end
end

# Deletes all observers
# @return [CopyOnWriteObserverSet] self
def delete_observers
synchronize do
@observers.clear
self
end
end

# @return [Integer] the observers count
def count_observers
synchronize { @observers.count }
end

# Notifies all registered observers with optional args
# @param [Object] args arguments to be passed to each observer
# @return [CopyOnWriteObserverSet] self
def notify_observers(*args, &block)
observers = duplicate_observers
notify_to(observers, *args, &block)
self
end

# Notifies all registered observers with optional args and deletes them.
#
# @param [Object] args arguments to be passed to each observer
# @return [CopyOnWriteObserverSet] self
def notify_and_delete_observers(*args, &block)
observers = duplicate_and_clear_observers
notify_to(observers, *args, &block)
self
end

protected

def ns_initialize
@observers = {}
end

private

def duplicate_and_clear_observers
synchronize do
observers = @observers.dup
@observers.clear
observers
end
end

def duplicate_observers
synchronize { observers = @observers.dup }
end

def notify_to(observers, *args)
raise ArgumentError.new('cannot give arguments and a block') if block_given? && !args.empty?
observers.each do |observer, function|
args = yield if block_given?
observer.send(function, *args)
end
end
end
end
end
Loading