Skip to content

Commit 325ddbd

Browse files
krisselberglee1258561
authored andcommitted
[Serve] Remove shutdown() method from TaskProcessorAdapter (ray-project#59713)
## Description Fix @task_consumer decorator's __del__ method calling shutdown() which broadcasts to all Celery workers instead of just the local one. This kills newly started workers during rolling updates. ## Related issues None ## Additional information Removed self._adapter.shutdown() from __del__ - only stop_consumer() should be called since it targets the specific worker hostname. Also removed shutdown() implementation & from interface given it is not used anywhere --------- Signed-off-by: krisselberg <kselberg@princeton.edu>
1 parent 69f6645 commit 325ddbd

File tree

4 files changed

+0
-17
lines changed

4 files changed

+0
-17
lines changed

python/ray/serve/schema.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,13 +1582,6 @@ def stop_consumer(self, timeout: float = 10.0):
15821582
"""
15831583
pass
15841584

1585-
@abstractmethod
1586-
def shutdown(self):
1587-
"""
1588-
Shutdown the task processor and clean up resources.
1589-
"""
1590-
pass
1591-
15921585
@abstractmethod
15931586
def cancel_task_sync(self, task_id: str):
15941587
"""

python/ray/serve/task_consumer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ def initialize_callable(self, consumer_concurrency: int):
154154

155155
def __del__(self):
156156
self._adapter.stop_consumer()
157-
self._adapter.shutdown()
158157

159158
if hasattr(target_cls, "__del__"):
160159
target_cls.__del__(self)

python/ray/serve/task_processor.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,6 @@ def stop_consumer(self, timeout: float = 10.0):
225225

226226
self._worker_thread = None
227227

228-
def shutdown(self):
229-
logger.info("Shutting down Celery worker...")
230-
self._app.control.shutdown()
231-
logger.info("Celery worker shutdown complete...")
232-
233228
def cancel_task_sync(self, task_id):
234229
"""
235230
Cancels a task synchronously. Only supported for Redis and RabbitMQ brokers by Celery.

python/ray/serve/tests/unit/test_task_consumer.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ class MockTaskProcessorAdapter(TaskProcessorAdapter):
2020

2121
_start_consumer_received: bool = False
2222
_stop_consumer_received: bool = False
23-
_shutdown_received: bool = False
2423

2524
def __init__(self, config: TaskProcessorConfig):
2625
self._config = config
@@ -46,9 +45,6 @@ def start_consumer(self, **kwargs):
4645
def stop_consumer(self, timeout: float = 10.0):
4746
self._stop_consumer_received = True
4847

49-
def shutdown(self):
50-
self._shutdown_received = True
51-
5248
def cancel_task_sync(self, task_id) -> bool:
5349
pass
5450

0 commit comments

Comments
 (0)