Skip to content

Fix memory leaks, connection leaks, and optimize for 10k+ queues#1488

Closed
ShubhAtWork wants to merge 4 commits intomher:masterfrom
twofourlabs:fix/production-memory-leaks-and-queue-optimization
Closed

Fix memory leaks, connection leaks, and optimize for 10k+ queues#1488
ShubhAtWork wants to merge 4 commits intomher:masterfrom
twofourlabs:fix/production-memory-leaks-and-queue-optimization

Conversation

@ShubhAtWork
Copy link
Copy Markdown

@ShubhAtWork ShubhAtWork commented Mar 2, 2026

Summary

  • Fix unbounded growth in EventsState.counter, Prometheus metric labels, and Inspector.workers by adding periodic purge of offline/orphaned workers
  • Add backpressure queue (bounded 10k) with rate-limited drop logging to prevent IOLoop flooding under sustained event load
  • Fix connection leaks across 6 call sites (app.py, views/broker.py, views/workers.py, api/tasks.py, command.py) using context managers
  • Fix shelve file handle leaks with proper with statements
  • Make Events.stop() and Flower.stop() exception-safe with try/finally
  • Add Redis pipelining for queue stats (N×4 round trips → 1 batched pipeline)
  • Add queue stats caching with configurable TTL (queue_cache_ttl option)
  • Add API pagination (limit/offset) for /api/queues/length endpoint
  • Use frozenset filtering for RabbitMQ queue responses
  • Add RedisBase.close() for proper client cleanup
  • Fix mutable default argument in BaseHandler.get_argument
  • Fix AsyncHTTPClient singleton misuse (removed erroneous close())
  • Cap retry backoff at 60s to prevent unbounded sleep intervals

Test plan

  • All 204 existing + new unit tests pass (0 failures, 2 skipped)
  • Verify Prometheus /metrics endpoint no longer accumulates labels for purged workers
  • Load test with high event throughput to confirm backpressure queue drops gracefully
  • Deploy with 10k+ queues and confirm Redis pipelining + caching reduces latency
  • Verify no ResourceWarning for unclosed connections under python -W all

🤖 Generated with Claude Code

- Fix unbounded growth in EventsState.counter, Prometheus metrics, and
  Inspector.workers by adding periodic purge of offline workers
- Add backpressure queue (bounded to 10k) with rate-limited drop logging
  to prevent IOLoop callback flooding under sustained event load
- Fix connection leaks across 6 call sites by using context managers
- Fix shelve file handle leaks with proper context managers
- Make Events.stop() and Flower.stop() exception-safe with try/finally
- Add Redis pipelining for queue stats (40k round trips → 1 pipeline)
- Add queue stats caching with configurable TTL (queue_cache_ttl option)
- Add API pagination (limit/offset) for /api/queues/length endpoint
- Use frozenset filtering for RabbitMQ queue responses
- Add RedisBase.close() for proper client cleanup
- Fix mutable default argument in BaseHandler.get_argument
- Fix AsyncHTTPClient singleton misuse (removed erroneous close())
- Cap retry backoff at 60s to prevent unbounded sleep intervals

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR targets long-running Flower instances under heavy load (10k+ queues / sustained event throughput) by addressing multiple leak vectors (connections, file handles, in-memory caches/labels) and adding throttling/caching to reduce IOLoop and broker pressure.

Changes:

  • Add periodic purging for offline/orphaned workers and Prometheus label cleanup to prevent unbounded memory/label growth.
  • Introduce bounded backpressure for Celery events ingestion, plus safer shutdown and capped retry backoff.
  • Optimize broker queue-length fetching via Redis pipelining + TTL caching, and add pagination to /api/queues/length.

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
flower/events.py Adds backpressure queue + drain timer, metric label cleanup, capped retry backoff, and safer persistence handling.
flower/app.py Adds worker purge timer, transport caching, queue-stats cache, and purge implementation.
flower/utils/broker.py Adds Redis pipelining + chunking and RedisBase.close(), optimizes RabbitMQ filtering and adjusts timeouts.
flower/api/tasks.py Adds cached broker queue fetching and limit/offset pagination with total in response.
flower/views/broker.py Uses connection context manager, adds queue-stats cache usage, and closes broker clients when supported.
flower/views/workers.py Avoids connection leak by using a connection context manager for broker URL rendering.
flower/views/__init__.py Fixes mutable default argument in BaseHandler.get_argument using an _UNSET sentinel.
flower/inspector.py Adds purge_worker() to delete cached inspector worker state.
flower/options.py Adds queue_cache_ttl option for queue stats caching.
flower/command.py Avoids connection leak by using a connection context manager for banner logging.
docs/config.rst Documents the new queue_cache_ttl option.
tests/unit/test_events.py Adds tests for backpressure, purgeable metrics, capped retry, and stop safety.
tests/unit/test_app.py Adds tests for queue cache behavior, purge logic, and stop safety.
tests/unit/test_inspector.py Adds tests for Inspector.purge_worker().
tests/unit/utils/test_broker_queues.py Adds tests for Redis pipelining/chunking, close(), and RabbitMQ filtering.
tests/unit/views/test_base_handler.py Adds tests for _UNSET sentinel identity/uniqueness.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/unit/utils/test_broker_queues.py Outdated
Comment thread flower/app.py
Comment on lines +129 to +138
def get_cached_queue_stats(self, names_key):
"""Return cached queue stats if still valid, else None.

Returns a shallow copy to prevent callers from mutating the cache."""
if self._queue_cache_ttl <= 0 or self._queue_cache is None:
return None
ts, cached_key, result = self._queue_cache
if cached_key == names_key and (time.time() - ts) < self._queue_cache_ttl:
return list(result)
return None
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_cached_queue_stats() claims it returns a copy to prevent callers from mutating the cache, but list(result) only copies the list container—mutating any dict elements in the returned list will still mutate the cached objects. If the intent is immutability, return a deep copy (e.g., copy.deepcopy) or store/return an immutable structure (e.g., tuples / MappingProxyType).

Copilot uses AI. Check for mistakes.
Comment thread flower/api/tasks.py Outdated
Comment thread flower/events.py Outdated
auvipy and others added 3 commits March 2, 2026 21:08
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@mher
Copy link
Copy Markdown
Owner

mher commented Mar 3, 2026

Please split this into separate pull requests with one PR per functionality change, so each change can be reviewed and merged independently. Also, please explain why each change was needed, including the concrete problem it fixes.

@mher mher closed this Mar 3, 2026
@ShubhAtWork
Copy link
Copy Markdown
Author

This PR has been split into 5 separate PRs as requested, each with one focused change and a detailed explanation of the concrete problem it fixes:

  1. Fix mutable default argument in BaseHandler.get_argument #1489 — Fix mutable default argument in BaseHandler.get_argument
  2. Fix connection leaks in broker and transport access #1490 — Fix connection leaks in broker and transport access
  3. Add backpressure queue for Celery event ingestion #1491 — Add backpressure queue for Celery event ingestion
  4. Purge offline workers and clean up Prometheus metric labels #1492 — Purge offline workers and clean up Prometheus metric labels
  5. Optimize broker queue fetching with Redis pipelining, TTL cache, and pagination #1493 — Optimize broker queue fetching with Redis pipelining, TTL cache, and pagination

Each PR can be reviewed and merged independently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants