[refactor] Simplify initialization and improve API usability#26
Merged
0oshowero0 merged 14 commits intoAscend:mainfrom Feb 6, 2026
Merged
[refactor] Simplify initialization and improve API usability#260oshowero0 merged 14 commits intoAscend:mainfrom
0oshowero0 merged 14 commits intoAscend:mainfrom
Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR optimizes the TransferQueue initialization process by introducing a simplified init() function that provides a more user-friendly API. The changes consolidate the initialization logic and rename storage manager types to be more concise.
Changes:
- Added a new
transfer_queue.init()function that simplifies TransferQueue initialization with automatic controller detection and configuration management - Renamed storage manager types:
AsyncSimpleStorageManager→SimpleStorage,MooncakeStorageManager→MooncakeStore,YuanrongStorageManager→Yuanrongwith backward compatibility through deprecation warnings - Moved
process_zmq_server_infofunction fromclient.pytozmq_utils.pyfor better organization and added config.yaml for default configuration
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/interface.py | New file introducing the simplified init(), get_meta(), get_data(), put(), and related wrapper functions for easier API usage |
| transfer_queue/controller.py | Added store_config() and get_config() methods to store/retrieve global TransferQueue configuration |
| transfer_queue/config.yaml | New default configuration file defining controller and backend settings |
| transfer_queue/init.py | Updated exports to include new interface functions and process_zmq_server_info |
| transfer_queue/storage/managers/factory.py | Added deprecation warnings for old manager type names with automatic fallback to new names |
| transfer_queue/storage/managers/simple_backend_manager.py | Updated registration name and added deprecation warning for storage_unit_infos config key |
| transfer_queue/storage/managers/base.py | Changed config parameter type annotation to DictConfig |
| transfer_queue/storage/managers/yuanrong_manager.py | Updated registration name from YuanrongStorageManager to Yuanrong |
| transfer_queue/storage/managers/mooncake_manager.py | Updated registration name from MooncakeStorageManager to MooncakeStore |
| transfer_queue/utils/zmq_utils.py | Added process_zmq_server_info() function moved from client.py |
| transfer_queue/client.py | Removed process_zmq_server_info() function and unused imports |
| tutorial/01_core_components.py | Simplified tutorial to use new tq.init() and module-level functions |
| tests/test_client.py | Updated test cases to use new SimpleStorage manager type name |
| requirements.txt | Added omegaconf dependency |
| pyproject.toml | Added *.yaml to package data for config file inclusion |
| README.md | Updated storage backend names in documentation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 23 changed files in this pull request and generated 15 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
0oshowero0
added a commit
that referenced
this pull request
Feb 11, 2026
## Summary This PR introduces a **High-Level Key-Value (KV) Interface** to TransferQueue, offering a Redis-style API that can enjoy most of the advanced features provided by TransferQueue. ## Background In previous versions of TransferQueue, the learning curve was relatively sharp for new users. To perform basic operations, users had to: 1. Understand `BatchMeta` `SampleMeta` and `FieldMeta` design (as illustrated in [tutorial/02_metadat_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_metadata_concepts.py) 2. Navigate the flexible but complex [`TransferQueueClient`](https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py) API. Although PR #26 simplified the initialization process, the core interaction still required exposing low-level details. This PR bridges that gap by providing a familiar, easy-to-use KV abstraction. ## TransferQueue API Architecture With this PR, TransferQueue now supports a two-level API architecture to satisfy different user needs. | Level | Tier | Style | Fine-Grained Access | Streaming | Sampler | Multiple-Backends | |---|---|---|---|---|---|---| | High | **KV Interface** (this PR) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓ | | High | **StreamingDataLoader** (#23) | PyTorch DataLoader | ✓ |✓ | ✓ | ✓ | | Low | **TransferQueueClient** | Metadata-based | ✓ | ✓ | ✓ | ✓ | ### High-Level API #### Key-Value based API (This PR) **Methods** - **(async_)kv_put**: Insert/Update a multi-column sample by key, with optional metadata tag - **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in batch - **(async_)kv_batch_get**: Retrieve samples (by keys), supporting column selection (by fields) - **(async_)kv_list**: List keys and tags (metadata) in a partition - **(async_)kv_clear**: Remove key-value pairs from storage **Key Features** - **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for zero learning curve - **Fine-grained Access**: Update or retrieve specific fields (columns) within a key (row) without full op. - **Partition Isolation**: Logical separation of storage namespaces - **Metadata Tags**: Lightweight metadata for status tracking - **Pluggable Backends**: Supports multiple backends #### StreamingDataLoader API Refer to our [RoadMap](#1) and related PRs(#23). The usage example can be found in [tutorial/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py). ### Low-Level API Directly manipulate the `TransferQueueClient`. Refer to [tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py), [tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py) and [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py) for details. ## Usage Example Please refer to [tutorial/02_kv_interface.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_kv_interface.py) and [tests/e2e/test_kv_interface_e2e.py](https://github.com/Ascend/TransferQueue/blob/main/tests/e2e/test_kv_interface_e2e.py) for details. ```python3 import torch from tensordict import TensorDict import transfer_queue as tq # initialize TQ tq.init() # prepare data batch_input_ids = torch.tensor( [ [4, 5, 6], [7, 8, 9], [10, 11, 12], [13, 14, 15], ] ) batch_attention_mask = torch.ones_like(batch_input_ids) data_batch = TensorDict( { "input_ids": batch_input_ids, "attention_mask": batch_attention_mask, }, batch_size=batch_input_ids.size(0), ) keys = ["1_0", "1_1", "1_2", "2_0"] # 4 keys for 4 samples tags = [{"global_steps": 1, "status": "running", "model_version": 1} for _ in range(len(keys))] partition_id = "test" # use kv interface to put into TQ tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=data_batch, tags=tags) # list all keys and tags all_keys, all_tags = tq.kv_list(partition_id=partition_id) for k, t in zip(all_keys, all_tags, strict=False): print(f" - key='{k}' | tag={t}") # retrieve all data retrieved_all = tq.kv_batch_get(keys=all_keys, partition_id=partition_id) print(f" Fields: {list(retrieved_all.keys())}") ``` ## Use Cases & Limitations **Best For**: - Scenarios requiring fine-grained data access (e.g., updating a reward score for a specific prompt). - Integration with external ReplayBuffers or Single-Controller architectures that manage sample dispatching logic. **Limitations (vs. Streaming/Low-level APIs):** - No built-in production/consumption tracking: Users must manually check status via tags or manage logic externally. - No Built-in Sampler: Must implement data dispatch by ReplayBuffer or single-controller externally. - Not Fully Streaming: Consumers typically wait for a controller to dispatch `keys` before fetching, rather than a continuous stream. --------- Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
0oshowero0
pushed a commit
that referenced
this pull request
Feb 11, 2026
### Background In order to align with the backend mentioned in the [PR#26](#26), I have extracted the RayStorageManager to manage the RDT backend ### Use Case ```python3 import ray import time import torch import sys from pathlib import Path from omegaconf import OmegaConf from tensordict import TensorDict from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy parent_dir = Path(__file__).resolve().parent.parent sys.path.append(str(parent_dir)) import transfer_queue as tq from transfer_queue.metadata import BatchMeta def tensordict_memory_mb(td): total_bytes = sum(tensor.element_size() * tensor.numel() for tensor in td.values()) return total_bytes / (1024 * 1024) @ray.remote class WriterActor: def __init__(self): tq.init() self.data = None def generate_data(self, batch_size: int = 10000, seq_len: int = 10000): self.data = TensorDict({ "input_ids": torch.randn(batch_size, seq_len, dtype=torch.float32), }, batch_size=batch_size) size = tensordict_memory_mb(self.data) print(f"[Writer] Data generated. Memory usage: {size:.2f} MB") def put_once(self, partition_id): t0 = time.time() batch_meta = tq.put(data=self.data, partition_id=partition_id) t1 = time.time() return t1 - t0, batch_meta @ray.remote class ReaderActor: def __init__(self): tq.init() def get_once(self, metadata: BatchMeta): t0 = time.perf_counter() result = tq.get_data(metadata) t1 = time.perf_counter() return t1 - t0 def main(): if not ray.is_initialized(): ray.init(address="auto") print("Initialize TransferQueue System...") tq.init() nodes = ray.nodes() ip_to_nodeid = {} for n in nodes: addr = n.get("NodeManagerAddress") or n.get("node_ip_address") or n.get("NodeIP") node_id = n["NodeID"] if "NodeID" in n else n.get("NodeID") or n.get("node_id") if addr and node_id: ip_to_nodeid[addr] = node_id ip_A = "" # Writer ip_B = "" # Reader node_id_A = ip_to_nodeid.get(ip_A) node_id_B = ip_to_nodeid.get(ip_B) if not node_id_A or not node_id_B: print(f"Warning: Specific nodes not found. Available IPs: {list(ip_to_nodeid.keys())}") node_id_A = list(ip_to_nodeid.values())[0] node_id_B = list(ip_to_nodeid.values())[-1] writer = WriterActor.options( scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id_A, soft=False), ).remote() reader = ReaderActor.options( scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id_B, soft=False), ).remote() batch_size = 512 seq_len = 32 * 1024 partition_id = "train_step" ray.get(writer.generate_data.remote(batch_size, seq_len)) cost_put, batch_meta = ray.get(writer.put_once.remote(partition_id)) cost_get = ray.get(reader.get_once.remote(meta)) tq.close() print("Test Finished.") if __name__ == "__main__": main() ``` --------- Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
23 tasks
5 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Background
In previous versions of TransferQueue, we lacked an easy-to-use initialization process. Users had to manually initialize and configure TQ, leading to verbose boilerplate code.
Reference for previous complex usage:
TQ tutorial:
TransferQueue/tutorial/01_core_components.py
Line 59 in c2bb0fa
verl integration:https://github.com/verl-project/verl/blob/e4915bd8859e75a8634ec48c20a0bbc781d66eb8/verl/experimental/transfer_queue/ray_trainer.py#L340
Key Changes
transfer_queue.init(conf).TransferQueueClientAPIs directly into the top-leveltransfer_queuenamespace (e.g.,tq.put,tq.get_meta).config.yamlto support zero-config initialization and formalize the config structure.-
AsyncSimpleStorageManager->SimpleStorage-
YuanrongStorageManager->Yuanrong-
MooncakeStorageManager->MooncakeStoreStorageManager. [Enhancement] A suggestion regarding StorageManager initialization #6Usage Example
For more details, please refer to the updated tutorials.
Config Structure
TODO
CC: @wuxibin89 @dpj135