Fix memory leaks, connection leaks, and optimize for 10k+ queues#1488
Fix memory leaks, connection leaks, and optimize for 10k+ queues#1488ShubhAtWork wants to merge 4 commits intomher:masterfrom
Conversation
- Fix unbounded growth in EventsState.counter, Prometheus metrics, and Inspector.workers by adding periodic purge of offline workers - Add backpressure queue (bounded to 10k) with rate-limited drop logging to prevent IOLoop callback flooding under sustained event load - Fix connection leaks across 6 call sites by using context managers - Fix shelve file handle leaks with proper context managers - Make Events.stop() and Flower.stop() exception-safe with try/finally - Add Redis pipelining for queue stats (40k round trips → 1 pipeline) - Add queue stats caching with configurable TTL (queue_cache_ttl option) - Add API pagination (limit/offset) for /api/queues/length endpoint - Use frozenset filtering for RabbitMQ queue responses - Add RedisBase.close() for proper client cleanup - Fix mutable default argument in BaseHandler.get_argument - Fix AsyncHTTPClient singleton misuse (removed erroneous close()) - Cap retry backoff at 60s to prevent unbounded sleep intervals Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR targets long-running Flower instances under heavy load (10k+ queues / sustained event throughput) by addressing multiple leak vectors (connections, file handles, in-memory caches/labels) and adding throttling/caching to reduce IOLoop and broker pressure.
Changes:
- Add periodic purging for offline/orphaned workers and Prometheus label cleanup to prevent unbounded memory/label growth.
- Introduce bounded backpressure for Celery events ingestion, plus safer shutdown and capped retry backoff.
- Optimize broker queue-length fetching via Redis pipelining + TTL caching, and add pagination to
/api/queues/length.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
flower/events.py |
Adds backpressure queue + drain timer, metric label cleanup, capped retry backoff, and safer persistence handling. |
flower/app.py |
Adds worker purge timer, transport caching, queue-stats cache, and purge implementation. |
flower/utils/broker.py |
Adds Redis pipelining + chunking and RedisBase.close(), optimizes RabbitMQ filtering and adjusts timeouts. |
flower/api/tasks.py |
Adds cached broker queue fetching and limit/offset pagination with total in response. |
flower/views/broker.py |
Uses connection context manager, adds queue-stats cache usage, and closes broker clients when supported. |
flower/views/workers.py |
Avoids connection leak by using a connection context manager for broker URL rendering. |
flower/views/__init__.py |
Fixes mutable default argument in BaseHandler.get_argument using an _UNSET sentinel. |
flower/inspector.py |
Adds purge_worker() to delete cached inspector worker state. |
flower/options.py |
Adds queue_cache_ttl option for queue stats caching. |
flower/command.py |
Avoids connection leak by using a connection context manager for banner logging. |
docs/config.rst |
Documents the new queue_cache_ttl option. |
tests/unit/test_events.py |
Adds tests for backpressure, purgeable metrics, capped retry, and stop safety. |
tests/unit/test_app.py |
Adds tests for queue cache behavior, purge logic, and stop safety. |
tests/unit/test_inspector.py |
Adds tests for Inspector.purge_worker(). |
tests/unit/utils/test_broker_queues.py |
Adds tests for Redis pipelining/chunking, close(), and RabbitMQ filtering. |
tests/unit/views/test_base_handler.py |
Adds tests for _UNSET sentinel identity/uniqueness. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def get_cached_queue_stats(self, names_key): | ||
| """Return cached queue stats if still valid, else None. | ||
|
|
||
| Returns a shallow copy to prevent callers from mutating the cache.""" | ||
| if self._queue_cache_ttl <= 0 or self._queue_cache is None: | ||
| return None | ||
| ts, cached_key, result = self._queue_cache | ||
| if cached_key == names_key and (time.time() - ts) < self._queue_cache_ttl: | ||
| return list(result) | ||
| return None |
There was a problem hiding this comment.
get_cached_queue_stats() claims it returns a copy to prevent callers from mutating the cache, but list(result) only copies the list container—mutating any dict elements in the returned list will still mutate the cached objects. If the intent is immutability, return a deep copy (e.g., copy.deepcopy) or store/return an immutable structure (e.g., tuples / MappingProxyType).
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
Please split this into separate pull requests with one PR per functionality change, so each change can be reviewed and merged independently. Also, please explain why each change was needed, including the concrete problem it fixes. |
|
This PR has been split into 5 separate PRs as requested, each with one focused change and a detailed explanation of the concrete problem it fixes:
Each PR can be reviewed and merged independently. |
Summary
EventsState.counter, Prometheus metric labels, andInspector.workersby adding periodic purge of offline/orphaned workersapp.py,views/broker.py,views/workers.py,api/tasks.py,command.py) using context managerswithstatementsEvents.stop()andFlower.stop()exception-safe with try/finallyqueue_cache_ttloption)limit/offset) for/api/queues/lengthendpointfrozensetfiltering for RabbitMQ queue responsesRedisBase.close()for proper client cleanupBaseHandler.get_argumentAsyncHTTPClientsingleton misuse (removed erroneousclose())Test plan
/metricsendpoint no longer accumulates labels for purged workersResourceWarningfor unclosed connections underpython -W all🤖 Generated with Claude Code