@@ -65,33 +65,34 @@ def self.executor(executor_identifier)
6565 end
6666 end
6767
68+ # @!macro [attach] executor_method_post
69+ #
70+ # Submit a task to the executor for asynchronous processing.
71+ #
72+ # @param [Array] args zero or more arguments to be passed to the task
73+ #
74+ # @yield the asynchronous task to perform
75+ #
76+ # @return [Boolean] `true` if the task is queued, `false` if the executor
77+ # is not running
78+ #
79+ # @raise [ArgumentError] if no task is given
6880 def post ( *args , &task )
6981 raise NotImplementedError
7082 end
7183
84+ # @!macro [attach] executor_method_left_shift
85+ #
86+ # Submit a task to the executor for asynchronous processing.
87+ #
88+ # @param [Proc] task the asynchronous task to perform
89+ #
90+ # @return [self] returns itself
7291 def <<( task )
7392 post ( &task )
7493 self
7594 end
7695
77- ## The policy defining how rejected tasks (tasks received once the
78- ## queue size reaches the configured `max_queue`, or after the
79- ## executor has shut down) are handled. Must be one of the values
80- ## specified in `FALLBACK_POLICIES`.
81- #attr_reader :fallback_policy
82-
83- ## def initialize(opts)
84- ## @auto_terminate = opts.fetch(:auto_terminate, true)
85- ## end
86-
87- #def auto_terminate?
88- #synchronize { ns_auto_terminate? }
89- #end
90-
91- #def auto_terminate=(value)
92- #synchronize { self.ns_auto_terminate = value }
93- #end
94-
9596 # @!macro [attach] executor_module_method_can_overflow_question
9697 #
9798 # Does the task queue have a maximum size?
@@ -103,31 +104,6 @@ def can_overflow?
103104 false
104105 end
105106
106- ## Handler which executes the `fallback_policy` once the queue size
107- ## reaches `max_queue`.
108- ##
109- ## @param [Array] args the arguments to the task which is being handled.
110- ##
111- ## @!visibility private
112- #def handle_fallback(*args)
113- #case @fallback_policy
114- #when :abort
115- #raise RejectedExecutionError
116- #when :discard
117- #false
118- #when :caller_runs
119- #begin
120- #yield(*args)
121- #rescue => ex
122- ## let it fail
123- #log DEBUG, ex
124- #end
125- #true
126- #else
127- #fail "Unknown fallback policy #{@fallback_policy}"
128- #end
129- #end
130-
131107 # @!macro [attach] executor_module_method_serialized_question
132108 #
133109 # Does this executor guarantee serialization of its operations?
@@ -140,30 +116,6 @@ def can_overflow?
140116 def serialized?
141117 false
142118 end
143-
144- #private
145-
146- #def ns_auto_terminate?
147- #!!@auto_terminate
148- #end
149-
150- #def ns_auto_terminate=(value)
151- #case value
152- #when true
153- #AtExit.add(self) { terminate_at_exit }
154- #@auto_terminate = true
155- #when false
156- #AtExit.delete(self)
157- #@auto_terminate = false
158- #else
159- #raise ArgumentError
160- #end
161- #end
162-
163- #def terminate_at_exit
164- #kill # TODO be gentle first
165- #wait_for_termination(10)
166- #end
167119 end
168120
169121 # Indicates that the including `Executor` or `ExecutorService` guarantees
@@ -193,242 +145,4 @@ def serialized?
193145 true
194146 end
195147 end
196-
197- #module RubyExecutor
198- #include Executor
199- #include Logging
200-
201- ## The set of possible fallback policies that may be set at thread pool creation.
202- #FALLBACK_POLICIES = [:abort, :discard, :caller_runs]
203-
204- ## @!macro [attach] executor_method_post
205- ##
206- ## Submit a task to the executor for asynchronous processing.
207- ##
208- ## @param [Array] args zero or more arguments to be passed to the task
209- ##
210- ## @yield the asynchronous task to perform
211- ##
212- ## @return [Boolean] `true` if the task is queued, `false` if the executor
213- ## is not running
214- ##
215- ## @raise [ArgumentError] if no task is given
216- #def post(*args, &task)
217- #raise ArgumentError.new('no block given') unless block_given?
218- #mutex.synchronize do
219- ## If the executor is shut down, reject this task
220- #return handle_fallback(*args, &task) unless running?
221- #execute(*args, &task)
222- #true
223- #end
224- #end
225-
226- ## @!macro [attach] executor_method_left_shift
227- ##
228- ## Submit a task to the executor for asynchronous processing.
229- ##
230- ## @param [Proc] task the asynchronous task to perform
231- ##
232- ## @return [self] returns itself
233- #def <<(task)
234- #post(&task)
235- #self
236- #end
237-
238- ## @!macro [attach] executor_method_running_question
239- ##
240- ## Is the executor running?
241- ##
242- ## @return [Boolean] `true` when running, `false` when shutting down or shutdown
243- #def running?
244- #!stop_event.set?
245- #end
246-
247- ## @!macro [attach] executor_method_shuttingdown_question
248- ##
249- ## Is the executor shuttingdown?
250- ##
251- ## @return [Boolean] `true` when not running and not shutdown, else `false`
252- #def shuttingdown?
253- #!(running? || shutdown?)
254- #end
255-
256- ## @!macro [attach] executor_method_shutdown_question
257- ##
258- ## Is the executor shutdown?
259- ##
260- ## @return [Boolean] `true` when shutdown, `false` when shutting down or running
261- #def shutdown?
262- #stopped_event.set?
263- #end
264-
265- ## @!macro [attach] executor_method_shutdown
266- ##
267- ## Begin an orderly shutdown. Tasks already in the queue will be executed,
268- ## but no new tasks will be accepted. Has no additional effect if the
269- ## thread pool is not running.
270- #def shutdown
271- #mutex.synchronize do
272- #break unless running?
273- #self.ns_auto_terminate = false
274- #stop_event.set
275- #shutdown_execution
276- #end
277- #true
278- #end
279-
280- ## @!macro [attach] executor_method_kill
281- ##
282- ## Begin an immediate shutdown. In-progress tasks will be allowed to
283- ## complete but enqueued tasks will be dismissed and no new tasks
284- ## will be accepted. Has no additional effect if the thread pool is
285- ## not running.
286- #def kill
287- #mutex.synchronize do
288- #break if shutdown?
289- #self.ns_auto_terminate = false
290- #stop_event.set
291- #kill_execution
292- #stopped_event.set
293- #end
294- #true
295- #end
296-
297- ## @!macro [attach] executor_method_wait_for_termination
298- ##
299- ## Block until executor shutdown is complete or until `timeout` seconds have
300- ## passed.
301- ##
302- ## @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
303- ## must be called before this method (or on another thread).
304- ##
305- ## @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
306- ##
307- ## @return [Boolean] `true` if shutdown complete or false on `timeout`
308- #def wait_for_termination(timeout = nil)
309- #stopped_event.wait(timeout)
310- #end
311-
312- #protected
313-
314- #attr_reader :mutex, :stop_event, :stopped_event
315-
316- ## @!macro [attach] executor_method_init_executor
317- ##
318- ## Initialize the executor by creating and initializing all the
319- ## internal synchronization objects.
320- #def init_executor
321- #@mutex = Mutex.new
322- #@stop_event = Event.new
323- #@stopped_event = Event.new
324- #end
325-
326- ## @!macro [attach] executor_method_execute
327- #def execute(*args, &task)
328- #raise NotImplementedError
329- #end
330-
331- ## @!macro [attach] executor_method_shutdown_execution
332- ##
333- ## Callback method called when an orderly shutdown has completed.
334- ## The default behavior is to signal all waiting threads.
335- #def shutdown_execution
336- #stopped_event.set
337- #end
338-
339- ## @!macro [attach] executor_method_kill_execution
340- ##
341- ## Callback method called when the executor has been killed.
342- ## The default behavior is to do nothing.
343- #def kill_execution
344- ## do nothing
345- #end
346- #end
347-
348- #if Concurrent.on_jruby?
349-
350- #module JavaExecutor
351- #include Executor
352- #java_import 'java.lang.Runnable'
353-
354- ## The set of possible fallback policies that may be set at thread pool creation.
355- #FALLBACK_POLICIES = {
356- #abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
357- #discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
358- #caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
359- #}.freeze
360-
361- ## @!macro executor_method_post
362- #def post(*args, &task)
363- #raise ArgumentError.new('no block given') unless block_given?
364- #return handle_fallback(*args, &task) unless running?
365- #executor_submit = @executor.java_method(:submit, [Runnable.java_class])
366- #executor_submit.call { yield(*args) }
367- #true
368- #rescue Java::JavaUtilConcurrent::RejectedExecutionException
369- #raise RejectedExecutionError
370- #end
371-
372- ## @!macro executor_method_left_shift
373- #def <<(task)
374- #post(&task)
375- #self
376- #end
377-
378- ## @!macro executor_method_running_question
379- #def running?
380- #!(shuttingdown? || shutdown?)
381- #end
382-
383- ## @!macro executor_method_shuttingdown_question
384- #def shuttingdown?
385- #if @executor.respond_to? :isTerminating
386- #@executor.isTerminating
387- #else
388- #false
389- #end
390- #end
391-
392- ## @!macro executor_method_shutdown_question
393- #def shutdown?
394- #@executor.isShutdown || @executor.isTerminated
395- #end
396-
397- ## @!macro executor_method_wait_for_termination
398- #def wait_for_termination(timeout = nil)
399- #if timeout.nil?
400- #ok = @executor.awaitTermination(60, java.util.concurrent.TimeUnit::SECONDS) until ok
401- #true
402- #else
403- #@executor.awaitTermination(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
404- #end
405- #end
406-
407- ## @!macro executor_method_shutdown
408- #def shutdown
409- #self.ns_auto_terminate = false
410- #@executor.shutdown
411- #nil
412- #end
413-
414- ## @!macro executor_method_kill
415- #def kill
416- #self.ns_auto_terminate = false
417- #@executor.shutdownNow
418- #nil
419- #end
420-
421- #protected
422-
423- ## FIXME: it's here just for synchronization in auto_terminate methods, should be replaced and solved
424- ## by the synchronization layer
425- #def mutex
426- #self
427- #end
428-
429- #def synchronize
430- #JRuby.reference0(self).synchronized { yield }
431- #end
432- #end
433- #end
434148end
0 commit comments