[Data] Replace on_exit hook with __ray_shutdown__ to fix UDF cleanup race condition#61700
Conversation
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Pull request overview
This PR replaces the old _MapWorker.on_exit() actor cleanup mechanism with Ray Core's native __ray_shutdown__() hook, fixing a race condition where lineage-reconstruction retries could be routed to an actor after its UDF had already been deleted. The DataContext._enable_actor_pool_on_exit_hook workaround flag is removed, making UDF cleanup unconditional.
Changes:
- Renamed
on_exit()to__ray_shutdown__()on_MapWorkerand replaced theactor.on_exit.remote()+ray.wait()pattern withactor.__ray_terminate__.remote()in_release_running_actor, leveraging Ray Core's FIFO guarantee to prevent the retry race condition. - Removed
DataContext._enable_actor_pool_on_exit_hookflag and_ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S, along with associated plumbing (on_exit_refscollection,ray.wait()call). - Updated tests to remove the now-unnecessary
ctx._enable_actor_pool_on_exit_hook = Truesetup and renamed mock method accordingly.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| python/ray/data/_internal/execution/operators/actor_pool_map_operator.py | Core changes: rename on_exit → __ray_shutdown__, replace shutdown mechanism with __ray_terminate__, add graceful parameter, remove flag plumbing |
| python/ray/data/context.py | Remove _enable_actor_pool_on_exit_hook field |
| python/ray/data/tests/test_map.py | Remove flag setup from test_actor_udf_cleanup |
| python/ray/data/tests/test_actor_pool_map_operator.py | Rename mock on_exit → __ray_shutdown__ |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Code Review
This pull request replaces the custom on_exit hook with Ray Core's __ray_shutdown__ mechanism for actor cleanup in _MapWorker. This is a solid improvement that resolves a race condition during fault-tolerance retries by leveraging actor.__ray_terminate__.remote(). The changes correctly remove the opt-in flag, making UDF cleanup unconditional and more reliable. The implementation is clean, and the updated docstrings in _MapWorker and _release_running_actor clearly explain the new shutdown behavior. The associated test changes confirm the new logic works as expected. Overall, this is a well-executed and important fix for UDF cleanup reliability.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…-exit-to-shutdown
iamjustinhsu
left a comment
There was a problem hiding this comment.
left some comments. Also for the future, I think we also need to add some documentation about how callable classes work in del vs ray_shutdown somewhere
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…-exit-to-shutdown
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…-exit-to-shutdown
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…-exit-to-shutdown
9987c77 to
b351140
Compare
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…-exit-to-shutdown
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
| # Call _remote() directly instead of .options().remote() to | ||
| # avoid the FuncWrapper closure in ActorMethod.options(), which | ||
| # creates a reference cycle that prevents the ActorHandle from | ||
| # being collected by reference counting alone. |
Signed-off-by: HFFuture <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
## Description In #61700, a direct call to `_remote()` is used as workaround to get over the issue described in #61922. This issue has been fixed in #61934 and this PR changes the direct `_remote()` call back to the method chaining style. ## Related issues Related to #61922. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com> Co-authored-by: Hao Chen <chenh1024@gmail.com>
…race condition (ray-project#61700) ## Description Replaces `_MapWorker.on_exit()` with `_MapWorker.__ray_shutdown__()` and removes the `DataContext._enable_actor_pool_on_exit_hook` workaround flag.. ### What changed and why: The old approach called `actor.on_exit.remote()` (a regular actor task) in _release_running_actor, then used `ray.wait(..., timeout=30s)` to block until the hook finished. This had two problems: - Opt-in only. The hook was gated behind `DataContext._enable_actor_pool_on_exit_hook`, which defaulted to `False`. UDF cleanup was silently skipped unless users knew to set the private flag. - Fault-tolerance race condition. Because `on_exit` was submitted as a regular task, a lineage-reconstruction retry could be routed to the same actor after `on_exit` had already deleted the UDF. This may cause the retried task to execute against a `None` UDF instance. ### The new approach: - Renames `on_exit()` to `__ray_shutdown__()` on `_MapWorker`, using Ray Core's native actor shutdown hook, which is called directly by the worker process before it exits. - Replaces `.options().remote()` with `._remote()` for actor task submission. `ActorMethod.options()` creates a `FuncWrapper` closure that captures the `ActorMethod` (and therefore the `ActorHandle`) in a closure cell, forming a reference cycle. This cycle prevents actor handles from being collected by reference counting alone, meaning `__ray_shutdown__` would never fire without explicit `gc.collect()`. Using `._remote()` directly avoids the `FuncWrapper` entirely, so actor handles are collected properly by reference counting once all strong references are dropped. - Relies on passive GC (reference counting) to trigger `__ray_shutdown__`. During graceful shutdown, the actor pool drops its references to actor handles in `_release_running_actor`. - UDF cleanup is now unconditional. `__ray_shutdown__` is always called on graceful actor exit with no flag, no timeout, and no explicit termination task. ### Removed: - `DataContext._enable_actor_pool_on_exit_hook` (the flag is no longer needed because cleanup is now zero-cost and unconditional). - `_MapWorker.on_exit()` (replaced by `__ray_shutdown__()`). - The on_exit_refs collection and `ray.wait()` call in _release_running_actors. - `_ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S`. ## Related issues Related to ray-project#53249 and partially resolves ray-project#60453. ## Additional information The race condition in question from old `on_exit` approach: - Actor A is processing Task T. - `_release_running_actor` submits `actor.on_exit.remote()`; task added to actor's queue. - Task T fails and retry task is routed back to Actor A. - on_exit runs and deletes UDF. - Retry arrives and executes against `None` UDF, leading to crash. With the new approach: - `_release_running_actor` drops all pool references to the actor handle.. - Once `_data_tasks` are cleared during shutdown, the actor handle's refcount reaches zero and the actor exits gracefully. - Ray Core calls `__ray_shutdown__` directly in the worker process before exit, after all pending tasks complete. - `__ray_shutdown__` runs as part of the actor's exit sequence, guaranteed to be the last thing before the process terminates. No FIFO queuing issue (race conditions) because of this. The old `_enable_actor_pool_on_exit_hook` was a private, temporary workaround documented as having this race condition. It has been removed entirely as UDF cleanup is now unconditional and safe by default. Users who were setting `ctx._enable_actor_pool_on_exit_hook = True` will get the same behavior automatically with no code changes. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com>
## Description In ray-project#61700, a direct call to `_remote()` is used as workaround to get over the issue described in ray-project#61922. This issue has been fixed in ray-project#61934 and this PR changes the direct `_remote()` call back to the method chaining style. ## Related issues Related to ray-project#61922. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com> Co-authored-by: Hao Chen <chenh1024@gmail.com>
…race condition (ray-project#61700) ## Description Replaces `_MapWorker.on_exit()` with `_MapWorker.__ray_shutdown__()` and removes the `DataContext._enable_actor_pool_on_exit_hook` workaround flag.. ### What changed and why: The old approach called `actor.on_exit.remote()` (a regular actor task) in _release_running_actor, then used `ray.wait(..., timeout=30s)` to block until the hook finished. This had two problems: - Opt-in only. The hook was gated behind `DataContext._enable_actor_pool_on_exit_hook`, which defaulted to `False`. UDF cleanup was silently skipped unless users knew to set the private flag. - Fault-tolerance race condition. Because `on_exit` was submitted as a regular task, a lineage-reconstruction retry could be routed to the same actor after `on_exit` had already deleted the UDF. This may cause the retried task to execute against a `None` UDF instance. ### The new approach: - Renames `on_exit()` to `__ray_shutdown__()` on `_MapWorker`, using Ray Core's native actor shutdown hook, which is called directly by the worker process before it exits. - Replaces `.options().remote()` with `._remote()` for actor task submission. `ActorMethod.options()` creates a `FuncWrapper` closure that captures the `ActorMethod` (and therefore the `ActorHandle`) in a closure cell, forming a reference cycle. This cycle prevents actor handles from being collected by reference counting alone, meaning `__ray_shutdown__` would never fire without explicit `gc.collect()`. Using `._remote()` directly avoids the `FuncWrapper` entirely, so actor handles are collected properly by reference counting once all strong references are dropped. - Relies on passive GC (reference counting) to trigger `__ray_shutdown__`. During graceful shutdown, the actor pool drops its references to actor handles in `_release_running_actor`. - UDF cleanup is now unconditional. `__ray_shutdown__` is always called on graceful actor exit with no flag, no timeout, and no explicit termination task. ### Removed: - `DataContext._enable_actor_pool_on_exit_hook` (the flag is no longer needed because cleanup is now zero-cost and unconditional). - `_MapWorker.on_exit()` (replaced by `__ray_shutdown__()`). - The on_exit_refs collection and `ray.wait()` call in _release_running_actors. - `_ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S`. ## Related issues Related to ray-project#53249 and partially resolves ray-project#60453. ## Additional information The race condition in question from old `on_exit` approach: - Actor A is processing Task T. - `_release_running_actor` submits `actor.on_exit.remote()`; task added to actor's queue. - Task T fails and retry task is routed back to Actor A. - on_exit runs and deletes UDF. - Retry arrives and executes against `None` UDF, leading to crash. With the new approach: - `_release_running_actor` drops all pool references to the actor handle.. - Once `_data_tasks` are cleared during shutdown, the actor handle's refcount reaches zero and the actor exits gracefully. - Ray Core calls `__ray_shutdown__` directly in the worker process before exit, after all pending tasks complete. - `__ray_shutdown__` runs as part of the actor's exit sequence, guaranteed to be the last thing before the process terminates. No FIFO queuing issue (race conditions) because of this. The old `_enable_actor_pool_on_exit_hook` was a private, temporary workaround documented as having this race condition. It has been removed entirely as UDF cleanup is now unconditional and safe by default. Users who were setting `ctx._enable_actor_pool_on_exit_hook = True` will get the same behavior automatically with no code changes. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com>
## Description In ray-project#61700, a direct call to `_remote()` is used as workaround to get over the issue described in ray-project#61922. This issue has been fixed in ray-project#61934 and this PR changes the direct `_remote()` call back to the method chaining style. ## Related issues Related to ray-project#61922. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com> Co-authored-by: Hao Chen <chenh1024@gmail.com>

Description
Replaces
_MapWorker.on_exit()with_MapWorker.__ray_shutdown__()and removes theDataContext._enable_actor_pool_on_exit_hookworkaround flag..What changed and why:
The old approach called
actor.on_exit.remote()(a regular actor task) in _release_running_actor, then usedray.wait(..., timeout=30s)to block until the hook finished. This had two problems:DataContext._enable_actor_pool_on_exit_hook, which defaulted toFalse. UDF cleanup was silently skipped unless users knew to set the private flag.on_exitwas submitted as a regular task, a lineage-reconstruction retry could be routed to the same actor afteron_exithad already deleted the UDF. This may cause the retried task to execute against aNoneUDF instance.The new approach:
on_exit()to__ray_shutdown__()on_MapWorker, using Ray Core's native actor shutdown hook, which is called directly by the worker process before it exits..options().remote()with._remote()for actor task submission.ActorMethod.options()creates aFuncWrapperclosure that captures theActorMethod(and therefore theActorHandle) in a closure cell, forming a reference cycle. This cycle prevents actor handles from being collected by reference counting alone, meaning__ray_shutdown__would never fire without explicitgc.collect(). Using._remote()directly avoids theFuncWrapperentirely, so actor handles are collected properly by reference counting once all strong references are dropped.__ray_shutdown__. During graceful shutdown, the actor pool drops its references to actor handles in_release_running_actor.__ray_shutdown__is always called on graceful actor exit with no flag, no timeout, and no explicit termination task.Removed:
DataContext._enable_actor_pool_on_exit_hook(the flag is no longer needed because cleanup is now zero-cost and unconditional)._MapWorker.on_exit()(replaced by__ray_shutdown__()).ray.wait()call in _release_running_actors._ActorPool._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S.Related issues
Related to #53249 and partially resolves #60453.
Additional information
The race condition in question from old
on_exitapproach:_release_running_actorsubmitsactor.on_exit.remote(); task added to actor's queue.NoneUDF, leading to crash.With the new approach:
_release_running_actordrops all pool references to the actor handle.._data_tasksare cleared during shutdown, the actor handle's refcount reaches zero and the actor exits gracefully.__ray_shutdown__directly in the worker process before exit, after all pending tasks complete.__ray_shutdown__runs as part of the actor's exit sequence, guaranteed to be the last thing before the process terminates. No FIFO queuing issue (race conditions) because of this.The old
_enable_actor_pool_on_exit_hookwas a private, temporary workaround documented as having this race condition. It has been removed entirely as UDF cleanup is now unconditional and safe by default. Users who were settingctx._enable_actor_pool_on_exit_hook = Truewill get the same behavior automatically with no code changes.