Skip to content

[Data] Convert one-to-one logical operators to frozen dataclasses#61364

Merged
bveeramani merged 2 commits into
ray-project:masterfrom
myandpr:pr-d1-frozen-one-to-one
Mar 4, 2026
Merged

[Data] Convert one-to-one logical operators to frozen dataclasses#61364
bveeramani merged 2 commits into
ray-project:masterfrom
myandpr:pr-d1-frozen-one-to-one

Conversation

@myandpr

@myandpr myandpr commented Feb 26, 2026

Copy link
Copy Markdown
Member

Description

This PR implements converting one-to-one logical operators to frozen dataclasses.

Why this is needed:

  • This is the first operator-group step for the frozen logical-operator migration under [Data] Make LogicalPlan stateless and comparable #60312.
  • It removes in-place mutation paths for one-to-one logical operators.
  • It keeps the change scoped to logical-layer behavior needed for D1.

What this PR changes:

  • Converts one-to-one logical operators to frozen dataclasses:
    • Limit
    • Download
  • Applies one-to-one construction cleanup for frozen compatibility:
    • uses InitVar[LogicalOperator] + __post_init__ to initialize _name, _input_dependencies, and _num_outputs
    • makes Download.ray_remote_args a canonical dict field (default_factory=dict)
  • Adds frozen-safe transform behavior:
    • Limit._apply_transform() recreates Limit when input changes
    • Download._apply_transform() recreates Download when input changes
  • Updates optimizer rules to avoid mutating frozen instances:
    • limit_pushdown.py: recreate Limit/Download on input replacement
    • predicate_pushdown.py: recreate Limit on input replacement
  • Adds regression coverage for Limit(Download(...)) under limit pushdown.
  • Scope is intentionally D1-only (one-to-one logical operators); no map/all-to-all or physical-layer changes in this PR.

Related issues

Link related issues: "Fixes #60312", or "Related to #60312".

Additional information

Tests

Added/updated:

  • python/ray/data/tests/test_execution_optimizer_limit_pushdown.py
    • adds regression for Limit(Download(...)) under limit pushdown with frozen operators

Validated with targeted existing tests:

  • python/ray/data/tests/test_execution_optimizer_limit_pushdown.py
  • python/ray/data/tests/test_predicate_pushdown.py
  • python/ray/data/tests/test_operator_fusion.py
  • python/ray/data/tests/test_execution_optimizer_basic.py
  • python/ray/data/tests/test_execution_optimizer_advanced.py
  • python/ray/data/tests/test_projection_fusion.py
  • python/ray/data/tests/test_randomize_block_order.py
  • python/ray/data/tests/test_state_export.py::test_logical_op_args
  • python/ray/data/tests/unit/test_logical_plan.py

Stack Plan

To complete #60312, the original stack was:

  1. [Data] Use input_dependencies property in logical operator #60529
  2. [Data][2/4] Rename logical operator attributes to public #60528
  3. [Data] Drop output_dependencies from logical operators #60530
  4. [Data] Convert logical operators to frozen dataclasses #60531

Since PR4 was still too large, it is being further split:

  1. PR-A: default LogicalOperator naming behavior [Data] Default LogicalOperator name to class name #61020
  2. PR-B: move output_dependencies responsibility to physical side [Data] Move output_dependencies responsibilities to PhysicalOperator #61107
  3. PR-C: make LogicalOperator an ABC with abstract num_outputs [Data] Make LogicalOperator an ABC with abstract num_outputs #61308
  4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3)
    • D1: one-to-one operators (this PR)
    • D2: map operators
    • D3: all-to-all + join/read/write groups (as needed)

Planned follow-ups (not blocking this stack):

  • Converting all input_op usage to input_dependencies
  • Potential AbstractFrom restructuring
@myandpr myandpr requested a review from a team as a code owner February 26, 2026 19:39

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Limit and Download logical operators to be frozen dataclasses, which is a great step towards ensuring the immutability of the logical plan and improving overall robustness. The necessary adjustments in the optimizer rules (limit_pushdown, predicate_pushdown) are correctly implemented, and a new test is added to cover these changes. I have a couple of suggestions to make the optimizer rules more robust and maintainable in light of these changes.

Comment thread python/ray/data/_internal/logical/rules/limit_pushdown.py
Comment thread python/ray/data/_internal/logical/rules/predicate_pushdown.py

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/logical/operators/one_to_one_operator.py
@ray-gardener ray-gardener Bot added the community-contribution Contributed by the community label Feb 27, 2026
@myandpr myandpr force-pushed the pr-d1-frozen-one-to-one branch from a7be8d5 to 46bcead Compare March 2, 2026 09:45
@myandpr

myandpr commented Mar 2, 2026

Copy link
Copy Markdown
Member Author

Heads-up: the current microcheck failure appears unrelated to this PR's D1 changes. The failing test is python/ray/data/tests/test_read_datasource.py::test_read_datasource_compute_strategy, which is asserting on read_op._compute (private field), while Read exposes compute as a public attribute. This maybe came from #59633 and can be fixed in a small follow-up PR by switching _compute -> compute in that test.

I have submit #61423 to fix this issue.

myandpr added 2 commits March 3, 2026 11:05
…semantics

Signed-off-by: yaommen <myanstu@163.com>
@myandpr myandpr force-pushed the pr-d1-frozen-one-to-one branch from 46bcead to d055e17 Compare March 3, 2026 03:06
object.__setattr__(self, "_input_dependencies", [input_op])
object.__setattr__(self, "_num_outputs", None)

def _apply_transform(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we default to the LogicalOperator._apply_transform implementation and don't override this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can’t use the default _apply_transform for frozen Limit/Download yet: Operator._apply_transform does copy.copy(self) and then assigns _input_dependencies, which raises FrozenInstanceError on frozen dataclasses. That’s why these one-to-one operators override _apply_transform and rebuild nodes instead of mutating fields.

Comment on lines +76 to +78
object.__setattr__(self, "_name", f"limit={self.limit}")
object.__setattr__(self, "_input_dependencies", [input_op])
object.__setattr__(self, "_num_outputs", None)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When LogicalOperator is a pure abstract class (no constructor parameters), I think we should work towards avoiding __setattr__. Maybe it could look like this:

num_outputs: Optional[int] = field(init=False, default=None, repr=False)

@property
def name(self) -> str:
    return f"limit={self.limit"}"

@property
def input_dependencies(self):
    return [self.input_op]

Okay as-is because this is a transient state and I trust it'll all look good in the end

object.__setattr__(self, "_input_dependencies", [input_op])
object.__setattr__(self, "_num_outputs", None)

def _apply_transform(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same root cause as above: default _apply_transform does copy.copy + input_dependencies reassignment, which is invalid for frozen dataclasses; this override keeps Download frozen-safe by rebuilding on input change.

Comment on lines +320 to +322
if isinstance(op, Limit):
assert len(new_inputs) == 1, len(new_inputs)
return Limit(new_inputs[0], op.limit)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for this change/how does it relate to this refactor? What happens if we remove it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For predicate_pushdown, the Limit branch is for the same reason: the generic clone path uses copy.copy(op) + input_dependencies reassignment. Removing the Limit special-case would reintroduce FrozenInstanceError when pushing predicates through Limit.

@bveeramani bveeramani enabled auto-merge (squash) March 4, 2026 03:20
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label Mar 4, 2026
@bveeramani bveeramani merged commit 4356f0f into ray-project:master Mar 4, 2026
8 checks passed
bveeramani pushed a commit that referenced this pull request Mar 18, 2026
## Description

This PR implements converting map logical operators to frozen
dataclasses.

#### Why this is needed:

- This is the second operator-group step under #60312.
- It removes in-place mutation paths for map logical operators.
- It keeps the scope limited to logical map operators and the minimum
rule updates needed for frozen compatibility.

#### What this PR changes:

- Converts map logical operators to frozen dataclasses:
  - `MapBatches`
  - `MapRows`
  - `Filter`
  - `Project`
  - `FlatMap`
  - `StreamingRepartition`
- Applies map construction cleanup for frozen compatibility:
- uses `InitVar[LogicalOperator]` + `__post_init__` to initialize
`_name`, `_input_dependencies`, and `_num_outputs`
- uses canonical dict defaults (`default_factory=dict`) for map remote
args fields
- keeps `eq=False` intentionally to avoid introducing field-based
equality/hash semantics for operators with mutable fields (e.g.,
lists/dicts), and to stay aligned with prior identity-based behavior
- Adds frozen-safe transform behavior for map operators:
- map operators recreate nodes on input change (no in-place input
mutation)
- Updates optimizer rules to avoid mutating frozen map operators:
- `limit_pushdown.py`: uses frozen-safe recreation/replace logic for map
operators (including per-block-limit path)
- `predicate_pushdown.py`: uses frozen-safe recreation/replace logic
when cloning map operators with new inputs
- these rule changes are required because the generic clone path
(`copy.copy` + input reassignment / setter mutation) is not valid for
frozen map operators and can raise `FrozenInstanceError`
- Scope is intentionally D2-only: no all-to-all/join/read/write
conversion in this PR; no physical-layer behavior changes

## Related issues

Link related issues: "Fixes #60312", or "Related to #60312".

## Additional information

### Tests
Validated with targeted existing tests

### Stack Plan

To complete [#60312](#60312),
the original stack was:

1. #60529
2. #60528
3. #60530
4. #60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
#61020
2. PR-B: move `output_dependencies` responsibility to physical side
#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
(D1/D2/D3)
- D1: one-to-one operators #61364
    - D2: map operators (this PR)
    - D3: all-to-all + join/read/write groups (as needed)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring

---------

Signed-off-by: yaommen <myanstu@163.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Mar 25, 2026
…ct#61481)

## Description

This PR implements converting map logical operators to frozen
dataclasses.

#### Why this is needed:

- This is the second operator-group step under ray-project#60312.
- It removes in-place mutation paths for map logical operators.
- It keeps the scope limited to logical map operators and the minimum
rule updates needed for frozen compatibility.

#### What this PR changes:

- Converts map logical operators to frozen dataclasses:
  - `MapBatches`
  - `MapRows`
  - `Filter`
  - `Project`
  - `FlatMap`
  - `StreamingRepartition`
- Applies map construction cleanup for frozen compatibility:
- uses `InitVar[LogicalOperator]` + `__post_init__` to initialize
`_name`, `_input_dependencies`, and `_num_outputs`
- uses canonical dict defaults (`default_factory=dict`) for map remote
args fields
- keeps `eq=False` intentionally to avoid introducing field-based
equality/hash semantics for operators with mutable fields (e.g.,
lists/dicts), and to stay aligned with prior identity-based behavior
- Adds frozen-safe transform behavior for map operators:
- map operators recreate nodes on input change (no in-place input
mutation)
- Updates optimizer rules to avoid mutating frozen map operators:
- `limit_pushdown.py`: uses frozen-safe recreation/replace logic for map
operators (including per-block-limit path)
- `predicate_pushdown.py`: uses frozen-safe recreation/replace logic
when cloning map operators with new inputs
- these rule changes are required because the generic clone path
(`copy.copy` + input reassignment / setter mutation) is not valid for
frozen map operators and can raise `FrozenInstanceError`
- Scope is intentionally D2-only: no all-to-all/join/read/write
conversion in this PR; no physical-layer behavior changes

## Related issues

Link related issues: "Fixes ray-project#60312", or "Related to ray-project#60312".

## Additional information

### Tests
Validated with targeted existing tests

### Stack Plan

To complete [ray-project#60312](ray-project#60312),
the original stack was:

1. ray-project#60529
2. ray-project#60528
3. ray-project#60530
4. ray-project#60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
ray-project#61020
2. PR-B: move `output_dependencies` responsibility to physical side
ray-project#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
ray-project#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
(D1/D2/D3)
- D1: one-to-one operators ray-project#61364
    - D2: map operators (this PR)
    - D3: all-to-all + join/read/write groups (as needed)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring

---------

Signed-off-by: yaommen <myanstu@163.com>
bveeramani pushed a commit that referenced this pull request Apr 13, 2026
#62321)

## Description

This PR implements converting all-to-all, join, read, and write logical
operators to frozen dataclasses.

#### Why this is needed:

- This is the D3 operator-group step under #60312.
- It removes in-place mutation paths for these logical operators.
- It keeps the scope limited to logical all-to-all/join/read/write
operators and the minimum rule updates needed for frozen compatibility.

#### What this PR changes:

- Converts logical operators to frozen dataclasses:
  - all-to-all logical operators:
    - `RandomizeBlocks`
    - `RandomShuffle`
    - `Repartition`
    - `Sort`
    - `Aggregate`
  - `Join`
  - `Read`
  - `Write`
- Applies construction cleanup for frozen compatibility:
- uses `InitVar[LogicalOperator]` + `__post_init__` where needed to
initialize `_name`, `_input_dependencies`, and `_num_outputs`
- keeps `eq=False` intentionally to avoid introducing field-based
equality/hash semantics as part of this PR
- Adds frozen-safe transform behavior for these operators:
- operators recreate nodes on input change instead of mutating inputs in
place
- Updates optimizer rules to avoid mutating frozen instances:
- `inherit_batch_format.py`: rebuilds frozen all-to-all operators when
inheriting batch format
- `limit_pushdown.py`: uses frozen-safe recreation/replace logic for
`Read` per-block-limit handling
- `predicate_pushdown.py`: uses frozen-safe recreation/replace logic for
frozen all-to-all operators and `Join`
- `set_read_parallelism.py`: adapts `Read` parallelism setting for the
frozen `Read` operator shape
- Scope is intentionally D3-only:
  - no `input_op` -> `input_dependencies` cleanup in this PR
  - no `AbstractFrom` restructuring in this PR
  - no equality/comparability semantics changes in this PR

## Related issues

Related to #60312.

## Additional information

### Tests

Validated with targeted existing tests:
- `python/ray/data/tests/test_execution_optimizer_advanced.py`
- `python/ray/data/tests/test_join.py`

### Stack Plan

To complete [#60312](#60312),
the original stack was:

1. #60529
2. #60528
3. #60530
4. #60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
#61020
2. PR-B: move `output_dependencies` responsibility to physical side
#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
(D1/D2/D3)
- D1: one-to-one operators #61364
    - D2: map operators #61481
    - D3: all-to-all + join/read/write operators (this PR)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring

---------

Signed-off-by: yaommen <myanstu@163.com>
bveeramani added a commit that referenced this pull request Apr 13, 2026
…es (#62400)

## Description

This PR implements converting the remaining source/simple logical
operators to frozen dataclasses.

#### Why this is needed:

- This is the next operator-group step under #60312 after D1 / D2 / D3.
- It removes in-place mutation paths for the remaining source/simple
logical operators not yet covered by the frozen logical-operator
migration.
- It keeps the scope limited to these logical operators only, without
mixing in follow-up source-operator restructuring or API cleanup.

#### What this PR changes:

- Converts the remaining source/simple logical operators to frozen
dataclasses:
  - `InputData`
  - `Count`
  - `AbstractFrom` and its subclasses:
    - `FromItems`
    - `FromBlocks`
    - `FromNumpy`
    - `FromArrow`
    - `FromPandas`
- Applies frozen construction cleanup for these operators:
- initializes `_name`, `_input_dependencies`, and `_num_outputs` in
`__post_init__`
- keeps `eq=False` intentionally to stay aligned with the current
transitional operator-group pattern
- Adds frozen-safe transform behavior for `Count`:
  - `Count._apply_transform()` recreates `Count` when the input changes
- Keeps `AbstractFrom` subclasses structurally unchanged in this PR:
  - no source hierarchy restructuring
  - no subclass removal
  - no `input_op` -> `input_dependencies` cleanup
- Scope is intentionally limited to D4-only frozen migration for the
remaining source/simple logical operators.

## Related issues

Related to #60312.

## Additional information

### Tests

Validated with targeted existing tests:
- `python/ray/data/tests/test_split.py`
- `python/ray/data/tests/test_operator_fusion.py`
- `python/ray/data/tests/test_execution_optimizer_basic.py`

### Stack Plan

To complete [#60312](#60312),
the original stack was:

1. #60529
2. #60528
3. #60530
4. #60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
#61020
2. PR-B: move `output_dependencies` responsibility to physical side
#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
- D1: one-to-one operators #61364
    - D2: map operators #61481
- D3: all-to-all + join/read/write operators
#62321
    - D4: remaining source/simple logical operators (this PR)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring
- Equality/comparability follow-up

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
HLDKNotFound pushed a commit to chichic21039/ray that referenced this pull request Apr 22, 2026
ray-project#62321)

## Description

This PR implements converting all-to-all, join, read, and write logical
operators to frozen dataclasses.

#### Why this is needed:

- This is the D3 operator-group step under ray-project#60312.
- It removes in-place mutation paths for these logical operators.
- It keeps the scope limited to logical all-to-all/join/read/write
operators and the minimum rule updates needed for frozen compatibility.

#### What this PR changes:

- Converts logical operators to frozen dataclasses:
  - all-to-all logical operators:
    - `RandomizeBlocks`
    - `RandomShuffle`
    - `Repartition`
    - `Sort`
    - `Aggregate`
  - `Join`
  - `Read`
  - `Write`
- Applies construction cleanup for frozen compatibility:
- uses `InitVar[LogicalOperator]` + `__post_init__` where needed to
initialize `_name`, `_input_dependencies`, and `_num_outputs`
- keeps `eq=False` intentionally to avoid introducing field-based
equality/hash semantics as part of this PR
- Adds frozen-safe transform behavior for these operators:
- operators recreate nodes on input change instead of mutating inputs in
place
- Updates optimizer rules to avoid mutating frozen instances:
- `inherit_batch_format.py`: rebuilds frozen all-to-all operators when
inheriting batch format
- `limit_pushdown.py`: uses frozen-safe recreation/replace logic for
`Read` per-block-limit handling
- `predicate_pushdown.py`: uses frozen-safe recreation/replace logic for
frozen all-to-all operators and `Join`
- `set_read_parallelism.py`: adapts `Read` parallelism setting for the
frozen `Read` operator shape
- Scope is intentionally D3-only:
  - no `input_op` -> `input_dependencies` cleanup in this PR
  - no `AbstractFrom` restructuring in this PR
  - no equality/comparability semantics changes in this PR

## Related issues

Related to ray-project#60312.

## Additional information

### Tests

Validated with targeted existing tests:
- `python/ray/data/tests/test_execution_optimizer_advanced.py`
- `python/ray/data/tests/test_join.py`

### Stack Plan

To complete [ray-project#60312](ray-project#60312),
the original stack was:

1. ray-project#60529
2. ray-project#60528
3. ray-project#60530
4. ray-project#60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
ray-project#61020
2. PR-B: move `output_dependencies` responsibility to physical side
ray-project#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
ray-project#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
(D1/D2/D3)
- D1: one-to-one operators ray-project#61364
    - D2: map operators ray-project#61481
    - D3: all-to-all + join/read/write operators (this PR)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring

---------

Signed-off-by: yaommen <myanstu@163.com>
HLDKNotFound pushed a commit to chichic21039/ray that referenced this pull request Apr 22, 2026
…es (ray-project#62400)

## Description

This PR implements converting the remaining source/simple logical
operators to frozen dataclasses.

#### Why this is needed:

- This is the next operator-group step under ray-project#60312 after D1 / D2 / D3.
- It removes in-place mutation paths for the remaining source/simple
logical operators not yet covered by the frozen logical-operator
migration.
- It keeps the scope limited to these logical operators only, without
mixing in follow-up source-operator restructuring or API cleanup.

#### What this PR changes:

- Converts the remaining source/simple logical operators to frozen
dataclasses:
  - `InputData`
  - `Count`
  - `AbstractFrom` and its subclasses:
    - `FromItems`
    - `FromBlocks`
    - `FromNumpy`
    - `FromArrow`
    - `FromPandas`
- Applies frozen construction cleanup for these operators:
- initializes `_name`, `_input_dependencies`, and `_num_outputs` in
`__post_init__`
- keeps `eq=False` intentionally to stay aligned with the current
transitional operator-group pattern
- Adds frozen-safe transform behavior for `Count`:
  - `Count._apply_transform()` recreates `Count` when the input changes
- Keeps `AbstractFrom` subclasses structurally unchanged in this PR:
  - no source hierarchy restructuring
  - no subclass removal
  - no `input_op` -> `input_dependencies` cleanup
- Scope is intentionally limited to D4-only frozen migration for the
remaining source/simple logical operators.

## Related issues

Related to ray-project#60312.

## Additional information

### Tests

Validated with targeted existing tests:
- `python/ray/data/tests/test_split.py`
- `python/ray/data/tests/test_operator_fusion.py`
- `python/ray/data/tests/test_execution_optimizer_basic.py`

### Stack Plan

To complete [ray-project#60312](ray-project#60312),
the original stack was:

1. ray-project#60529
2. ray-project#60528
3. ray-project#60530
4. ray-project#60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
ray-project#61020
2. PR-B: move `output_dependencies` responsibility to physical side
ray-project#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
ray-project#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
- D1: one-to-one operators ray-project#61364
    - D2: map operators ray-project#61481
- D3: all-to-all + join/read/write operators
ray-project#62321
    - D4: remaining source/simple logical operators (this PR)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring
- Equality/comparability follow-up

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…ct#61481)

## Description

This PR implements converting map logical operators to frozen
dataclasses.

#### Why this is needed:

- This is the second operator-group step under ray-project#60312.
- It removes in-place mutation paths for map logical operators.
- It keeps the scope limited to logical map operators and the minimum
rule updates needed for frozen compatibility.

#### What this PR changes:

- Converts map logical operators to frozen dataclasses:
  - `MapBatches`
  - `MapRows`
  - `Filter`
  - `Project`
  - `FlatMap`
  - `StreamingRepartition`
- Applies map construction cleanup for frozen compatibility:
- uses `InitVar[LogicalOperator]` + `__post_init__` to initialize
`_name`, `_input_dependencies`, and `_num_outputs`
- uses canonical dict defaults (`default_factory=dict`) for map remote
args fields
- keeps `eq=False` intentionally to avoid introducing field-based
equality/hash semantics for operators with mutable fields (e.g.,
lists/dicts), and to stay aligned with prior identity-based behavior
- Adds frozen-safe transform behavior for map operators:
- map operators recreate nodes on input change (no in-place input
mutation)
- Updates optimizer rules to avoid mutating frozen map operators:
- `limit_pushdown.py`: uses frozen-safe recreation/replace logic for map
operators (including per-block-limit path)
- `predicate_pushdown.py`: uses frozen-safe recreation/replace logic
when cloning map operators with new inputs
- these rule changes are required because the generic clone path
(`copy.copy` + input reassignment / setter mutation) is not valid for
frozen map operators and can raise `FrozenInstanceError`
- Scope is intentionally D2-only: no all-to-all/join/read/write
conversion in this PR; no physical-layer behavior changes

## Related issues

Link related issues: "Fixes ray-project#60312", or "Related to ray-project#60312".

## Additional information

### Tests
Validated with targeted existing tests

### Stack Plan

To complete [ray-project#60312](ray-project#60312),
the original stack was:

1. ray-project#60529
2. ray-project#60528
3. ray-project#60530
4. ray-project#60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
ray-project#61020
2. PR-B: move `output_dependencies` responsibility to physical side
ray-project#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
ray-project#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
(D1/D2/D3)
- D1: one-to-one operators ray-project#61364
    - D2: map operators (this PR)
    - D3: all-to-all + join/read/write groups (as needed)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring

---------

Signed-off-by: yaommen <myanstu@163.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
ray-project#62321)

## Description

This PR implements converting all-to-all, join, read, and write logical
operators to frozen dataclasses.

#### Why this is needed:

- This is the D3 operator-group step under ray-project#60312.
- It removes in-place mutation paths for these logical operators.
- It keeps the scope limited to logical all-to-all/join/read/write
operators and the minimum rule updates needed for frozen compatibility.

#### What this PR changes:

- Converts logical operators to frozen dataclasses:
  - all-to-all logical operators:
    - `RandomizeBlocks`
    - `RandomShuffle`
    - `Repartition`
    - `Sort`
    - `Aggregate`
  - `Join`
  - `Read`
  - `Write`
- Applies construction cleanup for frozen compatibility:
- uses `InitVar[LogicalOperator]` + `__post_init__` where needed to
initialize `_name`, `_input_dependencies`, and `_num_outputs`
- keeps `eq=False` intentionally to avoid introducing field-based
equality/hash semantics as part of this PR
- Adds frozen-safe transform behavior for these operators:
- operators recreate nodes on input change instead of mutating inputs in
place
- Updates optimizer rules to avoid mutating frozen instances:
- `inherit_batch_format.py`: rebuilds frozen all-to-all operators when
inheriting batch format
- `limit_pushdown.py`: uses frozen-safe recreation/replace logic for
`Read` per-block-limit handling
- `predicate_pushdown.py`: uses frozen-safe recreation/replace logic for
frozen all-to-all operators and `Join`
- `set_read_parallelism.py`: adapts `Read` parallelism setting for the
frozen `Read` operator shape
- Scope is intentionally D3-only:
  - no `input_op` -> `input_dependencies` cleanup in this PR
  - no `AbstractFrom` restructuring in this PR
  - no equality/comparability semantics changes in this PR

## Related issues

Related to ray-project#60312.

## Additional information

### Tests

Validated with targeted existing tests:
- `python/ray/data/tests/test_execution_optimizer_advanced.py`
- `python/ray/data/tests/test_join.py`

### Stack Plan

To complete [ray-project#60312](ray-project#60312),
the original stack was:

1. ray-project#60529
2. ray-project#60528
3. ray-project#60530
4. ray-project#60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
ray-project#61020
2. PR-B: move `output_dependencies` responsibility to physical side
ray-project#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
ray-project#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
(D1/D2/D3)
- D1: one-to-one operators ray-project#61364
    - D2: map operators ray-project#61481
    - D3: all-to-all + join/read/write operators (this PR)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring

---------

Signed-off-by: yaommen <myanstu@163.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…es (ray-project#62400)

## Description

This PR implements converting the remaining source/simple logical
operators to frozen dataclasses.

#### Why this is needed:

- This is the next operator-group step under ray-project#60312 after D1 / D2 / D3.
- It removes in-place mutation paths for the remaining source/simple
logical operators not yet covered by the frozen logical-operator
migration.
- It keeps the scope limited to these logical operators only, without
mixing in follow-up source-operator restructuring or API cleanup.

#### What this PR changes:

- Converts the remaining source/simple logical operators to frozen
dataclasses:
  - `InputData`
  - `Count`
  - `AbstractFrom` and its subclasses:
    - `FromItems`
    - `FromBlocks`
    - `FromNumpy`
    - `FromArrow`
    - `FromPandas`
- Applies frozen construction cleanup for these operators:
- initializes `_name`, `_input_dependencies`, and `_num_outputs` in
`__post_init__`
- keeps `eq=False` intentionally to stay aligned with the current
transitional operator-group pattern
- Adds frozen-safe transform behavior for `Count`:
  - `Count._apply_transform()` recreates `Count` when the input changes
- Keeps `AbstractFrom` subclasses structurally unchanged in this PR:
  - no source hierarchy restructuring
  - no subclass removal
  - no `input_op` -> `input_dependencies` cleanup
- Scope is intentionally limited to D4-only frozen migration for the
remaining source/simple logical operators.

## Related issues

Related to ray-project#60312.

## Additional information

### Tests

Validated with targeted existing tests:
- `python/ray/data/tests/test_split.py`
- `python/ray/data/tests/test_operator_fusion.py`
- `python/ray/data/tests/test_execution_optimizer_basic.py`

### Stack Plan

To complete [ray-project#60312](ray-project#60312),
the original stack was:

1. ray-project#60529
2. ray-project#60528
3. ray-project#60530
4. ray-project#60531

Since PR4 was still too large, it is being further split:

1. PR-A: default `LogicalOperator` naming behavior
ray-project#61020
2. PR-B: move `output_dependencies` responsibility to physical side
ray-project#61107
3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs`
ray-project#61308
4. PR-D: convert logical operators to frozen dataclasses in small groups
- D1: one-to-one operators ray-project#61364
    - D2: map operators ray-project#61481
- D3: all-to-all + join/read/write operators
ray-project#62321
    - D4: remaining source/simple logical operators (this PR)

Planned follow-ups (not blocking this stack):
- Converting all `input_op` usage to `input_dependencies`
- Potential `AbstractFrom` restructuring
- Equality/comparability follow-up

---------

Signed-off-by: yaommen <myanstu@163.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community go add ONLY when ready to merge, run all tests

2 participants