[core][train][data] Keep strong references to fire-and-forget asyncio tasks#63291
Conversation
… tasks Python's asyncio documentation explicitly warns that the event loop only holds weak references to tasks created via asyncio.create_task / loop.create_task. A task without any other strong reference can be garbage collected at any time, even before it finishes: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task Three call sites in the repository were violating this contract (caught by ruff's RUF006 rule): * python/ray/_private/async_utils.py: monitor_loop_lag() The event-loop lag monitor task could be silently collected, stopping lag monitoring without warning. * python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.py: CheckpointManager._notify() The notify task wakes up coroutines waiting on _condition. If it gets collected before notify_all() runs, listeners may wait forever. * python/ray/data/_internal/planner/plan_udf_map_op.py: _generate_transform_fn_for_async_map()._execute_transform() The _reorder background task is responsible for forwarding completed results to the output queue in deterministic order. Losing this task to GC would silently break the entire async map operation. Fix all three by retaining a strong reference (per-instance set with add_done_callback discard, or a module-level set, or a local variable that is awaited in finally). Signed-off-by: forwardxu <forwardxu@apache.org>
There was a problem hiding this comment.
Code Review
This pull request ensures that background asyncio tasks are not prematurely garbage collected by maintaining strong references to them in async_utils.py, plan_udf_map_op.py, and checkpoint_manager.py. Feedback was provided to use loop.create_task() instead of asyncio.create_task() in plan_udf_map_op.py for consistency with the local scope.
| # reference with the event loop, so without a strong reference the | ||
| # task could be garbage collected mid-execution and the reordering | ||
| # would silently stop. | ||
| reorder_task = asyncio.create_task(_reorder()) |
There was a problem hiding this comment.
For consistency with the rest of this function (e.g., line 899) and other parts of the codebase (like async_utils.py), it is recommended to use loop.create_task() instead of asyncio.create_task() since the loop object is already available in the local scope.
| reorder_task = asyncio.create_task(_reorder()) | |
| reorder_task = loop.create_task(_reorder()) |
Address review comment: use loop.create_task() instead of asyncio.create_task() to be consistent with the surrounding code in the same function (which already uses loop.create_task) and other modules like async_utils.py. Signed-off-by: forwardxu <forwardxu@apache.org>
| # Wait for the reorder task to finish draining ``completed_tasks_queue`` | ||
| # and pushing remaining results to the output queue. This both keeps a | ||
| # strong reference to the task alive until completion (preventing GC) | ||
| # and surfaces any unexpected exception raised inside ``_reorder``. | ||
| await reorder_task |
There was a problem hiding this comment.
@bveeramani PTAL -- is it correct/will it break anything to await the task here?
There was a problem hiding this comment.
I think this should be safe.
… tasks (ray-project#63291) ## Why are these changes needed? Python's asyncio documentation explicitly warns that the event loop only holds **weak** references to tasks created via `asyncio.create_task` / `loop.create_task`. A task without any other strong reference can be garbage collected at **any** time — even before it finishes: > Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn't referenced elsewhere may be garbage collected at any time, even before it's done. > > — https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task Three call sites in the repository were violating this contract (caught by ruff's `RUF006` rule). All three create background tasks whose return value is discarded, so the resulting `Task` object is only referenced by the event loop's weak set and may be GC'd at any moment: - **`python/ray/_private/async_utils.py`** — `enable_monitor_loop_lag()` The event-loop lag monitor task could be silently collected, stopping lag monitoring without warning. - **`python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.py`** — `CheckpointManager._notify()` The notify task wakes up coroutines waiting on `self._condition`. If it's collected before `notify_all()` runs, listeners may wait forever. - **`python/ray/data/_internal/planner/plan_udf_map_op.py`** — `_generate_transform_fn_for_async_map().._execute_transform()` The `_reorder` background task is responsible for forwarding completed results from `completed_tasks_queue` into the output queue in deterministic order. Losing this task to GC would silently break the entire async map operation. ## What was changed Each of the three sites now retains a strong reference to the created task using the pattern recommended by the asyncio docs: | File | Pattern | |---|---| | `async_utils.py` | Module-level `_BACKGROUND_TASKS: Set[asyncio.Task]` + `task.add_done_callback(_BACKGROUND_TASKS.discard)` | | `checkpoint_manager.py` | Per-instance `self._background_tasks: set` + `add_done_callback(self._background_tasks.discard)` | | `plan_udf_map_op.py` | Local variable `reorder_task = asyncio.create_task(_reorder())`, awaited in the `finally` block of `_execute_transform` (also propagates unexpected exceptions) | No public API change. No behavior change in the happy path — only correctness under GC pressure. ## Related issues N/A (silences the `RUF006` lint warnings for these three files). ## Checks - [x] I've signed off every commit (DCO). - [x] `ruff check --select RUF006` passes on the three modified files. - [x] `python -m py_compile` passes for all three files. - [ ] I've made sure the tests are passing. --------- Signed-off-by: forwardxu <forwardxu@apache.org> Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
Why are these changes needed?
Python's asyncio documentation explicitly warns that the event loop only holds weak references to tasks created via
asyncio.create_task/loop.create_task. A task without any other strong reference can be garbage collected at any time — even before it finishes:Three call sites in the repository were violating this contract (caught by ruff's
RUF006rule). All three create background tasks whose return value is discarded, so the resultingTaskobject is only referenced by the event loop's weak set and may be GC'd at any moment:python/ray/_private/async_utils.py—enable_monitor_loop_lag()The event-loop lag monitor task could be silently collected, stopping lag monitoring without warning.
python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.py—CheckpointManager._notify()The notify task wakes up coroutines waiting on
self._condition. If it's collected beforenotify_all()runs, listeners may wait forever.python/ray/data/_internal/planner/plan_udf_map_op.py—_generate_transform_fn_for_async_map().._execute_transform()The
_reorderbackground task is responsible for forwarding completed results fromcompleted_tasks_queueinto the output queue in deterministic order. Losing this task to GC would silently break the entire async map operation.What was changed
Each of the three sites now retains a strong reference to the created task using the pattern recommended by the asyncio docs:
async_utils.py_BACKGROUND_TASKS: Set[asyncio.Task]+task.add_done_callback(_BACKGROUND_TASKS.discard)checkpoint_manager.pyself._background_tasks: set+add_done_callback(self._background_tasks.discard)plan_udf_map_op.pyreorder_task = asyncio.create_task(_reorder()), awaited in thefinallyblock of_execute_transform(also propagates unexpected exceptions)No public API change. No behavior change in the happy path — only correctness under GC pressure.
Related issues
N/A (silences the
RUF006lint warnings for these three files).Checks
ruff check --select RUF006passes on the three modified files.python -m py_compilepasses for all three files.