[core][Let the driver I/O context breathe] Resolve task dependencies synchronously when objects already exist#62561
Conversation
There was a problem hiding this comment.
Code Review
This pull request optimizes object retrieval by allowing GetAsync callbacks to execute synchronously when the object is already present in the memory store, reducing I/O event loop overhead. Additionally, PlasmaCallback is refactored to fetch objects directly via a non-blocking Get call after raylet notification, with a fallback mechanism for eviction. A review comment suggests that the Contains check within PlasmaCallback is redundant and could be removed to eliminate an unnecessary IPC and simplify the code.
07b1f39 to
f90305e
Compare
a6191cf to
aec304d
Compare
| // object already exists (see #47833 for why). In workloads like Data shuffle, all map outputs | ||
| // are ready before reduce tasks are submitted, so checking synchronously | ||
| // first avoids flooding the I/O context with callbacks. | ||
| auto existing = in_memory_store_.GetIfExists(obj_id); |
There was a problem hiding this comment.
Let's also benchmark the worst case workload for this change, which is that the object never exists and we make many unnecessary GetIfExists calls.
There was a problem hiding this comment.
Makes sense! Running the benchmark locally where I use a single actor blocks all producers. When consumers send, all producers are stuck at await signal.wait(), so GetIfExists has a 100% miss rate. My PR and master seem the same I just got small noisy results diff, between 0 to 3%(run 1k 10k 100k deps cases).
b89c3a5 to
b330ef2
Compare
…xist Signed-off-by: yicheng <yicheng@anyscale.com>
Signed-off-by: yicheng <yicheng@anyscale.com>
b330ef2 to
f1448b9
Compare
Signed-off-by: yicheng <yicheng@anyscale.com>
|
Do you see any performance difference w/ and w/o this PR in the microbenchmarks? ( |
ran the microbenchmarks in CI (lin) and compared with the latest master run (link). They're the same within noise. None of the microbenchmarks tests with many already ready object ref dependencies. All tasks are either argument free or have a single dep. We can add a dedicated case in a separate PR! |
| // Even if dependencies are already local, the ResolveDependencies callbacks are still | ||
| // called asynchronously in the event loop as a different task. | ||
| // Test that dependencies already in the store are resolved synchronously | ||
| // via GetIfExists, without posting to the I/O event queue. | ||
| TEST(LocalDependencyResolverTest, TestDependenciesAlreadyLocal) { |
There was a problem hiding this comment.
can you check git blame for the history of this test and see if this was added for good reason?
There was a problem hiding this comment.
This test was originally testing synchronous resolution when deps are already local. PR #47833 changed GetAsync to always post callbacks to io_context to avoid potential deadlocks, so this test was updated to wait asynchronously.
For our ifx, we does not modify GetAsync itself, we check if the object exists locally in ResolveDependencies before calling GetAsync, so the callback fires synchronously again.
There was a problem hiding this comment.
I see, and for correctness it's important that GetAsync always posts a callback, but not that the dependency resolver does?
There was a problem hiding this comment.
Yes! To be very specific, in #47833, what they are talking about is:
ray.cancel()fires and it holds lock A and waits for lock B on the Python thread- Meanwhile in the C++ worker thread, a task finishes, it holds lock B and triggers the callback that the dependency resolver registered. This callback could possibly try to hold lock A, so we deadlock.
So, I think Jiajun's reasoning is that a caller holding a lock then calling a function that needs another lock is too dangerous, and he decided to post that function to the I/O thread no matter what.
In our case, we are safe, and the reason is that we, the caller, do not hold any lock when calling that callback.
There was a problem hiding this comment.
This PR would leaks the abstraction a bit since we can't modify GetAsync itself, but we got performance gain!
…synchronously when objects already exist (ray-project#62561) The driver's single threaded I/O context is the bottleneck for many workloads. When it's overwhelmed, everything slows down. Workloads like Ray Data shuffle have two phases: map and reduce. You can't start reduce until all map tasks are done, which means all map objects are already in the memory store. Normally for large shuffles, the reduce tasks have very long object args. In core, what we do is the driver, as the reduce task submitter, needs to check task dependencies before submitting. In our case, track if each object is ready or not. We did this by calling GetAsync, which always puts a checking callback into the driver I/O context. This can create up to 1,028,921 GetAsync.Callback events, taking 17% of I/O thread time in the worst case during my experiments. Each callback takes 0.04ms to execute but waits 5,719ms on average in the queue. The reason GetAsync must be async is to avoid deadlock, you can read more on this: ray-project#47833 So we can't modify GetAsync. What we do instead is when resolving task dependencies, we first check if the object is ready. If it is, mark it resolved immediately. Checking object readiness and marking it resolved don't have any lock issues, so we are safe here. The improvement is great: we almost eliminate all those GetAsync.Callback events, and achieve ~9.2% end-to-end speedup in shuffle. --------- Signed-off-by: yicheng <yicheng@anyscale.com> Co-authored-by: yicheng <yicheng@anyscale.com>
…synchronously when objects already exist (ray-project#62561) The driver's single threaded I/O context is the bottleneck for many workloads. When it's overwhelmed, everything slows down. Workloads like Ray Data shuffle have two phases: map and reduce. You can't start reduce until all map tasks are done, which means all map objects are already in the memory store. Normally for large shuffles, the reduce tasks have very long object args. In core, what we do is the driver, as the reduce task submitter, needs to check task dependencies before submitting. In our case, track if each object is ready or not. We did this by calling GetAsync, which always puts a checking callback into the driver I/O context. This can create up to 1,028,921 GetAsync.Callback events, taking 17% of I/O thread time in the worst case during my experiments. Each callback takes 0.04ms to execute but waits 5,719ms on average in the queue. The reason GetAsync must be async is to avoid deadlock, you can read more on this: ray-project#47833 So we can't modify GetAsync. What we do instead is when resolving task dependencies, we first check if the object is ready. If it is, mark it resolved immediately. Checking object readiness and marking it resolved don't have any lock issues, so we are safe here. The improvement is great: we almost eliminate all those GetAsync.Callback events, and achieve ~9.2% end-to-end speedup in shuffle. --------- Signed-off-by: yicheng <yicheng@anyscale.com> Co-authored-by: yicheng <yicheng@anyscale.com>
Description
The driver's single threaded I/O context is the bottleneck for many workloads. When it's overwhelmed, everything slows down.
Workloads like Ray Data shuffle have two phases: map and reduce. You can't start reduce until all map tasks are done, which means all map objects are already in the memory store.
Normally for large shuffles, the reduce tasks have very long object args. In core, what we do is the driver, as the reduce task submitter, needs to check task dependencies before submitting. In our case, track if each object is ready or not.
We did this by calling GetAsync, which always puts a checking callback into the driver I/O context. This can create up to 1,028,921 GetAsync.Callback events, taking 17% of I/O thread time in the worst case during my experiments. Each callback takes 0.04ms to execute but waits 5,719ms on average in the queue. The reason GetAsync must be async is to avoid deadlock, you can read more on this: #47833
So we can't modify GetAsync. What we do instead is when resolving task dependencies, we first check if the object is ready. If it is, mark it resolved immediately. Checking object readiness and marking it resolved don't have any lock issues, so we are safe here.
The improvement is great: we almost eliminate all those GetAsync.Callback events, and achieve ~9.2% end-to-end speedup in shuffle.