Skip to content

[data] Clear queue for manually mark_execution_finished operators#58441

Merged
raulchen merged 6 commits into
ray-project:masterfrom
iamjustinhsu:jhsu/clear-queue-for-manually-completed-operators
Nov 10, 2025
Merged

[data] Clear queue for manually mark_execution_finished operators#58441
raulchen merged 6 commits into
ray-project:masterfrom
iamjustinhsu:jhsu/clear-queue-for-manually-completed-operators

Conversation

@iamjustinhsu

@iamjustinhsu iamjustinhsu commented Nov 7, 2025

Copy link
Copy Markdown
Contributor

Description

Currently, we clear external queues when an operator is manually marked as finished. But we don't clear their internal queues. This PR fixes that

Related issues

Fixes this test https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736

Additional information

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner November 7, 2025 00:44
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to clear internal operator queues when an execution is manually marked as finished. This is achieved by adding a new clear_internal_queues abstract method to InternalQueueOperatorMixin and calling it from mark_execution_finished. Implementations are provided for several operators. The overall approach is sound and addresses the problem described. I've provided a couple of suggestions to improve consistency and clarity in the implementation for AllToAllOperator and MapOperator.

Comment thread python/ray/data/_internal/execution/operators/base_physical_operator.py Outdated
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Comment thread python/ray/data/_internal/execution/operators/map_operator.py Outdated
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Nov 7, 2025
@omatthew98 omatthew98 added the go add ONLY when ready to merge, run all tests label Nov 7, 2025
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
"""Clear internal output queue."""
while self._output_queue.has_next():
bundle = self._output_queue.get_next()
self._metrics.on_output_dequeued(bundle)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Output queue accessed before initialization.

The clear_internal_output_queue method accesses self._output_queue without checking if it's None. Since _output_queue is initialized to None in __init__ and only set in start(), calling mark_execution_finished() before start() (which happens in InputDataBuffer.__init__) causes an AttributeError when trying to call self._output_queue.has_next().

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't happen because we always call start() on an operator.

@raulchen raulchen merged commit 8971f83 into ray-project:master Nov 10, 2025
6 checks passed
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…y-project#58441)

## Description
Currently, we clear _external_ queues when an operator is manually
marked as finished. But we don't clear their _internal_ queues. This PR
fixes that
## Related issues
Fixes this test
https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736
## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…y-project#58441)

## Description
Currently, we clear _external_ queues when an operator is manually
marked as finished. But we don't clear their _internal_ queues. This PR
fixes that
## Related issues
Fixes this test
https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736
## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…y-project#58441)

## Description
Currently, we clear _external_ queues when an operator is manually
marked as finished. But we don't clear their _internal_ queues. This PR
fixes that
## Related issues
Fixes this test
https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736
## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants