[Serve] Resolve deadlock when replica result is accessed from two eve…#59385
Conversation
…nt loops Signed-off-by: abrar <abrar@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request effectively resolves a critical deadlock issue that occurred when awaiting chained DeploymentResponse objects from different event loops. The root cause was correctly identified as the non-thread-safe nature of asyncio.Lock across event loops. The fix, which replaces it with a non-blocking acquire pattern on a threading.Lock, is a robust and well-implemented solution for cross-loop synchronization. The new logic in to_object_ref_async is sound, and the comprehensive new test test_chained_deployment_response_await_order thoroughly validates the fix by checking various await orders. Overall, this is an excellent change that significantly improves the stability of deployment composition in Ray Serve. I have one suggestion for a minor code cleanup.
Signed-off-by: abrar <abrar@anyscale.com>
|
in the repro, why does the user event loop being blocked prevent the router loop from making progress to resolve b? |
discussed offline, here is a small toy example to demo the behavior def demo_the_actual_error():
"""
Show the actual error that occurs.
"""
print()
print("=" * 70)
print("DEMO 3: The actual error when using asyncio.Lock across loops")
print("=" * 70)
print()
lock = asyncio.Lock()
lock_acquired = threading.Event()
def thread_a():
async def hold_lock():
async with lock:
print("[Thread A] Acquired lock, holding for 1 second...")
lock_acquired.set()
await asyncio.sleep(1)
print("[Thread A] Releasing lock...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(hold_lock())
loop.close()
def thread_b():
lock_acquired.wait() # Wait for Thread A to acquire the lock
time.sleep(0.1) # Small delay to ensure we try to acquire after
async def try_acquire():
print("[Thread B] Trying to acquire lock...")
async with lock: # This will fail!
print("[Thread B] Got the lock!")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(try_acquire())
except RuntimeError as e:
print(f"[Thread B] ERROR: {e}")
finally:
loop.close()
t1 = threading.Thread(target=thread_a)
t2 = threading.Thread(target=thread_b)
t1.start()
t2.start()
t1.join()
t2.join() |
Summary
Fixes a deadlock that occurs when awaiting intermediate
DeploymentResponseobjects in a chain of deployment calls.Problem
The following code would hang indefinitely:
Interestingly, reversing the await order (
await cthenawait b) worked fine.Root Cause
When chained
DeploymentResponseobjects are used, the sameReplicaResultcan be accessed from two different event loops concurrently:await bbas an argument tocBoth code paths call
ReplicaResult.to_object_ref_async(), which used anasyncio.Lockfor synchronization. However,asyncio.Lockis not thread-safe and not designed for cross-event-loop usage, causing a deadlock.Fix
Replace the
asyncio.Lockwith a non-blocking acquire pattern using the existing thread-safethreading.Lock:Testing
Added
test_chained_deployment_response_await_orderwhich tests both await orders (b_firstandc_first) to ensure they complete without hanging. Ran this test with all combinations ofRAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREADandRAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOPenv varsTested against the repro in #54201
Performance
Use the same repro script as given in the GH issue except i switch the order, i'e
await cthenawait bso that i can run it on master.locust --headless -u 100 -r 10 --host http://localhost:8000