Add automatic Yuanrong startup to interface.py#60
Conversation
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
1 similar comment
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
Signed-off-by: dpj135 <958208521@qq.com>
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
Adds automatic lifecycle management for the Yuanrong backend by starting required local services during init() and stopping/cleaning them up in close(), aligning Yuanrong with the existing “auto_init” behavior used for MooncakeStore.
Changes:
- Add Yuanrong
auto_initsupport to start localetcdand a datasystem worker (dscli start) during initialization. - Add Yuanrong shutdown logic to terminate
etcd, remove its temporary data dir, and stop the worker viadscli stop. - Extend default
config.yamlwith Yuanrongauto_init,etcd_address, andclient_name.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
transfer_queue/interface.py |
Starts/stops Yuanrong dependencies (etcd + dscli) as part of TransferQueue init/close lifecycle. |
transfer_queue/config.yaml |
Adds Yuanrong auto-init configuration knobs and adjusts default Yuanrong config fields. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
transfer_queue/config.yaml
Outdated
| # If enable npu transport | ||
| enable_yr_npu_transport: false | ||
| # Client name (optional, default is YuanrongStorageClient) | ||
| client_name: "YuanrongStorageClient" |
There was a problem hiding this comment.
enable_yr_npu_transport was removed from the default Yuanrong config. In transfer_queue/storage/clients/yuanrong_client.py, the NPU adapter defaults enable_yr_npu_transport to True when the key is missing, so this change flips the default behavior from disabled (previously false) to enabled. Consider re-adding enable_yr_npu_transport with an explicit default (or change the code default) to avoid an unintended behavior change for existing users.
| client_name: "YuanrongStorageClient" | |
| client_name: "YuanrongStorageClient" | |
| # Whether to enable Yuanrong NPU transport (default: False for backward compatibility) | |
| enable_yr_npu_transport: False |
| try: | ||
| etcd_process.wait(timeout=5) | ||
| except subprocess.TimeoutExpired: | ||
| etcd_process.kill() |
There was a problem hiding this comment.
If etcd_process.kill() is used after a timeout, the code should still call wait() to reap the child process; otherwise a zombie process can be left behind until interpreter exit. After kill(), call etcd_process.wait() (possibly in a nested try/except) to ensure the process is fully collected.
| etcd_process.kill() | |
| etcd_process.kill() | |
| try: | |
| # Ensure the killed process is fully reaped to avoid zombies | |
| etcd_process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| logger.warning("Timed out while waiting for etcd process to exit after kill.") | |
| except Exception as e: | |
| logger.warning(f"Error while waiting for etcd process to exit after kill: {e}") |
There was a problem hiding this comment.
Add etcd_process.wait()
transfer_queue/interface.py
Outdated
| subprocess.run( | ||
| ["dscli", "stop", "--worker_address", worker_address], | ||
| timeout=90, | ||
| capture_output=True, | ||
| ) | ||
| logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") |
There was a problem hiding this comment.
dscli stop is run but its result is ignored, and an info log is emitted unconditionally. This can report a successful shutdown even if dscli fails (non-zero return code) or prints an error. Capture the CompletedProcess, check returncode (or use check=True), and log/raise accordingly (and include stdout/stderr in the error path).
| subprocess.run( | |
| ["dscli", "stop", "--worker_address", worker_address], | |
| timeout=90, | |
| capture_output=True, | |
| ) | |
| logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") | |
| result = subprocess.run( | |
| ["dscli", "stop", "--worker_address", worker_address], | |
| timeout=90, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| if result.returncode == 0: | |
| logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") | |
| else: | |
| logger.error( | |
| "Failed to stop datasystem worker at %s via dscli stop " | |
| "(return code %s). stdout: %r stderr: %r", | |
| worker_address, | |
| result.returncode, | |
| result.stdout, | |
| result.stderr, | |
| ) | |
| except subprocess.TimeoutExpired as e: | |
| logger.error( | |
| "Timed out while stopping datasystem worker at %s via dscli stop: %s", | |
| worker_address, | |
| e, | |
| ) |
| except Exception as e: | ||
| # Clean up on failure | ||
| if etcd_process is not None and etcd_process.poll() is None: | ||
| etcd_process.terminate() |
There was a problem hiding this comment.
On Yuanrong startup failure, the cleanup path terminates the etcd process but doesn't wait() (or kill() on timeout) to ensure the process is actually stopped/reaped. This can leave a stray/zombie etcd process around after init() raises. Consider mirroring the shutdown logic used in close() (terminate -> wait with timeout -> kill -> wait).
| etcd_process.terminate() | |
| try: | |
| etcd_process.terminate() | |
| try: | |
| etcd_process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| etcd_process.kill() | |
| etcd_process.wait() | |
| except Exception: | |
| # Best-effort cleanup; ignore failures here | |
| pass |
| if conf.backend.storage_backend == "Yuanrong": | ||
| if conf.backend.Yuanrong.auto_init: | ||
| etcd_process = None | ||
| etcd_data_dir = None | ||
| worker_address = None | ||
| if not shutil.which("etcd"): | ||
| raise RuntimeError( | ||
| "etcd executable not found in PATH. Please install etcd and make sure it's in the PATH." | ||
| ) | ||
| if not shutil.which("dscli"): | ||
| raise RuntimeError( | ||
| "dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`." | ||
| ) | ||
| try: |
There was a problem hiding this comment.
The new Yuanrong auto-init path (etcd + dscli) introduces substantial side effects during init() but currently has no targeted test coverage. Consider adding a unit test that monkeypatches shutil.which/subprocess.Popen/subprocess.run to validate the happy path and failure cleanup behavior (without requiring real etcd/dscli binaries).
transfer_queue/config.yaml
Outdated
| # Client name (optional, default is YuanrongStorageClient) | ||
| client_name: "YuanrongStorageClient" |
There was a problem hiding this comment.
We can eliminate these config since it must be YuanrongStorageClient
| pass | ||
| elif key == "Yuanrong": | ||
| # Stop etcd process and clean up data directory, stop datasystem worker via dscli | ||
| if isinstance(value, dict): |
There was a problem hiding this comment.
Only close etcd when enable auto_init?
There was a problem hiding this comment.
Yes. If users start etcd manually, they should close etcd manually.
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: dpj135 <958208521@qq.com>
CLA Signature Passdpj135, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Description
init()close()