Skip to content

Commit 3840fae

Browse files
[v3-2-test] Fix ObjectStoragePath NoCredentialsError when using conn_id with remote stores (#64634) (#64646)
* Fix ObjectStoragePath credential resolution by injecting authenticated fs into __wrapped__._fs_cached (cherry picked from commit f391942) Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
1 parent 959ebd8 commit 3840fae

2 files changed

Lines changed: 147 additions & 0 deletions

File tree

task-sdk/src/airflow/sdk/io/path.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import logging
2021
import shutil
2122
from typing import TYPE_CHECKING, Any, ClassVar
2223
from urllib.parse import urlsplit
@@ -33,6 +34,8 @@
3334
from typing_extensions import Self
3435
from upath.types import JoinablePathLike
3536

37+
log = logging.getLogger(__name__)
38+
3639

3740
class _TrackingFileWrapper:
3841
"""Wrapper that tracks file operations to intercept lineage."""
@@ -116,6 +119,13 @@ def __init__(
116119
# to the underlying fsspec filesystem, which doesn't understand it
117120
self._conn_id = storage_options.pop("conn_id", None)
118121
super().__init__(*args, protocol=protocol, **storage_options)
122+
# ProxyUPath delegates all operations to self.__wrapped__, which was
123+
# constructed with empty storage_options (conn_id stripped above).
124+
# Pre-populating __wrapped__._fs_cached with the Airflow-authenticated
125+
# filesystem fixes every delegated method (exists, mkdir, iterdir, glob,
126+
# walk, rename, read_bytes, write_bytes, …) in one place rather than
127+
# requiring individual overrides for each one.
128+
self._inject_authenticated_fs(self.__wrapped__)
119129

120130
@classmethod_or_method # type: ignore[arg-type]
121131
def _from_upath(cls_or_self, upath, /):
@@ -127,8 +137,36 @@ def _from_upath(cls_or_self, upath, /):
127137
obj = object.__new__(cls)
128138
obj.__wrapped__ = upath
129139
obj._conn_id = getattr(cls_or_self, "_conn_id", None) if is_instance else None
140+
# If the wrapped UPath has not yet had its fs cached (e.g. when _from_upath is
141+
# called as a classmethod with a fresh UPath), inject the authenticated fs now.
142+
# Child UPaths produced by __wrapped__ operations (iterdir, glob, etc.) already
143+
# inherit _fs_cached from the parent UPath, so the hasattr check is a no-op for them.
144+
if not hasattr(upath, "_fs_cached"):
145+
obj._inject_authenticated_fs(upath)
130146
return obj
131147

148+
def _inject_authenticated_fs(self, wrapped: UPath) -> None:
149+
"""
150+
Inject the Airflow-authenticated filesystem into wrapped._fs_cached.
151+
152+
This ensures that all ProxyUPath-delegated operations use the connection-aware
153+
filesystem rather than an unauthenticated one constructed from empty storage_options.
154+
Failures are logged at DEBUG level and silently skipped so that construction always
155+
succeeds — errors will surface naturally at first use of the path.
156+
"""
157+
if self._conn_id is None:
158+
return
159+
try:
160+
wrapped._fs_cached = attach(wrapped.protocol or "file", self._conn_id).fs
161+
except Exception:
162+
log.debug(
163+
"Could not pre-populate authenticated filesystem for %r (conn_id=%r); "
164+
"operations will attempt lazy resolution at first use.",
165+
self,
166+
self._conn_id,
167+
exc_info=True,
168+
)
169+
132170
@property
133171
def conn_id(self) -> str | None:
134172
"""Return the connection ID for this path."""

task-sdk/tests/task_sdk/io/test_path.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import pytest
2727
from fsspec.implementations.local import LocalFileSystem
2828
from fsspec.implementations.memory import MemoryFileSystem
29+
from upath import UPath
2930

3031
from airflow.sdk import Asset, ObjectStoragePath
3132
from airflow.sdk._shared.module_loading import qualname
@@ -228,6 +229,114 @@ def test_standard_extended_api(self, fake_files, fn, args, fn2, path, expected_a
228229
method.assert_called_once_with(expected_args, **expected_kwargs)
229230

230231

232+
class TestConnIdCredentialResolution:
233+
"""
234+
Regression tests for https://github.com/apache/airflow/issues/64632
235+
236+
When ObjectStoragePath was migrated from CloudPath to ProxyUPath (3.2.0),
237+
methods like exists(), mkdir(), is_dir(), is_file() were delegated to
238+
self.__wrapped__ which carries empty storage_options (conn_id is stored
239+
separately). This caused NoCredentialsError / 401 errors for remote stores
240+
even when a valid conn_id was provided.
241+
"""
242+
243+
@pytest.fixture(autouse=True)
244+
def restore_cache(self):
245+
cache = _STORE_CACHE.copy()
246+
yield
247+
_STORE_CACHE.clear()
248+
_STORE_CACHE.update(cache)
249+
250+
@pytest.fixture
251+
def fake_fs_with_conn(self):
252+
fs = _FakeRemoteFileSystem(conn_id="my_conn")
253+
attach(protocol="ffs2", conn_id="my_conn", fs=fs)
254+
try:
255+
yield fs
256+
finally:
257+
_FakeRemoteFileSystem.store.clear()
258+
_FakeRemoteFileSystem.pseudo_dirs[:] = [""]
259+
260+
def test_exists_uses_authenticated_fs(self, fake_fs_with_conn):
261+
"""exists() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
262+
p = ObjectStoragePath("ffs2://my_conn@bucket/some_file.txt", conn_id="my_conn")
263+
# Verify the correct fs instance was injected, not merely any _FakeRemoteFileSystem
264+
assert p.__wrapped__._fs_cached is fake_fs_with_conn
265+
fake_fs_with_conn.touch("bucket/some_file.txt")
266+
267+
assert p.exists() is True
268+
assert (
269+
ObjectStoragePath("ffs2://my_conn@bucket/no_such_file.txt", conn_id="my_conn").exists() is False
270+
)
271+
272+
def test_mkdir_uses_authenticated_fs(self, fake_fs_with_conn):
273+
"""mkdir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
274+
p = ObjectStoragePath("ffs2://my_conn@bucket/new_dir/", conn_id="my_conn")
275+
p.mkdir(parents=True, exist_ok=True)
276+
assert fake_fs_with_conn.isdir("bucket/new_dir")
277+
278+
def test_is_dir_uses_authenticated_fs(self, fake_fs_with_conn):
279+
"""is_dir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
280+
fake_fs_with_conn.mkdir("bucket/a_dir")
281+
p = ObjectStoragePath("ffs2://my_conn@bucket/a_dir", conn_id="my_conn")
282+
assert p.is_dir() is True
283+
284+
def test_is_file_uses_authenticated_fs(self, fake_fs_with_conn):
285+
"""is_file() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
286+
fake_fs_with_conn.touch("bucket/a_file.txt")
287+
p = ObjectStoragePath("ffs2://my_conn@bucket/a_file.txt", conn_id="my_conn")
288+
assert p.is_file() is True
289+
290+
def test_touch_uses_authenticated_fs(self, fake_fs_with_conn):
291+
"""touch() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
292+
p = ObjectStoragePath("ffs2://my_conn@bucket/touched_file.txt", conn_id="my_conn")
293+
p.touch()
294+
assert fake_fs_with_conn.exists("bucket/touched_file.txt")
295+
296+
def test_unlink_uses_authenticated_fs(self, fake_fs_with_conn):
297+
"""unlink() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
298+
fake_fs_with_conn.touch("bucket/to_delete.txt")
299+
p = ObjectStoragePath("ffs2://my_conn@bucket/to_delete.txt", conn_id="my_conn")
300+
p.unlink()
301+
assert not fake_fs_with_conn.exists("bucket/to_delete.txt")
302+
303+
def test_rmdir_uses_authenticated_fs(self, fake_fs_with_conn):
304+
"""rmdir() must use self.fs (Airflow-attached) not __wrapped__.fs (unauthenticated)."""
305+
fake_fs_with_conn.mkdir("bucket/empty_dir")
306+
p = ObjectStoragePath("ffs2://my_conn@bucket/empty_dir", conn_id="my_conn")
307+
# upath's rmdir(recursive=False) calls next(self.iterdir()) without a default,
308+
# which raises StopIteration on empty dirs — a upath bug. Use the default (recursive=True).
309+
p.rmdir()
310+
assert not fake_fs_with_conn.exists("bucket/empty_dir")
311+
312+
def test_conn_id_in_uri_works_for_exists(self, fake_fs_with_conn):
313+
"""conn_id embedded in URI (user@host) should also work for exists()."""
314+
fake_fs_with_conn.touch("bucket/target.txt")
315+
p = ObjectStoragePath("ffs2://my_conn@bucket/target.txt")
316+
assert p.conn_id == "my_conn"
317+
assert p.exists() is True
318+
319+
def test_from_upath_injects_fs_when_no_cache(self, fake_fs_with_conn):
320+
"""_from_upath must inject authenticated fs into a fresh UPath with no _fs_cached."""
321+
# Simulate _from_upath called as an instance method with a fresh UPath that has
322+
# no _fs_cached set (e.g. cwd() / home() or a cross-protocol _from_upath call).
323+
p_instance = ObjectStoragePath("ffs2://my_conn@bucket/root", conn_id="my_conn")
324+
fresh_upath = UPath("ffs2://bucket/other")
325+
assert not hasattr(fresh_upath, "_fs_cached")
326+
child = p_instance._from_upath(fresh_upath)
327+
assert child.__wrapped__._fs_cached is fake_fs_with_conn
328+
329+
def test_iterdir_children_use_authenticated_fs(self, fake_fs_with_conn):
330+
"""Children yielded by iterdir() must also carry the authenticated filesystem."""
331+
fake_fs_with_conn.touch("bucket/dir/file1.txt")
332+
fake_fs_with_conn.touch("bucket/dir/file2.txt")
333+
p = ObjectStoragePath("ffs2://my_conn@bucket/dir", conn_id="my_conn")
334+
children = list(p.iterdir())
335+
assert len(children) == 2
336+
# Each child path must use the same authenticated fs, not a fresh unauthenticated one
337+
assert all(c.__wrapped__._fs_cached is fake_fs_with_conn for c in children)
338+
339+
231340
class TestRemotePath:
232341
def test_bucket_key_protocol(self):
233342
bucket = "bkt"

0 commit comments

Comments
 (0)