From a0c3d3d5a6ea2c762748ee08e1f81f8ae30a786e Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:01:11 +0000 Subject: [PATCH 1/4] fix: Handle `pa.RecordBatchReader` in `SparkLikeLazyFrame.collect` Related - https://github.com/duckdb/duckdb/pull/18642 - https://github.com/narwhals-dev/narwhals/actions/runs/17096107509/job/48483947375 - https://discord.com/channels/1235257048170762310/1272835922924273694/1407676349925036052 --- narwhals/_spark_like/dataframe.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index 79c409100f..b70d10a50f 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -164,9 +164,9 @@ def _to_arrow_schema(self) -> pa.Schema: # pragma: no cover return pa.schema(schema) def _collect_to_arrow(self) -> pa.Table: - if self._implementation.is_pyspark() and self._backend_version < (4,): - import pyarrow as pa # ignore-banned-import + import pyarrow as pa # ignore-banned-import + if self._implementation.is_pyspark() and self._backend_version < (4,): try: return pa.Table.from_batches(self.native._collect_as_arrow()) except ValueError as exc: @@ -178,12 +178,17 @@ def _collect_to_arrow(self) -> pa.Table: return pa.Table.from_pydict(data, schema=pa_schema) raise # pragma: no cover elif self._implementation.is_pyspark_connect() and self._backend_version < (4,): - import pyarrow as pa # ignore-banned-import - pa_schema = self._to_arrow_schema() return pa.Table.from_pandas(self.native.toPandas(), schema=pa_schema) else: - return self.native.toArrow() + # NOTE: Returns `pa.RecordBatchReader` since https://github.com/duckdb/duckdb/pull/18642 + to_arrow: Incomplete = self.native.toArrow + pa_native: pa.Table | pa.RecordBatchReader = to_arrow() + return ( + pa_native + if isinstance(pa_native, pa.Table) + else pa.Table.from_batches(pa_native) + ) def _iter_columns(self) -> Iterator[Column]: for col in self.columns: From e6b21ca6e5f24f7db343ebf65b68497a33e850d6 Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:21:29 +0000 Subject: [PATCH 2/4] fix: handle empty batches for `sqlframe` Fixes 2 tests https://github.com/narwhals-dev/narwhals/actions/runs/17097741666/job/48486468595?pr=3015 --- narwhals/_spark_like/dataframe.py | 44 ++++++++++++++++--------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index b70d10a50f..b1dea526c8 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -27,7 +27,7 @@ from narwhals.exceptions import InvalidOperationError if TYPE_CHECKING: - from collections.abc import Iterator, Mapping, Sequence + from collections.abc import Iterable, Iterator, Mapping, Sequence from io import BytesIO from pathlib import Path from types import ModuleType @@ -163,32 +163,34 @@ def _to_arrow_schema(self) -> pa.Schema: # pragma: no cover schema.append((key, native_dtype)) return pa.schema(schema) + def _to_arrow_from_batches(self, batches: Iterable[pa.RecordBatch]) -> pa.Table: + import pyarrow as pa # ignore-banned-import + + try: + return pa.Table.from_batches(batches) + except ValueError as exc: + if "at least one RecordBatch" in str(exc): + # Empty dataframe + data: dict[str, list[Any]] = {k: [] for k in self.columns} + return pa.Table.from_pydict(data, schema=self._to_arrow_schema()) + raise # pragma: no cover + def _collect_to_arrow(self) -> pa.Table: import pyarrow as pa # ignore-banned-import if self._implementation.is_pyspark() and self._backend_version < (4,): - try: - return pa.Table.from_batches(self.native._collect_as_arrow()) - except ValueError as exc: - if "at least one RecordBatch" in str(exc): - # Empty dataframe - - data: dict[str, list[Any]] = {k: [] for k in self.columns} - pa_schema = self._to_arrow_schema() - return pa.Table.from_pydict(data, schema=pa_schema) - raise # pragma: no cover - elif self._implementation.is_pyspark_connect() and self._backend_version < (4,): + return self._to_arrow_from_batches(self.native._collect_as_arrow()) + if self._implementation.is_pyspark_connect() and self._backend_version < (4,): pa_schema = self._to_arrow_schema() return pa.Table.from_pandas(self.native.toPandas(), schema=pa_schema) - else: - # NOTE: Returns `pa.RecordBatchReader` since https://github.com/duckdb/duckdb/pull/18642 - to_arrow: Incomplete = self.native.toArrow - pa_native: pa.Table | pa.RecordBatchReader = to_arrow() - return ( - pa_native - if isinstance(pa_native, pa.Table) - else pa.Table.from_batches(pa_native) - ) + # NOTE: Returns `pa.RecordBatchReader` since https://github.com/duckdb/duckdb/pull/18642 + to_arrow: Incomplete = self.native.toArrow + pa_native: pa.Table | pa.RecordBatchReader = to_arrow() + return ( + pa_native + if isinstance(pa_native, pa.Table) + else self._to_arrow_from_batches(pa_native) + ) def _iter_columns(self) -> Iterator[Column]: for col in self.columns: From 6482cee0fa23f870668561a334d8136d62f9c07e Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Wed, 20 Aug 2025 16:40:57 +0000 Subject: [PATCH 3/4] ci: try removing `duckdb` nightly https://github.com/narwhals-dev/narwhals/pull/3015#issuecomment-3207149863 --- .github/workflows/pytest.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 8b73a0e548..76ffb0c68a 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -84,8 +84,6 @@ jobs: cache-dependency-glob: "pyproject.toml" - name: install-reqs run: uv pip install -e ".[dask, modin, ibis]" --group core-tests --group extra --system - - name: install duckdb nightly - run: uv pip install -U --pre duckdb --system - name: show-deps run: uv pip freeze - name: Run pytest From 12234b7312ba2af9668382a08c5490b68d46f42e Mon Sep 17 00:00:00 2001 From: dangotbanned <125183946+dangotbanned@users.noreply.github.com> Date: Wed, 20 Aug 2025 16:48:05 +0000 Subject: [PATCH 4/4] revert: (a0c3d3d5a6ea2c762748ee08e1f81f8ae30a786e), (e6b21ca6e5f24f7db343ebf65b68497a33e850d6) farewell --- narwhals/_spark_like/dataframe.py | 43 +++++++++++++------------------ 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/narwhals/_spark_like/dataframe.py b/narwhals/_spark_like/dataframe.py index b1dea526c8..79c409100f 100644 --- a/narwhals/_spark_like/dataframe.py +++ b/narwhals/_spark_like/dataframe.py @@ -27,7 +27,7 @@ from narwhals.exceptions import InvalidOperationError if TYPE_CHECKING: - from collections.abc import Iterable, Iterator, Mapping, Sequence + from collections.abc import Iterator, Mapping, Sequence from io import BytesIO from pathlib import Path from types import ModuleType @@ -163,34 +163,27 @@ def _to_arrow_schema(self) -> pa.Schema: # pragma: no cover schema.append((key, native_dtype)) return pa.schema(schema) - def _to_arrow_from_batches(self, batches: Iterable[pa.RecordBatch]) -> pa.Table: - import pyarrow as pa # ignore-banned-import - - try: - return pa.Table.from_batches(batches) - except ValueError as exc: - if "at least one RecordBatch" in str(exc): - # Empty dataframe - data: dict[str, list[Any]] = {k: [] for k in self.columns} - return pa.Table.from_pydict(data, schema=self._to_arrow_schema()) - raise # pragma: no cover - def _collect_to_arrow(self) -> pa.Table: - import pyarrow as pa # ignore-banned-import - if self._implementation.is_pyspark() and self._backend_version < (4,): - return self._to_arrow_from_batches(self.native._collect_as_arrow()) - if self._implementation.is_pyspark_connect() and self._backend_version < (4,): + import pyarrow as pa # ignore-banned-import + + try: + return pa.Table.from_batches(self.native._collect_as_arrow()) + except ValueError as exc: + if "at least one RecordBatch" in str(exc): + # Empty dataframe + + data: dict[str, list[Any]] = {k: [] for k in self.columns} + pa_schema = self._to_arrow_schema() + return pa.Table.from_pydict(data, schema=pa_schema) + raise # pragma: no cover + elif self._implementation.is_pyspark_connect() and self._backend_version < (4,): + import pyarrow as pa # ignore-banned-import + pa_schema = self._to_arrow_schema() return pa.Table.from_pandas(self.native.toPandas(), schema=pa_schema) - # NOTE: Returns `pa.RecordBatchReader` since https://github.com/duckdb/duckdb/pull/18642 - to_arrow: Incomplete = self.native.toArrow - pa_native: pa.Table | pa.RecordBatchReader = to_arrow() - return ( - pa_native - if isinstance(pa_native, pa.Table) - else self._to_arrow_from_batches(pa_native) - ) + else: + return self.native.toArrow() def _iter_columns(self) -> Iterator[Column]: for col in self.columns: