Skip to content

[core][Let the driver I/O context breathe] Resolve task dependencies synchronously when objects already exist#62561

Merged
edoakes merged 4 commits into
ray-project:masterfrom
Yicheng-Lu-llll:getasync-sync-fast-path
Apr 14, 2026
Merged

[core][Let the driver I/O context breathe] Resolve task dependencies synchronously when objects already exist#62561
edoakes merged 4 commits into
ray-project:masterfrom
Yicheng-Lu-llll:getasync-sync-fast-path

Conversation

@Yicheng-Lu-llll

@Yicheng-Lu-llll Yicheng-Lu-llll commented Apr 13, 2026

Copy link
Copy Markdown
Member

Description

The driver's single threaded I/O context is the bottleneck for many workloads. When it's overwhelmed, everything slows down.

Workloads like Ray Data shuffle have two phases: map and reduce. You can't start reduce until all map tasks are done, which means all map objects are already in the memory store.

Normally for large shuffles, the reduce tasks have very long object args. In core, what we do is the driver, as the reduce task submitter, needs to check task dependencies before submitting. In our case, track if each object is ready or not.

We did this by calling GetAsync, which always puts a checking callback into the driver I/O context. This can create up to 1,028,921 GetAsync.Callback events, taking 17% of I/O thread time in the worst case during my experiments. Each callback takes 0.04ms to execute but waits 5,719ms on average in the queue. The reason GetAsync must be async is to avoid deadlock, you can read more on this: #47833

So we can't modify GetAsync. What we do instead is when resolving task dependencies, we first check if the object is ready. If it is, mark it resolved immediately. Checking object readiness and marking it resolved don't have any lock issues, so we are safe here.

The improvement is great: we almost eliminate all those GetAsync.Callback events, and achieve ~9.2% end-to-end speedup in shuffle.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request optimizes object retrieval by allowing GetAsync callbacks to execute synchronously when the object is already present in the memory store, reducing I/O event loop overhead. Additionally, PlasmaCallback is refactored to fetch objects directly via a non-blocking Get call after raylet notification, with a fallback mechanism for eviction. A review comment suggests that the Contains check within PlasmaCallback is redundant and could be removed to eliminate an unnecessary IPC and simplify the code.

Comment thread src/ray/core_worker/core_worker.cc Outdated
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the getasync-sync-fast-path branch 2 times, most recently from 07b1f39 to f90305e Compare April 13, 2026 16:41
@Yicheng-Lu-llll Yicheng-Lu-llll changed the title [core][Let the driver I/O context breathe] Invoke GetAsync callback synchronously when object already exists Apr 13, 2026
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the getasync-sync-fast-path branch 5 times, most recently from a6191cf to aec304d Compare April 13, 2026 17:03
@Yicheng-Lu-llll Yicheng-Lu-llll marked this pull request as ready for review April 13, 2026 17:29
@Yicheng-Lu-llll Yicheng-Lu-llll requested a review from a team as a code owner April 13, 2026 17:29
// object already exists (see #47833 for why). In workloads like Data shuffle, all map outputs
// are ready before reduce tasks are submitted, so checking synchronously
// first avoids flooding the I/O context with callbacks.
auto existing = in_memory_store_.GetIfExists(obj_id);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also benchmark the worst case workload for this change, which is that the object never exists and we make many unnecessary GetIfExists calls.

@Yicheng-Lu-llll Yicheng-Lu-llll Apr 13, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Running the benchmark locally where I use a single actor blocks all producers. When consumers send, all producers are stuck at await signal.wait(), so GetIfExists has a 100% miss rate. My PR and master seem the same I just got small noisy results diff, between 0 to 3%(run 1k 10k 100k deps cases).

@ray-gardener ray-gardener Bot added the core Issues that should be addressed in Ray Core label Apr 13, 2026
@Yicheng-Lu-llll Yicheng-Lu-llll added the go add ONLY when ready to merge, run all tests label Apr 13, 2026
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the getasync-sync-fast-path branch from b89c3a5 to b330ef2 Compare April 13, 2026 20:26
yicheng added 3 commits April 13, 2026 13:27
…xist

Signed-off-by: yicheng <yicheng@anyscale.com>
Signed-off-by: yicheng <yicheng@anyscale.com>
Signed-off-by: yicheng <yicheng@anyscale.com>
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the getasync-sync-fast-path branch from b330ef2 to f1448b9 Compare April 13, 2026 20:27
Signed-off-by: yicheng <yicheng@anyscale.com>
@edoakes

edoakes commented Apr 14, 2026

Copy link
Copy Markdown
Collaborator

Do you see any performance difference w/ and w/o this PR in the microbenchmarks? (ray microbenchmark to run them). If not, probably we are missing some cases! :)

@Yicheng-Lu-llll

Yicheng-Lu-llll commented Apr 14, 2026

Copy link
Copy Markdown
Member Author

Do you see any performance difference w/ and w/o this PR in the microbenchmarks? (ray microbenchmark to run them). If not, probably we are missing some cases! :)

ran the microbenchmarks in CI (lin) and compared with the latest master run (link). They're the same within noise. None of the microbenchmarks tests with many already ready object ref dependencies. All tasks are either argument free or have a single dep. We can add a dedicated case in a separate PR!

Comment on lines -418 to 420
// Even if dependencies are already local, the ResolveDependencies callbacks are still
// called asynchronously in the event loop as a different task.
// Test that dependencies already in the store are resolved synchronously
// via GetIfExists, without posting to the I/O event queue.
TEST(LocalDependencyResolverTest, TestDependenciesAlreadyLocal) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check git blame for the history of this test and see if this was added for good reason?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was originally testing synchronous resolution when deps are already local. PR #47833 changed GetAsync to always post callbacks to io_context to avoid potential deadlocks, so this test was updated to wait asynchronously.

For our ifx, we does not modify GetAsync itself, we check if the object exists locally in ResolveDependencies before calling GetAsync, so the callback fires synchronously again.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and for correctness it's important that GetAsync always posts a callback, but not that the dependency resolver does?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! To be very specific, in #47833, what they are talking about is:

  • ray.cancel() fires and it holds lock A and waits for lock B on the Python thread
  • Meanwhile in the C++ worker thread, a task finishes, it holds lock B and triggers the callback that the dependency resolver registered. This callback could possibly try to hold lock A, so we deadlock.

So, I think Jiajun's reasoning is that a caller holding a lock then calling a function that needs another lock is too dangerous, and he decided to post that function to the I/O thread no matter what.

In our case, we are safe, and the reason is that we, the caller, do not hold any lock when calling that callback.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR would leaks the abstraction a bit since we can't modify GetAsync itself, but we got performance gain!

@edoakes edoakes merged commit 15e8bf4 into ray-project:master Apr 14, 2026
6 checks passed
HLDKNotFound pushed a commit to chichic21039/ray that referenced this pull request Apr 22, 2026
…synchronously when objects already exist (ray-project#62561)

The driver's single threaded I/O context is the bottleneck for many
workloads. When it's overwhelmed, everything slows down.

Workloads like Ray Data shuffle have two phases: map and reduce. You
can't start reduce until all map tasks are done, which means all map
objects are already in the memory store.

Normally for large shuffles, the reduce tasks have very long object
args. In core, what we do is the driver, as the reduce task submitter,
needs to check task dependencies before submitting. In our case, track
if each object is ready or not.

We did this by calling GetAsync, which always puts a checking callback
into the driver I/O context. This can create up to 1,028,921
GetAsync.Callback events, taking 17% of I/O thread time in the worst
case during my experiments. Each callback takes 0.04ms to execute but
waits 5,719ms on average in the queue. The reason GetAsync must be async
is to avoid deadlock, you can read more on this: ray-project#47833

So we can't modify GetAsync. What we do instead is when resolving task
dependencies, we first check if the object is ready. If it is, mark it
resolved immediately. Checking object readiness and marking it resolved
don't have any lock issues, so we are safe here.

The improvement is great: we almost eliminate all those
GetAsync.Callback events, and achieve ~9.2% end-to-end speedup in
shuffle.

---------

Signed-off-by: yicheng <yicheng@anyscale.com>
Co-authored-by: yicheng <yicheng@anyscale.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…synchronously when objects already exist (ray-project#62561)

The driver's single threaded I/O context is the bottleneck for many
workloads. When it's overwhelmed, everything slows down.

Workloads like Ray Data shuffle have two phases: map and reduce. You
can't start reduce until all map tasks are done, which means all map
objects are already in the memory store.

Normally for large shuffles, the reduce tasks have very long object
args. In core, what we do is the driver, as the reduce task submitter,
needs to check task dependencies before submitting. In our case, track
if each object is ready or not.

We did this by calling GetAsync, which always puts a checking callback
into the driver I/O context. This can create up to 1,028,921
GetAsync.Callback events, taking 17% of I/O thread time in the worst
case during my experiments. Each callback takes 0.04ms to execute but
waits 5,719ms on average in the queue. The reason GetAsync must be async
is to avoid deadlock, you can read more on this: ray-project#47833

So we can't modify GetAsync. What we do instead is when resolving task
dependencies, we first check if the object is ready. If it is, mark it
resolved immediately. Checking object readiness and marking it resolved
don't have any lock issues, so we are safe here.

The improvement is great: we almost eliminate all those
GetAsync.Callback events, and achieve ~9.2% end-to-end speedup in
shuffle.

---------

Signed-off-by: yicheng <yicheng@anyscale.com>
Co-authored-by: yicheng <yicheng@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

2 participants