Skip to content

[Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors#58821

Merged
raulchen merged 15 commits intomasterfrom
srinathk10/collate_fn_speedup
Nov 22, 2025
Merged

[Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors#58821
raulchen merged 15 commits intomasterfrom
srinathk10/collate_fn_speedup

Conversation

@srinathk10
Copy link
Contributor

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors

In arrow_batch_to_tensors, use make_async_gen to set up multiple workers to speed up processing of columns for Tensor conversion in convert_ndarray_to_torch_tensor, so DefaultCollateFn can be speedup.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 requested a review from a team as a code owner November 19, 2025 17:48
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallelization to arrow_batch_to_tensors in DefaultCollateFn to improve performance. The changes leverage make_async_gen for parallel tensor conversion, which is a good approach for this CPU-bound task. The implementation correctly handles both combine_chunks and non-combine_chunks cases. The code is well-structured. I have a few suggestions to further improve efficiency and code clarity.

@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Nov 19, 2025
srinathk10 and others added 2 commits November 19, 2025 10:16
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 19, 2025
@srinathk10
Copy link
Contributor Author

Train release tests: https://buildkite.com/ray-project/release/builds/68576

@srinathk10
Copy link
Contributor Author

Parallelization Microbenchmark

Getting around 1.8X with 4 workers.

Result

2025-11-20 05:23:58,863 INFO worker.py:2014 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/ec2-user/myworkspace/ray/python/ray/_private/worker.py:2062: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
Creating dataset with image columns...
Dataset: 256 rows, 128 image columns (224x224x3)

Warming up...
2025-11-20 05:24:12,999 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_0
2025-11-20 05:24:12,007 INFO streaming_executor.py:682 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
Running 0: 0.00 row [00:00, ? row/s]2025-11-20 05:24:12,009     WARNING resource_manager.py:136 -- ⚠️  Ray's object store is configured to use only 42.9% of available memory (18.3GiB out of 42.7GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2025-11-20 05:24:12,030 INFO streaming_executor.py:300 -- ✔️  Dataset dataset_0_0 execution finished in 0.00 seconds
✔️  Dataset dataset_0_0 execution finished in 0.00 seconds: : 256 row [00:00, 11.6k row/s]
2025-11-20 05:24:12,046 INFO util.py:257 -- Exiting prefetcher's background thread
2025-11-20 05:24:13,012 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_1
Running 0: 0.00 row [00:00, ? row/s]2025-11-20 05:24:13,031     INFO streaming_executor.py:300 -- ✔️  Dataset dataset_0_1 execution finished in 0.00 seconds
✔️  Dataset dataset_0_1 execution finished in 0.00 seconds: : 256 row [00:00, 14.8k row/s]
2025-11-20 05:24:13,036 INFO util.py:257 -- Exiting prefetcher's background thread
Warmup complete.

Testing with threading (4 workers)...
2025-11-20 05:24:14,067 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_2
Running 0: 0.00 row [00:00, ? row/s]2025-11-20 05:24:14,086     INFO streaming_executor.py:300 -- ✔️  Dataset dataset_0_2 execution finished in 0.00 seconds
✔️  Dataset dataset_0_2 execution finished in 0.00 seconds: : 256 row [00:00, 14.9k row/s]
2025-11-20 05:24:14,090 INFO util.py:257 -- Exiting prefetcher's background thread
Completed in 0.9077s

Testing without threading...
2025-11-20 05:24:14,975 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_3
Running 0: 0.00 row [00:00, ? row/s]2025-11-20 05:24:14,995     INFO streaming_executor.py:300 -- ✔️  Dataset dataset_0_3 execution finished in 0.00 seconds
✔️  Dataset dataset_0_3 execution finished in 0.00 seconds: : 256 row [00:00, 13.9k row/s]
2025-11-20 05:24:14,999 INFO util.py:257 -- Exiting prefetcher's background thread
Completed in 1.6309s

============================================================
BENCHMARK RESULTS
============================================================
Dataset: 256 rows, 128 image columns
Image size: 224x224x3
With threading (4 workers): 0.9077s
Without threading: 1.6309s
Speedup: 1.80x
✓ Threading provides 1.80x speedup
============================================================

Benchmark

#!/usr/bin/env python3
"""Standalone benchmark comparing iter_torch_batches with/without threadpool.

This benchmark uses image data to make tensor conversion more expensive,
better demonstrating the benefit of parallel processing.
"""

import time

import numpy as np
import pyarrow as pa

import ray
from ray.data.collate_fn import DefaultCollateFn


def create_image_dataset(
    num_rows: int, num_images: int, image_size: tuple = (224, 224, 3)
):
    """Create a dataset with image columns for expensive tensor conversion."""
    image_size_flat = image_size[0] * image_size[1] * image_size[2]
    # Create data as a list of dictionaries (one per row)
    rows = []
    for _ in range(num_rows):
        row = {}
        for i in range(num_images):
            # Each image is a flattened array
            row[f"image_{i}"] = np.random.randint(
                0, 255, size=image_size_flat, dtype=np.uint8
            )
        rows.append(row)

    table = pa.Table.from_pylist(rows)
    return ray.data.from_arrow([table])


def benchmark_iter_torch_batches():
    """Benchmark iter_torch_batches with and without threadpool."""
    print("Creating dataset with image columns...")
    num_rows = 256
    num_images = 128
    image_size = (224, 224, 3)
    ds = create_image_dataset(num_rows, num_images, image_size)

    img_str = f"{image_size[0]}x{image_size[1]}x{image_size[2]}"
    print(f"Dataset: {num_rows} rows, {num_images} image columns ({img_str})")
    print()

    # Warmup runs to avoid cold start effects
    print("Warming up...")
    collate_fn_warmup = DefaultCollateFn(num_workers=4)
    for _ in range(2):
        list(
            ds.iterator().iter_torch_batches(
                collate_fn=collate_fn_warmup, batch_size=1000
            )
        )
    print("Warmup complete.\n")

    # Test with threading
    print("Testing with threading (4 workers)...")
    collate_fn_with_threading = DefaultCollateFn(num_workers=4)
    start = time.perf_counter()
    batches_with_threading = []
    for batch in ds.iterator().iter_torch_batches(
        collate_fn=collate_fn_with_threading, batch_size=1000
    ):
        batches_with_threading.append(batch)
    time_with_threading = time.perf_counter() - start
    print(f"Completed in {time_with_threading:.4f}s\n")

    # Test without threading
    print("Testing without threading...")
    collate_fn_no_threading = DefaultCollateFn(num_workers=0)
    start = time.perf_counter()
    batches_no_threading = []
    for batch in ds.iterator().iter_torch_batches(
        collate_fn=collate_fn_no_threading, batch_size=1000
    ):
        batches_no_threading.append(batch)
    time_no_threading = time.perf_counter() - start
    print(f"Completed in {time_no_threading:.4f}s\n")

    # Verify results are the same
    assert len(batches_with_threading) == len(batches_no_threading)
    for b1, b2 in zip(batches_with_threading, batches_no_threading):
        for col in b1.keys():
            assert len(b1[col]) == len(b2[col])
            # Compare first tensor in each batch
            assert b1[col][0].shape == b2[col][0].shape
            assert b1[col][0].dtype == b2[col][0].dtype

    # Print benchmark results
    print("=" * 60)
    print("BENCHMARK RESULTS")
    print("=" * 60)
    print(f"Dataset: {num_rows} rows, {num_images} image columns")
    img_str = f"{image_size[0]}x{image_size[1]}x{image_size[2]}"
    print(f"Image size: {img_str}")
    print(f"With threading (4 workers): {time_with_threading:.4f}s")
    print(f"Without threading: {time_no_threading:.4f}s")
    if time_with_threading > 0:
        speedup = time_no_threading / time_with_threading
        print(f"Speedup: {speedup:.2f}x")
        if speedup > 1.0:
            print(f"✓ Threading provides {speedup:.2f}x speedup")
        else:
            print("⚠ Threading overhead exceeds parallel benefit")
    print("=" * 60)


if __name__ == "__main__":
    ray.init(num_cpus=8, ignore_reinit_error=True)
    try:
        benchmark_iter_torch_batches()
    finally:
        ray.shutdown()

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 requested a review from a team as a code owner November 20, 2025 05:28
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallel processing to DefaultCollateFn by using a ThreadPoolExecutor within arrow_batch_to_tensors. The changes look promising for improving performance. I've identified a few issues: a type inconsistency in the return value of arrow_batch_to_tensors, a bug in CustomArrowCollateFn when running on GPU, and a weakness in the new test case. Addressing these will improve the correctness and robustness of the implementation.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallel processing to arrow_batch_to_tensors using a ThreadPoolExecutor, which should speed up the DefaultCollateFn. The changes are well-structured, with parallel paths for both combine_chunks=True and combine_chunks=False scenarios. The implementation also includes a bug fix for handling None devices and adds comprehensive tests to ensure correctness.

My feedback focuses on a few areas for improvement:

  • Using threadpool.map for more concise and idiomatic code when submitting tasks.
  • Discussing the use of __del__ for thread pool cleanup, which can be unreliable, and suggesting alternatives or at least adding comments to explain the choice.

Overall, this is a solid improvement to the data loading performance.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallel processing to DefaultCollateFn by using a ThreadPoolExecutor within arrow_batch_to_tensors. This is a great enhancement for performance, especially for datasets with many columns or chunks. The implementation looks solid, and the addition of a correctness test is much appreciated.

My main feedback is regarding code duplication in the benchmark file release/train_tests/benchmark/image_classification/factory.py. The CustomArrowCollateFn duplicates a significant amount of logic from DefaultCollateFn. I've suggested a refactoring to use inheritance, which will make the code cleaner and easier to maintain. I've also included a related comment to update the imports accordingly.

Comment on lines 13 to +14
from ray.data.collate_fn import ArrowBatchCollateFn, CollateFn
from concurrent.futures import ThreadPoolExecutor
from ray.data.dataset import TorchDeviceType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Following the refactoring of CustomArrowCollateFn to inherit from DefaultCollateFn, the imports should be updated. ArrowBatchCollateFn and ThreadPoolExecutor are no longer needed, and DefaultCollateFn should be imported instead.

Suggested change
from ray.data.collate_fn import ArrowBatchCollateFn, CollateFn
from concurrent.futures import ThreadPoolExecutor
from ray.data.dataset import TorchDeviceType
from ray.data.collate_fn import CollateFn, DefaultCollateFn
from ray.data.dataset import TorchDeviceType

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

srinathk10 and others added 2 commits November 19, 2025 22:58
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

Parallelization Microbenchmark

Just with 2 columns, 4 threads look better than 0 thread(s).

Result

================================================================================
BENCHMARK RESULTS (Time in seconds)
================================================================================

+--------------+---------------+---------------+---------------+---------------+
|   num_images |   0 thread(s) |   1 thread(s) |   2 thread(s) |   4 thread(s) |
+==============+===============+===============+===============+===============+
|            1 |        0.0315 |        0.0322 |        0.0321 |        0.0327 |
+--------------+---------------+---------------+---------------+---------------+
|            2 |        0.045  |        0.0484 |        0.0401 |        0.0407 |
+--------------+---------------+---------------+---------------+---------------+
|            4 |        0.0746 |        0.0803 |        0.0615 |        0.0595 |
+--------------+---------------+---------------+---------------+---------------+
|            8 |        0.128  |        0.1329 |        0.1012 |        0.0868 |
+--------------+---------------+---------------+---------------+---------------+
|           16 |        0.236  |        0.2462 |        0.1807 |        0.1736 |
+--------------+---------------+---------------+---------------+---------------+
|           32 |        0.4512 |        0.4614 |        0.3328 |        0.307  |
+--------------+---------------+---------------+---------------+---------------+
|           64 |        0.8634 |        0.9315 |        0.6787 |        0.5406 |
+--------------+---------------+---------------+---------------+---------------+
|          128 |        1.8134 |        1.8105 |        1.337  |        1.0154 |
+--------------+---------------+---------------+---------------+---------------+

================================================================================
SPEEDUP TABLE (relative to 1 thread)
================================================================================

+--------------+---------------+---------------+---------------+---------------+
|   num_images | 0 thread(s)   | 1 thread(s)   | 2 thread(s)   | 4 thread(s)   |
+==============+===============+===============+===============+===============+
|            1 | 1.02x         | 1.00x         | 1.00x         | 0.99x         |
+--------------+---------------+---------------+---------------+---------------+
|            2 | 1.07x         | 1.00x         | 1.21x         | 1.19x         |
+--------------+---------------+---------------+---------------+---------------+
|            4 | 1.08x         | 1.00x         | 1.31x         | 1.35x         |
+--------------+---------------+---------------+---------------+---------------+
|            8 | 1.04x         | 1.00x         | 1.31x         | 1.53x         |
+--------------+---------------+---------------+---------------+---------------+
|           16 | 1.04x         | 1.00x         | 1.36x         | 1.42x         |
+--------------+---------------+---------------+---------------+---------------+
|           32 | 1.02x         | 1.00x         | 1.39x         | 1.50x         |
+--------------+---------------+---------------+---------------+---------------+
|           64 | 1.08x         | 1.00x         | 1.37x         | 1.72x         |
+--------------+---------------+---------------+---------------+---------------+
|          128 | 1.00x         | 1.00x         | 1.35x         | 1.78x         |
+--------------+---------------+---------------+---------------+---------------+
================================================================================

Benchmark

#!/usr/bin/env python3
"""Standalone benchmark comparing iter_torch_batches with/without threadpool.

This benchmark uses image data to make tensor conversion more expensive,
better demonstrating the benefit of parallel processing.
"""

import time
from collections import defaultdict

import numpy as np
import pyarrow as pa
from tabulate import tabulate

import ray
from ray.data.collate_fn import DefaultCollateFn


def create_image_dataset(
    num_rows: int, num_images: int, image_size: tuple = (224, 224, 3)
):
    """Create a dataset with image columns for expensive tensor conversion."""
    image_size_flat = image_size[0] * image_size[1] * image_size[2]
    # Create data as a list of dictionaries (one per row)
    rows = []
    for _ in range(num_rows):
        row = {}
        for i in range(num_images):
            # Each image is a flattened array
            row[f"image_{i}"] = np.random.randint(
                0, 255, size=image_size_flat, dtype=np.uint8
            )
        rows.append(row)

    table = pa.Table.from_pylist(rows)
    return ray.data.from_arrow([table])


def benchmark_iter_torch_batches():
    """Benchmark iter_torch_batches with different thread counts."""
    num_rows = 256
    image_size = (224, 224, 3)
    num_threads_list = [0, 1, 2, 4]
    num_images_list = [1, 2, 4, 8, 16, 32, 64, 128]

    # Store results: results[num_threads][num_images] = time
    results = defaultdict(dict)

    print("=" * 80)
    print("BENCHMARK: iter_torch_batches with varying threads and num_images")
    print("=" * 80)
    img_size_str = f"{image_size[0]}x{image_size[1]}x{image_size[2]}"
    print(f"Dataset: {num_rows} rows, image size: {img_size_str}")
    print(f"Testing threads: {num_threads_list}")
    print(f"Testing num_images: {num_images_list}")
    print("=" * 80)
    print()

    for num_images in num_images_list:
        print(f"Creating dataset with {num_images} image columns...")
        ds = create_image_dataset(num_rows, num_images, image_size)

        # Warmup runs to avoid cold start effects
        print("  Warming up...")
        collate_fn_warmup = DefaultCollateFn(num_workers=4)
        for _ in range(2):
            list(
                ds.iterator().iter_torch_batches(
                    collate_fn=collate_fn_warmup, batch_size=1000
                )
            )

        for num_threads in num_threads_list:
            msg = (
                f"  Testing with {num_threads} thread(s), "
                f"{num_images} image(s)..."
            )
            print(msg, end=" ")
            collate_fn = DefaultCollateFn(num_workers=num_threads)
            start = time.perf_counter()
            batches = []
            for batch in ds.iterator().iter_torch_batches(
                collate_fn=collate_fn, batch_size=1000
            ):
                batches.append(batch)
            elapsed_time = time.perf_counter() - start
            results[num_threads][num_images] = elapsed_time
            print(f"{elapsed_time:.4f}s")

        print()

    # Print tabulated results
    print("=" * 80)
    print("BENCHMARK RESULTS (Time in seconds)")
    print("=" * 80)
    print()

    # Build table data for timing results
    headers = ["num_images"] + [f"{t} thread(s)" for t in num_threads_list]
    table_data = []
    for num_images in num_images_list:
        row = [num_images]
        for num_threads in num_threads_list:
            time_val = results[num_threads][num_images]
            row.append(f"{time_val:.4f}")
        table_data.append(row)

    print(tabulate(table_data, headers=headers, tablefmt="grid"))
    print()

    # Print speedup table
    print("=" * 80)
    print("SPEEDUP TABLE (relative to 1 thread)")
    print("=" * 80)
    print()

    # Build table data for speedup results
    speedup_table_data = []
    for num_images in num_images_list:
        row = [num_images]
        baseline = results[1][num_images]
        for num_threads in num_threads_list:
            time_val = results[num_threads][num_images]
            if baseline > 0:
                speedup = baseline / time_val
                row.append(f"{speedup:.2f}x")
            else:
                row.append("N/A")
        speedup_table_data.append(row)

    print(tabulate(speedup_table_data, headers=headers, tablefmt="grid"))
    print("=" * 80)


if __name__ == "__main__":
    ray.init(num_cpus=8, ignore_reinit_error=True)
    try:
        benchmark_iter_torch_batches()
    finally:
        ray.shutdown()

@raulchen raulchen merged commit b9f3d1a into master Nov 22, 2025
6 checks passed
@raulchen raulchen deleted the srinathk10/collate_fn_speedup branch November 22, 2025 00:45
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…ject#58821)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors

In `arrow_batch_to_tensors`, use `make_async_gen` to set up multiple
workers to speed up processing of columns for Tensor conversion in
`convert_ndarray_to_torch_tensor`, so `DefaultCollateFn` can be speedup.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…ject#58821)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors

In `arrow_batch_to_tensors`, use `make_async_gen` to set up multiple
workers to speed up processing of columns for Tensor conversion in
`convert_ndarray_to_torch_tensor`, so `DefaultCollateFn` can be speedup.


## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ject#58821)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Parallelize DefaultCollateFn - arrow_batch_to_tensors

In `arrow_batch_to_tensors`, use `make_async_gen` to set up multiple
workers to speed up processing of columns for Tensor conversion in
`convert_ndarray_to_torch_tensor`, so `DefaultCollateFn` can be speedup.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

3 participants