[Data] - Port over changes from lance-ray into Ray Data#60497
Conversation
There was a problem hiding this comment.
Code Review
The pull request introduces support for Lance namespaces, write retry mechanisms, and driver-side commit flow to Ray Data's LanceDatasink. The changes involve modifying the LanceDatasink and _write_fragment functions to handle new parameters related to namespaces and retries, and adding a new utility module lance_utils.py for namespace management. The pydoclint-baseline.txt file was updated to remove previously reported docstring issues, indicating improved documentation. A new test case was added to verify the correct passing of namespace arguments. Overall, the changes are well-structured and align with the described features. The introduction of call_with_retry and namespace handling enhances the robustness and flexibility of Lance integration.
|
Hey @goutamvenkat-anyscale , I implemented the #60147 migration (retry/namespace/driver commit). Maybe you can take a quick look at the PR’s approach when you have a moment—would love your review and whether it aligns with your expectations. Thanks! |
3d21e0b to
1c5363e
Compare
|
note: CI failure looks infra-related (Docker client/daemon API mismatch) and likely tied to recent infra updates, not this PR’s changes. |
46f1ffd to
096ba61
Compare
096ba61 to
a7e60e7
Compare
goutamvenkat-anyscale
left a comment
There was a problem hiding this comment.
Thanks for the change. Left a few comments.
| @@ -77,21 +125,97 @@ class _BaseLanceDatasink(Datasink): | |||
|
|
|||
| def __init__( | |||
| self, | |||
| uri: str, | |||
| uri: Optional[str] = None, | |||
| schema: Optional[pa.Schema] = None, | |||
| mode: Literal["create", "append", "overwrite"] = "create", | |||
There was a problem hiding this comment.
Let's use SaveMode enum
| describe_request = DescribeTableRequest(id=table_id) | ||
| describe_response = namespace.describe_table(describe_request) | ||
| self.uri = describe_response.location | ||
| if describe_response.storage_options: | ||
| merged_storage_options.update(describe_response.storage_options) |
There was a problem hiding this comment.
Append and overwrite seem to be functionally the same?
There was a problem hiding this comment.
Thanks for calling this out. This branch was ported from lance-ray and keeps the same mode semantics (source: lance_ray/datasink.py, _BaseLanceDatasink.__init__, append/overwrite handling: https://github.com/lance-format/lance-ray/blob/342949e6ee0f7cfe2355951addfccaae57e39301/lance_ray/datasink.py#L79). They are similar when the table already exists, but behavior differs when it does not: append should fail, while overwrite falls back to _declare_table_with_fallback; commit behavior also differs (LanceOperation.Append vs LanceOperation.Overwrite).
Now mode handling branch has been removed.
| captured = {} | ||
|
|
||
| class _FakeLanceDatasink: | ||
| def __init__(self, path, **kwargs): | ||
| captured["path"] = path | ||
| captured["kwargs"] = kwargs | ||
|
|
||
| def _fake_write_datasink(self, datasink, **kwargs): | ||
| captured["datasink"] = datasink | ||
| captured["write_kwargs"] = kwargs | ||
|
|
||
| monkeypatch.setattr(ray.data.dataset, "LanceDatasink", _FakeLanceDatasink) | ||
| monkeypatch.setattr(ray.data.Dataset, "write_datasink", _fake_write_datasink) |
There was a problem hiding this comment.
Let's create a test fixture to create a fake lancedb
There was a problem hiding this comment.
Great point. I updated the test to use a fixture-backed fake namespace/LanceDB setup.
| self.table_id = table_id | ||
| has_namespace_storage_options = True | ||
|
|
||
| if mode == "append": |
There was a problem hiding this comment.
Can we separate the different mode handling in a different PR?
a786aac to
6e68942
Compare
6e68942 to
7c2d623
Compare
7c2d623 to
26d5a54
Compare
|
note: CI failure looks not related to this PR (https://app.readthedocs.com/projects/anyscale-ray/builds/3732139/) |
| retry_params = { | ||
| "description": "write lance fragments", | ||
| "match": [], | ||
| "max_attempts": 1, |
There was a problem hiding this comment.
Curious why only 1 max_attempt? Is this what lance_ray is doing?
There was a problem hiding this comment.
This default was ported from lance-ray. In lance-ray, when retry_params is None, it explicitly falls back to a single attempt (max_attempts=1, max_backoff_s=0):
https://github.com/lance-format/lance-ray/blob/main/lance_ray/fragment.py#L77-L84
In Ray’s current main write path, _write_fragment is called from LanceDatasink.write(...) with explicit retry params (now sourced fromDataContext.lance_config), so this 1 is effectively only a defensive fallback and not the primary retry strategy.
26d5a54 to
e07f2d7
Compare
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
prrao87
left a comment
There was a problem hiding this comment.
Made a few observations on the state of lance-ray 0.2.0 (latest release), but overall, looks good. I also think it's worth bumping the pylance SDK version to 2.x as there have been numerous bugfixes and performance improvements since 1.x.
LGTM!
| self.table_id = table_id | ||
| if mode != SaveMode.CREATE: | ||
| raise ValueError( | ||
| "Namespace writes currently only support mode='create'. " |
There was a problem hiding this comment.
This currently rejects namespace writes unless mode == SaveMode.CREATE. Since the PR description says this ports lance-ray namespace support, should we also support namespace APPEND and OVERWRITE here? lance-ray handles both by resolving existing table location via describe_table and only declaring when needed.
There was a problem hiding this comment.
Thanks for calling this out. This gap is real relative to lance-ray parity, but I kept namespace writes scoped to SaveMode.CREATE here to align with earlier review feedback (#60497 (comment)) to keep mode-handling out of this PR for now. I have updated the Ray Data write_lance() / LanceDatasink docs to make that constraint explicit, and added a test to lock in the current behavior for namespace-backed writes. I plan to treat namespace APPEND / OVERWRITE parity as follow-up work rather than expanding the scope of this PR.
| @@ -5442,16 +5445,21 @@ def write_lance( | |||
| ds.write_lance("/tmp/data/") | |||
There was a problem hiding this comment.
We're adding namespace args to write_lance(), but read_lance() still appears URI-only (python/ray/data/read_api.py was untouched). Should we add namespace-based read parity as well (or explicitly flag the namespace scope in the PR description to state that it focuses on write-only namespace support)? Otherwise the end-to-end namespace workflow remains incomplete compared to lance-ray.
There was a problem hiding this comment.
Good catch. I am keeping namespace support in this PR scoped to the write path only, rather than expanding into read_lance() here. That scope follows the target of issue #60147 (#60147), which is specifically about porting functionality into Ray Data LanceDatasink. I have updated the PR description to make that explicit: this change covers write-side namespace support in Dataset.write_lance() / LanceDatasink, while namespace-based read parity remains out of scope for this PR.
| ) -> List[Tuple["FragmentMetadata", "pa.Schema"]]: | ||
| import pandas as pd | ||
| from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, write_fragments | ||
|
|
||
| stream = list(stream) |
There was a problem hiding this comment.
This materializes all blocks with stream = list(stream), which could increase peak memory for large writes vs. the prior streaming iterator path. Is this required, or can we preserve streaming behavior (or at add a comment to document the tradeoff) to avoid O(total input size) buffering on the writer?
There was a problem hiding this comment.
Good point. The original list(stream) was there to make the input replayable under call_with_retry, but it did mean we eagerly materialized all blocks even on the common single-attempt path. I updated _write_fragment() so we now preserve streaming behavior when max_attempts == 1, and only materialize the input stream when retries are actually enabled (max_attempts > 1) and we need replayable input. I also added a focused test to cover both cases.
|
I think the scope of this PR largely makes sense. More feature parity can be incrementally added in future PRs. Thanks for your work @myandpr ! |
Thanks for the thoughtful review, @prrao87. I made a few small follow-up updates and clarifications based on the review comments. I agree that the remaining parity work can be handled incrementally in future PRs. I also plan to keep the pylance / lance-ray 0.2.0 upgrade as a separate follow-up PR so it can be evaluated independently. |
3663c16 to
3b20c2c
Compare
|
CI failure is not related with current PR |
|
Hi @goutamvenkat-anyscale , could you take another look, when you have a chance? |
| max_attempts = retry_params.get("max_attempts", 1) | ||
| if max_attempts > 1: | ||
| # Retries need a replayable input stream, so materialize blocks only when | ||
| # the write may be attempted more than once. | ||
| replayable_stream = list(stream) | ||
| if not replayable_stream: | ||
| return [] | ||
| first = replayable_stream[0] | ||
|
|
||
| def stream_factory(): | ||
| return iter(replayable_stream) | ||
|
|
||
| else: | ||
| stream = iter(stream) | ||
| first = next(stream, None) | ||
| if first is None: | ||
| return [] | ||
|
|
||
| def stream_factory(): | ||
| return chain([first], stream) |
There was a problem hiding this comment.
Let's make this a separate function:
def _make_stream_factory(
stream: Iterable[Block], replayable: bool
) -> Optional[Callable[[], Iterator[Block]]]:
"""Returns a stream factory, or None if the stream is empty."""
if replayable:
blocks = list(stream)
return (lambda: iter(blocks)) if blocks else None
else:
it = iter(stream)
first = next(it, None)
return (lambda: chain([first], it)) if first is not None else None
and ideally let's also reduce the amount of function nesting going on so that's easier to follow
|
|
||
|
|
||
| def _make_stream_factory( | ||
| stream: Iterable[Union["pa.Table", "pd.DataFrame"]], replayable: bool |
There was a problem hiding this comment.
Why not just Iterable[Block]?
There was a problem hiding this comment.
Good point, Block is the better type here. I kept the concrete union when I first extracted the helper, but that was unnecessary. Updated to use Iterable[Block] / Block.
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: yaommen <myanstu@163.com>
a909fc5 to
e72217d
Compare
|
@goutamvenkat-anyscale @prrao87 Thanks for the thorough review — appreciate it! |
…60497) ## Description Port lance-ray datasink features into Ray Data LanceDatasink: write retry, Lance namespaces, and driver-side commit flow. ## Related issues > Link related issues: "Fixes ray-project#60147", "Closes ray-project#60147", or "Related to ray-project#60147". ## Additional information #### implementation details - Write retry - Logic/parameters (from lance-ray): retry on LanceError(IO) + DataContext.retried_io_errors, with max_attempts=10 and max_backoff_s=32. - Execution framework (Ray-native): use ray._common.retry.call_with_retry to wrap write_fragments. - Lance namespaces (from lance-ray) - This PR focuses on write-side namespace support in Ray Data (`Dataset.write_lance()` / `LanceDatasink`). - Add table_id / namespace_impl / namespace_properties to LanceDatasink. - Resolve/declare tables via DescribeTableRequest / DeclareTableRequest / CreateEmptyTableRequest. - Merge user storage_options with namespace-provided options. - Create and pass storage_options_provider to write_fragments, LanceDataset, and commit. - New helper module: python/ray/data/_internal/datasource/lance_utils.py. - Namespace read parity via `read_lance()` is not included in this PR. - Driver commit (aligned with lance-ray) - Keep the same driver-side commit behavior as lance-ray: collect fragments and use the last observed schema from write results, then commit. - This does not implement true schema merge/unification, since lance-ray itself doesn’t either. - High-level API - Dataset.write_lance() now accepts and forwards table_id / namespace_impl / namespace_properties. - Tests - Added a parameterized test to verify write_lance() forwards namespace arguments. #### Testing - python -m pytest python/ray/data/tests/datasource/test_lance.py -q #### Notes - Requires a lance/pylance version that supports storage_options_provider (validated locally with 1.0.3). --------- Signed-off-by: yaommen <myanstu@163.com>
…60497) ## Description Port lance-ray datasink features into Ray Data LanceDatasink: write retry, Lance namespaces, and driver-side commit flow. ## Related issues > Link related issues: "Fixes ray-project#60147", "Closes ray-project#60147", or "Related to ray-project#60147". ## Additional information #### implementation details - Write retry - Logic/parameters (from lance-ray): retry on LanceError(IO) + DataContext.retried_io_errors, with max_attempts=10 and max_backoff_s=32. - Execution framework (Ray-native): use ray._common.retry.call_with_retry to wrap write_fragments. - Lance namespaces (from lance-ray) - This PR focuses on write-side namespace support in Ray Data (`Dataset.write_lance()` / `LanceDatasink`). - Add table_id / namespace_impl / namespace_properties to LanceDatasink. - Resolve/declare tables via DescribeTableRequest / DeclareTableRequest / CreateEmptyTableRequest. - Merge user storage_options with namespace-provided options. - Create and pass storage_options_provider to write_fragments, LanceDataset, and commit. - New helper module: python/ray/data/_internal/datasource/lance_utils.py. - Namespace read parity via `read_lance()` is not included in this PR. - Driver commit (aligned with lance-ray) - Keep the same driver-side commit behavior as lance-ray: collect fragments and use the last observed schema from write results, then commit. - This does not implement true schema merge/unification, since lance-ray itself doesn’t either. - High-level API - Dataset.write_lance() now accepts and forwards table_id / namespace_impl / namespace_properties. - Tests - Added a parameterized test to verify write_lance() forwards namespace arguments. #### Testing - python -m pytest python/ray/data/tests/datasource/test_lance.py -q #### Notes - Requires a lance/pylance version that supports storage_options_provider (validated locally with 1.0.3). --------- Signed-off-by: yaommen <myanstu@163.com>

Description
Port lance-ray datasink features into Ray Data LanceDatasink: write retry, Lance namespaces, and driver-side commit flow.
Related issues
Additional information
implementation details
Dataset.write_lance()/LanceDatasink).read_lance()is not included in this PR.Testing
Notes