[Data] Fix silent credential drop for fsspec-S3 in download expression#62897
Conversation
There was a problem hiding this comment.
Code Review
This pull request enhances the obstore download path in Ray Data by implementing a robust credential extraction and routing mechanism. It introduces _plan_obstore_routing to decide between obstore and threaded PyArrow downloads, specifically handling session-backed s3fs configurations (e.g., Okta, STS) where credentials are resolved lazily. The changes ensure that if credentials cannot be extracted, the system falls back to the authoritative user filesystem rather than silently using default credentials. Review feedback suggests using isawaitable instead of iscoroutine to better handle various asynchronous return types from fsspec implementations.
When a user passes an fsspec ``S3FileSystem`` with Okta / STS / profile-
based auth (wrapped as ``PyFileSystem(FSSpecHandler(s3fs_fs))``), the
credential extraction path only read static attrs (``key`` / ``secret``
/ ``token``, ``storage_options``). Those attrs are ``None`` when
credentials are resolved lazily via ``fs.session.get_credentials()``,
so extraction returned ``{}`` and obstore silently fell back to its
own credential chain (IMDS / env). Users never saw a warning.
Under concurrent S3 workloads on EC2 this manifested as intermittent
``NoCredentialsError`` — IMDS rate-limited per instance, the user's
Okta credentials had been silently dropped.
Changes:
- ``_extract_credentials_from_filesystem`` now returns ``Optional[Dict]``.
``None`` signals "route to threaded" so the user's filesystem stays
authoritative instead of being silently replaced by obstore's chain.
Unrecognized non-``None`` filesystems also return ``None``.
- New ``_frozen_s3fs_credentials`` helper snapshots credentials via
``session.get_credentials().get_frozen_credentials()``, handling both
sync botocore sessions and async aiobotocore sessions.
- New ``_plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)``
centralizes the dispatch decision at plan time and emits a one-shot
WARNING per filesystem when credentials can't be extracted.
- ``plan_download_op`` wires the routing helper so both the partition
actor choice and the download function choice use the same gate.
- ``download_bytes_async`` extracts credentials once in sync context and
threads the kwargs through to ``_download_uris_with_obstore`` — fixes a
correctness issue where aiobotocore's async ``get_credentials`` can't
run from inside an already-running event loop.
- ``AsyncPartitionActor`` and ``_download_uris_with_obstore`` both fail
closed with a clear ``RuntimeError`` when constructed / called with a
filesystem whose credentials can't be extracted. The dispatch should
route such filesystems to the PyArrow path; reaching these code paths
anyway indicates a dispatch bug, and the guard prevents a regression
back to the silent-drop behavior.
Tests cover session-based credential extraction (sync + aiobotocore),
unresolvable sessions, static-attrs-win-over-session precedence,
``anon=True`` skipping the session, warning dedup (one WARNING per
filesystem instance), and the fail-closed behavior of
``AsyncPartitionActor`` and ``_download_uris_with_obstore``.
The pre-resolve fix for the threaded PyArrow path (which is the matching
second half of the IMDS herd story when the threaded fallback is used)
is split out to a separate PR so each change can be reviewed independently.
Review fixes for PR ray-project#62897. Two issues addressed: 1. The warning in ``_plan_obstore_routing`` previously fired for every non-None filesystem whose extraction returned ``None``. That path now also includes unrecognized native filesystems (``LocalFileSystem``, ``HdfsFileSystem``, custom ``fs.FileSystem`` subclasses). The message hardcodes fsspec-specific advice ("pass credentials via fsspec ``storage_options``"), so emitting it for a LocalFileSystem is misleading. Fix: route unrecognized filesystems to the threaded path silently (DEBUG log), and only emit the WARNING when the input is specifically ``PyFileSystem(FSSpecHandler(s3/s3a))``. Added ``_is_fsspec_s3_filesystem`` helper and renamed the warn helper to ``_warn_fsspec_s3_credentials_unextractable`` so the narrower contract is explicit at the call site. 2. CI test failures for ``test_routing_unextractable_fsspec_s3_...`` and ``test_routing_warns_once_per_filesystem`` — the warnings fired (visible in captured stderr) but ``caplog.records`` came back empty. Ray's internal logger hierarchy doesn't reliably propagate to pytest's caplog handler in Bazel-sandboxed CI environments. Fix: replace ``caplog`` with direct ``patch`` of the module's ``logger`` object. More reliable, also decouples the test from Ray's logging propagation configuration. Also shortened test and class names that risked tripping the 255-byte file-path limit in Bazel's test-log archive naming ("File name too long" error on CI): TestSessionBackedFsspecCredentials -> TestFsspecSessionCreds test_routing_unextractable_fsspec_s3_routes_to_threaded_with_warning -> test_fsspec_s3_unextractable_warns test_extract_credentials_session_preferred_only_when_static_missing -> test_static_wins_over_session (and similar for the other long names in both classes) Added ``test_native_fs_silent_routing`` to lock in the fsspec-vs-native warning distinction as a regression test.
Test-only cleanup. Same coverage, fewer tests, less boilerplate. - Parametrized the three ``_frozen_s3fs_credentials`` failure modes (no session, session raises, empty creds) into one parametrized ``test_frozen_returns_none_on_failure`` with pytest param ids. - Merged ``test_sync_session`` + ``test_underscore_session_fallback`` into ``test_frozen_sync_and_underscore_fallback`` — same MagicMock setup, identical expected output. - Merged ``test_non_s3_fsspec_to_threaded`` + ``test_native_fs_silent_routing`` into ``test_silent_routing_for_unrecognized_fs`` that iterates both filesystem shapes — same assertion (routes to threaded, no warning). - Merged ``test_fsspec_s3_unextractable_warns`` + ``test_warns_once_per_fs`` into ``test_fsspec_s3_unextractable_warns_once`` — both test the warning path; dedup is just a property of the same code path. - Collapsed the two near-duplicate ``_make_fsspec_wrapper`` / ``_make_fsspec_s3_wrapper`` helpers into a single ``_wrap_s3fs``. Replaced the verbose ``_make_sync_session`` / ``_make_raising_session`` with shorter ``_sync_session`` / ``_raising_session``, and used MagicMock kwargs init to flatten the attribute setup. Net: TestFsspecSessionCreds 10 -> 6 tests, TestPlanObstoreRouting 8 -> 6, total new tests 18 -> 12. Same coverage surface.
``asyncio.iscoroutine`` only matches coroutine objects. ``inspect.isawaitable`` is broader: it matches Tasks, Futures, and any object with an ``__await__`` method. Some fsspec implementations and custom wrappers around aiobotocore can return any of these from ``get_credentials`` / ``get_frozen_credentials``, so the broader check is safer. ``asyncio.run`` only accepts coroutines — drive arbitrary awaitables through a small ``async def _await(awaitable): return await awaitable`` shim so the same code path works for Tasks, Futures, and custom awaitables. Updated test docstring to reflect the wider compatibility surface.
basedpyright flagged the existing tests on PR ray-project#62897 because ``_extract_credentials_from_filesystem`` now returns ``Optional[Dict]`` (``None`` when extraction was attempted but yielded nothing usable), and several pre-existing tests subscript / call ``.get()`` / ``"x" in result`` directly on the returned value. Add ``assert result is not None`` to narrow the type at each call site where the test is exercising the populated-dict branch. The behavior under test is unchanged — these branches all return a dict — the asserts are purely for the type checker.
51e9478 to
814d08b
Compare
|
General Comment: Might be better to use https://developmentseed.org/obstore/latest/authentication/ let me know what you think |
| async def _await(awaitable): | ||
| """Drive an arbitrary awaitable (coroutine, Task, Future, ...) to a value. | ||
|
|
||
| ``asyncio.run`` only accepts coroutines, but ``inspect.isawaitable`` | ||
| matches a wider set (Task, Future, custom ``__await__`` objects). | ||
| Wrapping with ``await`` here lets ``asyncio.run`` drive any of them. | ||
| """ | ||
| return await awaitable |
There was a problem hiding this comment.
Can you expand more on why this function is needed?
There was a problem hiding this comment.
_await was only needed to drive Tasks/Futures through asyncio.run from sync code; the new
_S3FSSessionCredentialProvider.__call__ runs inside obstore's event loop and uses plain await via _await_maybe, so the wrapper is no longer required.
| return await awaitable | ||
|
|
||
|
|
||
| def _frozen_s3fs_credentials(fsspec_fs) -> Optional[Dict[str, str]]: |
There was a problem hiding this comment.
Can we strongly type this param?
There was a problem hiding this comment.
The helper was replaced by _S3FSSessionCredentialProvider.from_fsspec_fs(fsspec_fs: "s3fs.S3FileSystem").
| def _is_fsspec_s3_filesystem(filesystem: "pyarrow.fs.FileSystem") -> bool: | ||
| """True if *filesystem* is a PyFileSystem wrapping an fsspec s3/s3a handler.""" | ||
| if isinstance(filesystem, RetryingPyFileSystem): | ||
| filesystem = filesystem.unwrap() | ||
| PyFileSystem = getattr(pyarrow.fs, "PyFileSystem", None) | ||
| FSSpecHandler = getattr(pyarrow.fs, "FSSpecHandler", None) | ||
| if ( | ||
| PyFileSystem is None | ||
| or FSSpecHandler is None | ||
| or not isinstance(filesystem, PyFileSystem) | ||
| or not isinstance(filesystem.handler, FSSpecHandler) | ||
| ): | ||
| return False | ||
| fsspec_fs = getattr(filesystem.handler, "fs", None) | ||
| if fsspec_fs is None: | ||
| return False | ||
| protocol = getattr(fsspec_fs, "protocol", None) | ||
| if isinstance(protocol, tuple): | ||
| protocol = protocol[0] if protocol else None | ||
| return protocol in ("s3", "s3a") |
There was a problem hiding this comment.
We can replace the getattr lookups. Does this not suffice?
def _is_fsspec_s3_filesystem(filesystem: "pyarrow.fs.FileSystem") -> bool:
"""True if *filesystem* is a PyFileSystem wrapping an fsspec s3/s3a handler."""
if isinstance(filesystem, RetryingPyFileSystem):
filesystem = filesystem.unwrap()
if not isinstance(filesystem, pyarrow.fs.PyFileSystem):
return False
if not isinstance(filesystem.handler, pyarrow.fs.FSSpecHandler):
return False
try:
from s3fs import S3FileSystem
except ImportError:
return False
return isinstance(filesystem.handler.fs, S3FileSystem)
Addresses cursor review: the `elif OBSTORE_AVAILABLE` and `else` branches both assigned `download_fn = download_bytes_threaded`, differing only in the `_log_fallback_warning()` call. The "warning already emitted by _plan_obstore_routing" comment was misleading — _plan_obstore_routing only warns for fsspec-S3-unextractable, not for the _obstore_filesystem_requires_threaded_download early-return. Collapse to a single else with a conditional warning gated on OBSTORE_AVAILABLE, matching what each branch actually meant. Signed-off-by: xyuzh <xinyzng@gmail.com>
Addresses goutam review (general): replace the snapshot-based ``_frozen_s3fs_credentials`` helper with a ``_S3FSSessionCredentialProvider`` class that obstore invokes on demand. Long-running jobs no longer hit stale-credential failures when STS / Okta keys rotate mid-job — obstore calls the provider, which re-enters the session and returns refreshed keys (cached until ``expires_at``). Also addresses two related review points in the same file: - cursor: the inner ``except RuntimeError: return None`` blocks around ``asyncio.run`` were too broad — they swallowed RuntimeErrors raised by the coroutine itself (e.g. ``Session is closed`` from aiobotocore). The provider's ``__call__`` runs inside obstore's event loop, so it uses plain ``await`` and propagates errors. The sync validator ``can_fetch_credentials()`` now catches all exceptions and logs at DEBUG, with no special-case for RuntimeError. - goutam: ``_is_fsspec_s3_filesystem`` replaces getattr lookups with direct ``isinstance`` checks against ``pyarrow.fs.PyFileSystem`` / ``FSSpecHandler`` / ``s3fs.S3FileSystem``. The protocol-string match is dropped in favor of the real class check. The ``_await`` helper (goutam: "expand on why this is needed") is gone along with ``_frozen_s3fs_credentials`` — there's no more nested ``asyncio.run`` to drive arbitrary awaitables in sync code. Tests: - New ``TestS3FSSessionCredentialProvider`` covers sync/async sessions, caching, refresh-after-expiry, session-supplied expiry_time, session/_session fallback, and the sync ``can_fetch_credentials`` validator (success and failure paths). - ``TestExtractCredentialsFromFilesystemFsspecSession`` verifies the provider is installed in the kwargs (not static keys), static keys still win, anon skips the provider, and unresolvable sessions return None so the planner routes to the threaded path. - ``_wrap_s3fs`` now uses ``MagicMock(spec=s3fs.S3FileSystem)`` so the stricter ``_is_fsspec_s3_filesystem`` recognizes the mock. Signed-off-by: xyuzh <xinyzng@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Reviewed by Cursor Bugbot for commit 87d1442. Configure here.
| # filesystem directly and resolves credentials correctly. | ||
| use_obstore_path = False | ||
| if OBSTORE_AVAILABLE: | ||
| use_obstore_path, _ = _plan_obstore_routing(filesystem) |
There was a problem hiding this comment.
Redundant credential validation with I/O discarded at plan time
Low Severity
_plan_obstore_routing(filesystem) in plan_download_op creates a _S3FSSessionCredentialProvider, calls can_fetch_credentials() (which performs asyncio.run → real session get_credentials() potentially involving network I/O to STS/Okta), and then discards the result with _. The identical validation and provider creation is repeated on every worker in download_bytes_async. For session-backed filesystems requiring network calls, this adds unnecessary latency to the planning phase with no reuse of the fetched credentials.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 87d1442. Configure here.
|
|
||
| with self._lock: | ||
| self._cached = creds | ||
| return dict(creds) |
There was a problem hiding this comment.
Race condition in async credential provider cache refresh
Low Severity
_S3FSSessionCredentialProvider.__call__ releases the lock after detecting a cache miss (line 162) before performing the credential fetch (line 164). Multiple concurrent callers (from obstore's async runtime) can all observe the cache miss and simultaneously call self._session.get_credentials(). For botocore sessions that are not guaranteed thread-safe, this thundering-herd pattern could cause undefined behavior under concurrent refresh. A double-checked lock after fetching, or holding the lock across the full refresh (using an asyncio lock instead of a threading lock), would be more robust.
Reviewed by Cursor Bugbot for commit 87d1442. Configure here.
- black: collapse multi-line `__init__` signature in _obstore_download.py and a wrapped `isinstance(...)` assert in test_obstore_download.py. - Merge `test_returns_obstore_dict_sync_session` / `test_returns_obstore_dict_async_session` into one parametrized ``test_returns_obstore_dict`` with sync-botocore / async-aiobotocore cases. New ``_async_session`` helper at module level mirrors ``_sync_session`` so the test body stays a 3-line assertion. - Merge `test_can_fetch_credentials_true` / `test_can_fetch_credentials_false` into one parametrized ``test_can_fetch_credentials`` with success / session-raises / empty-access-key cases. Net: same coverage, ~50 fewer lines, single test per behavior. Signed-off-by: xyuzh <xinyzng@gmail.com>
|
Looks good. Just running the release test to see if there's any regression |
ray-project#62897) ## Summary When a user passes an fsspec `S3FileSystem` with Okta/STS/profile-based auth (wrapped as `PyFileSystem(FSSpecHandler(s3fs_fs))`), `_extract_credentials_from_filesystem` read only static attrs (`key`/`secret`/`token`, `storage_options`). Those attrs are `None` for session-backed s3fs — credentials live on `fs.session.get_credentials()` and may rotate. The function returned `{}`, obstore's `from_url` got no keys, and obstore silently fell back to its own credential chain (IMDS on EC2). Under concurrent S3 workloads this manifested as intermittent `NoCredentialsError` — users never saw a warning, and their Okta credentials had been silently dropped. This is the first of two independent PRs addressing related bugs in the `download` expression pipeline. The matching threaded-path pre-resolve fix (which avoids an IMDS thundering herd on the PyArrow fallback path, and is triggered whenever this PR routes an unextractable filesystem to threaded) is in a separate PR. Either can land first. ## Changes - **`_extract_credentials_from_filesystem`** now returns `Optional[Dict]`. `None` signals "route to threaded" so the user's filesystem stays authoritative. Unrecognized non-`None` filesystems also return `None` per the new contract. - **`_frozen_s3fs_credentials`** (new helper) snapshots credentials via `session.get_credentials().get_frozen_credentials()` for both sync (botocore) and async (aiobotocore) sessions. - **`_plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)`** (new helper) centralizes dispatch at plan time with a one-shot `WARNING` per filesystem when credentials can't be extracted. - **`plan_download_op`** wires the routing helper so both the partition actor choice (`AsyncPartitionActor` vs `PartitionActor`) and the download function choice (`download_bytes_async` vs `download_bytes_threaded`) use the same gate. - **`download_bytes_async`** extracts credentials once in sync context and threads the kwargs through to `_download_uris_with_obstore`. Fixes a latent correctness issue where aiobotocore's async `get_credentials` can't run from inside an already-running event loop. - **`AsyncPartitionActor`** and **`_download_uris_with_obstore`** both fail closed with a clear `RuntimeError` when constructed / called with a filesystem whose credentials can't be statically extracted. Dispatch should route such filesystems to the PyArrow path; reaching these code paths anyway indicates a dispatch bug and the guard prevents regressions back to the silent-drop behavior. ## Test plan - [x] `pre-commit run` passes (ruff, pydoclint, black, docstyle, semgrep, import order, mock-method / logger checks). - [x] New unit tests in `test_obstore_download.py`: - `TestSessionBackedFsspecCredentials` — sync botocore session, aiobotocore async session, `_session` fallback for older s3fs, unresolvable-session → `None`, no-access-key → `None`, `anon=True` skips the session, static attrs win over session. - `TestPlanObstoreRouting` — `None` filesystem → obstore, non-S3 fsspec → threaded, unextractable fsspec-S3 → threaded with warning, warning dedup (same FS twice = one warning), extractable fsspec-S3 → obstore, `AsyncPartitionActor` raises on unextractable creds, `_download_uris_with_obstore` raises on unextractable creds. - [ ] End-to-end against moto/minio with Okta-style fsspec `S3FileSystem` over ~100 URIs (no `NoCredentialsError`, moto sees signed requests with the session's current keys). --------- Signed-off-by: xyuzh <xinyzng@gmail.com> Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>


Summary
When a user passes an fsspec
S3FileSystemwith Okta/STS/profile-based auth (wrapped asPyFileSystem(FSSpecHandler(s3fs_fs))),_extract_credentials_from_filesystemread only static attrs (key/secret/token,storage_options). Those attrs areNonefor session-backed s3fs — credentials live onfs.session.get_credentials()and may rotate. The function returned{}, obstore'sfrom_urlgot no keys, and obstore silently fell back to its own credential chain (IMDS on EC2). Under concurrent S3 workloads this manifested as intermittentNoCredentialsError— users never saw a warning, and their Okta credentials had been silently dropped.This is the first of two independent PRs addressing related bugs in the
downloadexpression pipeline. The matching threaded-path pre-resolve fix (which avoids an IMDS thundering herd on the PyArrow fallback path, and is triggered whenever this PR routes an unextractable filesystem to threaded) is in a separate PR. Either can land first.Changes
_extract_credentials_from_filesystemnow returnsOptional[Dict].Nonesignals "route to threaded" so the user's filesystem stays authoritative. Unrecognized non-Nonefilesystems also returnNoneper the new contract._frozen_s3fs_credentials(new helper) snapshots credentials viasession.get_credentials().get_frozen_credentials()for both sync (botocore) and async (aiobotocore) sessions._plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)(new helper) centralizes dispatch at plan time with a one-shotWARNINGper filesystem when credentials can't be extracted.plan_download_opwires the routing helper so both the partition actor choice (AsyncPartitionActorvsPartitionActor) and the download function choice (download_bytes_asyncvsdownload_bytes_threaded) use the same gate.download_bytes_asyncextracts credentials once in sync context and threads the kwargs through to_download_uris_with_obstore. Fixes a latent correctness issue where aiobotocore's asyncget_credentialscan't run from inside an already-running event loop.AsyncPartitionActorand_download_uris_with_obstoreboth fail closed with a clearRuntimeErrorwhen constructed / called with a filesystem whose credentials can't be statically extracted. Dispatch should route such filesystems to the PyArrow path; reaching these code paths anyway indicates a dispatch bug and the guard prevents regressions back to the silent-drop behavior.Test plan
pre-commit runpasses (ruff, pydoclint, black, docstyle, semgrep, import order, mock-method / logger checks).test_obstore_download.py:TestSessionBackedFsspecCredentials— sync botocore session, aiobotocore async session,_sessionfallback for older s3fs, unresolvable-session →None, no-access-key →None,anon=Trueskips the session, static attrs win over session.TestPlanObstoreRouting—Nonefilesystem → obstore, non-S3 fsspec → threaded, unextractable fsspec-S3 → threaded with warning, warning dedup (same FS twice = one warning), extractable fsspec-S3 → obstore,AsyncPartitionActorraises on unextractable creds,_download_uris_with_obstoreraises on unextractable creds.S3FileSystemover ~100 URIs (noNoCredentialsError, moto sees signed requests with the session's current keys).