Skip to content

[Data] - Port over changes from lance-ray into Ray Data#60497

Merged
goutamvenkat-anyscale merged 5 commits into
ray-project:masterfrom
myandpr:migrate-lance-ray
Mar 23, 2026
Merged

[Data] - Port over changes from lance-ray into Ray Data#60497
goutamvenkat-anyscale merged 5 commits into
ray-project:masterfrom
myandpr:migrate-lance-ray

Conversation

@myandpr

@myandpr myandpr commented Jan 26, 2026

Copy link
Copy Markdown
Member

Description

Port lance-ray datasink features into Ray Data LanceDatasink: write retry, Lance namespaces, and driver-side commit flow.

Related issues

Link related issues: "Fixes #60147", "Closes #60147", or "Related to #60147".

Additional information

implementation details

  • Write retry
    • Logic/parameters (from lance-ray): retry on LanceError(IO) + DataContext.retried_io_errors, with max_attempts=10 and max_backoff_s=32.
    • Execution framework (Ray-native): use ray._common.retry.call_with_retry to wrap write_fragments.
  • Lance namespaces (from lance-ray)
    • This PR focuses on write-side namespace support in Ray Data (Dataset.write_lance() / LanceDatasink).
    • Add table_id / namespace_impl / namespace_properties to LanceDatasink.
    • Resolve/declare tables via DescribeTableRequest / DeclareTableRequest / CreateEmptyTableRequest.
    • Merge user storage_options with namespace-provided options.
    • Create and pass storage_options_provider to write_fragments, LanceDataset, and commit.
    • New helper module: python/ray/data/_internal/datasource/lance_utils.py.
    • Namespace read parity via read_lance() is not included in this PR.
  • Driver commit (aligned with lance-ray)
    • Keep the same driver-side commit behavior as lance-ray: collect fragments and use the last observed schema from write results, then commit.
    • This does not implement true schema merge/unification, since lance-ray itself doesn’t either.
  • High-level API
    • Dataset.write_lance() now accepts and forwards table_id / namespace_impl / namespace_properties.
  • Tests
    • Added a parameterized test to verify write_lance() forwards namespace arguments.

Testing

  • python -m pytest python/ray/data/tests/datasource/test_lance.py -q

Notes

  • Requires a lance/pylance version that supports storage_options_provider (validated locally with 1.0.3).
@myandpr myandpr requested a review from a team as a code owner January 26, 2026 16:32

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces support for Lance namespaces, write retry mechanisms, and driver-side commit flow to Ray Data's LanceDatasink. The changes involve modifying the LanceDatasink and _write_fragment functions to handle new parameters related to namespaces and retries, and adding a new utility module lance_utils.py for namespace management. The pydoclint-baseline.txt file was updated to remove previously reported docstring issues, indicating improved documentation. A new test case was added to verify the correct passing of namespace arguments. Overall, the changes are well-structured and align with the described features. The introduction of call_with_retry and namespace handling enhances the robustness and flexibility of Lance integration.

Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/lance_datasink.py
Comment thread python/ray/data/_internal/datasource/lance_datasink.py
@myandpr

myandpr commented Jan 26, 2026

Copy link
Copy Markdown
Member Author

Hey @goutamvenkat-anyscale , I implemented the #60147 migration (retry/namespace/driver commit). Maybe you can take a quick look at the PR’s approach when you have a moment—would love your review and whether it aligns with your expectations. Thanks!

@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 26, 2026
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Feb 3, 2026
@myandpr

myandpr commented Feb 4, 2026

Copy link
Copy Markdown
Member Author

note: CI failure looks infra-related (Docker client/daemon API mismatch) and likely tied to recent infra updates, not this PR’s changes.

Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/lance_datasink.py
Comment thread python/ray/data/dataset.py

@goutamvenkat-anyscale goutamvenkat-anyscale left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change. Left a few comments.

@@ -77,21 +125,97 @@ class _BaseLanceDatasink(Datasink):

def __init__(
self,
uri: str,
uri: Optional[str] = None,
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use SaveMode enum

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update.

Comment on lines +173 to +177
describe_request = DescribeTableRequest(id=table_id)
describe_response = namespace.describe_table(describe_request)
self.uri = describe_response.location
if describe_response.storage_options:
merged_storage_options.update(describe_response.storage_options)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Append and overwrite seem to be functionally the same?

@myandpr myandpr Feb 6, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out. This branch was ported from lance-ray and keeps the same mode semantics (source: lance_ray/datasink.py, _BaseLanceDatasink.__init__, append/overwrite handling: https://github.com/lance-format/lance-ray/blob/342949e6ee0f7cfe2355951addfccaae57e39301/lance_ray/datasink.py#L79). They are similar when the table already exists, but behavior differs when it does not: append should fail, while overwrite falls back to _declare_table_with_fallback; commit behavior also differs (LanceOperation.Append vs LanceOperation.Overwrite).

Now mode handling branch has been removed.

Comment on lines +257 to +269
captured = {}

class _FakeLanceDatasink:
def __init__(self, path, **kwargs):
captured["path"] = path
captured["kwargs"] = kwargs

def _fake_write_datasink(self, datasink, **kwargs):
captured["datasink"] = datasink
captured["write_kwargs"] = kwargs

monkeypatch.setattr(ray.data.dataset, "LanceDatasink", _FakeLanceDatasink)
monkeypatch.setattr(ray.data.Dataset, "write_datasink", _fake_write_datasink)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a test fixture to create a fake lancedb

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. I updated the test to use a fixture-backed fake namespace/LanceDB setup.

self.table_id = table_id
has_namespace_storage_options = True

if mode == "append":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate the different mode handling in a different PR?

Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
Comment thread python/ray/data/tests/datasource/test_lance.py
@myandpr myandpr force-pushed the migrate-lance-ray branch 2 times, most recently from a786aac to 6e68942 Compare February 7, 2026 02:45
Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
@myandpr

myandpr commented Feb 8, 2026

Copy link
Copy Markdown
Member Author

note: CI failure looks not related to this PR (https://app.readthedocs.com/projects/anyscale-ray/builds/3732139/)

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
Comment thread python/ray/data/tests/datasource/test_lance.py Outdated
Comment thread python/ray/data/_internal/datasource/lance_datasink.py Outdated
retry_params = {
"description": "write lance fragments",
"match": [],
"max_attempts": 1,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why only 1 max_attempt? Is this what lance_ray is doing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default was ported from lance-ray. In lance-ray, when retry_params is None, it explicitly falls back to a single attempt (max_attempts=1, max_backoff_s=0):
https://github.com/lance-format/lance-ray/blob/main/lance_ray/fragment.py#L77-L84

In Ray’s current main write path, _write_fragment is called from LanceDatasink.write(...) with explicit retry params (now sourced fromDataContext.lance_config), so this 1 is effectively only a defensive fallback and not the primary retry strategy.

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
@github-actions

Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions Bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Feb 26, 2026
@myandpr myandpr removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Feb 26, 2026

@prrao87 prrao87 left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a few observations on the state of lance-ray 0.2.0 (latest release), but overall, looks good. I also think it's worth bumping the pylance SDK version to 2.x as there have been numerous bugfixes and performance improvements since 1.x.

LGTM!

self.table_id = table_id
if mode != SaveMode.CREATE:
raise ValueError(
"Namespace writes currently only support mode='create'. "

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently rejects namespace writes unless mode == SaveMode.CREATE. Since the PR description says this ports lance-ray namespace support, should we also support namespace APPEND and OVERWRITE here? lance-ray handles both by resolving existing table location via describe_table and only declaring when needed.

@myandpr myandpr Mar 9, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out. This gap is real relative to lance-ray parity, but I kept namespace writes scoped to SaveMode.CREATE here to align with earlier review feedback (#60497 (comment)) to keep mode-handling out of this PR for now. I have updated the Ray Data write_lance() / LanceDatasink docs to make that constraint explicit, and added a test to lock in the current behavior for namespace-backed writes. I plan to treat namespace APPEND / OVERWRITE parity as follow-up work rather than expanding the scope of this PR.

@@ -5442,16 +5445,21 @@ def write_lance(
ds.write_lance("/tmp/data/")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're adding namespace args to write_lance(), but read_lance() still appears URI-only (python/ray/data/read_api.py was untouched). Should we add namespace-based read parity as well (or explicitly flag the namespace scope in the PR description to state that it focuses on write-only namespace support)? Otherwise the end-to-end namespace workflow remains incomplete compared to lance-ray.

@myandpr myandpr Mar 9, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I am keeping namespace support in this PR scoped to the write path only, rather than expanding into read_lance() here. That scope follows the target of issue #60147 (#60147), which is specifically about porting functionality into Ray Data LanceDatasink. I have updated the PR description to make that explicit: this change covers write-side namespace support in Dataset.write_lance() / LanceDatasink, while namespace-based read parity remains out of scope for this PR.

) -> List[Tuple["FragmentMetadata", "pa.Schema"]]:
import pandas as pd
from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, write_fragments

stream = list(stream)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This materializes all blocks with stream = list(stream), which could increase peak memory for large writes vs. the prior streaming iterator path. Is this required, or can we preserve streaming behavior (or at add a comment to document the tradeoff) to avoid O(total input size) buffering on the writer?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The original list(stream) was there to make the input replayable under call_with_retry, but it did mean we eagerly materialized all blocks even on the common single-attempt path. I updated _write_fragment() so we now preserve streaming behavior when max_attempts == 1, and only materialize the input stream when retries are actually enabled (max_attempts > 1) and we need replayable input. I also added a focused test to cover both cases.

@prrao87

prrao87 commented Mar 9, 2026

Copy link
Copy Markdown

I think the scope of this PR largely makes sense. More feature parity can be incrementally added in future PRs. Thanks for your work @myandpr !

@myandpr

myandpr commented Mar 9, 2026

Copy link
Copy Markdown
Member Author

I think the scope of this PR largely makes sense. More feature parity can be incrementally added in future PRs. Thanks for your work @myandpr !

Thanks for the thoughtful review, @prrao87.

I made a few small follow-up updates and clarifications based on the review comments. I agree that the remaining parity work can be handled incrementally in future PRs. I also plan to keep the pylance / lance-ray 0.2.0 upgrade as a separate follow-up PR so it can be evaluated independently.

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
@myandpr myandpr force-pushed the migrate-lance-ray branch from 3663c16 to 3b20c2c Compare March 10, 2026 01:26

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Comment thread python/ray/data/datasource/file_datasink.py
Comment thread python/ray/data/_internal/datasource/lance_datasink.py
@myandpr

myandpr commented Mar 10, 2026

Copy link
Copy Markdown
Member Author

CI failure is not related with current PR //python/ray/train:test_xgboost_trainer FAILED in 3 out of 3

@myandpr

myandpr commented Mar 10, 2026

Copy link
Copy Markdown
Member Author

Hi @goutamvenkat-anyscale , could you take another look, when you have a chance?

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
Comment on lines +77 to +96
max_attempts = retry_params.get("max_attempts", 1)
if max_attempts > 1:
# Retries need a replayable input stream, so materialize blocks only when
# the write may be attempted more than once.
replayable_stream = list(stream)
if not replayable_stream:
return []
first = replayable_stream[0]

def stream_factory():
return iter(replayable_stream)

else:
stream = iter(stream)
first = next(stream, None)
if first is None:
return []

def stream_factory():
return chain([first], stream)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a separate function:

def _make_stream_factory(
    stream: Iterable[Block], replayable: bool
) -> Optional[Callable[[], Iterator[Block]]]:
    """Returns a stream factory, or None if the stream is empty."""
    if replayable:
        blocks = list(stream)
        return (lambda: iter(blocks)) if blocks else None
    else:
        it = iter(stream)
        first = next(it, None)
        return (lambda: chain([first], it)) if first is not None else None

and ideally let's also reduce the amount of function nesting going on so that's easier to follow

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
Comment thread python/ray/data/_internal/datasource/lance_datasink.py


def _make_stream_factory(
stream: Iterable[Union["pa.Table", "pd.DataFrame"]], replayable: bool

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Iterable[Block]?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, Block is the better type here. I kept the concrete union when I first extracted the helper, but that was unnecessary. Updated to use Iterable[Block] / Block.

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
myandpr added 5 commits March 17, 2026 14:50
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
@myandpr myandpr force-pushed the migrate-lance-ray branch from a909fc5 to e72217d Compare March 17, 2026 06:52

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Comment thread python/ray/data/_internal/datasource/lance_datasink.py
@goutamvenkat-anyscale goutamvenkat-anyscale merged commit e22bdee into ray-project:master Mar 23, 2026
6 checks passed
@myandpr

myandpr commented Mar 24, 2026

Copy link
Copy Markdown
Member Author

@goutamvenkat-anyscale @prrao87 Thanks for the thorough review — appreciate it!

ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Mar 25, 2026
…60497)

## Description
Port lance-ray datasink features into Ray Data LanceDatasink: write
retry, Lance namespaces, and driver-side commit flow.

## Related issues
> Link related issues: "Fixes ray-project#60147", "Closes ray-project#60147", or "Related to
ray-project#60147".

## Additional information

#### implementation details

- Write retry
- Logic/parameters (from lance-ray): retry on LanceError(IO) +
DataContext.retried_io_errors, with max_attempts=10 and
max_backoff_s=32.
- Execution framework (Ray-native): use
ray._common.retry.call_with_retry to wrap write_fragments.
- Lance namespaces (from lance-ray)
- This PR focuses on write-side namespace support in Ray Data
(`Dataset.write_lance()` / `LanceDatasink`).
- Add table_id / namespace_impl / namespace_properties to LanceDatasink.
- Resolve/declare tables via DescribeTableRequest / DeclareTableRequest
/ CreateEmptyTableRequest.
    - Merge user storage_options with namespace-provided options.
- Create and pass storage_options_provider to write_fragments,
LanceDataset, and commit.
- New helper module:
python/ray/data/_internal/datasource/lance_utils.py.
- Namespace read parity via `read_lance()` is not included in this PR.
- Driver commit (aligned with lance-ray)
- Keep the same driver-side commit behavior as lance-ray: collect
fragments and use the last observed schema from write results, then
commit.
- This does not implement true schema merge/unification, since lance-ray
itself doesn’t either.
- High-level API
- Dataset.write_lance() now accepts and forwards table_id /
namespace_impl / namespace_properties.
- Tests
- Added a parameterized test to verify write_lance() forwards namespace
arguments.

#### Testing

- python -m pytest python/ray/data/tests/datasource/test_lance.py -q

#### Notes

- Requires a lance/pylance version that supports
storage_options_provider (validated locally with 1.0.3).

---------

Signed-off-by: yaommen <myanstu@163.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…60497)

## Description
Port lance-ray datasink features into Ray Data LanceDatasink: write
retry, Lance namespaces, and driver-side commit flow.

## Related issues
> Link related issues: "Fixes ray-project#60147", "Closes ray-project#60147", or "Related to
ray-project#60147".

## Additional information

#### implementation details

- Write retry
- Logic/parameters (from lance-ray): retry on LanceError(IO) +
DataContext.retried_io_errors, with max_attempts=10 and
max_backoff_s=32.
- Execution framework (Ray-native): use
ray._common.retry.call_with_retry to wrap write_fragments.
- Lance namespaces (from lance-ray)
- This PR focuses on write-side namespace support in Ray Data
(`Dataset.write_lance()` / `LanceDatasink`).
- Add table_id / namespace_impl / namespace_properties to LanceDatasink.
- Resolve/declare tables via DescribeTableRequest / DeclareTableRequest
/ CreateEmptyTableRequest.
    - Merge user storage_options with namespace-provided options.
- Create and pass storage_options_provider to write_fragments,
LanceDataset, and commit.
- New helper module:
python/ray/data/_internal/datasource/lance_utils.py.
- Namespace read parity via `read_lance()` is not included in this PR.
- Driver commit (aligned with lance-ray)
- Keep the same driver-side commit behavior as lance-ray: collect
fragments and use the last observed schema from write results, then
commit.
- This does not implement true schema merge/unification, since lance-ray
itself doesn’t either.
- High-level API
- Dataset.write_lance() now accepts and forwards table_id /
namespace_impl / namespace_properties.
- Tests
- Added a parameterized test to verify write_lance() forwards namespace
arguments.

#### Testing

- python -m pytest python/ray/data/tests/datasource/test_lance.py -q

#### Notes

- Requires a lance/pylance version that supports
storage_options_provider (validated locally with 1.0.3).

---------

Signed-off-by: yaommen <myanstu@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

6 participants