Skip to content

[refactor] Simplify initialization and improve API usability#26

Merged
0oshowero0 merged 14 commits intoAscend:mainfrom
0oshowero0:initialize
Feb 6, 2026
Merged

[refactor] Simplify initialization and improve API usability#26
0oshowero0 merged 14 commits intoAscend:mainfrom
0oshowero0:initialize

Conversation

@0oshowero0
Copy link
Copy Markdown
Collaborator

@0oshowero0 0oshowero0 commented Feb 4, 2026

Background

In previous versions of TransferQueue, we lacked an easy-to-use initialization process. Users had to manually initialize and configure TQ, leading to verbose boilerplate code.

Reference for previous complex usage:

Key Changes

  • Initialization: Introduced a one-click initialization via transfer_queue.init(conf).
  • Interface: Wrapped the core TransferQueueClient APIs directly into the top-level transfer_queue namespace (e.g., tq.put, tq.get_meta).
  • Config: Added a default config.yaml to support zero-config initialization and formalize the config structure.
  • Backend:

Usage Example

# In process 0, node A
import transfer_queue as tq
tq.init()   # Initialize the TransferQueue
tq.put(...) # then you can use tq for data operations

# In process 1, node B (with Ray connected to node A)
import transfer_queue as tq
tq.init()   # This will only initialize a TransferQueueClient and link with existing TQ
metadata = tq.get_meta(...)
data = tq.get_data(metadata)

For more details, please refer to the updated tutorials.

Config Structure

# This is the default configuration of TransferQueue. Users may modify the default value
# and use transfer_queue.init(conf) to overwrite the config entries.

controller:
  # User-defined sampler. User can pass sampler instance to overwrite this string config.
  sampler: SequentialSampler
  # Whether return an empty BatchMeta to prevent request blocking when no enough data is available
  polling_mode: False
  # ZMQ Server IP & Ports (automatically generated during init)
  zmq_info: null


backend:
  # Pluggable storage/transport backend of TransferQueue. Choose from:
  # SimpleStorage, Yuanrong, MooncakeStore, ...
  storage_backend: SimpleStorage

  # For SimpleStorage:
  SimpleStorage:
    # Total number of samples
    total_storage_size: 100000
    # Number of distributed storage units for SimpleStorage backend
    num_data_storage_units: 2
    # ZMQ Server IP & Ports (automatically generated during init)
    zmq_info: null

  # For Yuanrong:
  # TODO

  # For MooncakeStore:
  # TODO

TODO

  • Provide Yuanrong backend default config (and initialization) @dpj135
  • Provide MooncakeStore backend default config (and initialization)

CC: @wuxibin89 @dpj135

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings February 4, 2026 14:21
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes the TransferQueue initialization process by introducing a simplified init() function that provides a more user-friendly API. The changes consolidate the initialization logic and rename storage manager types to be more concise.

Changes:

  • Added a new transfer_queue.init() function that simplifies TransferQueue initialization with automatic controller detection and configuration management
  • Renamed storage manager types: AsyncSimpleStorageManagerSimpleStorage, MooncakeStorageManagerMooncakeStore, YuanrongStorageManagerYuanrong with backward compatibility through deprecation warnings
  • Moved process_zmq_server_info function from client.py to zmq_utils.py for better organization and added config.yaml for default configuration

Reviewed changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
transfer_queue/interface.py New file introducing the simplified init(), get_meta(), get_data(), put(), and related wrapper functions for easier API usage
transfer_queue/controller.py Added store_config() and get_config() methods to store/retrieve global TransferQueue configuration
transfer_queue/config.yaml New default configuration file defining controller and backend settings
transfer_queue/init.py Updated exports to include new interface functions and process_zmq_server_info
transfer_queue/storage/managers/factory.py Added deprecation warnings for old manager type names with automatic fallback to new names
transfer_queue/storage/managers/simple_backend_manager.py Updated registration name and added deprecation warning for storage_unit_infos config key
transfer_queue/storage/managers/base.py Changed config parameter type annotation to DictConfig
transfer_queue/storage/managers/yuanrong_manager.py Updated registration name from YuanrongStorageManager to Yuanrong
transfer_queue/storage/managers/mooncake_manager.py Updated registration name from MooncakeStorageManager to MooncakeStore
transfer_queue/utils/zmq_utils.py Added process_zmq_server_info() function moved from client.py
transfer_queue/client.py Removed process_zmq_server_info() function and unused imports
tutorial/01_core_components.py Simplified tutorial to use new tq.init() and module-level functions
tests/test_client.py Updated test cases to use new SimpleStorage manager type name
requirements.txt Added omegaconf dependency
pyproject.toml Added *.yaml to package data for config file inclusion
README.md Updated storage backend names in documentation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 changed the title [feat] Optimize TransferQueue initialization [refactor] Simplify initialization and improve API usability Feb 5, 2026
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 22 out of 23 changed files in this pull request and generated 15 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 merged commit 7c9e970 into Ascend:main Feb 6, 2026
5 checks passed
0oshowero0 added a commit that referenced this pull request Feb 11, 2026
## Summary

This PR introduces a **High-Level Key-Value (KV) Interface** to
TransferQueue, offering a Redis-style API that can enjoy most of the
advanced features provided by TransferQueue.

## Background

In previous versions of TransferQueue, the learning curve was relatively
sharp for new users. To perform basic operations, users had to:

1. Understand `BatchMeta` `SampleMeta` and `FieldMeta` design (as
illustrated in
[tutorial/02_metadat_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_metadata_concepts.py)
2. Navigate the flexible but complex
[`TransferQueueClient`](https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py)
API.

Although PR #26 simplified
the initialization process, the core interaction still required exposing
low-level details. This PR bridges that gap by providing a familiar,
easy-to-use KV abstraction.

## TransferQueue API Architecture
With this PR, TransferQueue now supports a two-level API architecture to
satisfy different user needs.

| Level | Tier | Style | Fine-Grained Access | Streaming | Sampler |
Multiple-Backends |
|---|---|---|---|---|---|---|
| High | **KV Interface** (this PR) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓
|
| High | **StreamingDataLoader** (#23) | PyTorch DataLoader | ✓ |✓ | ✓ |
✓ |
| Low |  **TransferQueueClient** | Metadata-based | ✓ | ✓ | ✓ | ✓ | 

### High-Level API
#### Key-Value based API (This PR)

**Methods**

- **(async_)kv_put**: Insert/Update a multi-column sample by key, with
optional metadata tag
- **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in
batch
- **(async_)kv_batch_get**: Retrieve samples (by keys), supporting
column selection (by fields)
- **(async_)kv_list**: List keys and tags (metadata) in a partition
- **(async_)kv_clear**: Remove key-value pairs from storage

**Key Features**

- **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for
zero learning curve
- **Fine-grained Access**: Update or retrieve specific fields (columns)
within a key (row) without full op.
- **Partition Isolation**: Logical separation of storage namespaces
- **Metadata Tags**: Lightweight metadata for status tracking
- **Pluggable Backends**: Supports multiple backends

#### StreamingDataLoader API
Refer to our [RoadMap](#1)
and related PRs(#23).

The usage example can be found in
[tutorial/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py).

### Low-Level API
Directly manipulate the `TransferQueueClient`. Refer to
[tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py),
[tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py)
and
[tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py)
for details.

## Usage Example

Please refer to
[tutorial/02_kv_interface.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/02_kv_interface.py)
and
[tests/e2e/test_kv_interface_e2e.py](https://github.com/Ascend/TransferQueue/blob/main/tests/e2e/test_kv_interface_e2e.py)
for details.

```python3
import torch
from tensordict import TensorDict
import transfer_queue as tq

# initialize TQ
tq.init()

# prepare data
batch_input_ids = torch.tensor(
    [
        [4, 5, 6],
        [7, 8, 9],
        [10, 11, 12],
        [13, 14, 15],
    ]
)
batch_attention_mask = torch.ones_like(batch_input_ids)

data_batch = TensorDict(
    {
        "input_ids": batch_input_ids,
        "attention_mask": batch_attention_mask,
    },
    batch_size=batch_input_ids.size(0),
)

keys = ["1_0", "1_1", "1_2", "2_0"]  # 4 keys for 4 samples
tags = [{"global_steps": 1, "status": "running", "model_version": 1} for _ in range(len(keys))]
partition_id = "test"
# use kv interface to put into TQ
tq.kv_batch_put(keys=keys, partition_id=partition_id, fields=data_batch, tags=tags)

# list all keys and tags
all_keys, all_tags = tq.kv_list(partition_id=partition_id)
for k, t in zip(all_keys, all_tags, strict=False):
    print(f"    - key='{k}' | tag={t}")

# retrieve all data
retrieved_all = tq.kv_batch_get(keys=all_keys, partition_id=partition_id)
print(f"  Fields: {list(retrieved_all.keys())}")
```

## Use Cases & Limitations

**Best For**:

- Scenarios requiring fine-grained data access (e.g., updating a reward
score for a specific prompt).
- Integration with external ReplayBuffers or Single-Controller
architectures that manage sample dispatching logic.

**Limitations (vs. Streaming/Low-level APIs):**
- No built-in production/consumption tracking: Users must manually check
status via tags or manage logic externally.
- No Built-in Sampler: Must implement data dispatch by ReplayBuffer or
single-controller externally.
- Not Fully Streaming: Consumers typically wait for a controller to
dispatch `keys` before fetching, rather than a continuous stream.

---------

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
0oshowero0 pushed a commit that referenced this pull request Feb 11, 2026
### Background
In order to align with the backend mentioned in the
[PR#26](#26), I have
extracted the RayStorageManager to manage the RDT backend

### Use Case
```python3
import ray
import time
import torch
import sys
from pathlib import Path
from omegaconf import OmegaConf
from tensordict import TensorDict
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

parent_dir = Path(__file__).resolve().parent.parent
sys.path.append(str(parent_dir))

import transfer_queue as tq
from transfer_queue.metadata import BatchMeta


def tensordict_memory_mb(td):
    total_bytes = sum(tensor.element_size() * tensor.numel() for tensor in td.values())
    return total_bytes / (1024 * 1024)


@ray.remote
class WriterActor:
    def __init__(self):
        tq.init()
        self.data = None

    def generate_data(self, batch_size: int = 10000, seq_len: int = 10000):
        self.data = TensorDict({
            "input_ids": torch.randn(batch_size, seq_len, dtype=torch.float32),
        }, batch_size=batch_size)

        size = tensordict_memory_mb(self.data)
        print(f"[Writer] Data generated. Memory usage: {size:.2f} MB")

    def put_once(self, partition_id):
        t0 = time.time()
        batch_meta = tq.put(data=self.data, partition_id=partition_id)
        t1 = time.time()
        return t1 - t0, batch_meta


@ray.remote
class ReaderActor:
    def __init__(self):
        tq.init()

    def get_once(self, metadata: BatchMeta):
        t0 = time.perf_counter()
        result = tq.get_data(metadata)
        t1 = time.perf_counter()
        return t1 - t0


def main():
    if not ray.is_initialized():
        ray.init(address="auto")
    print("Initialize TransferQueue System...")
    tq.init()

    nodes = ray.nodes()
    ip_to_nodeid = {}
    for n in nodes:
        addr = n.get("NodeManagerAddress") or n.get("node_ip_address") or n.get("NodeIP")
        node_id = n["NodeID"] if "NodeID" in n else n.get("NodeID") or n.get("node_id")
        if addr and node_id:
            ip_to_nodeid[addr] = node_id

    ip_A = ""  # Writer 
    ip_B = ""  # Reader
    node_id_A = ip_to_nodeid.get(ip_A)
    node_id_B = ip_to_nodeid.get(ip_B)

    if not node_id_A or not node_id_B:
        print(f"Warning: Specific nodes not found. Available IPs: {list(ip_to_nodeid.keys())}")
        node_id_A = list(ip_to_nodeid.values())[0]
        node_id_B = list(ip_to_nodeid.values())[-1]

    writer = WriterActor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id_A, soft=False),
    ).remote()

    reader = ReaderActor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id_B, soft=False),
    ).remote()

    batch_size = 512
    seq_len = 32 * 1024

    partition_id = "train_step"
    ray.get(writer.generate_data.remote(batch_size, seq_len))
    cost_put, batch_meta = ray.get(writer.put_once.remote(partition_id))
    cost_get = ray.get(reader.get_once.remote(meta))

    tq.close()
    print("Test Finished.")

if __name__ == "__main__":
    main()
```

---------

Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
@0oshowero0 0oshowero0 deleted the initialize branch April 2, 2026 07:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants