diff --git a/flower/app.py b/flower/app.py index 3427e098a..3c4c7b303 100644 --- a/flower/app.py +++ b/flower/app.py @@ -84,7 +84,10 @@ def start(self): def stop(self): if self.started: - self.events.stop() + try: + self.events.stop() + except Exception: + logger.debug("Error stopping events", exc_info=True) logging.debug("Stopping executors...") self.executor.shutdown(wait=False) logging.debug("Stopping event loop...") diff --git a/flower/events.py b/flower/events.py index cd15d7a2e..cafa432a6 100644 --- a/flower/events.py +++ b/flower/events.py @@ -1,5 +1,6 @@ import collections import logging +import queue import shelve import threading import time @@ -17,6 +18,8 @@ PROMETHEUS_METRICS = None +MAX_RETRY_INTERVAL = 60 + def get_prometheus_metrics(): global PROMETHEUS_METRICS # pylint: disable=global-statement @@ -112,6 +115,9 @@ def event(self, event): class Events(threading.Thread): events_enable_interval = 5000 + _BACKPRESSURE_MAXSIZE = 10000 + _DRAIN_INTERVAL_MS = 100 + _DRAIN_BATCH_SIZE = 500 # pylint: disable=too-many-arguments def __init__(self, capp, io_loop, db=None, persistent=False, @@ -128,13 +134,21 @@ def __init__(self, capp, io_loop, db=None, persistent=False, self.enable_events = enable_events self.state = None self.state_save_timer = None + self._drain_timer = None + self._event_queue = queue.Queue(maxsize=self._BACKPRESSURE_MAXSIZE) + self._drop_count = 0 + self._last_drop_log_time = 0.0 if self.persistent: logger.debug("Loading state from '%s'...", self.db) - state = shelve.open(self.db) - if state: - self.state = state['events'] - state.close() + try: + with shelve.open(self.db) as state: + if state: + self.state = state['events'] + except KeyError: + logger.debug("No existing state found in '%s'", self.db) + except Exception: + logger.error("Failed to load state from '%s'", self.db, exc_info=True) if state_save_interval: self.state_save_timer = PeriodicCallback(self.save_state, @@ -156,23 +170,42 @@ def start(self): logger.debug("Starting state save timer...") self.state_save_timer.start() + self._drain_timer = PeriodicCallback(self._drain_events, + self._DRAIN_INTERVAL_MS) + self._drain_timer.start() + def stop(self): - if self.enable_events: - logger.debug("Stopping enable events timer...") - self.timer.stop() + try: + if self.enable_events: + logger.debug("Stopping enable events timer...") + try: + self.timer.stop() + except Exception: + logger.debug("Error stopping enable events timer", exc_info=True) - if self.state_save_timer: - logger.debug("Stopping state save timer...") - self.state_save_timer.stop() + if self.state_save_timer: + logger.debug("Stopping state save timer...") + try: + self.state_save_timer.stop() + except Exception: + logger.debug("Error stopping state save timer", exc_info=True) - if self.persistent: - self.save_state() + if self._drain_timer: + try: + self._drain_timer.stop() + except Exception: + logger.debug("Error stopping drain timer", exc_info=True) + finally: + if self.persistent: + self.save_state() def run(self): try_interval = 1 while True: try: try_interval *= 2 + if try_interval > MAX_RETRY_INTERVAL: + try_interval = MAX_RETRY_INTERVAL with self.capp.connection() as conn: recv = EventReceiver(conn, @@ -196,9 +229,11 @@ def run(self): def save_state(self): logger.debug("Saving state to '%s'...", self.db) - state = shelve.open(self.db, flag='n') - state['events'] = self.state - state.close() + try: + with shelve.open(self.db, flag='n') as state: + state['events'] = self.state + except Exception: + logger.error("Failed to save state to '%s'", self.db, exc_info=True) def on_enable_events(self): # Periodically enable events for workers @@ -206,5 +241,30 @@ def on_enable_events(self): self.io_loop.run_in_executor(None, self.capp.control.enable_events) def on_event(self, event): - # Call EventsState.event in ioloop thread to avoid synchronization - self.io_loop.add_callback(partial(self.state.event, event)) + # Enqueue event with backpressure — drop if queue is full. + # Rate-limit drop warnings to avoid flooding logs under sustained load. + try: + self._event_queue.put_nowait(event) + except queue.Full: + self._drop_count += 1 + now = time.monotonic() + if now - self._last_drop_log_time >= 5.0: + window_start = self._last_drop_log_time or now + duration = now - window_start + logger.warning( + "Event queue full (%d), dropped %d event(s) in last %.0fs", + self._BACKPRESSURE_MAXSIZE, self._drop_count, duration) + self._drop_count = 0 + self._last_drop_log_time = now + + def _drain_events(self): + """Process up to _DRAIN_BATCH_SIZE events from the backpressure queue.""" + for _ in range(self._DRAIN_BATCH_SIZE): + try: + event = self._event_queue.get_nowait() + except queue.Empty: + break + try: + self.state.event(event) + except Exception: + logger.error("Error processing event", exc_info=True) diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py new file mode 100644 index 000000000..4fcc12399 --- /dev/null +++ b/tests/unit/test_events.py @@ -0,0 +1,156 @@ +import queue +import time +import unittest +from unittest.mock import MagicMock, patch + +from celery.events import Event +from tornado.ioloop import IOLoop + +from flower.events import Events, EventsState, get_prometheus_metrics + +import celery + + +class TestEventsState(unittest.TestCase): + def test_counter_tracks_events_by_worker(self): + state = EventsState() + state.get_or_create_worker('w1') + e = Event('worker-online', hostname='w1') + e['clock'] = 0 + e['local_received'] = time.time() + state.event(e) + + self.assertIn('w1', state.counter) + self.assertEqual(state.counter['w1']['worker-online'], 1) + + def test_counter_increments(self): + state = EventsState() + state.get_or_create_worker('w1') + for i in range(5): + e = Event('worker-heartbeat', hostname='w1', active=0) + e['clock'] = i + e['local_received'] = time.time() + state.event(e) + + self.assertEqual(state.counter['w1']['worker-heartbeat'], 5) + + +class TestEventsBackpressure(unittest.TestCase): + def test_on_event_drops_when_queue_full(self): + capp = celery.Celery() + io_loop = MagicMock() + events = Events(capp, io_loop) + # Fill the queue + for i in range(events._BACKPRESSURE_MAXSIZE): + events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'}) + + # Next event should be dropped without raising + events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'}) + self.assertEqual(events._event_queue.qsize(), events._BACKPRESSURE_MAXSIZE) + + def test_drop_logging_is_rate_limited(self): + capp = celery.Celery() + io_loop = MagicMock() + events = Events(capp, io_loop) + # Fill the queue + for i in range(events._BACKPRESSURE_MAXSIZE): + events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'}) + + # Reset drop state so we control it entirely within the patch. + # Set _last_drop_log_time far enough in the past to guarantee the + # 5-second cooldown has elapsed (time.monotonic() can be small on + # short-lived processes). + events._drop_count = 0 + events._last_drop_log_time = time.monotonic() - 10.0 + + with patch('flower.events.logger') as mock_logger: + # First drop should trigger a log (cooldown elapsed) + events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'}) + self.assertEqual(mock_logger.warning.call_count, 1) + + # Subsequent drops within 5s should NOT trigger more logs + for _ in range(99): + events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'}) + self.assertEqual(mock_logger.warning.call_count, 1) + + def test_drain_events_processes_batch(self): + capp = celery.Celery() + io_loop = MagicMock() + events = Events(capp, io_loop) + events.state = MagicMock() + + for i in range(10): + events._event_queue.put({'hostname': 'w1', 'type': 'worker-heartbeat', + 'clock': i, 'local_received': time.time()}) + + events._drain_events() + + self.assertEqual(events.state.event.call_count, 10) + self.assertTrue(events._event_queue.empty()) + + def test_drain_events_handles_errors_gracefully(self): + capp = celery.Celery() + io_loop = MagicMock() + events = Events(capp, io_loop) + events.state = MagicMock() + events.state.event.side_effect = [RuntimeError("test"), None] + + events._event_queue.put({'hostname': 'w1', 'type': 'a'}) + events._event_queue.put({'hostname': 'w1', 'type': 'b'}) + + events._drain_events() + + # Both events should be consumed despite the error on the first one + self.assertEqual(events.state.event.call_count, 2) + self.assertTrue(events._event_queue.empty()) + + def test_drain_respects_batch_size(self): + capp = celery.Celery() + io_loop = MagicMock() + events = Events(capp, io_loop) + events.state = MagicMock() + + count = events._DRAIN_BATCH_SIZE + 100 + for i in range(count): + events._event_queue.put({'hostname': 'w1', 'type': 'hb'}) + + events._drain_events() + + # Should process exactly _DRAIN_BATCH_SIZE, leaving 100 + self.assertEqual(events.state.event.call_count, events._DRAIN_BATCH_SIZE) + self.assertEqual(events._event_queue.qsize(), 100) + + +class TestEventsRetryBackoff(unittest.TestCase): + def test_retry_interval_caps_at_max(self): + from flower.events import MAX_RETRY_INTERVAL + try_interval = 1 + for _ in range(100): + try_interval *= 2 + if try_interval > MAX_RETRY_INTERVAL: + try_interval = MAX_RETRY_INTERVAL + + self.assertEqual(try_interval, MAX_RETRY_INTERVAL) + self.assertEqual(MAX_RETRY_INTERVAL, 60) + + +class TestEventsStopSafety(unittest.TestCase): + def test_stop_calls_save_state_even_if_timer_fails(self): + capp = celery.Celery() + io_loop = MagicMock() + events = Events(capp, io_loop, persistent=True, db='test_db') + + events.timer = MagicMock() + events.timer.stop.side_effect = RuntimeError("timer error") + events.state_save_timer = MagicMock() + events.state_save_timer.stop.side_effect = RuntimeError("save timer error") + events._drain_timer = MagicMock() + events._drain_timer.stop.side_effect = RuntimeError("drain timer error") + + with patch.object(events, 'save_state') as mock_save: + events.stop() + mock_save.assert_called_once() + + +if __name__ == '__main__': + unittest.main()