Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def start(self):

def stop(self):
if self.started:
self.events.stop()
try:
self.events.stop()
except Exception:
logger.debug("Error stopping events", exc_info=True)
logging.debug("Stopping executors...")
self.executor.shutdown(wait=False)
logging.debug("Stopping event loop...")
Expand Down
94 changes: 77 additions & 17 deletions flower/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import logging
import queue
import shelve
import threading
import time
Expand All @@ -17,6 +18,8 @@

PROMETHEUS_METRICS = None

MAX_RETRY_INTERVAL = 60


def get_prometheus_metrics():
global PROMETHEUS_METRICS # pylint: disable=global-statement
Expand Down Expand Up @@ -112,6 +115,9 @@ def event(self, event):

class Events(threading.Thread):
events_enable_interval = 5000
_BACKPRESSURE_MAXSIZE = 10000
_DRAIN_INTERVAL_MS = 100
_DRAIN_BATCH_SIZE = 500

# pylint: disable=too-many-arguments
def __init__(self, capp, io_loop, db=None, persistent=False,
Expand All @@ -128,13 +134,21 @@ def __init__(self, capp, io_loop, db=None, persistent=False,
self.enable_events = enable_events
self.state = None
self.state_save_timer = None
self._drain_timer = None
self._event_queue = queue.Queue(maxsize=self._BACKPRESSURE_MAXSIZE)
self._drop_count = 0
self._last_drop_log_time = 0.0

if self.persistent:
logger.debug("Loading state from '%s'...", self.db)
state = shelve.open(self.db)
if state:
self.state = state['events']
state.close()
try:
with shelve.open(self.db) as state:
if state:
self.state = state['events']
except KeyError:
logger.debug("No existing state found in '%s'", self.db)
except Exception:
logger.error("Failed to load state from '%s'", self.db, exc_info=True)

if state_save_interval:
self.state_save_timer = PeriodicCallback(self.save_state,
Expand All @@ -156,23 +170,42 @@ def start(self):
logger.debug("Starting state save timer...")
self.state_save_timer.start()

self._drain_timer = PeriodicCallback(self._drain_events,
self._DRAIN_INTERVAL_MS)
self._drain_timer.start()

def stop(self):
if self.enable_events:
logger.debug("Stopping enable events timer...")
self.timer.stop()
try:
if self.enable_events:
logger.debug("Stopping enable events timer...")
try:
self.timer.stop()
except Exception:
logger.debug("Error stopping enable events timer", exc_info=True)

if self.state_save_timer:
logger.debug("Stopping state save timer...")
self.state_save_timer.stop()
if self.state_save_timer:
logger.debug("Stopping state save timer...")
try:
self.state_save_timer.stop()
except Exception:
logger.debug("Error stopping state save timer", exc_info=True)

if self.persistent:
self.save_state()
if self._drain_timer:
try:
self._drain_timer.stop()
except Exception:
logger.debug("Error stopping drain timer", exc_info=True)
finally:
if self.persistent:
self.save_state()

def run(self):
try_interval = 1
while True:
try:
try_interval *= 2
if try_interval > MAX_RETRY_INTERVAL:
try_interval = MAX_RETRY_INTERVAL

with self.capp.connection() as conn:
recv = EventReceiver(conn,
Expand All @@ -196,15 +229,42 @@ def run(self):

def save_state(self):
logger.debug("Saving state to '%s'...", self.db)
state = shelve.open(self.db, flag='n')
state['events'] = self.state
state.close()
try:
with shelve.open(self.db, flag='n') as state:
state['events'] = self.state
except Exception:
logger.error("Failed to save state to '%s'", self.db, exc_info=True)

def on_enable_events(self):
# Periodically enable events for workers
# launched after flower
self.io_loop.run_in_executor(None, self.capp.control.enable_events)

def on_event(self, event):
# Call EventsState.event in ioloop thread to avoid synchronization
self.io_loop.add_callback(partial(self.state.event, event))
# Enqueue event with backpressure — drop if queue is full.
# Rate-limit drop warnings to avoid flooding logs under sustained load.
try:
self._event_queue.put_nowait(event)
except queue.Full:
self._drop_count += 1
now = time.monotonic()
if now - self._last_drop_log_time >= 5.0:
window_start = self._last_drop_log_time or now
duration = now - window_start
logger.warning(
"Event queue full (%d), dropped %d event(s) in last %.0fs",
self._BACKPRESSURE_MAXSIZE, self._drop_count, duration)
self._drop_count = 0
self._last_drop_log_time = now

def _drain_events(self):
"""Process up to _DRAIN_BATCH_SIZE events from the backpressure queue."""
for _ in range(self._DRAIN_BATCH_SIZE):
try:
event = self._event_queue.get_nowait()
except queue.Empty:
break
try:
self.state.event(event)
except Exception:
logger.error("Error processing event", exc_info=True)
156 changes: 156 additions & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import queue
import time
import unittest
from unittest.mock import MagicMock, patch

from celery.events import Event
from tornado.ioloop import IOLoop

from flower.events import Events, EventsState, get_prometheus_metrics

import celery


class TestEventsState(unittest.TestCase):
def test_counter_tracks_events_by_worker(self):
state = EventsState()
state.get_or_create_worker('w1')
e = Event('worker-online', hostname='w1')
e['clock'] = 0
e['local_received'] = time.time()
state.event(e)

self.assertIn('w1', state.counter)
self.assertEqual(state.counter['w1']['worker-online'], 1)

def test_counter_increments(self):
state = EventsState()
state.get_or_create_worker('w1')
for i in range(5):
e = Event('worker-heartbeat', hostname='w1', active=0)
e['clock'] = i
e['local_received'] = time.time()
state.event(e)

self.assertEqual(state.counter['w1']['worker-heartbeat'], 5)


class TestEventsBackpressure(unittest.TestCase):
def test_on_event_drops_when_queue_full(self):
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop)
# Fill the queue
for i in range(events._BACKPRESSURE_MAXSIZE):
events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'})

# Next event should be dropped without raising
events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'})
self.assertEqual(events._event_queue.qsize(), events._BACKPRESSURE_MAXSIZE)

def test_drop_logging_is_rate_limited(self):
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop)
# Fill the queue
for i in range(events._BACKPRESSURE_MAXSIZE):
events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'})

# Reset drop state so we control it entirely within the patch.
# Set _last_drop_log_time far enough in the past to guarantee the
# 5-second cooldown has elapsed (time.monotonic() can be small on
# short-lived processes).
events._drop_count = 0
events._last_drop_log_time = time.monotonic() - 10.0

with patch('flower.events.logger') as mock_logger:
# First drop should trigger a log (cooldown elapsed)
events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'})
self.assertEqual(mock_logger.warning.call_count, 1)

# Subsequent drops within 5s should NOT trigger more logs
for _ in range(99):
events.on_event({'hostname': 'w1', 'type': 'worker-heartbeat'})
self.assertEqual(mock_logger.warning.call_count, 1)

def test_drain_events_processes_batch(self):
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop)
events.state = MagicMock()

for i in range(10):
events._event_queue.put({'hostname': 'w1', 'type': 'worker-heartbeat',
'clock': i, 'local_received': time.time()})

events._drain_events()

self.assertEqual(events.state.event.call_count, 10)
self.assertTrue(events._event_queue.empty())

def test_drain_events_handles_errors_gracefully(self):
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop)
events.state = MagicMock()
events.state.event.side_effect = [RuntimeError("test"), None]

events._event_queue.put({'hostname': 'w1', 'type': 'a'})
events._event_queue.put({'hostname': 'w1', 'type': 'b'})

events._drain_events()

# Both events should be consumed despite the error on the first one
self.assertEqual(events.state.event.call_count, 2)
self.assertTrue(events._event_queue.empty())

def test_drain_respects_batch_size(self):
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop)
events.state = MagicMock()

count = events._DRAIN_BATCH_SIZE + 100
for i in range(count):
events._event_queue.put({'hostname': 'w1', 'type': 'hb'})

events._drain_events()

# Should process exactly _DRAIN_BATCH_SIZE, leaving 100
self.assertEqual(events.state.event.call_count, events._DRAIN_BATCH_SIZE)
self.assertEqual(events._event_queue.qsize(), 100)


class TestEventsRetryBackoff(unittest.TestCase):
def test_retry_interval_caps_at_max(self):
from flower.events import MAX_RETRY_INTERVAL
try_interval = 1
for _ in range(100):
try_interval *= 2
if try_interval > MAX_RETRY_INTERVAL:
try_interval = MAX_RETRY_INTERVAL

self.assertEqual(try_interval, MAX_RETRY_INTERVAL)
self.assertEqual(MAX_RETRY_INTERVAL, 60)


class TestEventsStopSafety(unittest.TestCase):
def test_stop_calls_save_state_even_if_timer_fails(self):
capp = celery.Celery()
io_loop = MagicMock()
events = Events(capp, io_loop, persistent=True, db='test_db')

Comment on lines +139 to +142
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses a hard-coded db='test_db' with persistent=True, which will cause shelve.open() to touch the real filesystem (creating/locking files in the repo working directory) and can make unit tests flaky or fail in read-only/parallel environments. Use a temporary path (e.g., tempfile.TemporaryDirectory()/NamedTemporaryFile) or patch shelve.open so the test doesn't create persistent artifacts.

Copilot uses AI. Check for mistakes.
events.timer = MagicMock()
events.timer.stop.side_effect = RuntimeError("timer error")
events.state_save_timer = MagicMock()
events.state_save_timer.stop.side_effect = RuntimeError("save timer error")
events._drain_timer = MagicMock()
events._drain_timer.stop.side_effect = RuntimeError("drain timer error")

with patch.object(events, 'save_state') as mock_save:
events.stop()
mock_save.assert_called_once()


if __name__ == '__main__':
unittest.main()
Loading