Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ jobs:
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
pkill -f "[m]ooncake_master" || true
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
pkill -f "[m]ooncake_master" || true
pkill -f "[m]ooncake_master" || true
- name: Run Yuanrong Backend Specific E2E Tests
run: |
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py
7 changes: 4 additions & 3 deletions tests/e2e/test_e2e_lifecycle_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
"backend": {
"storage_backend": "Yuanrong",
"Yuanrong": {
"host": "127.0.0.1",
"port": 31501,
"worker_port": 31501,
"metastore_port": 2379,
},
},
},
Expand All @@ -102,11 +102,12 @@ def backend_name():
"""Get the backend name from environment variable.

Environment variables:
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)

To run tests for a specific backend:
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_e2e_lifecycle_consistency.py
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
"""
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")

Expand Down
15 changes: 14 additions & 1 deletion tests/e2e/test_kv_interface_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ def tq_api(request):
},
},
},
"Yuanrong": {
"controller": {
"polling_mode": True,
},
"backend": {
"storage_backend": "Yuanrong",
"Yuanrong": {
"worker_port": 31501,
"metastore_port": 2379,
},
},
},
}


Expand All @@ -112,11 +124,12 @@ def backend_name():
"""Get the backend name from environment variable.

Environment variables:
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)

To run tests for a specific backend:
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_kv_interface_e2e.py
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py
"""
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")

Expand Down
3 changes: 1 addition & 2 deletions tests/test_kv_storage_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def test_data():
cfg = {
"controller_info": MagicMock(),
"client_name": "YuanrongStorageClient",
"host": "127.0.0.1",
"port": 31501,
"worker_port": 31501,
"device_id": 0,
}
global_indexes = [8, 9, 10]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class Test(unittest.TestCase):
def setUp(self):
self.cfg = {"host": "127.0.0.1", "port": 31501, "device_id": 0}
self.cfg = {"worker_port": 31501, "device_id": 0}

@pytest.mark.skipif(find_spec("datasystem") is None, reason="datasystem is not available")
def test_create_client(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_yuanrong_client_zero_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def mock_kv_client(self, mocker):

@pytest.fixture
def storage_client(self, mock_kv_client):
return GeneralKVClientAdapter({"host": "127.0.0.1", "port": 31501})
return GeneralKVClientAdapter({"worker_port": 31501})

def test_mset_mget_p2p(self, storage_client, mocker):
# Mock serialization/deserialization
Expand Down
2 changes: 1 addition & 1 deletion tests/test_yuanrong_storage_client_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def mock_find_reachable_host(port, timeout=1.0):

@pytest.fixture
def config():
return {"host": "127.0.0.1", "port": 12345, "enable_yr_npu_optimization": True}
return {"worker_port": 12345, "enable_yr_npu_optimization": True}


def assert_tensors_equal(a: torch.Tensor, b: torch.Tensor):
Expand Down
17 changes: 11 additions & 6 deletions transfer_queue/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ backend:

# For Yuanrong:
Yuanrong:
# Whether to let TQ automatically start etcd and datasystem services
# Whether to let TQ automatically init yuanrong
auto_init: True
# etcd service address (used to start etcd when auto_init=true)
etcd_address: "127.0.0.1:2379"
# datasystem worker host and port (used to start dscli when auto_init=true)
host: "127.0.0.1"
port: 31501
# Datasystem worker port
worker_port: 31501
# Metastore service port
metastore_port: 2379
# Additional config for yuanrong worker.
# Recommended options for NPU environments:
# --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated).
# --enable_huge_tlb Enable huge page memory to improve performance. Required for >21GB shared memory on 910B.
# Example: "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
worker_args: "--shared_memory_size_mb 8192"
179 changes: 8 additions & 171 deletions transfer_queue/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import logging
import math
import os
import shutil
import socket
import subprocess
import tempfile
import time
from importlib import resources
from typing import Any, Optional
Expand All @@ -38,6 +35,10 @@
from transfer_queue.sampler import BaseSampler
from transfer_queue.storage.simple_backend import SimpleStorageUnit
from transfer_queue.utils.common import get_placement_group
from transfer_queue.utils.yuanrong_utils import (
cleanup_yuanrong_resources,
initialize_yuanrong_backend,
)
from transfer_queue.utils.zmq_utils import process_zmq_server_info

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -187,129 +188,8 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
f"Output:\n{error_msg}"
)
_TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process
if conf.backend.storage_backend == "Yuanrong":
if conf.backend.Yuanrong.auto_init:
etcd_process = None
etcd_data_dir = None
worker_address = None
if not shutil.which("etcd"):
raise RuntimeError(
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
)
if not shutil.which("dscli"):
raise RuntimeError(
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
)
try:
# ========== Start etcd ==========
etcd_address = "127.0.0.1:2379"
try:
etcd_address = conf.backend.Yuanrong.etcd_address
except Exception:
pass

# Assume host:port format
parts = etcd_address.split(":")
if len(parts) != 2:
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
host = parts[0]
port = int(parts[1])

# Create temporary data directory
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")

cmd = [
"etcd",
f"--data-dir={etcd_data_dir}",
f"--listen-client-urls=http://{host}:{port}",
f"--advertise-client-urls=http://{host}:{port}",
]

etcd_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
universal_newlines=True,
start_new_session=True,
)
time.sleep(3) # Wait for etcd to start

if etcd_process.poll() is None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((host, port))
sock.close()
if result != 0:
raise RuntimeError(f"etcd process started but not listening on {host}:{port}")
else:
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")

logger.info(f"etcd started, PID: {etcd_process.pid}")
time.sleep(2)

# ========== Start datasystem worker ==========
# Assume host:port format
worker_host = conf.backend.Yuanrong.host
worker_port = conf.backend.Yuanrong.port
worker_address = worker_host + ":" + str(worker_port)

cmd = [
"dscli",
"start",
"-w",
"--worker_address",
worker_address,
"--etcd_address",
etcd_address,
]

try:
ds_result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=90,
)
except subprocess.TimeoutExpired as err:
raise RuntimeError(f"dscli start timed out: {err}") from err
# Wait for dscli to start and exit (it starts worker and exits)
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")

else:
raise RuntimeError(
f"Failed to start datasystem worker at {worker_address}. "
f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}"
)

# Store processes and data directory
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
"etcd": etcd_process,
"etcd_data_dir": etcd_data_dir,
"worker_address": worker_address,
"etcd_address": etcd_address,
}
logger.info("Yuanrong backend (etcd + datasystem) started successfully.")

except Exception as e:
# Clean up on failure
if etcd_process is not None and etcd_process.poll() is None:
etcd_process.terminate()
try:
etcd_process.wait(timeout=5)
except subprocess.TimeoutExpired:
etcd_process.kill()
etcd_process.wait()
if etcd_data_dir is not None:
try:
shutil.rmtree(etcd_data_dir, ignore_errors=True)
except Exception:
pass
raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e
if conf.backend.storage_backend == "Yuanrong" and conf.backend.Yuanrong.auto_init:
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = initialize_yuanrong_backend(conf)
return conf


Expand All @@ -335,6 +215,7 @@ def _init_from_existing() -> bool:
conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote())
if conf is not None:
_maybe_create_transferqueue_client(conf)

logger.info("TransferQueueClient initialized.")
return True

Expand Down Expand Up @@ -475,51 +356,7 @@ def close():
except Exception:
pass
elif key == "Yuanrong":
# Stop etcd process and clean up data directory, stop datasystem worker via dscli
if isinstance(value, dict):
etcd_process = value.get("etcd")
etcd_data_dir = value.get("etcd_data_dir")
worker_address = value.get("worker_address")

# Stop etcd if running
if etcd_process is not None and etcd_process.poll() is None:
etcd_process.terminate()
try:
etcd_process.wait(timeout=5)
except subprocess.TimeoutExpired:
etcd_process.kill()
etcd_process.wait()

# Clean up etcd data directory
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
try:
shutil.rmtree(etcd_data_dir, ignore_errors=True)
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
except Exception as e:
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")

# Stop datasystem worker via dscli command
if worker_address:
try:
result = subprocess.run(
["dscli", "stop", "--worker_address", worker_address],
timeout=90,
capture_output=True,
)
if result.returncode == 0:
logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop")
else:
error_msg = (result.stderr or result.stdout or b"").decode()
logger.warning(
f"Failed to stop datasystem worker at {worker_address}. "
f"Return code: {result.returncode}, Error: {error_msg}"
)
except subprocess.TimeoutExpired as err:
logger.warning(f"dscli stop timed out for {worker_address}: {err}")
except Exception as e:
logger.warning(f"Failed to stop datasystem worker via dscli: {e}")
else:
logger.warning(f"Unexpected Yuanrong storage value: {value}")
cleanup_yuanrong_resources(value)
else:
logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.")

Expand Down
Loading
Loading