[Data] Convert one-to-one logical operators to frozen dataclasses#61364
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the Limit and Download logical operators to be frozen dataclasses, which is a great step towards ensuring the immutability of the logical plan and improving overall robustness. The necessary adjustments in the optimizer rules (limit_pushdown, predicate_pushdown) are correctly implemented, and a new test is added to cover these changes. I have a couple of suggestions to make the optimizer rules more robust and maintainable in light of these changes.
a7be8d5 to
46bcead
Compare
|
Heads-up: the current microcheck failure appears unrelated to this PR's D1 changes. The failing test is I have submit #61423 to fix this issue. |
Signed-off-by: yaommen <myanstu@163.com>
…semantics Signed-off-by: yaommen <myanstu@163.com>
46bcead to
d055e17
Compare
| object.__setattr__(self, "_input_dependencies", [input_op]) | ||
| object.__setattr__(self, "_num_outputs", None) | ||
|
|
||
| def _apply_transform( |
There was a problem hiding this comment.
What happens if we default to the LogicalOperator._apply_transform implementation and don't override this?
There was a problem hiding this comment.
We can’t use the default _apply_transform for frozen Limit/Download yet: Operator._apply_transform does copy.copy(self) and then assigns _input_dependencies, which raises FrozenInstanceError on frozen dataclasses. That’s why these one-to-one operators override _apply_transform and rebuild nodes instead of mutating fields.
| object.__setattr__(self, "_name", f"limit={self.limit}") | ||
| object.__setattr__(self, "_input_dependencies", [input_op]) | ||
| object.__setattr__(self, "_num_outputs", None) |
There was a problem hiding this comment.
When LogicalOperator is a pure abstract class (no constructor parameters), I think we should work towards avoiding __setattr__. Maybe it could look like this:
num_outputs: Optional[int] = field(init=False, default=None, repr=False)
@property
def name(self) -> str:
return f"limit={self.limit"}"
@property
def input_dependencies(self):
return [self.input_op]Okay as-is because this is a transient state and I trust it'll all look good in the end
| object.__setattr__(self, "_input_dependencies", [input_op]) | ||
| object.__setattr__(self, "_num_outputs", None) | ||
|
|
||
| def _apply_transform( |
There was a problem hiding this comment.
Same root cause as above: default _apply_transform does copy.copy + input_dependencies reassignment, which is invalid for frozen dataclasses; this override keeps Download frozen-safe by rebuilding on input change.
| if isinstance(op, Limit): | ||
| assert len(new_inputs) == 1, len(new_inputs) | ||
| return Limit(new_inputs[0], op.limit) |
There was a problem hiding this comment.
What's the motivation for this change/how does it relate to this refactor? What happens if we remove it?
There was a problem hiding this comment.
For predicate_pushdown, the Limit branch is for the same reason: the generic clone path uses copy.copy(op) + input_dependencies reassignment. Removing the Limit special-case would reintroduce FrozenInstanceError when pushing predicates through Limit.
## Description This PR implements converting map logical operators to frozen dataclasses. #### Why this is needed: - This is the second operator-group step under #60312. - It removes in-place mutation paths for map logical operators. - It keeps the scope limited to logical map operators and the minimum rule updates needed for frozen compatibility. #### What this PR changes: - Converts map logical operators to frozen dataclasses: - `MapBatches` - `MapRows` - `Filter` - `Project` - `FlatMap` - `StreamingRepartition` - Applies map construction cleanup for frozen compatibility: - uses `InitVar[LogicalOperator]` + `__post_init__` to initialize `_name`, `_input_dependencies`, and `_num_outputs` - uses canonical dict defaults (`default_factory=dict`) for map remote args fields - keeps `eq=False` intentionally to avoid introducing field-based equality/hash semantics for operators with mutable fields (e.g., lists/dicts), and to stay aligned with prior identity-based behavior - Adds frozen-safe transform behavior for map operators: - map operators recreate nodes on input change (no in-place input mutation) - Updates optimizer rules to avoid mutating frozen map operators: - `limit_pushdown.py`: uses frozen-safe recreation/replace logic for map operators (including per-block-limit path) - `predicate_pushdown.py`: uses frozen-safe recreation/replace logic when cloning map operators with new inputs - these rule changes are required because the generic clone path (`copy.copy` + input reassignment / setter mutation) is not valid for frozen map operators and can raise `FrozenInstanceError` - Scope is intentionally D2-only: no all-to-all/join/read/write conversion in this PR; no physical-layer behavior changes ## Related issues Link related issues: "Fixes #60312", or "Related to #60312". ## Additional information ### Tests Validated with targeted existing tests ### Stack Plan To complete [#60312](#60312), the original stack was: 1. #60529 2. #60528 3. #60530 4. #60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior #61020 2. PR-B: move `output_dependencies` responsibility to physical side #61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` #61308 4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3) - D1: one-to-one operators #61364 - D2: map operators (this PR) - D3: all-to-all + join/read/write groups (as needed) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring --------- Signed-off-by: yaommen <myanstu@163.com>
…ct#61481) ## Description This PR implements converting map logical operators to frozen dataclasses. #### Why this is needed: - This is the second operator-group step under ray-project#60312. - It removes in-place mutation paths for map logical operators. - It keeps the scope limited to logical map operators and the minimum rule updates needed for frozen compatibility. #### What this PR changes: - Converts map logical operators to frozen dataclasses: - `MapBatches` - `MapRows` - `Filter` - `Project` - `FlatMap` - `StreamingRepartition` - Applies map construction cleanup for frozen compatibility: - uses `InitVar[LogicalOperator]` + `__post_init__` to initialize `_name`, `_input_dependencies`, and `_num_outputs` - uses canonical dict defaults (`default_factory=dict`) for map remote args fields - keeps `eq=False` intentionally to avoid introducing field-based equality/hash semantics for operators with mutable fields (e.g., lists/dicts), and to stay aligned with prior identity-based behavior - Adds frozen-safe transform behavior for map operators: - map operators recreate nodes on input change (no in-place input mutation) - Updates optimizer rules to avoid mutating frozen map operators: - `limit_pushdown.py`: uses frozen-safe recreation/replace logic for map operators (including per-block-limit path) - `predicate_pushdown.py`: uses frozen-safe recreation/replace logic when cloning map operators with new inputs - these rule changes are required because the generic clone path (`copy.copy` + input reassignment / setter mutation) is not valid for frozen map operators and can raise `FrozenInstanceError` - Scope is intentionally D2-only: no all-to-all/join/read/write conversion in this PR; no physical-layer behavior changes ## Related issues Link related issues: "Fixes ray-project#60312", or "Related to ray-project#60312". ## Additional information ### Tests Validated with targeted existing tests ### Stack Plan To complete [ray-project#60312](ray-project#60312), the original stack was: 1. ray-project#60529 2. ray-project#60528 3. ray-project#60530 4. ray-project#60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior ray-project#61020 2. PR-B: move `output_dependencies` responsibility to physical side ray-project#61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` ray-project#61308 4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3) - D1: one-to-one operators ray-project#61364 - D2: map operators (this PR) - D3: all-to-all + join/read/write groups (as needed) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring --------- Signed-off-by: yaommen <myanstu@163.com>
#62321) ## Description This PR implements converting all-to-all, join, read, and write logical operators to frozen dataclasses. #### Why this is needed: - This is the D3 operator-group step under #60312. - It removes in-place mutation paths for these logical operators. - It keeps the scope limited to logical all-to-all/join/read/write operators and the minimum rule updates needed for frozen compatibility. #### What this PR changes: - Converts logical operators to frozen dataclasses: - all-to-all logical operators: - `RandomizeBlocks` - `RandomShuffle` - `Repartition` - `Sort` - `Aggregate` - `Join` - `Read` - `Write` - Applies construction cleanup for frozen compatibility: - uses `InitVar[LogicalOperator]` + `__post_init__` where needed to initialize `_name`, `_input_dependencies`, and `_num_outputs` - keeps `eq=False` intentionally to avoid introducing field-based equality/hash semantics as part of this PR - Adds frozen-safe transform behavior for these operators: - operators recreate nodes on input change instead of mutating inputs in place - Updates optimizer rules to avoid mutating frozen instances: - `inherit_batch_format.py`: rebuilds frozen all-to-all operators when inheriting batch format - `limit_pushdown.py`: uses frozen-safe recreation/replace logic for `Read` per-block-limit handling - `predicate_pushdown.py`: uses frozen-safe recreation/replace logic for frozen all-to-all operators and `Join` - `set_read_parallelism.py`: adapts `Read` parallelism setting for the frozen `Read` operator shape - Scope is intentionally D3-only: - no `input_op` -> `input_dependencies` cleanup in this PR - no `AbstractFrom` restructuring in this PR - no equality/comparability semantics changes in this PR ## Related issues Related to #60312. ## Additional information ### Tests Validated with targeted existing tests: - `python/ray/data/tests/test_execution_optimizer_advanced.py` - `python/ray/data/tests/test_join.py` ### Stack Plan To complete [#60312](#60312), the original stack was: 1. #60529 2. #60528 3. #60530 4. #60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior #61020 2. PR-B: move `output_dependencies` responsibility to physical side #61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` #61308 4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3) - D1: one-to-one operators #61364 - D2: map operators #61481 - D3: all-to-all + join/read/write operators (this PR) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring --------- Signed-off-by: yaommen <myanstu@163.com>
…es (#62400) ## Description This PR implements converting the remaining source/simple logical operators to frozen dataclasses. #### Why this is needed: - This is the next operator-group step under #60312 after D1 / D2 / D3. - It removes in-place mutation paths for the remaining source/simple logical operators not yet covered by the frozen logical-operator migration. - It keeps the scope limited to these logical operators only, without mixing in follow-up source-operator restructuring or API cleanup. #### What this PR changes: - Converts the remaining source/simple logical operators to frozen dataclasses: - `InputData` - `Count` - `AbstractFrom` and its subclasses: - `FromItems` - `FromBlocks` - `FromNumpy` - `FromArrow` - `FromPandas` - Applies frozen construction cleanup for these operators: - initializes `_name`, `_input_dependencies`, and `_num_outputs` in `__post_init__` - keeps `eq=False` intentionally to stay aligned with the current transitional operator-group pattern - Adds frozen-safe transform behavior for `Count`: - `Count._apply_transform()` recreates `Count` when the input changes - Keeps `AbstractFrom` subclasses structurally unchanged in this PR: - no source hierarchy restructuring - no subclass removal - no `input_op` -> `input_dependencies` cleanup - Scope is intentionally limited to D4-only frozen migration for the remaining source/simple logical operators. ## Related issues Related to #60312. ## Additional information ### Tests Validated with targeted existing tests: - `python/ray/data/tests/test_split.py` - `python/ray/data/tests/test_operator_fusion.py` - `python/ray/data/tests/test_execution_optimizer_basic.py` ### Stack Plan To complete [#60312](#60312), the original stack was: 1. #60529 2. #60528 3. #60530 4. #60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior #61020 2. PR-B: move `output_dependencies` responsibility to physical side #61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` #61308 4. PR-D: convert logical operators to frozen dataclasses in small groups - D1: one-to-one operators #61364 - D2: map operators #61481 - D3: all-to-all + join/read/write operators #62321 - D4: remaining source/simple logical operators (this PR) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring - Equality/comparability follow-up --------- Signed-off-by: yaommen <myanstu@163.com> Signed-off-by: Balaji Veeramani <balaji@anyscale.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
ray-project#62321) ## Description This PR implements converting all-to-all, join, read, and write logical operators to frozen dataclasses. #### Why this is needed: - This is the D3 operator-group step under ray-project#60312. - It removes in-place mutation paths for these logical operators. - It keeps the scope limited to logical all-to-all/join/read/write operators and the minimum rule updates needed for frozen compatibility. #### What this PR changes: - Converts logical operators to frozen dataclasses: - all-to-all logical operators: - `RandomizeBlocks` - `RandomShuffle` - `Repartition` - `Sort` - `Aggregate` - `Join` - `Read` - `Write` - Applies construction cleanup for frozen compatibility: - uses `InitVar[LogicalOperator]` + `__post_init__` where needed to initialize `_name`, `_input_dependencies`, and `_num_outputs` - keeps `eq=False` intentionally to avoid introducing field-based equality/hash semantics as part of this PR - Adds frozen-safe transform behavior for these operators: - operators recreate nodes on input change instead of mutating inputs in place - Updates optimizer rules to avoid mutating frozen instances: - `inherit_batch_format.py`: rebuilds frozen all-to-all operators when inheriting batch format - `limit_pushdown.py`: uses frozen-safe recreation/replace logic for `Read` per-block-limit handling - `predicate_pushdown.py`: uses frozen-safe recreation/replace logic for frozen all-to-all operators and `Join` - `set_read_parallelism.py`: adapts `Read` parallelism setting for the frozen `Read` operator shape - Scope is intentionally D3-only: - no `input_op` -> `input_dependencies` cleanup in this PR - no `AbstractFrom` restructuring in this PR - no equality/comparability semantics changes in this PR ## Related issues Related to ray-project#60312. ## Additional information ### Tests Validated with targeted existing tests: - `python/ray/data/tests/test_execution_optimizer_advanced.py` - `python/ray/data/tests/test_join.py` ### Stack Plan To complete [ray-project#60312](ray-project#60312), the original stack was: 1. ray-project#60529 2. ray-project#60528 3. ray-project#60530 4. ray-project#60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior ray-project#61020 2. PR-B: move `output_dependencies` responsibility to physical side ray-project#61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` ray-project#61308 4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3) - D1: one-to-one operators ray-project#61364 - D2: map operators ray-project#61481 - D3: all-to-all + join/read/write operators (this PR) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring --------- Signed-off-by: yaommen <myanstu@163.com>
…es (ray-project#62400) ## Description This PR implements converting the remaining source/simple logical operators to frozen dataclasses. #### Why this is needed: - This is the next operator-group step under ray-project#60312 after D1 / D2 / D3. - It removes in-place mutation paths for the remaining source/simple logical operators not yet covered by the frozen logical-operator migration. - It keeps the scope limited to these logical operators only, without mixing in follow-up source-operator restructuring or API cleanup. #### What this PR changes: - Converts the remaining source/simple logical operators to frozen dataclasses: - `InputData` - `Count` - `AbstractFrom` and its subclasses: - `FromItems` - `FromBlocks` - `FromNumpy` - `FromArrow` - `FromPandas` - Applies frozen construction cleanup for these operators: - initializes `_name`, `_input_dependencies`, and `_num_outputs` in `__post_init__` - keeps `eq=False` intentionally to stay aligned with the current transitional operator-group pattern - Adds frozen-safe transform behavior for `Count`: - `Count._apply_transform()` recreates `Count` when the input changes - Keeps `AbstractFrom` subclasses structurally unchanged in this PR: - no source hierarchy restructuring - no subclass removal - no `input_op` -> `input_dependencies` cleanup - Scope is intentionally limited to D4-only frozen migration for the remaining source/simple logical operators. ## Related issues Related to ray-project#60312. ## Additional information ### Tests Validated with targeted existing tests: - `python/ray/data/tests/test_split.py` - `python/ray/data/tests/test_operator_fusion.py` - `python/ray/data/tests/test_execution_optimizer_basic.py` ### Stack Plan To complete [ray-project#60312](ray-project#60312), the original stack was: 1. ray-project#60529 2. ray-project#60528 3. ray-project#60530 4. ray-project#60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior ray-project#61020 2. PR-B: move `output_dependencies` responsibility to physical side ray-project#61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` ray-project#61308 4. PR-D: convert logical operators to frozen dataclasses in small groups - D1: one-to-one operators ray-project#61364 - D2: map operators ray-project#61481 - D3: all-to-all + join/read/write operators ray-project#62321 - D4: remaining source/simple logical operators (this PR) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring - Equality/comparability follow-up --------- Signed-off-by: yaommen <myanstu@163.com> Signed-off-by: Balaji Veeramani <balaji@anyscale.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…ct#61481) ## Description This PR implements converting map logical operators to frozen dataclasses. #### Why this is needed: - This is the second operator-group step under ray-project#60312. - It removes in-place mutation paths for map logical operators. - It keeps the scope limited to logical map operators and the minimum rule updates needed for frozen compatibility. #### What this PR changes: - Converts map logical operators to frozen dataclasses: - `MapBatches` - `MapRows` - `Filter` - `Project` - `FlatMap` - `StreamingRepartition` - Applies map construction cleanup for frozen compatibility: - uses `InitVar[LogicalOperator]` + `__post_init__` to initialize `_name`, `_input_dependencies`, and `_num_outputs` - uses canonical dict defaults (`default_factory=dict`) for map remote args fields - keeps `eq=False` intentionally to avoid introducing field-based equality/hash semantics for operators with mutable fields (e.g., lists/dicts), and to stay aligned with prior identity-based behavior - Adds frozen-safe transform behavior for map operators: - map operators recreate nodes on input change (no in-place input mutation) - Updates optimizer rules to avoid mutating frozen map operators: - `limit_pushdown.py`: uses frozen-safe recreation/replace logic for map operators (including per-block-limit path) - `predicate_pushdown.py`: uses frozen-safe recreation/replace logic when cloning map operators with new inputs - these rule changes are required because the generic clone path (`copy.copy` + input reassignment / setter mutation) is not valid for frozen map operators and can raise `FrozenInstanceError` - Scope is intentionally D2-only: no all-to-all/join/read/write conversion in this PR; no physical-layer behavior changes ## Related issues Link related issues: "Fixes ray-project#60312", or "Related to ray-project#60312". ## Additional information ### Tests Validated with targeted existing tests ### Stack Plan To complete [ray-project#60312](ray-project#60312), the original stack was: 1. ray-project#60529 2. ray-project#60528 3. ray-project#60530 4. ray-project#60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior ray-project#61020 2. PR-B: move `output_dependencies` responsibility to physical side ray-project#61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` ray-project#61308 4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3) - D1: one-to-one operators ray-project#61364 - D2: map operators (this PR) - D3: all-to-all + join/read/write groups (as needed) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring --------- Signed-off-by: yaommen <myanstu@163.com>
ray-project#62321) ## Description This PR implements converting all-to-all, join, read, and write logical operators to frozen dataclasses. #### Why this is needed: - This is the D3 operator-group step under ray-project#60312. - It removes in-place mutation paths for these logical operators. - It keeps the scope limited to logical all-to-all/join/read/write operators and the minimum rule updates needed for frozen compatibility. #### What this PR changes: - Converts logical operators to frozen dataclasses: - all-to-all logical operators: - `RandomizeBlocks` - `RandomShuffle` - `Repartition` - `Sort` - `Aggregate` - `Join` - `Read` - `Write` - Applies construction cleanup for frozen compatibility: - uses `InitVar[LogicalOperator]` + `__post_init__` where needed to initialize `_name`, `_input_dependencies`, and `_num_outputs` - keeps `eq=False` intentionally to avoid introducing field-based equality/hash semantics as part of this PR - Adds frozen-safe transform behavior for these operators: - operators recreate nodes on input change instead of mutating inputs in place - Updates optimizer rules to avoid mutating frozen instances: - `inherit_batch_format.py`: rebuilds frozen all-to-all operators when inheriting batch format - `limit_pushdown.py`: uses frozen-safe recreation/replace logic for `Read` per-block-limit handling - `predicate_pushdown.py`: uses frozen-safe recreation/replace logic for frozen all-to-all operators and `Join` - `set_read_parallelism.py`: adapts `Read` parallelism setting for the frozen `Read` operator shape - Scope is intentionally D3-only: - no `input_op` -> `input_dependencies` cleanup in this PR - no `AbstractFrom` restructuring in this PR - no equality/comparability semantics changes in this PR ## Related issues Related to ray-project#60312. ## Additional information ### Tests Validated with targeted existing tests: - `python/ray/data/tests/test_execution_optimizer_advanced.py` - `python/ray/data/tests/test_join.py` ### Stack Plan To complete [ray-project#60312](ray-project#60312), the original stack was: 1. ray-project#60529 2. ray-project#60528 3. ray-project#60530 4. ray-project#60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior ray-project#61020 2. PR-B: move `output_dependencies` responsibility to physical side ray-project#61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` ray-project#61308 4. PR-D: convert logical operators to frozen dataclasses in small groups (D1/D2/D3) - D1: one-to-one operators ray-project#61364 - D2: map operators ray-project#61481 - D3: all-to-all + join/read/write operators (this PR) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring --------- Signed-off-by: yaommen <myanstu@163.com>
…es (ray-project#62400) ## Description This PR implements converting the remaining source/simple logical operators to frozen dataclasses. #### Why this is needed: - This is the next operator-group step under ray-project#60312 after D1 / D2 / D3. - It removes in-place mutation paths for the remaining source/simple logical operators not yet covered by the frozen logical-operator migration. - It keeps the scope limited to these logical operators only, without mixing in follow-up source-operator restructuring or API cleanup. #### What this PR changes: - Converts the remaining source/simple logical operators to frozen dataclasses: - `InputData` - `Count` - `AbstractFrom` and its subclasses: - `FromItems` - `FromBlocks` - `FromNumpy` - `FromArrow` - `FromPandas` - Applies frozen construction cleanup for these operators: - initializes `_name`, `_input_dependencies`, and `_num_outputs` in `__post_init__` - keeps `eq=False` intentionally to stay aligned with the current transitional operator-group pattern - Adds frozen-safe transform behavior for `Count`: - `Count._apply_transform()` recreates `Count` when the input changes - Keeps `AbstractFrom` subclasses structurally unchanged in this PR: - no source hierarchy restructuring - no subclass removal - no `input_op` -> `input_dependencies` cleanup - Scope is intentionally limited to D4-only frozen migration for the remaining source/simple logical operators. ## Related issues Related to ray-project#60312. ## Additional information ### Tests Validated with targeted existing tests: - `python/ray/data/tests/test_split.py` - `python/ray/data/tests/test_operator_fusion.py` - `python/ray/data/tests/test_execution_optimizer_basic.py` ### Stack Plan To complete [ray-project#60312](ray-project#60312), the original stack was: 1. ray-project#60529 2. ray-project#60528 3. ray-project#60530 4. ray-project#60531 Since PR4 was still too large, it is being further split: 1. PR-A: default `LogicalOperator` naming behavior ray-project#61020 2. PR-B: move `output_dependencies` responsibility to physical side ray-project#61107 3. PR-C: make `LogicalOperator` an ABC with abstract `num_outputs` ray-project#61308 4. PR-D: convert logical operators to frozen dataclasses in small groups - D1: one-to-one operators ray-project#61364 - D2: map operators ray-project#61481 - D3: all-to-all + join/read/write operators ray-project#62321 - D4: remaining source/simple logical operators (this PR) Planned follow-ups (not blocking this stack): - Converting all `input_op` usage to `input_dependencies` - Potential `AbstractFrom` restructuring - Equality/comparability follow-up --------- Signed-off-by: yaommen <myanstu@163.com> Signed-off-by: Balaji Veeramani <balaji@anyscale.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Description
This PR implements converting one-to-one logical operators to frozen dataclasses.
Why this is needed:
LogicalPlanstateless and comparable #60312.What this PR changes:
LimitDownloadInitVar[LogicalOperator]+__post_init__to initialize_name,_input_dependencies, and_num_outputsDownload.ray_remote_argsa canonical dict field (default_factory=dict)Limit._apply_transform()recreatesLimitwhen input changesDownload._apply_transform()recreatesDownloadwhen input changeslimit_pushdown.py: recreateLimit/Downloadon input replacementpredicate_pushdown.py: recreateLimiton input replacementLimit(Download(...))under limit pushdown.Related issues
Link related issues: "Fixes #60312", or "Related to #60312".
Additional information
Tests
Added/updated:
python/ray/data/tests/test_execution_optimizer_limit_pushdown.pyLimit(Download(...))under limit pushdown with frozen operatorsValidated with targeted existing tests:
python/ray/data/tests/test_execution_optimizer_limit_pushdown.pypython/ray/data/tests/test_predicate_pushdown.pypython/ray/data/tests/test_operator_fusion.pypython/ray/data/tests/test_execution_optimizer_basic.pypython/ray/data/tests/test_execution_optimizer_advanced.pypython/ray/data/tests/test_projection_fusion.pypython/ray/data/tests/test_randomize_block_order.pypython/ray/data/tests/test_state_export.py::test_logical_op_argspython/ray/data/tests/unit/test_logical_plan.pyStack Plan
To complete #60312, the original stack was:
Since PR4 was still too large, it is being further split:
LogicalOperatornaming behavior [Data] Default LogicalOperator name to class name #61020output_dependenciesresponsibility to physical side [Data] Move output_dependencies responsibilities to PhysicalOperator #61107LogicalOperatoran ABC with abstractnum_outputs[Data] Make LogicalOperator an ABC with abstract num_outputs #61308Planned follow-ups (not blocking this stack):
input_opusage toinput_dependenciesAbstractFromrestructuring