Skip to content

[Data] Replace on_exit hook with __ray_shutdown__ to fix UDF cleanup race condition#61700

Merged
raulchen merged 38 commits into
ray-project:masterfrom
rayhhome:to-exit-to-shutdown
Mar 20, 2026
Merged

[Data] Replace on_exit hook with __ray_shutdown__ to fix UDF cleanup race condition#61700
raulchen merged 38 commits into
ray-project:masterfrom
rayhhome:to-exit-to-shutdown

Conversation

@rayhhome

@rayhhome rayhhome commented Mar 12, 2026

Copy link
Copy Markdown
Contributor

Description

Replaces _MapWorker.on_exit() with _MapWorker.__ray_shutdown__() and removes the DataContext._enable_actor_pool_on_exit_hook workaround flag..

What changed and why:

The old approach called actor.on_exit.remote() (a regular actor task) in _release_running_actor, then used ray.wait(..., timeout=30s) to block until the hook finished. This had two problems:

  • Opt-in only. The hook was gated behind DataContext._enable_actor_pool_on_exit_hook, which defaulted to False. UDF cleanup was silently skipped unless users knew to set the private flag.
  • Fault-tolerance race condition. Because on_exit was submitted as a regular task, a lineage-reconstruction retry could be routed to the same actor after on_exit had already deleted the UDF. This may cause the retried task to execute against a None UDF instance.

The new approach:

  • Renames on_exit() to __ray_shutdown__() on _MapWorker, using Ray Core's native actor shutdown hook, which is called directly by the worker process before it exits.
  • Replaces .options().remote() with ._remote() for actor task submission. ActorMethod.options() creates a FuncWrapper closure that captures the ActorMethod (and therefore the ActorHandle) in a closure cell, forming a reference cycle. This cycle prevents actor handles from being collected by reference counting alone, meaning __ray_shutdown__ would never fire without explicit gc.collect(). Using ._remote() directly avoids the FuncWrapper entirely, so actor handles are collected properly by reference counting once all strong references are dropped.
  • Relies on passive GC (reference counting) to trigger __ray_shutdown__. During graceful shutdown, the actor pool drops its references to actor handles in _release_running_actor.
  • UDF cleanup is now unconditional. __ray_shutdown__ is always called on graceful actor exit with no flag, no timeout, and no explicit termination task.

Removed:

  • DataContext._enable_actor_pool_on_exit_hook (the flag is no longer needed because cleanup is now zero-cost and unconditional).
  • _MapWorker.on_exit() (replaced by __ray_shutdown__()).
  • The on_exit_refs collection and ray.wait() call in _release_running_actors.
  • _ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S.

Related issues

Related to #53249 and partially resolves #60453.

Additional information

The race condition in question from old on_exit approach:

  • Actor A is processing Task T.
  • _release_running_actor submits actor.on_exit.remote(); task added to actor's queue.
  • Task T fails and retry task is routed back to Actor A.
  • on_exit runs and deletes UDF.
  • Retry arrives and executes against None UDF, leading to crash.

With the new approach:

  • _release_running_actor drops all pool references to the actor handle..
  • Once _data_tasks are cleared during shutdown, the actor handle's refcount reaches zero and the actor exits gracefully.
  • Ray Core calls __ray_shutdown__ directly in the worker process before exit, after all pending tasks complete.
  • __ray_shutdown__ runs as part of the actor's exit sequence, guaranteed to be the last thing before the process terminates. No FIFO queuing issue (race conditions) because of this.

The old _enable_actor_pool_on_exit_hook was a private, temporary workaround documented as having this race condition. It has been removed entirely as UDF cleanup is now unconditional and safe by default. Users who were setting ctx._enable_actor_pool_on_exit_hook = True will get the same behavior automatically with no code changes.

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
@rayhhome rayhhome self-assigned this Mar 12, 2026
@rayhhome rayhhome requested a review from a team as a code owner March 12, 2026 23:57
@rayhhome rayhhome added the data Ray Data-related issues label Mar 12, 2026
Copilot AI review requested due to automatic review settings March 12, 2026 23:57

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the old _MapWorker.on_exit() actor cleanup mechanism with Ray Core's native __ray_shutdown__() hook, fixing a race condition where lineage-reconstruction retries could be routed to an actor after its UDF had already been deleted. The DataContext._enable_actor_pool_on_exit_hook workaround flag is removed, making UDF cleanup unconditional.

Changes:

  • Renamed on_exit() to __ray_shutdown__() on _MapWorker and replaced the actor.on_exit.remote() + ray.wait() pattern with actor.__ray_terminate__.remote() in _release_running_actor, leveraging Ray Core's FIFO guarantee to prevent the retry race condition.
  • Removed DataContext._enable_actor_pool_on_exit_hook flag and _ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S, along with associated plumbing (on_exit_refs collection, ray.wait() call).
  • Updated tests to remove the now-unnecessary ctx._enable_actor_pool_on_exit_hook = True setup and renamed mock method accordingly.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py Core changes: rename on_exit__ray_shutdown__, replace shutdown mechanism with __ray_terminate__, add graceful parameter, remove flag plumbing
python/ray/data/context.py Remove _enable_actor_pool_on_exit_hook field
python/ray/data/tests/test_map.py Remove flag setup from test_actor_udf_cleanup
python/ray/data/tests/test_actor_pool_map_operator.py Rename mock on_exit__ray_shutdown__

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request replaces the custom on_exit hook with Ray Core's __ray_shutdown__ mechanism for actor cleanup in _MapWorker. This is a solid improvement that resolves a race condition during fault-tolerance retries by leveraging actor.__ray_terminate__.remote(). The changes correctly remove the opt-in flag, making UDF cleanup unconditional and more reliable. The implementation is clean, and the updated docstrings in _MapWorker and _release_running_actor clearly explain the new shutdown behavior. The associated test changes confirm the new logic works as expected. Overall, this is a well-executed and important fix for UDF cleanup reliability.

@rayhhome rayhhome added the go add ONLY when ready to merge, run all tests label Mar 13, 2026

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments. Also for the future, I think we also need to add some documentation about how callable classes work in del vs ray_shutdown somewhere

Comment thread python/ray/data/_internal/execution/operators/actor_pool_map_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/actor_pool_map_operator.py Outdated
Comment thread python/ray/data/_internal/execution/operators/actor_pool_map_operator.py Outdated
Comment thread python/ray/data/tests/test_map.py Outdated
@rayhhome rayhhome force-pushed the to-exit-to-shutdown branch from 9987c77 to b351140 Compare March 17, 2026 20:21
@rayhhome rayhhome requested review from a team as code owners March 17, 2026 20:21
Comment thread python/ray/data/_internal/tensor_extensions/arrow.py Outdated
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
# Call _remote() directly instead of .options().remote() to
# avoid the FuncWrapper closure in ActorMethod.options(), which
# creates a reference cycle that prevents the ActorHandle from
# being collected by reference counting alone.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulchen raulchen enabled auto-merge (squash) March 19, 2026 20:47
@github-actions github-actions Bot disabled auto-merge March 19, 2026 20:55
Signed-off-by: HFFuture <ray.huang@anyscale.com>
@github-actions github-actions Bot disabled auto-merge March 20, 2026 21:42

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Comment thread python/ray/data/_internal/execution/operators/actor_pool_map_operator.py Outdated
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
@raulchen raulchen merged commit 16eaed0 into ray-project:master Mar 20, 2026
6 checks passed
raulchen added a commit that referenced this pull request Mar 25, 2026
## Description
In #61700, a direct call to `_remote()` is used as workaround to get
over the issue described in #61922. This issue has been fixed in #61934
and this PR changes the direct `_remote()` call back to the method
chaining style.

## Related issues
Related to #61922.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Mar 25, 2026
…race condition (ray-project#61700)

## Description
Replaces `_MapWorker.on_exit()` with `_MapWorker.__ray_shutdown__()` and
removes the `DataContext._enable_actor_pool_on_exit_hook` workaround
flag..

### What changed and why:

The old approach called `actor.on_exit.remote()` (a regular actor task)
in _release_running_actor, then used `ray.wait(..., timeout=30s)` to
block until the hook finished. This had two problems:
- Opt-in only. The hook was gated behind
`DataContext._enable_actor_pool_on_exit_hook`, which defaulted to
`False`. UDF cleanup was silently skipped unless users knew to set the
private flag.
- Fault-tolerance race condition. Because `on_exit` was submitted as a
regular task, a lineage-reconstruction retry could be routed to the same
actor after `on_exit` had already deleted the UDF. This may cause the
retried task to execute against a `None` UDF instance.

### The new approach:
- Renames `on_exit()` to `__ray_shutdown__()` on `_MapWorker`, using Ray
Core's native actor shutdown hook, which is called directly by the
worker process before it exits.
- Replaces `.options().remote()` with `._remote()` for actor task
submission. `ActorMethod.options()` creates a `FuncWrapper` closure that
captures the `ActorMethod` (and therefore the `ActorHandle`) in a
closure cell, forming a reference cycle. This cycle prevents actor
handles from being collected by reference counting alone, meaning
`__ray_shutdown__` would never fire without explicit `gc.collect()`.
Using `._remote()` directly avoids the `FuncWrapper` entirely, so actor
handles are collected properly by reference counting once all strong
references are dropped.
- Relies on passive GC (reference counting) to trigger
`__ray_shutdown__`. During graceful shutdown, the actor pool drops its
references to actor handles in `_release_running_actor`.
- UDF cleanup is now unconditional. `__ray_shutdown__` is always called
on graceful actor exit with no flag, no timeout, and no explicit
termination task.

### Removed:
- `DataContext._enable_actor_pool_on_exit_hook` (the flag is no longer
needed because cleanup is now zero-cost and unconditional).
- `_MapWorker.on_exit()` (replaced by `__ray_shutdown__()`).
- The on_exit_refs collection and `ray.wait()` call in
_release_running_actors.
- `_ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S`.

## Related issues
Related to ray-project#53249 and partially resolves ray-project#60453.

## Additional information
The race condition in question from old `on_exit` approach:
- Actor A is processing Task T.
- `_release_running_actor` submits `actor.on_exit.remote()`; task added
to actor's queue.
- Task T fails and retry task is routed back to Actor A.
- on_exit runs and deletes UDF.
- Retry arrives and executes against `None` UDF, leading to crash.

With the new approach:
- `_release_running_actor` drops all pool references to the actor
handle..
- Once `_data_tasks` are cleared during shutdown, the actor handle's
refcount reaches zero and the actor exits gracefully.
- Ray Core calls `__ray_shutdown__` directly in the worker process
before exit, after all pending tasks complete.
- `__ray_shutdown__` runs as part of the actor's exit sequence,
guaranteed to be the last thing before the process terminates. No FIFO
queuing issue (race conditions) because of this.

The old `_enable_actor_pool_on_exit_hook` was a private, temporary
workaround documented as having this race condition. It has been removed
entirely as UDF cleanup is now unconditional and safe by default. Users
who were setting `ctx._enable_actor_pool_on_exit_hook = True` will get
the same behavior automatically with no code changes.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Mar 25, 2026
## Description
In ray-project#61700, a direct call to `_remote()` is used as workaround to get
over the issue described in ray-project#61922. This issue has been fixed in ray-project#61934
and this PR changes the direct `_remote()` call back to the method
chaining style.

## Related issues
Related to ray-project#61922.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…race condition (ray-project#61700)

## Description
Replaces `_MapWorker.on_exit()` with `_MapWorker.__ray_shutdown__()` and
removes the `DataContext._enable_actor_pool_on_exit_hook` workaround
flag..

### What changed and why:

The old approach called `actor.on_exit.remote()` (a regular actor task)
in _release_running_actor, then used `ray.wait(..., timeout=30s)` to
block until the hook finished. This had two problems:
- Opt-in only. The hook was gated behind
`DataContext._enable_actor_pool_on_exit_hook`, which defaulted to
`False`. UDF cleanup was silently skipped unless users knew to set the
private flag.
- Fault-tolerance race condition. Because `on_exit` was submitted as a
regular task, a lineage-reconstruction retry could be routed to the same
actor after `on_exit` had already deleted the UDF. This may cause the
retried task to execute against a `None` UDF instance.

### The new approach:
- Renames `on_exit()` to `__ray_shutdown__()` on `_MapWorker`, using Ray
Core's native actor shutdown hook, which is called directly by the
worker process before it exits.
- Replaces `.options().remote()` with `._remote()` for actor task
submission. `ActorMethod.options()` creates a `FuncWrapper` closure that
captures the `ActorMethod` (and therefore the `ActorHandle`) in a
closure cell, forming a reference cycle. This cycle prevents actor
handles from being collected by reference counting alone, meaning
`__ray_shutdown__` would never fire without explicit `gc.collect()`.
Using `._remote()` directly avoids the `FuncWrapper` entirely, so actor
handles are collected properly by reference counting once all strong
references are dropped.
- Relies on passive GC (reference counting) to trigger
`__ray_shutdown__`. During graceful shutdown, the actor pool drops its
references to actor handles in `_release_running_actor`.
- UDF cleanup is now unconditional. `__ray_shutdown__` is always called
on graceful actor exit with no flag, no timeout, and no explicit
termination task.

### Removed:
- `DataContext._enable_actor_pool_on_exit_hook` (the flag is no longer
needed because cleanup is now zero-cost and unconditional).
- `_MapWorker.on_exit()` (replaced by `__ray_shutdown__()`).
- The on_exit_refs collection and `ray.wait()` call in
_release_running_actors.
- `_ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S`.

## Related issues
Related to ray-project#53249 and partially resolves ray-project#60453.

## Additional information
The race condition in question from old `on_exit` approach:
- Actor A is processing Task T.
- `_release_running_actor` submits `actor.on_exit.remote()`; task added
to actor's queue.
- Task T fails and retry task is routed back to Actor A.
- on_exit runs and deletes UDF.
- Retry arrives and executes against `None` UDF, leading to crash.

With the new approach:
- `_release_running_actor` drops all pool references to the actor
handle..
- Once `_data_tasks` are cleared during shutdown, the actor handle's
refcount reaches zero and the actor exits gracefully.
- Ray Core calls `__ray_shutdown__` directly in the worker process
before exit, after all pending tasks complete.
- `__ray_shutdown__` runs as part of the actor's exit sequence,
guaranteed to be the last thing before the process terminates. No FIFO
queuing issue (race conditions) because of this.

The old `_enable_actor_pool_on_exit_hook` was a private, temporary
workaround documented as having this race condition. It has been removed
entirely as UDF cleanup is now unconditional and safe by default. Users
who were setting `ctx._enable_actor_pool_on_exit_hook = True` will get
the same behavior automatically with no code changes.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
## Description
In ray-project#61700, a direct call to `_remote()` is used as workaround to get
over the issue described in ray-project#61922. This issue has been fixed in ray-project#61934
and this PR changes the direct `_remote()` call back to the method
chaining style.

## Related issues
Related to ray-project#61922.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

5 participants