[Serve] Avoid proxy readiness future timeout race#62194
Conversation
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the wrap_as_future function in proxy_state.py to improve timeout handling. By introducing an intermediate asyncio.Future and a propagation helper, the implementation ensures that late completion of the source future does not conflict with a previously triggered timeout. A corresponding test case has been added to verify this behavior. The review feedback suggests simplifying the _propagate_concurrent_future_state logic by centralizing the check for the destination future's completion status.
| def _propagate_concurrent_future_state( | ||
| source_fut: concurrent.futures.Future, dest_fut: asyncio.Future | ||
| ): | ||
| if source_fut.cancelled(): | ||
| if not dest_fut.done(): | ||
| dest_fut.cancel() | ||
| return | ||
|
|
||
| exception = source_fut.exception() | ||
| if exception is not None: | ||
| _try_set_exception(dest_fut, exception) | ||
| return | ||
|
|
||
| if not dest_fut.done(): | ||
| dest_fut.set_result(source_fut.result()) |
There was a problem hiding this comment.
The logic in _propagate_concurrent_future_state can be simplified by checking if dest_fut is already done at the beginning of the function. This avoids redundant checks and makes the control flow more linear, improving maintainability.
| def _propagate_concurrent_future_state( | |
| source_fut: concurrent.futures.Future, dest_fut: asyncio.Future | |
| ): | |
| if source_fut.cancelled(): | |
| if not dest_fut.done(): | |
| dest_fut.cancel() | |
| return | |
| exception = source_fut.exception() | |
| if exception is not None: | |
| _try_set_exception(dest_fut, exception) | |
| return | |
| if not dest_fut.done(): | |
| dest_fut.set_result(source_fut.result()) | |
| def _propagate_concurrent_future_state( | |
| source_fut: concurrent.futures.Future, dest_fut: asyncio.Future | |
| ): | |
| if dest_fut.done(): | |
| return | |
| if source_fut.cancelled(): | |
| dest_fut.cancel() | |
| return | |
| exception = source_fut.exception() | |
| if exception is not None: | |
| dest_fut.set_exception(exception) | |
| return | |
| dest_fut.set_result(source_fut.result()) |
There was a problem hiding this comment.
+1. better for readability
abrarsheikh
left a comment
There was a problem hiding this comment.
change looks good to me, thanks for the explanation. lets incorporate bots comments
| def _propagate_concurrent_future_state( | ||
| source_fut: concurrent.futures.Future, dest_fut: asyncio.Future | ||
| ): | ||
| if source_fut.cancelled(): | ||
| if not dest_fut.done(): | ||
| dest_fut.cancel() | ||
| return | ||
|
|
||
| exception = source_fut.exception() | ||
| if exception is not None: | ||
| _try_set_exception(dest_fut, exception) | ||
| return | ||
|
|
||
| if not dest_fut.done(): | ||
| dest_fut.set_result(source_fut.result()) |
There was a problem hiding this comment.
+1. better for readability
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
## Summary This fixes the race in `wrap_as_future()` used by Serve proxy readiness/health/drain checks. The old implementation combined: - `asyncio.wrap_future(ref.future())`, which relies on asyncio's internal `_chain_future` - a timeout callback that directly called `set_exception()` on that same wrapped future If the Ray future completed just after the timeout path won, asyncio's `_copy_future_state` could still run and trip `assert not dest.done()`. This change keeps the timeout on an outer asyncio future and propagates the `concurrent.futures.Future` state into that outer future on the event loop. That removes the double-writer race while preserving the existing success/timeout semantics. ## Testing - `PYTHONPATH=/home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python python -m pytest /home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python/ray/serve/tests/test_proxy_actor_wrapper.py /home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python/ray/serve/tests/unit/test_proxy_state.py -q` - `uvx pre-commit run --files python/ray/serve/_private/proxy_state.py python/ray/serve/tests/test_proxy_actor_wrapper.py` Closes ray-project#60651. --------- Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com> Co-authored-by: abrar <abrar@anyscale.com>
## Summary This fixes the race in `wrap_as_future()` used by Serve proxy readiness/health/drain checks. The old implementation combined: - `asyncio.wrap_future(ref.future())`, which relies on asyncio's internal `_chain_future` - a timeout callback that directly called `set_exception()` on that same wrapped future If the Ray future completed just after the timeout path won, asyncio's `_copy_future_state` could still run and trip `assert not dest.done()`. This change keeps the timeout on an outer asyncio future and propagates the `concurrent.futures.Future` state into that outer future on the event loop. That removes the double-writer race while preserving the existing success/timeout semantics. ## Testing - `PYTHONPATH=/home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python python -m pytest /home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python/ray/serve/tests/test_proxy_actor_wrapper.py /home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python/ray/serve/tests/unit/test_proxy_state.py -q` - `uvx pre-commit run --files python/ray/serve/_private/proxy_state.py python/ray/serve/tests/test_proxy_actor_wrapper.py` Closes ray-project#60651. --------- Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com> Co-authored-by: abrar <abrar@anyscale.com>
Summary
This fixes the race in
wrap_as_future()used by Serve proxy readiness/health/drain checks.The old implementation combined:
asyncio.wrap_future(ref.future()), which relies on asyncio's internal_chain_futureset_exception()on that same wrapped futureIf the Ray future completed just after the timeout path won, asyncio's
_copy_future_statecould still run and tripassert not dest.done().This change keeps the timeout on an outer asyncio future and propagates the
concurrent.futures.Futurestate into that outer future on the event loop. That removes the double-writer race while preserving the existing success/timeout semantics.Testing
PYTHONPATH=/home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python python -m pytest /home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python/ray/serve/tests/test_proxy_actor_wrapper.py /home/simple/github/ray/.worktrees/fix-issue-60651-proxy-readiness-race/python/ray/serve/tests/unit/test_proxy_state.py -quvx pre-commit run --files python/ray/serve/_private/proxy_state.py python/ray/serve/tests/test_proxy_actor_wrapper.pyCloses #60651.