[BugFix][Metrics] Fix Prometheus Multiprocess Metrics Issues and Add ZMQ Communication Metrics#5185
Conversation
|
Thanks for your contribution! |
fastdeploy/__init__.py
Outdated
| import typing | ||
|
|
||
| # first import prometheus setup to set PROMETHEUS_MULTIPROC_DIR | ||
| # 否则会因为Prometheus包先被导入导致无法正确设置多进程 |
There was a problem hiding this comment.
Pull request overview
This PR addresses critical bugs in Prometheus metrics collection for multi-process environments and adds ZMQ communication observability metrics.
- Fixes metric aggregation failures in multi-process mode by separating Gauge metrics (read from memory) from Counter/Histogram metrics (read from shared filesystem)
- Corrects initialization order by setting
PROMETHEUS_MULTIPROC_DIRbefore Prometheus client loads in__init__.py - Adds comprehensive ZMQ metrics (
fastdeploy:zmq:*) to monitor message throughput, failures, and latency
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/init.py | Sets up Prometheus multiprocess directory early to ensure proper initialization before client loads |
| fastdeploy/metrics/prometheus_multiprocess_setup.py | New module to handle Prometheus multiprocess directory setup with user environment variable prioritization |
| fastdeploy/metrics/metrics.py | Refactored metric collection logic to properly handle multi-process aggregation; added ZMQ, HTTP, and server metrics; separated Gauge metrics for correct handling |
| fastdeploy/metrics/metrics_middleware.py | New middleware to track HTTP request metrics (requests total, duration) |
| fastdeploy/metrics/stats.py | New dataclass to hold ZMQ metrics statistics |
| fastdeploy/inter_communicator/zmq_server.py | Added ZMQ metrics collection with message wrapping for latency tracking |
| fastdeploy/inter_communicator/zmq_client.py | Added ZMQ metrics collection with message wrapping for latency tracking |
| fastdeploy/entrypoints/openai/api_server.py | Simplified metrics endpoint and integrated PrometheusMiddleware; removed redundant setup calls |
| fastdeploy/entrypoints/openai/utils.py | Added ZMQ metrics recording for dealer connections |
| fastdeploy/entrypoints/openai/serving_chat.py | Updated to use main_process_metrics instead of deprecated work_process_metrics |
| fastdeploy/entrypoints/engine_client.py | Updated to use main_process_metrics instead of deprecated work_process_metrics |
| fastdeploy/splitwise/internal_adapter_utils.py | Simplified metrics collection call by removing unused parameters |
| fastdeploy/metrics/work_metrics.py | Removed deprecated file; metrics moved to main MetricsManager |
| tests/metrics/test_prometheus_multiprocess_setup.py | New test suite for multiprocess setup logic |
| tests/metrics/test_metrics_middleware.py | New test suite for HTTP metrics middleware |
| tests/metrics/test_metrics.py | Updated test to reflect simplified metrics API |
| tests/entrypoints/openai/test_metrics_routes.py | Removed obsolete test for deprecated setup function |
| _zmq_metrics_stats.msg_bytes_send_total += len(msg) | ||
|
|
||
| def recv_json(self): | ||
| return self.socket.send(msg, flags=flags) |
There was a problem hiding this comment.
Missing _ensure_socket() call before using self.socket.send(). This will cause an AttributeError if the socket hasn't been created yet. Add self._ensure_socket() at the beginning of the method, similar to recv_json() at line 89.
| if _zmq_metrics_stats is not None: | ||
| _zmq_metrics_stats.msg_recv_total += 1 | ||
| if "zmq_send_time" in response: | ||
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - response["zmq_send_time"] | ||
| address = dealer.transport.getsockopt(zmq.LAST_ENDPOINT) | ||
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address) |
There was a problem hiding this comment.
Unnecessary null check: _zmq_metrics_stats is guaranteed to be non-None since it's instantiated on the previous line. The condition if _zmq_metrics_stats is not None: will always be true and can be removed.
| if _zmq_metrics_stats is not None: | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| if "zmq_send_time" in response: | |
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - response["zmq_send_time"] | |
| address = dealer.transport.getsockopt(zmq.LAST_ENDPOINT) | |
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address) | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| if "zmq_send_time" in response: | |
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - response["zmq_send_time"] | |
| address = dealer.transport.getsockopt(zmq.LAST_ENDPOINT) | |
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address) |
| envelope = ForkingPickler.loads(data_bytes) | ||
| if isinstance(envelope, dict): | ||
| if "__meta" in envelope and "send_ts" in envelope["__meta"]: | ||
| _zmq_metrics_stats.msg_recv_total += 1 | ||
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | ||
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"] | ||
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address) | ||
| return envelope["data"] | ||
| return envelope |
There was a problem hiding this comment.
Metrics are not recorded when the envelope is not a dict or doesn't have the __meta key. In these cases, the function returns early without calling record_zmq_stats(). Consider wrapping the logic in a try-finally block similar to recv_json() to ensure metrics are always recorded.
| envelope = ForkingPickler.loads(data_bytes) | |
| if isinstance(envelope, dict): | |
| if "__meta" in envelope and "send_ts" in envelope["__meta"]: | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | |
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"] | |
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address) | |
| return envelope["data"] | |
| return envelope | |
| try: | |
| envelope = ForkingPickler.loads(data_bytes) | |
| if isinstance(envelope, dict) and "__meta" in envelope and "send_ts" in envelope["__meta"]: | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | |
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"] | |
| return envelope["data"] | |
| else: | |
| # Record metrics for malformed envelope | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | |
| return envelope | |
| finally: | |
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address) |
| data_bytes = self.socket.recv(flags=flags) | ||
| envelope = ForkingPickler.loads(data_bytes) | ||
| if isinstance(envelope, dict): | ||
| if "__meta" in envelope and "send_ts" in envelope["__meta"]: | ||
| _zmq_metrics_stats.msg_recv_total += 1 | ||
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | ||
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"] | ||
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address) | ||
| return envelope["data"] | ||
| return envelope |
There was a problem hiding this comment.
Metrics are not recorded when the envelope is not a dict or doesn't have the __meta key. In these cases, the function returns early without calling record_zmq_stats(). Consider wrapping the logic in a try-finally block similar to recv_json() to ensure metrics are always recorded.
| data_bytes = self.socket.recv(flags=flags) | |
| envelope = ForkingPickler.loads(data_bytes) | |
| if isinstance(envelope, dict): | |
| if "__meta" in envelope and "send_ts" in envelope["__meta"]: | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | |
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"] | |
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address) | |
| return envelope["data"] | |
| return envelope | |
| try: | |
| data_bytes = self.socket.recv(flags=flags) | |
| envelope = ForkingPickler.loads(data_bytes) | |
| if isinstance(envelope, dict): | |
| if "__meta" in envelope and "send_ts" in envelope["__meta"]: | |
| _zmq_metrics_stats.msg_recv_total += 1 | |
| _zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes) | |
| _zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"] | |
| return envelope["data"] | |
| return envelope | |
| finally: | |
| main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address) |
|
|
||
| if hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): | ||
| self.register_speculative_metrics(registry) | ||
|
|
There was a problem hiding this comment.
The register_all() method doesn't register ZMQ metrics even when they are initialized. When init_zmq_metrics() is called (when FD_DEBUG is enabled), the ZMQ metrics are created but they won't be registered in non-multiprocess mode (line 121). Consider adding a loop to register ZMQ_METRICS if _collect_zmq_metrics is True.
| # Register ZMQ metrics if they are being collected | |
| if getattr(self, "_collect_zmq_metrics", False): | |
| for metric in getattr(self, "ZMQ_METRICS", []): | |
| registry.register(metric) |
| "msg_recv_total": { | ||
| "type": Counter, | ||
| "name": "fastdeploy:zmq:msg_recv_total", | ||
| "description": "Total number of zmq messages recieved", |
There was a problem hiding this comment.
Typo in description: 'recieved' should be 'received'.
| "msg_bytes_recv_total": { | ||
| "type": Counter, | ||
| "name": "fastdeploy:zmq:msg_bytes_recv_total", | ||
| "description": "Total number of bytes recieved over zmq", |
There was a problem hiding this comment.
Typo in description: 'recieved' should be 'received'.
| """Initializes the Prometheus metrics and starts the HTTP server if not already initialized.""" | ||
|
|
||
| # 在模块加载,指标注册先设置Prometheus环境变量 | ||
| setup_multiprocess_prometheus() |
There was a problem hiding this comment.
The function setup_multiprocess_prometheus() is called both in fastdeploy/__init__.py (line 33) and in MetricsManager.__init__() (line 595). This creates duplicate setup calls. Since setup_multiprocess_prometheus() is already called at module import in __init__.py, the call in MetricsManager.__init__() is redundant and should be removed to avoid confusion.
| setup_multiprocess_prometheus() |
| _zmq_metrics_stats.msg_bytes_send_total += len(msg) | ||
|
|
||
| def recv_json(self): | ||
| return self.socket.send(msg, flags=flags) |
There was a problem hiding this comment.
Missing _ensure_socket() call before using self.socket.send(). This will cause an AttributeError if the socket hasn't been created yet. Add self._ensure_socket() at the beginning of the method, similar to recv_json() at line 91.
| test_dir = "/tmp/prom_main_test-uuid" | ||
| # 使用 tmp_path 创建临时目录 | ||
| os.makedirs(test_dir, exist_ok=True) |
There was a problem hiding this comment.
Hardcoded path /tmp/prom_main_test-uuid is used instead of the tmp_path fixture provided by pytest. This could cause test failures or side effects on systems where /tmp is not writable or tests run in parallel. Consider using str(tmp_path / "prom_main_test-uuid") instead.
23a31cf to
a4fa504
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #5185 +/- ##
==========================================
Coverage ? 59.84%
==========================================
Files ? 319
Lines ? 38974
Branches ? 5866
==========================================
Hits ? 23325
Misses ? 13810
Partials ? 1839
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Motivation
This PR addresses two primary concerns: fixing data integrity issues with Prometheus metrics in multi-process environments and enhancing observability for ZMQ communication.
1. Prometheus Multi-process Issues:
CounterandHistogramdata from other processes to be lost.PROMETHEUS_MULTIPROC_DIRwas being set afterload_engine(). This caused the Engine process and API Server process to write to different directories (or disabled multi-process mode entirely if the Prometheus client was loaded too early).2. Lack of ZMQ Observability:
Modifications
Prometheus Fixes:
Gaugemetrics are now exclusively read from the current process memory (as multi-process aggregation for Gauges is ambiguous).CounterandHistogrammetrics are now correctly read from the multi-process file storage to ensure proper aggregation.PROMETHEUS_MULTIPROC_DIRto the very beginning of__init__. This ensures the environment is configured before the Prometheus client loads, guaranteeing that both the Engine and API Server share the correct directory.PROMETHEUS_MULTIPROC_DIRenvironment variable. It now falls back to a random UUID directory only if the user has not specified one.New ZMQ Metrics:
Added the
fastdeploy:zmq:*metric series to monitor ZMQ performance:msg_send_total,msg_send_failed_total,msg_bytes_send_totalmsg_recv_total,msg_bytes_recv_totalfastdeploy:zmq:latency(Histogram)Usage or Command
Verification:
Start the service in a multi-process environment and request the metrics endpoint.
Expected Output (ZMQ Section):
You should see the aggregated metrics and the new ZMQ entries:
Accuracy Tests
Countermetrics correctly sum up values from multiple worker processes.fastdeploy:zmq:latencycorrectly records transmission time between the API Server and Engine.PROMETHEUS_MULTIPROC_DIRspecified by the environment variable.Checklist
[BugFix],[Metrics],[Feature]