Skip to content

Commit fa6db11

Browse files
committed
[feat] Support auto init YR backend based on metastore
Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
1 parent e04cc05 commit fa6db11

File tree

3 files changed

+333
-177
lines changed

3 files changed

+333
-177
lines changed

transfer_queue/config.yaml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ backend:
4848

4949
# For Yuanrong:
5050
Yuanrong:
51-
# Whether to let TQ automatically start etcd and datasystem services
51+
# Whether to let TQ automatically init yuanrong.
5252
auto_init: True
53-
# etcd service address (used to start etcd when auto_init=true)
54-
etcd_address: "127.0.0.1:2379"
55-
# datasystem worker host and port (used to start dscli when auto_init=true)
56-
host: "127.0.0.1"
57-
port: 31501
53+
# Datasystem worker port (host is auto-detected from ray.nodes())
54+
worker_port: 31501
55+
# Metastore service port (host is auto-detected from ray.nodes())
56+
metastore_port: 2379

transfer_queue/interface.py

Lines changed: 8 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
import logging
1717
import math
1818
import os
19-
import shutil
20-
import socket
2119
import subprocess
22-
import tempfile
2320
import time
2421
from importlib import resources
2522
from typing import Any, Optional
@@ -38,6 +35,10 @@
3835
from transfer_queue.sampler import BaseSampler
3936
from transfer_queue.storage.simple_backend import SimpleStorageUnit
4037
from transfer_queue.utils.common import get_placement_group
38+
from transfer_queue.utils.yuanrong_utils import (
39+
cleanup_yuanrong_resources,
40+
initialize_yuanrong_backend,
41+
)
4142
from transfer_queue.utils.zmq_utils import process_zmq_server_info
4243

4344
logger = logging.getLogger(__name__)
@@ -187,129 +188,8 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
187188
f"Output:\n{error_msg}"
188189
)
189190
_TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process
190-
if conf.backend.storage_backend == "Yuanrong":
191-
if conf.backend.Yuanrong.auto_init:
192-
etcd_process = None
193-
etcd_data_dir = None
194-
worker_address = None
195-
if not shutil.which("etcd"):
196-
raise RuntimeError(
197-
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
198-
)
199-
if not shutil.which("dscli"):
200-
raise RuntimeError(
201-
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
202-
)
203-
try:
204-
# ========== Start etcd ==========
205-
etcd_address = "127.0.0.1:2379"
206-
try:
207-
etcd_address = conf.backend.Yuanrong.etcd_address
208-
except Exception:
209-
pass
210-
211-
# Assume host:port format
212-
parts = etcd_address.split(":")
213-
if len(parts) != 2:
214-
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
215-
host = parts[0]
216-
port = int(parts[1])
217-
218-
# Create temporary data directory
219-
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
220-
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")
221-
222-
cmd = [
223-
"etcd",
224-
f"--data-dir={etcd_data_dir}",
225-
f"--listen-client-urls=http://{host}:{port}",
226-
f"--advertise-client-urls=http://{host}:{port}",
227-
]
228-
229-
etcd_process = subprocess.Popen(
230-
cmd,
231-
stdout=subprocess.DEVNULL,
232-
stderr=subprocess.DEVNULL,
233-
text=True,
234-
bufsize=1,
235-
universal_newlines=True,
236-
start_new_session=True,
237-
)
238-
time.sleep(3) # Wait for etcd to start
239-
240-
if etcd_process.poll() is None:
241-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242-
sock.settimeout(2)
243-
result = sock.connect_ex((host, port))
244-
sock.close()
245-
if result != 0:
246-
raise RuntimeError(f"etcd process started but not listening on {host}:{port}")
247-
else:
248-
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")
249-
250-
logger.info(f"etcd started, PID: {etcd_process.pid}")
251-
time.sleep(2)
252-
253-
# ========== Start datasystem worker ==========
254-
# Assume host:port format
255-
worker_host = conf.backend.Yuanrong.host
256-
worker_port = conf.backend.Yuanrong.port
257-
worker_address = worker_host + ":" + str(worker_port)
258-
259-
cmd = [
260-
"dscli",
261-
"start",
262-
"-w",
263-
"--worker_address",
264-
worker_address,
265-
"--etcd_address",
266-
etcd_address,
267-
]
268-
269-
try:
270-
ds_result = subprocess.run(
271-
cmd,
272-
stdout=subprocess.PIPE,
273-
stderr=subprocess.STDOUT,
274-
text=True,
275-
timeout=90,
276-
)
277-
except subprocess.TimeoutExpired as err:
278-
raise RuntimeError(f"dscli start timed out: {err}") from err
279-
# Wait for dscli to start and exit (it starts worker and exits)
280-
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
281-
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")
282-
283-
else:
284-
raise RuntimeError(
285-
f"Failed to start datasystem worker at {worker_address}. "
286-
f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}"
287-
)
288-
289-
# Store processes and data directory
290-
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
291-
"etcd": etcd_process,
292-
"etcd_data_dir": etcd_data_dir,
293-
"worker_address": worker_address,
294-
"etcd_address": etcd_address,
295-
}
296-
logger.info("Yuanrong backend (etcd + datasystem) started successfully.")
297-
298-
except Exception as e:
299-
# Clean up on failure
300-
if etcd_process is not None and etcd_process.poll() is None:
301-
etcd_process.terminate()
302-
try:
303-
etcd_process.wait(timeout=5)
304-
except subprocess.TimeoutExpired:
305-
etcd_process.kill()
306-
etcd_process.wait()
307-
if etcd_data_dir is not None:
308-
try:
309-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
310-
except Exception:
311-
pass
312-
raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e
191+
if conf.backend.storage_backend == "Yuanrong" and conf.backend.Yuanrong.auto_init:
192+
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = initialize_yuanrong_backend(conf)
313193
return conf
314194

315195

@@ -335,6 +215,7 @@ def _init_from_existing() -> bool:
335215
conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote())
336216
if conf is not None:
337217
_maybe_create_transferqueue_client(conf)
218+
338219
logger.info("TransferQueueClient initialized.")
339220
return True
340221

@@ -475,51 +356,7 @@ def close():
475356
except Exception:
476357
pass
477358
elif key == "Yuanrong":
478-
# Stop etcd process and clean up data directory, stop datasystem worker via dscli
479-
if isinstance(value, dict):
480-
etcd_process = value.get("etcd")
481-
etcd_data_dir = value.get("etcd_data_dir")
482-
worker_address = value.get("worker_address")
483-
484-
# Stop etcd if running
485-
if etcd_process is not None and etcd_process.poll() is None:
486-
etcd_process.terminate()
487-
try:
488-
etcd_process.wait(timeout=5)
489-
except subprocess.TimeoutExpired:
490-
etcd_process.kill()
491-
etcd_process.wait()
492-
493-
# Clean up etcd data directory
494-
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
495-
try:
496-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
497-
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
498-
except Exception as e:
499-
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")
500-
501-
# Stop datasystem worker via dscli command
502-
if worker_address:
503-
try:
504-
result = subprocess.run(
505-
["dscli", "stop", "--worker_address", worker_address],
506-
timeout=90,
507-
capture_output=True,
508-
)
509-
if result.returncode == 0:
510-
logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop")
511-
else:
512-
error_msg = (result.stderr or result.stdout or b"").decode()
513-
logger.warning(
514-
f"Failed to stop datasystem worker at {worker_address}. "
515-
f"Return code: {result.returncode}, Error: {error_msg}"
516-
)
517-
except subprocess.TimeoutExpired as err:
518-
logger.warning(f"dscli stop timed out for {worker_address}: {err}")
519-
except Exception as e:
520-
logger.warning(f"Failed to stop datasystem worker via dscli: {e}")
521-
else:
522-
logger.warning(f"Unexpected Yuanrong storage value: {value}")
359+
cleanup_yuanrong_resources(value)
523360
else:
524361
logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.")
525362

0 commit comments

Comments
 (0)