Skip to content

[core][train][data] Keep strong references to fire-and-forget asyncio tasks#63291

Merged
edoakes merged 2 commits into
ray-project:masterfrom
XuQianJin-Stars:fix/asyncio-create-task-references
May 14, 2026
Merged

[core][train][data] Keep strong references to fire-and-forget asyncio tasks#63291
edoakes merged 2 commits into
ray-project:masterfrom
XuQianJin-Stars:fix/asyncio-create-task-references

Conversation

@XuQianJin-Stars

Copy link
Copy Markdown
Contributor

Why are these changes needed?

Python's asyncio documentation explicitly warns that the event loop only holds weak references to tasks created via asyncio.create_task / loop.create_task. A task without any other strong reference can be garbage collected at any time — even before it finishes:

Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn't referenced elsewhere may be garbage collected at any time, even before it's done.

https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

Three call sites in the repository were violating this contract (caught by ruff's RUF006 rule). All three create background tasks whose return value is discarded, so the resulting Task object is only referenced by the event loop's weak set and may be GC'd at any moment:

  • python/ray/_private/async_utils.pyenable_monitor_loop_lag()
    The event-loop lag monitor task could be silently collected, stopping lag monitoring without warning.

  • python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.pyCheckpointManager._notify()
    The notify task wakes up coroutines waiting on self._condition. If it's collected before notify_all() runs, listeners may wait forever.

  • python/ray/data/_internal/planner/plan_udf_map_op.py_generate_transform_fn_for_async_map().._execute_transform()
    The _reorder background task is responsible for forwarding completed results from completed_tasks_queue into the output queue in deterministic order. Losing this task to GC would silently break the entire async map operation.

What was changed

Each of the three sites now retains a strong reference to the created task using the pattern recommended by the asyncio docs:

File Pattern
async_utils.py Module-level _BACKGROUND_TASKS: Set[asyncio.Task] + task.add_done_callback(_BACKGROUND_TASKS.discard)
checkpoint_manager.py Per-instance self._background_tasks: set + add_done_callback(self._background_tasks.discard)
plan_udf_map_op.py Local variable reorder_task = asyncio.create_task(_reorder()), awaited in the finally block of _execute_transform (also propagates unexpected exceptions)

No public API change. No behavior change in the happy path — only correctness under GC pressure.

Related issues

N/A (silences the RUF006 lint warnings for these three files).

Checks

  • I've signed off every commit (DCO).
  • ruff check --select RUF006 passes on the three modified files.
  • python -m py_compile passes for all three files.
  • I've made sure the tests are passing.
… tasks

Python's asyncio documentation explicitly warns that the event loop only
holds weak references to tasks created via asyncio.create_task /
loop.create_task. A task without any other strong reference can be
garbage collected at any time, even before it finishes:

  https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

Three call sites in the repository were violating this contract (caught
by ruff's RUF006 rule):

* python/ray/_private/async_utils.py: monitor_loop_lag()
  The event-loop lag monitor task could be silently collected, stopping
  lag monitoring without warning.

* python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.py:
  CheckpointManager._notify()
  The notify task wakes up coroutines waiting on _condition. If it gets
  collected before notify_all() runs, listeners may wait forever.

* python/ray/data/_internal/planner/plan_udf_map_op.py:
  _generate_transform_fn_for_async_map()._execute_transform()
  The _reorder background task is responsible for forwarding completed
  results to the output queue in deterministic order. Losing this task
  to GC would silently break the entire async map operation.

Fix all three by retaining a strong reference (per-instance set with
add_done_callback discard, or a module-level set, or a local variable
that is awaited in finally).

Signed-off-by: forwardxu <forwardxu@apache.org>
@XuQianJin-Stars XuQianJin-Stars requested review from a team as code owners May 12, 2026 03:04

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request ensures that background asyncio tasks are not prematurely garbage collected by maintaining strong references to them in async_utils.py, plan_udf_map_op.py, and checkpoint_manager.py. Feedback was provided to use loop.create_task() instead of asyncio.create_task() in plan_udf_map_op.py for consistency with the local scope.

# reference with the event loop, so without a strong reference the
# task could be garbage collected mid-execution and the reordering
# would silently stop.
reorder_task = asyncio.create_task(_reorder())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the rest of this function (e.g., line 899) and other parts of the codebase (like async_utils.py), it is recommended to use loop.create_task() instead of asyncio.create_task() since the loop object is already available in the local scope.

Suggested change
reorder_task = asyncio.create_task(_reorder())
reorder_task = loop.create_task(_reorder())
Address review comment: use loop.create_task() instead of asyncio.create_task() to be consistent with the surrounding code in the same function (which already uses loop.create_task) and other modules like async_utils.py.

Signed-off-by: forwardxu <forwardxu@apache.org>
@ray-gardener ray-gardener Bot added core Issues that should be addressed in Ray Core community-contribution Contributed by the community labels May 12, 2026
@pseudo-rnd-thoughts pseudo-rnd-thoughts added train Ray Train Related Issue data Ray Data-related issues go add ONLY when ready to merge, run all tests labels May 12, 2026
Comment on lines +933 to +937
# Wait for the reorder task to finish draining ``completed_tasks_queue``
# and pushing remaining results to the output queue. This both keeps a
# strong reference to the task alive until completion (preventing GC)
# and surfaces any unexpected exception raised inside ``_reorder``.
await reorder_task

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani PTAL -- is it correct/will it break anything to await the task here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be safe.

@edoakes edoakes merged commit 12e96eb into ray-project:master May 14, 2026
12 checks passed
TruongQuangPhat pushed a commit to cyhapun/ray-fix-issue that referenced this pull request May 27, 2026
… tasks (ray-project#63291)

## Why are these changes needed?

Python's asyncio documentation explicitly warns that the event loop only
holds **weak** references to tasks created via `asyncio.create_task` /
`loop.create_task`. A task without any other strong reference can be
garbage collected at **any** time — even before it finishes:

> Important: Save a reference to the result of this function, to avoid a
task disappearing mid-execution. The event loop only keeps weak
references to tasks. A task that isn't referenced elsewhere may be
garbage collected at any time, even before it's done.
>
> —
https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

Three call sites in the repository were violating this contract (caught
by ruff's `RUF006` rule). All three create background tasks whose return
value is discarded, so the resulting `Task` object is only referenced by
the event loop's weak set and may be GC'd at any moment:

- **`python/ray/_private/async_utils.py`** — `enable_monitor_loop_lag()`
The event-loop lag monitor task could be silently collected, stopping
lag monitoring without warning.

-
**`python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.py`**
— `CheckpointManager._notify()`
The notify task wakes up coroutines waiting on `self._condition`. If
it's collected before `notify_all()` runs, listeners may wait forever.

- **`python/ray/data/_internal/planner/plan_udf_map_op.py`** —
`_generate_transform_fn_for_async_map().._execute_transform()`
The `_reorder` background task is responsible for forwarding completed
results from `completed_tasks_queue` into the output queue in
deterministic order. Losing this task to GC would silently break the
entire async map operation.

## What was changed

Each of the three sites now retains a strong reference to the created
task using the pattern recommended by the asyncio docs:

| File | Pattern |
|---|---|
| `async_utils.py` | Module-level `_BACKGROUND_TASKS: Set[asyncio.Task]`
+ `task.add_done_callback(_BACKGROUND_TASKS.discard)` |
| `checkpoint_manager.py` | Per-instance `self._background_tasks: set` +
`add_done_callback(self._background_tasks.discard)` |
| `plan_udf_map_op.py` | Local variable `reorder_task =
asyncio.create_task(_reorder())`, awaited in the `finally` block of
`_execute_transform` (also propagates unexpected exceptions) |

No public API change. No behavior change in the happy path — only
correctness under GC pressure.

## Related issues

N/A (silences the `RUF006` lint warnings for these three files).

## Checks

- [x] I've signed off every commit (DCO).
- [x] `ruff check --select RUF006` passes on the three modified files.
- [x] `python -m py_compile` passes for all three files.
- [ ] I've made sure the tests are passing.

---------

Signed-off-by: forwardxu <forwardxu@apache.org>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core data Ray Data-related issues go add ONLY when ready to merge, run all tests train Ray Train Related Issue

4 participants