[StreamingDataLoader, 4/N] feat: Introduce sample pre-allocation for dynamic streaming#16
Conversation
…mingDataLoader Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Pull request overview
Adds support for “fully streamed” producer/consumer workflows by pre-allocating sample indices in TransferQueue, so consumers don’t terminate early when the queue is temporarily empty.
Changes:
- Introduces
TQ_PRE_ALLOC_SAMPLE_NUMand pre-allocates global indexes per partition to support streamed production/consumption. - Updates consumption/production status APIs to include pre-allocated (not-yet-produced) indexes.
- Updates the streaming tutorial and adds tests covering pre-allocation behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| tutorial/05_streaming_dataloader.py | Adds guidance to set TQ_PRE_ALLOC_SAMPLE_NUM for fully streamed usage. |
| transfer_queue/dataloader/streaming_dataset.py | Replaces a TODO with user guidance about TQ_PRE_ALLOC_SAMPLE_NUM. |
| transfer_queue/controller.py | Implements pre-allocation, status reporting changes, and env var deprecations. |
| tests/test_controller_data_partitions.py | Adds unit tests validating pre-allocated index behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Pre-allocate global indexes for consumer consumption tracking | ||
| global_indexes = self.index_manager.allocate_indexes(partition_id, count=TQ_PRE_ALLOC_SAMPLE_NUM) | ||
| self.partitions[partition_id].register_pre_allocated_indexes(global_indexes) |
There was a problem hiding this comment.
Pre-allocation expands tensors to max(pre_allocated_indexes) + 1. Since PartitionIndexManager.allocate_indexes() uses a single monotonically increasing global_index_counter across partitions, creating multiple partitions will make later partitions pre-allocate higher index ranges (e.g., partition2 gets [N..2N-1]) and therefore allocate ~2N rows even though only N samples were pre-allocated. This scales memory with the global index value rather than per-partition sample count. Consider using per-partition local row indices (with a global<->local mapping) or changing the index allocation scheme so each partition’s indices start from 0 (or a compact range) to keep tensors sized to partition cardinality.
| # Pre-allocate global indexes for consumer consumption tracking | |
| global_indexes = self.index_manager.allocate_indexes(partition_id, count=TQ_PRE_ALLOC_SAMPLE_NUM) | |
| self.partitions[partition_id].register_pre_allocated_indexes(global_indexes) | |
| # Pre-allocate compact, partition-local indexes for consumer consumption tracking. | |
| # Using local indexes (0..TQ_PRE_ALLOC_SAMPLE_NUM-1) avoids tensor sizes scaling with | |
| # any global index counter shared across partitions. | |
| pre_alloc_indexes = list(range(TQ_PRE_ALLOC_SAMPLE_NUM)) | |
| self.partitions[partition_id].register_pre_allocated_indexes(pre_alloc_indexes) |
There was a problem hiding this comment.
This is for simplifying the addressing using global_index
|
Great work! 👏 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
transfer_queue/controller.py:335
ensure_samples_capacity()now expands the tensors by exactlyexpansion_neededevery time. For workloads that append samples incrementally (common in streaming), this causes repeated realloc+copy and can devolve into O(n^2) behavior. Consider restoring a minimum growth factor (e.g., geometric growth / doubling or a configurable minimum expansion) to reduce reallocations.
current_sample_space = self.allocated_samples_num
if required_samples > current_sample_space:
# Expand rows
expansion_needed = required_samples - current_sample_space
new_samples = current_sample_space + expansion_needed
new_fields = self.production_status.shape[1]
expanded_tensor = torch.zeros(new_samples, new_fields, dtype=torch.int8)
expanded_tensor[:current_sample_space, :] = self.production_status
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "\nIn real-world usage, please export the environment variable of TQ_PRE_ALLOC_SAMPLE_NUM to " | ||
| "global_batch_size to make sure consumers can accurately determine consumption status even before " | ||
| "producers have generated the samples." |
There was a problem hiding this comment.
This user-facing message is a bit unclear/grammatically awkward (“export the environment variable of … to …”). Consider rephrasing to an explicit command (e.g., export TQ_PRE_ALLOC_SAMPLE_NUM=<global_batch_size>) or otherwise clarify that the env var should be set equal to global_batch_size.
| "\nIn real-world usage, please export the environment variable of TQ_PRE_ALLOC_SAMPLE_NUM to " | |
| "global_batch_size to make sure consumers can accurately determine consumption status even before " | |
| "producers have generated the samples." | |
| "\nIn real-world usage, set the environment variable TQ_PRE_ALLOC_SAMPLE_NUM equal to your " | |
| "global batch size so consumers can accurately determine consumption status even before " | |
| "producers have generated the samples, for example:\n" | |
| " export TQ_PRE_ALLOC_SAMPLE_NUM=<global_batch_size>" |
| # Sample pre-allocation for StreamingDataLoader compatibility. | ||
| # By pre-allocating sample indices (typically global_batch_size), consumers can accurately | ||
| # determine consumption status even before producers have generated the samples. | ||
| TQ_PRE_ALLOC_SAMPLE_NUM = int(os.environ.get("TQ_PRE_ALLOC_SAMPLE_NUM", 1)) |
There was a problem hiding this comment.
TQ_PRE_ALLOC_SAMPLE_NUM is defaulted to 1 and create_partition() always calls allocate_indexes(..., count=TQ_PRE_ALLOC_SAMPLE_NUM). This makes pre-allocation effectively mandatory and also prevents setting the value to 0 (since allocate_indexes raises for count<=0), which is inconsistent with the PR description implying this is an opt-in behavior. Consider defaulting to 0 and skipping pre-allocation when the value is 0, or otherwise document/justify why every partition must always reserve at least one index.
| assert partition is not None | ||
| batch_global_indexes = partition.activate_pre_allocated_indexes(batch_size) | ||
|
|
There was a problem hiding this comment.
Avoid using assert for runtime validation in production code (assert can be optimized away with python -O). If partition being None is not expected, raise a clear exception instead so the failure mode is deterministic.
Background
In PR #9, we introduced initial support for the
StreamingDataLoaderinterface. Currently, the system assumes prompts are pre-loaded into the TransferQueue. However, a critical use case involves generation workers put both prompts and responses intoTransferQueueon the run (e.g.,rollout_buffermechanism in Slime).Since TransferQueue supports dynamic expansion, if the producer has not yet pushed any data to the TransferQueue, the TransferQueue appears empty. Consequently, the consumer's
check_consumption_statusAPI incorrectly assumes no data is available and prematurely terminates the data retrieval iteration.Solution
This PR introduces a new environment variable,
TQ_PRE_ALLOC_SAMPLE_NUM, to handle sample pre-allocation in TransferQueue.global_batch_size), the controller pre-allocates a fixed number of global indexes before data production begins.check_consumption_statusAPI now accounts for these pre-allocated slots. This ensures theStreamingDataLoaderwaits for the pending data instead of exiting immediately when the TransferQueue is temporarily empty.Other Changes
Deprecate
TQ_INIT_SAMPLE_NUM,TQ_INIT_FIELD_NUM,TQ_SAMPLE_MIN_EXPANSION_SIZEandTQ_SAMPLE_MIN_EXPANSION_SIZEfor simplicity.CC: @NINGBENZHE