Skip to content

[Data] Add sql_params support to read_sql#60030

Merged
bveeramani merged 2 commits intoray-project:masterfrom
myandpr:read-sql-para
Jan 12, 2026
Merged

[Data] Add sql_params support to read_sql#60030
bveeramani merged 2 commits intoray-project:masterfrom
myandpr:read-sql-para

Conversation

@myandpr
Copy link
Member

@myandpr myandpr commented Jan 10, 2026

Description

Add sql_params support to read_sql so callers can pass DB‑API 2 parameter bindings instead of string formatting. This enables safe, parameterized queries and is propagated through all SQL execution paths (count, sharding checks, and reads). Also adds a sqlite parameterized query test and updates docstring.

Related issues

Fixes #54098.

Additional information

Design/implementation notes:

  • API: add optional sql_params to read_sql, matching DB‑API 2 cursor.execute(operation[, parameters]).
  • Call chain:
    read_sql(...)
    → SQLDatasource(sql_params=...)
    → get_read_tasks(...)
    → supports_sharding/_get_num_rows/fallback read/per‑shard read
    → _execute(cursor, sql, sql_params).
  • No paramstyle parsing: Ray doesn’t interpret placeholders; it passes sql_params through to the driver as‑is.
  • Behavior: if sql_params is None, _execute falls back to cursor.execute(sql), preserving existing behavior.

Tests:

  • pytest python/ray/data/tests/test_sql.py

  • Local quick check (example):

Python 3.10.19 | packaged by conda-forge | (main, Oct 22 2025, 22:46:49) [Clang 19.1.7 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> import ray
>>> 
>>> db = "example.db"
>>> conn = sqlite3.connect(db)
>>> conn.execute("DROP TABLE IF EXISTS movie")
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.execute("CREATE TABLE movie(title, year, score)")
<sqlite3.Cursor object at 0x1055c7040>
>>> conn.executemany(
...     "INSERT INTO movie VALUES (?, ?, ?)",
...     [
...         ("Monty Python and the Holy Grail", 1975, 8.2),
...         ("And Now for Something Completely Different", 1971, 7.5),
...         ("Monty Python's Life of Brian", 1979, 8.0),
...     ],
... )
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.commit()
>>> conn.close()
>>> 
>>> def create_connection():
...     return sqlite3.connect(db)
... 
>>> # tuple 
>>> ds_tuple = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=(1975,),
... )
2026-01-11 00:26:54,103 INFO worker.py:2007 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/Users/XXX/miniforge3/envs/clion-ray-ce/lib/python3.10/site-packages/ray/_private/worker.py:2055: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2026-01-11 00:26:54,700 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("tuple:", ds_tuple.take_all())
2026-01-11 00:26:56,226 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_0
2026-01-11 00:26:56,240 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,241 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,242 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:26:56,242 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:26:56,242 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,246 WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 25.2% of available memory (2.0GiB out of 7.9GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-01-11 00:26:56,246 INFO streaming_executor.py:661 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
Running Dataset dataset_0_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:26:56,2672WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_0_0 execution finished in 0.46 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
2026-01-11 00:26:56,702 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_0_0 execution finished in 0.46 seconds                                                   
tuple: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # list 
>>> ds_list = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=[1975],
... )
2026-01-11 00:27:07,304 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("list:", ds_list.take_all())
2026-01-11 00:27:08,867 INFO logging.py:397 -- Registered dataset logger for dataset dataset_1_0
2026-01-11 00:27:08,871 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,872 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,873 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:08,873 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:08,874 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_1_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:08,8812WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_1_0 execution finished in 0.06 seconds: : 2.00 row [00:00, 38.9 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store: 100%|███████████| 2.00/2.00 [00:00<00:00, 37.6 row/s]
2026-01-11 00:27:08,932 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_1_0 execution finished in 0.06 seconds                                                   
list: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # dict 
>>> ds_dict = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= :year",
...     create_connection,
...     sql_params={"year": 1975},
... )
2026-01-11 00:27:19,155 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("dict:", ds_dict.take_all())
2026-01-11 00:27:19,807 INFO logging.py:397 -- Registered dataset logger for dataset dataset_2_0
2026-01-11 00:27:19,811 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,812 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,813 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_2_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:19,813 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_2_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:19,814 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_2_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:19,8212WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_2_0 execution finished in 0.04 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 51.6 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 49.0 row/s]
2026-01-11 00:27:19,859 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_2_0 execution finished in 0.04 seconds                                                   
dict: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> 
>>> 

Signed-off-by: yaommen <myanstu@163.com>
@myandpr myandpr requested a review from a team as a code owner January 10, 2026 16:56
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for sql_params in ray.data.read_sql, which is a significant improvement for writing safe, parameterized SQL queries. The changes are well-implemented, propagating the parameters through all necessary SQL execution paths. The addition of a test case is good, but it could be expanded to cover more parameter types as supported by the DB-API 2 specification.

@ray-gardener ray-gardener bot added docs An issue or change related to documentation data Ray Data-related issues community-contribution Contributed by the community labels Jan 10, 2026
Signed-off-by: yaommen <myanstu@163.com>
@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Jan 11, 2026
Copy link
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. ty

@bveeramani bveeramani merged commit 67bfeef into ray-project:master Jan 12, 2026
7 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
## Description

Add sql_params support to read_sql so callers can pass [DB‑API
2](https://peps.python.org/pep-0249/#id20) parameter bindings instead of
string formatting. This enables safe, parameterized queries and is
propagated through all SQL execution paths (count, sharding checks, and
reads). Also adds a sqlite parameterized query test and updates
docstring.

## Related issues

Related to ray-project#54098.

## Additional information

Design/implementation notes:

- API: add optional sql_params to read_sql, matching DB‑API 2
cursor.execute(operation[, parameters]).
- Call chain:
   read_sql(...)
   → SQLDatasource(sql_params=...)
   → get_read_tasks(...)
   → supports_sharding/_get_num_rows/fallback read/per‑shard read
   → _execute(cursor, sql, sql_params).
- No paramstyle parsing: Ray doesn’t interpret placeholders; it passes
sql_params through to the driver as‑is.
- Behavior: if sql_params is None, _execute falls back to
cursor.execute(sql), preserving existing behavior.

Tests:

- pytest python/ray/data/tests/test_sql.py

- Local quick check (example):

```python
Python 3.10.19 | packaged by conda-forge | (main, Oct 22 2025, 22:46:49) [Clang 19.1.7 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> import ray
>>>
>>> db = "example.db"
>>> conn = sqlite3.connect(db)
>>> conn.execute("DROP TABLE IF EXISTS movie")
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.execute("CREATE TABLE movie(title, year, score)")
<sqlite3.Cursor object at 0x1055c7040>
>>> conn.executemany(
...     "INSERT INTO movie VALUES (?, ?, ?)",
...     [
...         ("Monty Python and the Holy Grail", 1975, 8.2),
...         ("And Now for Something Completely Different", 1971, 7.5),
...         ("Monty Python's Life of Brian", 1979, 8.0),
...     ],
... )
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.commit()
>>> conn.close()
>>>
>>> def create_connection():
...     return sqlite3.connect(db)
...
>>> # tuple
>>> ds_tuple = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=(1975,),
... )
2026-01-11 00:26:54,103 INFO worker.py:2007 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/Users/XXX/miniforge3/envs/clion-ray-ce/lib/python3.10/site-packages/ray/_private/worker.py:2055: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2026-01-11 00:26:54,700 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("tuple:", ds_tuple.take_all())
2026-01-11 00:26:56,226 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_0
2026-01-11 00:26:56,240 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,241 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,242 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:26:56,242 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:26:56,242 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,246 WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 25.2% of available memory (2.0GiB out of 7.9GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-01-11 00:26:56,246 INFO streaming_executor.py:661 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
Running Dataset dataset_0_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:26:56,2672WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_0_0 execution finished in 0.46 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
2026-01-11 00:26:56,702 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_0_0 execution finished in 0.46 seconds
tuple: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # list
>>> ds_list = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=[1975],
... )
2026-01-11 00:27:07,304 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("list:", ds_list.take_all())
2026-01-11 00:27:08,867 INFO logging.py:397 -- Registered dataset logger for dataset dataset_1_0
2026-01-11 00:27:08,871 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,872 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,873 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:08,873 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:08,874 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_1_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:08,8812WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_1_0 execution finished in 0.06 seconds: : 2.00 row [00:00, 38.9 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store: 100%|███████████| 2.00/2.00 [00:00<00:00, 37.6 row/s]
2026-01-11 00:27:08,932 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_1_0 execution finished in 0.06 seconds
list: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # dict
>>> ds_dict = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= :year",
...     create_connection,
...     sql_params={"year": 1975},
... )
2026-01-11 00:27:19,155 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("dict:", ds_dict.take_all())
2026-01-11 00:27:19,807 INFO logging.py:397 -- Registered dataset logger for dataset dataset_2_0
2026-01-11 00:27:19,811 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,812 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,813 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_2_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:19,813 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_2_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:19,814 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_2_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:19,8212WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_2_0 execution finished in 0.04 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 51.6 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 49.0 row/s]
2026-01-11 00:27:19,859 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_2_0 execution finished in 0.04 seconds
dict: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>>
>>>
```

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description

Add sql_params support to read_sql so callers can pass [DB‑API
2](https://peps.python.org/pep-0249/#id20) parameter bindings instead of
string formatting. This enables safe, parameterized queries and is
propagated through all SQL execution paths (count, sharding checks, and
reads). Also adds a sqlite parameterized query test and updates
docstring.

## Related issues

Related to ray-project#54098.

## Additional information

Design/implementation notes:

- API: add optional sql_params to read_sql, matching DB‑API 2
cursor.execute(operation[, parameters]).
- Call chain: 
   read_sql(...) 
   → SQLDatasource(sql_params=...) 
   → get_read_tasks(...)
   → supports_sharding/_get_num_rows/fallback read/per‑shard read 
   → _execute(cursor, sql, sql_params).
- No paramstyle parsing: Ray doesn’t interpret placeholders; it passes
sql_params through to the driver as‑is.
- Behavior: if sql_params is None, _execute falls back to
cursor.execute(sql), preserving existing behavior.

Tests:

- pytest python/ray/data/tests/test_sql.py

- Local quick check (example):

```python
Python 3.10.19 | packaged by conda-forge | (main, Oct 22 2025, 22:46:49) [Clang 19.1.7 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> import ray
>>> 
>>> db = "example.db"
>>> conn = sqlite3.connect(db)
>>> conn.execute("DROP TABLE IF EXISTS movie")
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.execute("CREATE TABLE movie(title, year, score)")
<sqlite3.Cursor object at 0x1055c7040>
>>> conn.executemany(
...     "INSERT INTO movie VALUES (?, ?, ?)",
...     [
...         ("Monty Python and the Holy Grail", 1975, 8.2),
...         ("And Now for Something Completely Different", 1971, 7.5),
...         ("Monty Python's Life of Brian", 1979, 8.0),
...     ],
... )
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.commit()
>>> conn.close()
>>> 
>>> def create_connection():
...     return sqlite3.connect(db)
... 
>>> # tuple 
>>> ds_tuple = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=(1975,),
... )
2026-01-11 00:26:54,103 INFO worker.py:2007 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
/Users/XXX/miniforge3/envs/clion-ray-ce/lib/python3.10/site-packages/ray/_private/worker.py:2055: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2026-01-11 00:26:54,700 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("tuple:", ds_tuple.take_all())
2026-01-11 00:26:56,226 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_0
2026-01-11 00:26:56,240 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,241 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,242 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:26:56,242 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:26:56,242 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,246 WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 25.2% of available memory (2.0GiB out of 7.9GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-01-11 00:26:56,246 INFO streaming_executor.py:661 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
Running Dataset dataset_0_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:26:56,2672WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_0_0 execution finished in 0.46 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
2026-01-11 00:26:56,702 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_0_0 execution finished in 0.46 seconds                                                   
tuple: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # list 
>>> ds_list = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=[1975],
... )
2026-01-11 00:27:07,304 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("list:", ds_list.take_all())
2026-01-11 00:27:08,867 INFO logging.py:397 -- Registered dataset logger for dataset dataset_1_0
2026-01-11 00:27:08,871 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,872 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,873 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:08,873 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:08,874 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_1_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:08,8812WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_1_0 execution finished in 0.06 seconds: : 2.00 row [00:00, 38.9 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store: 100%|███████████| 2.00/2.00 [00:00<00:00, 37.6 row/s]
2026-01-11 00:27:08,932 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_1_0 execution finished in 0.06 seconds                                                   
list: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # dict 
>>> ds_dict = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= :year",
...     create_connection,
...     sql_params={"year": 1975},
... )
2026-01-11 00:27:19,155 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("dict:", ds_dict.take_all())
2026-01-11 00:27:19,807 INFO logging.py:397 -- Registered dataset logger for dataset dataset_2_0
2026-01-11 00:27:19,811 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,812 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,813 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_2_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:19,813 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_2_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:19,814 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_2_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:19,8212WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_2_0 execution finished in 0.04 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 51.6 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 49.0 row/s]
2026-01-11 00:27:19,859 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_2_0 execution finished in 0.04 seconds                                                   
dict: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> 
>>> 
```

---------

Signed-off-by: yaommen <myanstu@163.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description

Add sql_params support to read_sql so callers can pass [DB‑API
2](https://peps.python.org/pep-0249/#id20) parameter bindings instead of
string formatting. This enables safe, parameterized queries and is
propagated through all SQL execution paths (count, sharding checks, and
reads). Also adds a sqlite parameterized query test and updates
docstring.

## Related issues

Related to ray-project#54098.

## Additional information

Design/implementation notes:

- API: add optional sql_params to read_sql, matching DB‑API 2
cursor.execute(operation[, parameters]).
- Call chain:
   read_sql(...)
   → SQLDatasource(sql_params=...)
   → get_read_tasks(...)
   → supports_sharding/_get_num_rows/fallback read/per‑shard read
   → _execute(cursor, sql, sql_params).
- No paramstyle parsing: Ray doesn’t interpret placeholders; it passes
sql_params through to the driver as‑is.
- Behavior: if sql_params is None, _execute falls back to
cursor.execute(sql), preserving existing behavior.

Tests:

- pytest python/ray/data/tests/test_sql.py

- Local quick check (example):

```python
Python 3.10.19 | packaged by conda-forge | (main, Oct 22 2025, 22:46:49) [Clang 19.1.7 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> import ray
>>>
>>> db = "example.db"
>>> conn = sqlite3.connect(db)
>>> conn.execute("DROP TABLE IF EXISTS movie")
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.execute("CREATE TABLE movie(title, year, score)")
<sqlite3.Cursor object at 0x1055c7040>
>>> conn.executemany(
...     "INSERT INTO movie VALUES (?, ?, ?)",
...     [
...         ("Monty Python and the Holy Grail", 1975, 8.2),
...         ("And Now for Something Completely Different", 1971, 7.5),
...         ("Monty Python's Life of Brian", 1979, 8.0),
...     ],
... )
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.commit()
>>> conn.close()
>>>
>>> def create_connection():
...     return sqlite3.connect(db)
...
>>> # tuple
>>> ds_tuple = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=(1975,),
... )
2026-01-11 00:26:54,103 INFO worker.py:2007 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/Users/XXX/miniforge3/envs/clion-ray-ce/lib/python3.10/site-packages/ray/_private/worker.py:2055: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2026-01-11 00:26:54,700 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("tuple:", ds_tuple.take_all())
2026-01-11 00:26:56,226 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_0
2026-01-11 00:26:56,240 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,241 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,242 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:26:56,242 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:26:56,242 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,246 WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 25.2% of available memory (2.0GiB out of 7.9GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-01-11 00:26:56,246 INFO streaming_executor.py:661 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
Running Dataset dataset_0_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:26:56,2672WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_0_0 execution finished in 0.46 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
2026-01-11 00:26:56,702 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_0_0 execution finished in 0.46 seconds
tuple: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # list
>>> ds_list = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=[1975],
... )
2026-01-11 00:27:07,304 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("list:", ds_list.take_all())
2026-01-11 00:27:08,867 INFO logging.py:397 -- Registered dataset logger for dataset dataset_1_0
2026-01-11 00:27:08,871 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,872 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,873 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:08,873 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:08,874 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_1_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:08,8812WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_1_0 execution finished in 0.06 seconds: : 2.00 row [00:00, 38.9 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store: 100%|███████████| 2.00/2.00 [00:00<00:00, 37.6 row/s]
2026-01-11 00:27:08,932 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_1_0 execution finished in 0.06 seconds
list: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # dict
>>> ds_dict = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= :year",
...     create_connection,
...     sql_params={"year": 1975},
... )
2026-01-11 00:27:19,155 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("dict:", ds_dict.take_all())
2026-01-11 00:27:19,807 INFO logging.py:397 -- Registered dataset logger for dataset dataset_2_0
2026-01-11 00:27:19,811 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,812 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,813 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_2_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:19,813 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_2_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:19,814 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_2_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:19,8212WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_2_0 execution finished in 0.04 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 51.6 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 49.0 row/s]
2026-01-11 00:27:19,859 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_2_0 execution finished in 0.04 seconds
dict: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>>
>>>
```

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description

Add sql_params support to read_sql so callers can pass [DB‑API
2](https://peps.python.org/pep-0249/#id20) parameter bindings instead of
string formatting. This enables safe, parameterized queries and is
propagated through all SQL execution paths (count, sharding checks, and
reads). Also adds a sqlite parameterized query test and updates
docstring.

## Related issues

Related to ray-project#54098.

## Additional information

Design/implementation notes:

- API: add optional sql_params to read_sql, matching DB‑API 2
cursor.execute(operation[, parameters]).
- Call chain:
   read_sql(...)
   → SQLDatasource(sql_params=...)
   → get_read_tasks(...)
   → supports_sharding/_get_num_rows/fallback read/per‑shard read
   → _execute(cursor, sql, sql_params).
- No paramstyle parsing: Ray doesn’t interpret placeholders; it passes
sql_params through to the driver as‑is.
- Behavior: if sql_params is None, _execute falls back to
cursor.execute(sql), preserving existing behavior.

Tests:

- pytest python/ray/data/tests/test_sql.py

- Local quick check (example):

```python
Python 3.10.19 | packaged by conda-forge | (main, Oct 22 2025, 22:46:49) [Clang 19.1.7 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> import ray
>>>
>>> db = "example.db"
>>> conn = sqlite3.connect(db)
>>> conn.execute("DROP TABLE IF EXISTS movie")
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.execute("CREATE TABLE movie(title, year, score)")
<sqlite3.Cursor object at 0x1055c7040>
>>> conn.executemany(
...     "INSERT INTO movie VALUES (?, ?, ?)",
...     [
...         ("Monty Python and the Holy Grail", 1975, 8.2),
...         ("And Now for Something Completely Different", 1971, 7.5),
...         ("Monty Python's Life of Brian", 1979, 8.0),
...     ],
... )
<sqlite3.Cursor object at 0x1035af6c0>
>>> conn.commit()
>>> conn.close()
>>>
>>> def create_connection():
...     return sqlite3.connect(db)
...
>>> # tuple
>>> ds_tuple = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=(1975,),
... )
2026-01-11 00:26:54,103 INFO worker.py:2007 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/Users/XXX/miniforge3/envs/clion-ray-ce/lib/python3.10/site-packages/ray/_private/worker.py:2055: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2026-01-11 00:26:54,700 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("tuple:", ds_tuple.take_all())
2026-01-11 00:26:56,226 INFO logging.py:397 -- Registered dataset logger for dataset dataset_0_0
2026-01-11 00:26:56,240 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,241 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,242 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:26:56,242 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:26:56,242 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:26:56,246 WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 25.2% of available memory (2.0GiB out of 7.9GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-01-11 00:26:56,246 INFO streaming_executor.py:661 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
Running Dataset dataset_0_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:26:56,2672WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_0_0 execution finished in 0.46 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 4.48 row/s]
2026-01-11 00:26:56,702 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_0_0 execution finished in 0.46 seconds
tuple: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # list
>>> ds_list = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= ?",
...     create_connection,
...     sql_params=[1975],
... )
2026-01-11 00:27:07,304 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("list:", ds_list.take_all())
2026-01-11 00:27:08,867 INFO logging.py:397 -- Registered dataset logger for dataset dataset_1_0
2026-01-11 00:27:08,871 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,872 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:08,873 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:08,873 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:08,874 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_1_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:08,8812WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_1_0 execution finished in 0.06 seconds: : 2.00 row [00:00, 38.9 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store: 100%|███████████| 2.00/2.00 [00:00<00:00, 37.6 row/s]
2026-01-11 00:27:08,932 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_1_0 execution finished in 0.06 seconds
list: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>> # dict
>>> ds_dict = ray.data.read_sql(
...     "SELECT * FROM movie WHERE year >= :year",
...     create_connection,
...     sql_params={"year": 1975},
... )
2026-01-11 00:27:19,155 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
>>> print("dict:", ds_dict.take_all())
2026-01-11 00:27:19,807 INFO logging.py:397 -- Registered dataset logger for dataset dataset_2_0
2026-01-11 00:27:19,811 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,812 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
2026-01-11 00:27:19,813 INFO streaming_executor.py:182 -- Starting execution of Dataset dataset_2_0. Full logs are in /tmp/ray/session_2026-01-11_00-26-50_843083_56953/logs/ray-data
2026-01-11 00:27:19,813 INFO streaming_executor.py:183 -- Execution plan of Dataset dataset_2_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadSQL]
2026-01-11 00:27:19,814 INFO sql_datasource.py:153 -- Sharding is not supported. Falling back to reading all data in a single task.
Running Dataset dataset_2_0.: 0.00 row [00:00, ? row/s]
2026-01-11 00:27:19,8212WARNING resource_manager.py:791 -- Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadSQL]. The job may hang forever unless the cluster scales up.
✔️  Dataset dataset_2_0 execution finished in 0.04 seconds: 100%|█████████████████████████████████████████████████████████████████| 2.00/2.00 [00:00<00:00, 51.6 row/s]
- ReadSQL->SplitBlocks(200): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 99.0B object store: 100%|██████████| 2.00/2.00 [00:00<00:00, 49.0 row/s]
2026-01-11 00:27:19,859 INFO streaming_executor.py:302 -- ✔️  Dataset dataset_2_0 execution finished in 0.04 seconds
dict: [{'title': 'Monty Python and the Holy Grail', 'year': 1975, 'score': 8.2}, {'title': "Monty Python's Life of Brian", 'year': 1979, 'score': 8.0}]
>>>
>>>
```

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues docs An issue or change related to documentation go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Allow parameterized queries in read_sql

3 participants