Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,75 @@ def get_prometheus_metrics():
return PROMETHEUS_METRICS


class SlidingWindowTracker:
"""Tracks unprocessed tasks using sliding window buckets for Prometheus metrics."""

def __init__(self, window_minutes, metrics):
self.window_minutes = window_minutes
self.metrics = metrics
self.time_buckets = {}
self.task_to_bucket = {}
self.current_bucket_key = None
self.next_bucket_key = None

def _get_bucket_key(self, timestamp):
window_seconds = self.window_minutes * 60
return int(timestamp // window_seconds) * window_seconds

def add_task(self, task_id, timestamp):
bucket_changed = False
if self.current_bucket_key is None or timestamp >= self.next_bucket_key:
bucket_key = self._get_bucket_key(timestamp)
self.current_bucket_key = bucket_key
window_seconds = self.window_minutes * 60
self.next_bucket_key = bucket_key + window_seconds
bucket_changed = True
else:
bucket_key = self.current_bucket_key

if bucket_key not in self.time_buckets:
self.time_buckets[bucket_key] = {}

self.time_buckets[bucket_key][task_id] = True
self.task_to_bucket[task_id] = bucket_key

if bucket_changed:
self._update_metrics()

def remove_task(self, task_id):
if task_id in self.task_to_bucket:
bucket_key = self.task_to_bucket[task_id]

if bucket_key in self.time_buckets and task_id in self.time_buckets[bucket_key]:
del self.time_buckets[bucket_key][task_id]
del self.task_to_bucket[task_id]
return True

del self.task_to_bucket[task_id]

return False

def _update_metrics(self):
if len(self.time_buckets) < 3:
return

first_bucket_key = next(iter(self.time_buckets))

# Count tasks in the old bucket
bucket_tasks = self.time_buckets[first_bucket_key]
unprocessed_count = len(bucket_tasks)

# Clean up task mappings
for task_id in bucket_tasks:
if task_id in self.task_to_bucket:
del self.task_to_bucket[task_id]

# Delete the old bucket
del self.time_buckets[first_bucket_key]

self.metrics.unprocessed_tasks_in_window.set(unprocessed_count)


class PrometheusMetrics:
def __init__(self):
self.events = PrometheusCounter('flower_events_total', "Number of events", ['worker', 'type', 'task'])
Expand All @@ -52,6 +121,14 @@ def __init__(self):
"Number of tasks currently executing at a worker",
['worker']
)

if options.unprocessed_tasks_window_minutes > 0:
self.unprocessed_tasks_in_window = Gauge(
'flower_unprocessed_tasks_in_window',
"Number of tasks received but not completed for longer than the configured window"
)
else:
self.unprocessed_tasks_in_window = None


class EventsState(State):
Expand All @@ -61,6 +138,14 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.counter = collections.defaultdict(Counter)
self.metrics = get_prometheus_metrics()

if options.unprocessed_tasks_window_minutes > 0:
self.unprocessed_tasks_tracker = SlidingWindowTracker(
options.unprocessed_tasks_window_minutes,
self.metrics
)
else:
self.unprocessed_tasks_tracker = None

def event(self, event):
# Save the event
Expand Down Expand Up @@ -88,13 +173,17 @@ def event(self, event):

if event_type == 'task-received' and not task.eta and task_received:
self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).inc()
if self.unprocessed_tasks_tracker:
self.unprocessed_tasks_tracker.add_task(task_id, task_started)

if event_type == 'task-started' and not task.eta and task_started and task_received:
self.metrics.prefetch_time.labels(worker_name, task_name).set(task_started - task_received)
self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).dec()

if event_type in ['task-succeeded', 'task-failed'] and not task.eta and task_started and task_received:
self.metrics.prefetch_time.labels(worker_name, task_name).set(0)
if self.unprocessed_tasks_tracker:
self.unprocessed_tasks_tracker.remove_task(task_id)

if event_type == 'worker-online':
self.metrics.worker_online.labels(worker_name).set(1)
Expand Down
2 changes: 2 additions & 0 deletions flower/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
define("url_prefix", type=str, help="base url prefix")
define("task_runtime_metric_buckets", type=float, default=Histogram.DEFAULT_BUCKETS,
multiple=True, help="histogram latency bucket value")
define("unprocessed_tasks_window_minutes", type=int, default=0,
help="time window in minutes for tracking unprocessed tasks (e.g., 5, 10, 15, 30, 60). Set to 0 to disable.")


default_options = options