[data] Clear queue for manually mark_execution_finished operators#58441
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to clear internal operator queues when an execution is manually marked as finished. This is achieved by adding a new clear_internal_queues abstract method to InternalQueueOperatorMixin and calling it from mark_execution_finished. Implementations are provided for several operators. The overall approach is sound and addresses the problem described. I've provided a couple of suggestions to improve consistency and clarity in the implementation for AllToAllOperator and MapOperator.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| """Clear internal output queue.""" | ||
| while self._output_queue.has_next(): | ||
| bundle = self._output_queue.get_next() | ||
| self._metrics.on_output_dequeued(bundle) |
There was a problem hiding this comment.
Bug: Output queue accessed before initialization.
The clear_internal_output_queue method accesses self._output_queue without checking if it's None. Since _output_queue is initialized to None in __init__ and only set in start(), calling mark_execution_finished() before start() (which happens in InputDataBuffer.__init__) causes an AttributeError when trying to call self._output_queue.has_next().
There was a problem hiding this comment.
This can't happen because we always call start() on an operator.
…y-project#58441) ## Description Currently, we clear _external_ queues when an operator is manually marked as finished. But we don't clear their _internal_ queues. This PR fixes that ## Related issues Fixes this test https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736 ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…y-project#58441) ## Description Currently, we clear _external_ queues when an operator is manually marked as finished. But we don't clear their _internal_ queues. This PR fixes that ## Related issues Fixes this test https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736 ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…y-project#58441) ## Description Currently, we clear _external_ queues when an operator is manually marked as finished. But we don't clear their _internal_ queues. This PR fixes that ## Related issues Fixes this test https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736 ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
Description
Currently, we clear external queues when an operator is manually marked as finished. But we don't clear their internal queues. This PR fixes that
Related issues
Fixes this test https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736
Additional information