Skip to content

[Data] Fix silent credential drop for fsspec-S3 in download expression#62897

Merged
goutamvenkat-anyscale merged 8 commits into
ray-project:masterfrom
xyuzh:fix/download-fsspec-creds
May 15, 2026
Merged

[Data] Fix silent credential drop for fsspec-S3 in download expression#62897
goutamvenkat-anyscale merged 8 commits into
ray-project:masterfrom
xyuzh:fix/download-fsspec-creds

Conversation

@xyuzh

@xyuzh xyuzh commented Apr 23, 2026

Copy link
Copy Markdown
Member

Summary

When a user passes an fsspec S3FileSystem with Okta/STS/profile-based auth (wrapped as PyFileSystem(FSSpecHandler(s3fs_fs))), _extract_credentials_from_filesystem read only static attrs (key/secret/token, storage_options). Those attrs are None for session-backed s3fs — credentials live on fs.session.get_credentials() and may rotate. The function returned {}, obstore's from_url got no keys, and obstore silently fell back to its own credential chain (IMDS on EC2). Under concurrent S3 workloads this manifested as intermittent NoCredentialsError — users never saw a warning, and their Okta credentials had been silently dropped.

This is the first of two independent PRs addressing related bugs in the download expression pipeline. The matching threaded-path pre-resolve fix (which avoids an IMDS thundering herd on the PyArrow fallback path, and is triggered whenever this PR routes an unextractable filesystem to threaded) is in a separate PR. Either can land first.

Changes

  • _extract_credentials_from_filesystem now returns Optional[Dict]. None signals "route to threaded" so the user's filesystem stays authoritative. Unrecognized non-None filesystems also return None per the new contract.
  • _frozen_s3fs_credentials (new helper) snapshots credentials via session.get_credentials().get_frozen_credentials() for both sync (botocore) and async (aiobotocore) sessions.
  • _plan_obstore_routing(fs) -> (use_obstore, fs_kwargs) (new helper) centralizes dispatch at plan time with a one-shot WARNING per filesystem when credentials can't be extracted.
  • plan_download_op wires the routing helper so both the partition actor choice (AsyncPartitionActor vs PartitionActor) and the download function choice (download_bytes_async vs download_bytes_threaded) use the same gate.
  • download_bytes_async extracts credentials once in sync context and threads the kwargs through to _download_uris_with_obstore. Fixes a latent correctness issue where aiobotocore's async get_credentials can't run from inside an already-running event loop.
  • AsyncPartitionActor and _download_uris_with_obstore both fail closed with a clear RuntimeError when constructed / called with a filesystem whose credentials can't be statically extracted. Dispatch should route such filesystems to the PyArrow path; reaching these code paths anyway indicates a dispatch bug and the guard prevents regressions back to the silent-drop behavior.

Test plan

  • pre-commit run passes (ruff, pydoclint, black, docstyle, semgrep, import order, mock-method / logger checks).
  • New unit tests in test_obstore_download.py:
    • TestSessionBackedFsspecCredentials — sync botocore session, aiobotocore async session, _session fallback for older s3fs, unresolvable-session → None, no-access-key → None, anon=True skips the session, static attrs win over session.
    • TestPlanObstoreRoutingNone filesystem → obstore, non-S3 fsspec → threaded, unextractable fsspec-S3 → threaded with warning, warning dedup (same FS twice = one warning), extractable fsspec-S3 → obstore, AsyncPartitionActor raises on unextractable creds, _download_uris_with_obstore raises on unextractable creds.
  • End-to-end against moto/minio with Okta-style fsspec S3FileSystem over ~100 URIs (no NoCredentialsError, moto sees signed requests with the session's current keys).

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enhances the obstore download path in Ray Data by implementing a robust credential extraction and routing mechanism. It introduces _plan_obstore_routing to decide between obstore and threaded PyArrow downloads, specifically handling session-backed s3fs configurations (e.g., Okta, STS) where credentials are resolved lazily. The changes ensure that if credentials cannot be extracted, the system falls back to the authoritative user filesystem rather than silently using default credentials. Review feedback suggests using isawaitable instead of iscoroutine to better handle various asynchronous return types from fsspec implementations.

Comment thread python/ray/data/_internal/planner/_obstore_download.py Outdated
Comment thread python/ray/data/_internal/planner/_obstore_download.py Outdated
Comment thread python/ray/data/_internal/planner/_obstore_download.py
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Apr 24, 2026
@xyuzh xyuzh added the go add ONLY when ready to merge, run all tests label Apr 24, 2026
Comment thread python/ray/data/_internal/planner/plan_download_op.py Outdated
Comment thread python/ray/data/_internal/planner/_obstore_download.py Outdated
xyuzh added 5 commits April 28, 2026 09:24
When a user passes an fsspec ``S3FileSystem`` with Okta / STS / profile-
based auth (wrapped as ``PyFileSystem(FSSpecHandler(s3fs_fs))``), the
credential extraction path only read static attrs (``key`` / ``secret``
/ ``token``, ``storage_options``). Those attrs are ``None`` when
credentials are resolved lazily via ``fs.session.get_credentials()``,
so extraction returned ``{}`` and obstore silently fell back to its
own credential chain (IMDS / env). Users never saw a warning.

Under concurrent S3 workloads on EC2 this manifested as intermittent
``NoCredentialsError`` — IMDS rate-limited per instance, the user's
Okta credentials had been silently dropped.

Changes:

- ``_extract_credentials_from_filesystem`` now returns ``Optional[Dict]``.
  ``None`` signals "route to threaded" so the user's filesystem stays
  authoritative instead of being silently replaced by obstore's chain.
  Unrecognized non-``None`` filesystems also return ``None``.
- New ``_frozen_s3fs_credentials`` helper snapshots credentials via
  ``session.get_credentials().get_frozen_credentials()``, handling both
  sync botocore sessions and async aiobotocore sessions.
- New ``_plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)``
  centralizes the dispatch decision at plan time and emits a one-shot
  WARNING per filesystem when credentials can't be extracted.
- ``plan_download_op`` wires the routing helper so both the partition
  actor choice and the download function choice use the same gate.
- ``download_bytes_async`` extracts credentials once in sync context and
  threads the kwargs through to ``_download_uris_with_obstore`` — fixes a
  correctness issue where aiobotocore's async ``get_credentials`` can't
  run from inside an already-running event loop.
- ``AsyncPartitionActor`` and ``_download_uris_with_obstore`` both fail
  closed with a clear ``RuntimeError`` when constructed / called with a
  filesystem whose credentials can't be extracted. The dispatch should
  route such filesystems to the PyArrow path; reaching these code paths
  anyway indicates a dispatch bug, and the guard prevents a regression
  back to the silent-drop behavior.

Tests cover session-based credential extraction (sync + aiobotocore),
unresolvable sessions, static-attrs-win-over-session precedence,
``anon=True`` skipping the session, warning dedup (one WARNING per
filesystem instance), and the fail-closed behavior of
``AsyncPartitionActor`` and ``_download_uris_with_obstore``.

The pre-resolve fix for the threaded PyArrow path (which is the matching
second half of the IMDS herd story when the threaded fallback is used)
is split out to a separate PR so each change can be reviewed independently.
Review fixes for PR ray-project#62897.

Two issues addressed:

1. The warning in ``_plan_obstore_routing`` previously fired for every
   non-None filesystem whose extraction returned ``None``. That path now
   also includes unrecognized native filesystems (``LocalFileSystem``,
   ``HdfsFileSystem``, custom ``fs.FileSystem`` subclasses). The message
   hardcodes fsspec-specific advice ("pass credentials via fsspec
   ``storage_options``"), so emitting it for a LocalFileSystem is
   misleading.

   Fix: route unrecognized filesystems to the threaded path silently
   (DEBUG log), and only emit the WARNING when the input is specifically
   ``PyFileSystem(FSSpecHandler(s3/s3a))``. Added
   ``_is_fsspec_s3_filesystem`` helper and renamed the warn helper to
   ``_warn_fsspec_s3_credentials_unextractable`` so the narrower contract
   is explicit at the call site.

2. CI test failures for ``test_routing_unextractable_fsspec_s3_...`` and
   ``test_routing_warns_once_per_filesystem`` — the warnings fired
   (visible in captured stderr) but ``caplog.records`` came back empty.
   Ray's internal logger hierarchy doesn't reliably propagate to
   pytest's caplog handler in Bazel-sandboxed CI environments.

   Fix: replace ``caplog`` with direct ``patch`` of the module's
   ``logger`` object. More reliable, also decouples the test from Ray's
   logging propagation configuration.

Also shortened test and class names that risked tripping the 255-byte
file-path limit in Bazel's test-log archive naming ("File name too
long" error on CI):

  TestSessionBackedFsspecCredentials -> TestFsspecSessionCreds
  test_routing_unextractable_fsspec_s3_routes_to_threaded_with_warning
    -> test_fsspec_s3_unextractable_warns
  test_extract_credentials_session_preferred_only_when_static_missing
    -> test_static_wins_over_session
  (and similar for the other long names in both classes)

Added ``test_native_fs_silent_routing`` to lock in the fsspec-vs-native
warning distinction as a regression test.
Test-only cleanup. Same coverage, fewer tests, less boilerplate.

- Parametrized the three ``_frozen_s3fs_credentials`` failure modes
  (no session, session raises, empty creds) into one parametrized
  ``test_frozen_returns_none_on_failure`` with pytest param ids.
- Merged ``test_sync_session`` + ``test_underscore_session_fallback``
  into ``test_frozen_sync_and_underscore_fallback`` — same MagicMock
  setup, identical expected output.
- Merged ``test_non_s3_fsspec_to_threaded`` + ``test_native_fs_silent_routing``
  into ``test_silent_routing_for_unrecognized_fs`` that iterates both
  filesystem shapes — same assertion (routes to threaded, no warning).
- Merged ``test_fsspec_s3_unextractable_warns`` + ``test_warns_once_per_fs``
  into ``test_fsspec_s3_unextractable_warns_once`` — both test the
  warning path; dedup is just a property of the same code path.
- Collapsed the two near-duplicate ``_make_fsspec_wrapper`` /
  ``_make_fsspec_s3_wrapper`` helpers into a single ``_wrap_s3fs``.
  Replaced the verbose ``_make_sync_session`` / ``_make_raising_session``
  with shorter ``_sync_session`` / ``_raising_session``, and used
  MagicMock kwargs init to flatten the attribute setup.

Net: TestFsspecSessionCreds 10 -> 6 tests, TestPlanObstoreRouting 8 -> 6,
total new tests 18 -> 12. Same coverage surface.
``asyncio.iscoroutine`` only matches coroutine objects.
``inspect.isawaitable`` is broader: it matches Tasks, Futures, and any
object with an ``__await__`` method. Some fsspec implementations and
custom wrappers around aiobotocore can return any of these from
``get_credentials`` / ``get_frozen_credentials``, so the broader check
is safer.

``asyncio.run`` only accepts coroutines — drive arbitrary awaitables
through a small ``async def _await(awaitable): return await awaitable``
shim so the same code path works for Tasks, Futures, and custom
awaitables.

Updated test docstring to reflect the wider compatibility surface.
basedpyright flagged the existing tests on PR ray-project#62897 because
``_extract_credentials_from_filesystem`` now returns ``Optional[Dict]``
(``None`` when extraction was attempted but yielded nothing usable),
and several pre-existing tests subscript / call ``.get()`` /
``"x" in result`` directly on the returned value.

Add ``assert result is not None`` to narrow the type at each call site
where the test is exercising the populated-dict branch. The behavior
under test is unchanged — these branches all return a dict — the
asserts are purely for the type checker.
@xyuzh xyuzh force-pushed the fix/download-fsspec-creds branch from 51e9478 to 814d08b Compare April 28, 2026 16:24
@xyuzh

xyuzh commented May 5, 2026

Copy link
Copy Markdown
Member Author
@goutamvenkat-anyscale

goutamvenkat-anyscale commented May 8, 2026

Copy link
Copy Markdown
Contributor

General Comment:

Might be better to use https://developmentseed.org/obstore/latest/authentication/ credential_provider so that the authentication mechanism can refresh the creds on expiry instead of just using a single cached value.

from botocore.credentials import Credentials, ReadOnlyCredentials
from cachetools import TLRUCache

class _S3FSSession(Protocol):
    def get_credentials(
        self,
    ) -> Optional[Union["Credentials", Awaitable["Credentials"]]]: ...


class _S3FSSessionCredentialProvider:
    """obstore credential provider backed by an s3fs-style session."""

    _CACHE_KEY = "credentials"
    _DEFAULT_TTL = timedelta(minutes=30)

    def __init__(
        self, session: _S3FSSession, ttl: Optional[timedelta] = None
    ) -> None:
        self._session = session
        self._ttl = self._DEFAULT_TTL if ttl is None else ttl
        self._cache = TLRUCache(
            maxsize=1,
            ttu=lambda _key, value, _now: value["expires_at"],
            timer=lambda: datetime.now(timezone.utc),
        )
        self._cache_lock = RLock()

    @classmethod
    def from_fsspec_fs(
        cls, fsspec_fs: Any, *, validate: bool = False
    ) -> Optional["_S3FSSessionCredentialProvider"]:
        session = getattr(fsspec_fs, "session", None) or getattr(
            fsspec_fs, "_session", None
        )
        if session is None or not hasattr(session, "get_credentials"):
            return None

        provider = cls(cast(_S3FSSession, session))
        if validate and not provider.can_fetch_credentials():
            return None
        return provider

    async def __call__(self) -> Dict[str, Any]:
        with self._cache_lock:
            cached = self._cache.get(self._CACHE_KEY)
            if cached is not None:
                return dict(cached)

        session_credentials = await self._get_session_credentials()
        frozen_credentials = await self._get_frozen_credentials(session_credentials)
        credentials = self._to_obstore_credentials(
            session_credentials, frozen_credentials
        )

        with self._cache_lock:
            self._cache[self._CACHE_KEY] = credentials
        return dict(credentials)

    async def _get_session_credentials(self) -> "Credentials":
        credentials = self._session.get_credentials()
        if inspect.isawaitable(credentials):
            credentials = await credentials
        if credentials is None:
            raise RuntimeError("fsspec S3 session returned no credentials")
        return credentials

    async def _get_frozen_credentials(
        self, session_credentials: "Credentials"
    ) -> "ReadOnlyCredentials":
        frozen_credentials = session_credentials.get_frozen_credentials()
        if inspect.isawaitable(frozen_credentials):
            frozen_credentials = await frozen_credentials
        if not frozen_credentials.access_key:
            raise RuntimeError("fsspec S3 session returned no access key")
        return cast("ReadOnlyCredentials", frozen_credentials)

    def _to_obstore_credentials(
        self,
        session_credentials: "Credentials",
        frozen_credentials: "ReadOnlyCredentials",
    ) -> Dict[str, Any]:
        credentials: Dict[str, Any] = {
            "access_key_id": frozen_credentials.access_key,
            "secret_access_key": frozen_credentials.secret_key or "",
            "expires_at": self._expires_at(session_credentials),
        }
        if frozen_credentials.token:
            credentials["token"] = frozen_credentials.token
        return credentials

    def _expires_at(self, session_credentials: "Credentials") -> datetime:
        for attr in ("expiry_time", "_expiry_time"):
            expires_at = getattr(session_credentials, attr, None)
            if isinstance(expires_at, datetime):
                if expires_at.tzinfo is None:
                    return expires_at.replace(tzinfo=timezone.utc)
                return expires_at
        return datetime.now(timezone.utc) + self._ttl

    def can_fetch_credentials(self) -> bool:
        """Return True if credentials can be fetched from sync code now."""
        try:
            asyncio.run(self())
            return True
        except Exception as e:
            logger.debug("Could not fetch fsspec session credentials: %r", e)
            return False

let me know what you think

Comment on lines +117 to +124
async def _await(awaitable):
"""Drive an arbitrary awaitable (coroutine, Task, Future, ...) to a value.

``asyncio.run`` only accepts coroutines, but ``inspect.isawaitable``
matches a wider set (Task, Future, custom ``__await__`` objects).
Wrapping with ``await`` here lets ``asyncio.run`` drive any of them.
"""
return await awaitable

@goutamvenkat-anyscale goutamvenkat-anyscale May 4, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand more on why this function is needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_await was only needed to drive Tasks/Futures through asyncio.run from sync code; the new
_S3FSSessionCredentialProvider.__call__ runs inside obstore's event loop and uses plain await via _await_maybe, so the wrapper is no longer required.

return await awaitable


def _frozen_s3fs_credentials(fsspec_fs) -> Optional[Dict[str, str]]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we strongly type this param?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helper was replaced by _S3FSSessionCredentialProvider.from_fsspec_fs(fsspec_fs: "s3fs.S3FileSystem").

Comment on lines +371 to +390
def _is_fsspec_s3_filesystem(filesystem: "pyarrow.fs.FileSystem") -> bool:
"""True if *filesystem* is a PyFileSystem wrapping an fsspec s3/s3a handler."""
if isinstance(filesystem, RetryingPyFileSystem):
filesystem = filesystem.unwrap()
PyFileSystem = getattr(pyarrow.fs, "PyFileSystem", None)
FSSpecHandler = getattr(pyarrow.fs, "FSSpecHandler", None)
if (
PyFileSystem is None
or FSSpecHandler is None
or not isinstance(filesystem, PyFileSystem)
or not isinstance(filesystem.handler, FSSpecHandler)
):
return False
fsspec_fs = getattr(filesystem.handler, "fs", None)
if fsspec_fs is None:
return False
protocol = getattr(fsspec_fs, "protocol", None)
if isinstance(protocol, tuple):
protocol = protocol[0] if protocol else None
return protocol in ("s3", "s3a")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace the getattr lookups. Does this not suffice?

def _is_fsspec_s3_filesystem(filesystem: "pyarrow.fs.FileSystem") -> bool:
    """True if *filesystem* is a PyFileSystem wrapping an fsspec s3/s3a handler."""
    if isinstance(filesystem, RetryingPyFileSystem):
        filesystem = filesystem.unwrap()

    if not isinstance(filesystem, pyarrow.fs.PyFileSystem):
        return False
    if not isinstance(filesystem.handler, pyarrow.fs.FSSpecHandler):
        return False

    try:
        from s3fs import S3FileSystem
    except ImportError:
        return False
    return isinstance(filesystem.handler.fs, S3FileSystem)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed getattr lookup

xyuzh added 2 commits May 11, 2026 14:49
Addresses cursor review: the `elif OBSTORE_AVAILABLE` and `else` branches
both assigned `download_fn = download_bytes_threaded`, differing only in
the `_log_fallback_warning()` call. The "warning already emitted by
_plan_obstore_routing" comment was misleading — _plan_obstore_routing
only warns for fsspec-S3-unextractable, not for the
_obstore_filesystem_requires_threaded_download early-return.

Collapse to a single else with a conditional warning gated on
OBSTORE_AVAILABLE, matching what each branch actually meant.

Signed-off-by: xyuzh <xinyzng@gmail.com>
Addresses goutam review (general): replace the snapshot-based
``_frozen_s3fs_credentials`` helper with a ``_S3FSSessionCredentialProvider``
class that obstore invokes on demand. Long-running jobs no longer hit
stale-credential failures when STS / Okta keys rotate mid-job — obstore
calls the provider, which re-enters the session and returns refreshed
keys (cached until ``expires_at``).

Also addresses two related review points in the same file:

- cursor: the inner ``except RuntimeError: return None`` blocks around
  ``asyncio.run`` were too broad — they swallowed RuntimeErrors raised
  by the coroutine itself (e.g. ``Session is closed`` from aiobotocore).
  The provider's ``__call__`` runs inside obstore's event loop, so it
  uses plain ``await`` and propagates errors. The sync validator
  ``can_fetch_credentials()`` now catches all exceptions and logs at
  DEBUG, with no special-case for RuntimeError.

- goutam: ``_is_fsspec_s3_filesystem`` replaces getattr lookups with
  direct ``isinstance`` checks against ``pyarrow.fs.PyFileSystem`` /
  ``FSSpecHandler`` / ``s3fs.S3FileSystem``. The protocol-string match
  is dropped in favor of the real class check.

The ``_await`` helper (goutam: "expand on why this is needed") is gone
along with ``_frozen_s3fs_credentials`` — there's no more nested
``asyncio.run`` to drive arbitrary awaitables in sync code.

Tests:
- New ``TestS3FSSessionCredentialProvider`` covers sync/async sessions,
  caching, refresh-after-expiry, session-supplied expiry_time,
  session/_session fallback, and the sync ``can_fetch_credentials``
  validator (success and failure paths).
- ``TestExtractCredentialsFromFilesystemFsspecSession`` verifies the
  provider is installed in the kwargs (not static keys), static keys
  still win, anon skips the provider, and unresolvable sessions return
  None so the planner routes to the threaded path.
- ``_wrap_s3fs`` now uses ``MagicMock(spec=s3fs.S3FileSystem)`` so the
  stricter ``_is_fsspec_s3_filesystem`` recognizes the mock.

Signed-off-by: xyuzh <xinyzng@gmail.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 87d1442. Configure here.

# filesystem directly and resolves credentials correctly.
use_obstore_path = False
if OBSTORE_AVAILABLE:
use_obstore_path, _ = _plan_obstore_routing(filesystem)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant credential validation with I/O discarded at plan time

Low Severity

_plan_obstore_routing(filesystem) in plan_download_op creates a _S3FSSessionCredentialProvider, calls can_fetch_credentials() (which performs asyncio.run → real session get_credentials() potentially involving network I/O to STS/Okta), and then discards the result with _. The identical validation and provider creation is repeated on every worker in download_bytes_async. For session-backed filesystems requiring network calls, this adds unnecessary latency to the planning phase with no reuse of the fetched credentials.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 87d1442. Configure here.


with self._lock:
self._cached = creds
return dict(creds)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in async credential provider cache refresh

Low Severity

_S3FSSessionCredentialProvider.__call__ releases the lock after detecting a cache miss (line 162) before performing the credential fetch (line 164). Multiple concurrent callers (from obstore's async runtime) can all observe the cache miss and simultaneously call self._session.get_credentials(). For botocore sessions that are not guaranteed thread-safe, this thundering-herd pattern could cause undefined behavior under concurrent refresh. A double-checked lock after fetching, or holding the lock across the full refresh (using an asyncio lock instead of a threading lock), would be more robust.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 87d1442. Configure here.

- black: collapse multi-line `__init__` signature in _obstore_download.py
  and a wrapped `isinstance(...)` assert in test_obstore_download.py.
- Merge `test_returns_obstore_dict_sync_session` /
  `test_returns_obstore_dict_async_session` into one parametrized
  ``test_returns_obstore_dict`` with sync-botocore / async-aiobotocore
  cases. New ``_async_session`` helper at module level mirrors
  ``_sync_session`` so the test body stays a 3-line assertion.
- Merge `test_can_fetch_credentials_true` /
  `test_can_fetch_credentials_false` into one parametrized
  ``test_can_fetch_credentials`` with success / session-raises /
  empty-access-key cases.

Net: same coverage, ~50 fewer lines, single test per behavior.
Signed-off-by: xyuzh <xinyzng@gmail.com>
@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor

Looks good. Just running the release test to see if there's any regression

@goutamvenkat-anyscale goutamvenkat-anyscale merged commit 24e5f5b into ray-project:master May 15, 2026
6 checks passed
TruongQuangPhat pushed a commit to cyhapun/ray-fix-issue that referenced this pull request May 27, 2026
ray-project#62897)

## Summary

When a user passes an fsspec `S3FileSystem` with Okta/STS/profile-based
auth (wrapped as `PyFileSystem(FSSpecHandler(s3fs_fs))`),
`_extract_credentials_from_filesystem` read only static attrs
(`key`/`secret`/`token`, `storage_options`). Those attrs are `None` for
session-backed s3fs — credentials live on `fs.session.get_credentials()`
and may rotate. The function returned `{}`, obstore's `from_url` got no
keys, and obstore silently fell back to its own credential chain (IMDS
on EC2). Under concurrent S3 workloads this manifested as intermittent
`NoCredentialsError` — users never saw a warning, and their Okta
credentials had been silently dropped.

This is the first of two independent PRs addressing related bugs in the
`download` expression pipeline. The matching threaded-path pre-resolve
fix (which avoids an IMDS thundering herd on the PyArrow fallback path,
and is triggered whenever this PR routes an unextractable filesystem to
threaded) is in a separate PR. Either can land first.

## Changes

- **`_extract_credentials_from_filesystem`** now returns
`Optional[Dict]`. `None` signals "route to threaded" so the user's
filesystem stays authoritative. Unrecognized non-`None` filesystems also
return `None` per the new contract.
- **`_frozen_s3fs_credentials`** (new helper) snapshots credentials via
`session.get_credentials().get_frozen_credentials()` for both sync
(botocore) and async (aiobotocore) sessions.
- **`_plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)`** (new
helper) centralizes dispatch at plan time with a one-shot `WARNING` per
filesystem when credentials can't be extracted.
- **`plan_download_op`** wires the routing helper so both the partition
actor choice (`AsyncPartitionActor` vs `PartitionActor`) and the
download function choice (`download_bytes_async` vs
`download_bytes_threaded`) use the same gate.
- **`download_bytes_async`** extracts credentials once in sync context
and threads the kwargs through to `_download_uris_with_obstore`. Fixes a
latent correctness issue where aiobotocore's async `get_credentials`
can't run from inside an already-running event loop.
- **`AsyncPartitionActor`** and **`_download_uris_with_obstore`** both
fail closed with a clear `RuntimeError` when constructed / called with a
filesystem whose credentials can't be statically extracted. Dispatch
should route such filesystems to the PyArrow path; reaching these code
paths anyway indicates a dispatch bug and the guard prevents regressions
back to the silent-drop behavior.

## Test plan

- [x] `pre-commit run` passes (ruff, pydoclint, black, docstyle,
semgrep, import order, mock-method / logger checks).
- [x] New unit tests in `test_obstore_download.py`:
- `TestSessionBackedFsspecCredentials` — sync botocore session,
aiobotocore async session, `_session` fallback for older s3fs,
unresolvable-session → `None`, no-access-key → `None`, `anon=True` skips
the session, static attrs win over session.
- `TestPlanObstoreRouting` — `None` filesystem → obstore, non-S3 fsspec
→ threaded, unextractable fsspec-S3 → threaded with warning, warning
dedup (same FS twice = one warning), extractable fsspec-S3 → obstore,
`AsyncPartitionActor` raises on unextractable creds,
`_download_uris_with_obstore` raises on unextractable creds.
- [ ] End-to-end against moto/minio with Okta-style fsspec
`S3FileSystem` over ~100 URIs (no `NoCredentialsError`, moto sees signed
requests with the session's current keys).

---------

Signed-off-by: xyuzh <xinyzng@gmail.com>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants