Skip to content

[Data] Non-blocking Default Autoscaling Coordinator#62725

Merged
bveeramani merged 25 commits into
ray-project:masterfrom
rayhhome:ray-get-time-out
Apr 24, 2026
Merged

[Data] Non-blocking Default Autoscaling Coordinator#62725
bveeramani merged 25 commits into
ray-project:masterfrom
rayhhome:ray-get-time-out

Conversation

@rayhhome

@rayhhome rayhhome commented Apr 17, 2026

Copy link
Copy Markdown
Contributor

Description

Problem: get_allocated_resources was called every ~1s from the scheduling loop but used a blocking ray.get(), so any actor queue delay or result transfer latency directly stalled dataset execution.

This PR makes all three DefaultAutoscalingCoordinator public methods non-blocking:

  • get_allocated_resources: fires a background request and immediately returns the last cached value; the cache is updated when the response arrives on the next loop iteration.
  • request_resources and cancel_request: fire-and-forget; only send the request to the backing AutoscalingCoordinatorActor without any result observation.

Additional information

This change is backed by #62838: AutoscalingCoordinator abstract methods no longer take requester_id; DefaultAutoscalingCoordinator.__init__ now requires it.

The underlying _AutoscalingCoordinatorActor is unchanged.

Unit tests cover behaviors independently: in-flight caching, cache update on success, actor-error fallback to cached value, non-Ray error propagation, and client-side state cleanup on cancel.

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
@rayhhome rayhhome self-assigned this Apr 17, 2026
@rayhhome rayhhome added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Apr 17, 2026
@rayhhome rayhhome requested a review from a team as a code owner April 17, 2026 19:21
Copilot AI review requested due to automatic review settings April 17, 2026 19:21

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Ray Data’s DefaultAutoscalingCoordinator.get_allocated_resources() to avoid blocking the scheduling loop by issuing the actor RPC asynchronously and returning the last cached allocation until the in-flight response is ready.

Changes:

  • Add per-requester in-flight tracking (_pending_allocated_resources) and cached fallback behavior for non-blocking get_allocated_resources().
  • Introduce unit tests for in-flight caching, cache update on success, and failure-counter behavior for actor errors and timeouts.
  • Update an existing test to poll until the async get_allocated_resources() result arrives.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Implements non-blocking get_allocated_resources() with pending-request tracking, cancellation on timeout, and failure tracking/logging.
python/ray/data/tests/test_autoscaling_coordinator.py Adds unit tests for the new async/cached behavior and updates an existing test to poll for eventual allocation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread python/ray/data/tests/test_autoscaling_coordinator.py Outdated
result = ray.get(ref)
self._cached_allocated_resources[requester_id] = result
self._consecutive_failures_get_allocated_resources = 0
except Exception as exc:

Copilot AI Apr 17, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_allocated_resources() currently catches Exception from ray.get(ref) and converts it into cached fallback behavior via _record_get_allocated_resources_failure(). This swallows non-Ray/programming errors (e.g., TypeError/ValueError) that should surface to the caller, and it also conflicts with the PR description’s claim that non-Ray errors are propagated. Consider only treating Ray-originated failures as recoverable here (e.g., catch ray.exceptions.RayError / relevant Ray exceptions), and re-raise unexpected exceptions after cleaning up _pending_allocated_resources so real bugs aren’t masked.

Suggested change
except Exception as exc:
except ray.exceptions.RayError as exc:
Copilot uses AI. Check for mistakes.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors get_allocated_resources in DefaultAutoscalingCoordinator to be non-blocking, utilizing a cache and tracking in-flight asynchronous requests. The review feedback recommends using typing.Tuple for Python 3.8 compatibility and time.monotonic() for reliable duration tracking.

Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 7e0cb1e. Configure here.

Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
@rayhhome rayhhome changed the title [Data] Non-blocking get_allocated_resources Apr 21, 2026

@bveeramani bveeramani left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation LGTM. Left comments on tests

Comment on lines +64 to +66
Single-tenant: every instance is owned by exactly one
DefaultClusterAutoscalerV2 and is called with a single, fixed requester_id
for its entire lifetime. Violating this is undefined behavior.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt confused by this sentence. What does multi-tenancy mean in this context? I don't the jargon helps clarify the use of the class

I don't think we should state that it's owned by DefaultClusterAutoscalerV2 because the caller can be any Ray Data autoscaler implementation or even Ray Train. In general, I don't think we should make strong assumptions about who the caller is

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated comments to more accurately and directly describe the use of the DefaultAutoscalingCoordinator class.

DefaultClusterAutoscalerV2 and is called with a single, fixed requester_id
for its entire lifetime. Violating this is undefined behavior.
``get_allocated_resources`` tracks a single in-flight ref and falls back
to the cached value on actor errors; ``request_resources`` and

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On actor error or if the value isn't ready?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both; the new docstring should explain the behavior more clearly.

"""Fire-and-forget: submit a resource request to the coordinator actor.

Returns immediately without observing the result or errors. Actor-side
errors (e.g. type mismatches) are not surfaced to the caller.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What type mismatch can happen? Like invalid inputs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid inputs could lead to type mismatch. Upon review, this example seems too specific as an example for the docstring, considering there are much more meaningful designed ValueErrors that _AutoscalingCoordinatorActor can emit. Since all errors from the actor are swallowed in the current implementation, I've removed the example in the new commit.

if ready:
self._pending_allocated_resources = None
try:
self._cached_allocated_resources = ray.get(ref)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a timeout here.

If the the wait returns the reference as ready, and then the actor dies from CPU overload or something, then this ray.get will hang until the actor gets reconstructed.

Unlikely but good to be safe.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this using a timeout of 0 for get.

call_method,
counter_attr,
error_msg_prefix,
def _make_coordinator_with_mock_actor():

@bveeramani bveeramani Apr 22, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests in this file will be brittle because they assert against and mock several internal attributes. They're also a bit hard to read without understanding the implementation.

Here's a sketch of how you could refactor these:

  • Expose a seam so that we can pass in the autoscaling coordinator actor to the client. This allows us to test against an actual actor implementation (and avoid a heavy mock) without using the shared state of the default named actor.
  • Test only against the public methods (e.g., call request_resources and wait for that to be consistent rather than mocking _cached_allocated_resources)
  • Minimize mocking to just ray.wait/ray.get. This still makes some assumptions about implementation, but I think that might be unavoidable
@pytest.fixture
def autoscaling_coordinator_actor(ray_start_regular_shared):
    actor_cls = ray.remote(num_cpus=0)(_AutoscalingCoordinatorActor)
    actor = actor_cls.remote(
        send_resources_request=lambda b: None,
        get_cluster_nodes=lambda: [
            {"Alive": True, "Resources": {"CPU": 4}, "NodeID": "n1"}
        ],
    )
    yield actor
    ray.kill(actor)


def test_get_allocated_resources_eventually_consistent(autoscaling_coordinator_actor):
    coordinator = DefaultAutoscalingCoordinator(
        requester_id="test",
        autoscaling_coordinator_actor=autoscaling_coordinator_actor,
    )

    coordinator.request_resources(resources=[{"CPU": 1}], expire_after_s=60)

    wait_for_condition(lambda: coordinator.get_allocated_resources("test") == [{"CPU": 1}], timeout=5)


def test_get_allocated_resources_returns_cached_while_pending(autoscaling_coordinator_actor, monkeypatch):
    coordinator = DefaultAutoscalingCoordinator(
        requester_id="test",
        autoscaling_coordinator_actor=autoscaling_coordinator_actor,
    )
    coordinator.request_resources(resources=[{"CPU": 1}], expire_after_s=30)
    wait_for_condition(
        lambda: coordinator.get_allocated_resources("test") == [{"CPU": 1}],
        timeout=5
    )

    # Make ray.wait report all refs as pending.
    def fake_wait(refs, *args, **kwargs):
        return [], refs

    monkeypatch.setattr(ray, "wait", fake_wait)
    coordinator.request_resources(resources=[{"CPU": 2}], expire_after_s=30)
    # Should return the stale cached value, not block or error.
    result = coordinator.get_allocated_resources("test")

    assert result == [{"CPU": 1}]

etc. for other interface (not implementation) behaviors

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed!

Comment on lines +76 to +79
if autoscaling_coordinator_actor is not None:
# Bypass the cached_property by injecting the actor directly.
# Used in tests to avoid the shared named actor.
self.__dict__["_autoscaling_coordinator"] = autoscaling_coordinator_actor

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we should avoid Python magic unless absolutely necessary.

This could be written alternatively like this:

def __init__(...):
    self._autoscaling_coordinator = autoscaling_coordinator

def _get_or_create_autoscaling_coordinator(...):
    if self._autoscaling_coordinator is None:
        self._autoscaling_coordinator = # Create named actor

    return self._autoscaling_coordinator

def request_resources(...):
    autoscaling_coordinator = self._get_or_create_autoscaling_coordinator(...)
@bveeramani bveeramani merged commit 1112d90 into ray-project:master Apr 24, 2026
5 of 6 checks passed
@rayhhome rayhhome deleted the ray-get-time-out branch April 27, 2026 20:46
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
## Description

Problem: `get_allocated_resources` was called every ~1s from the
scheduling loop but used a blocking `ray.get()`, so any actor queue
delay or result transfer latency directly stalled dataset execution.

This PR makes `get_allocated_resources` non-blocking: it fires the
remote call in the background and immediately returns the last cached
value, updating the cache when the response arrives on the next loop
step. The first call for a new requester returns [] while the initial
response is in-flight, resolving ~1s later.

## Additional information

Currently, only `DefaultAutoscalingCoordinator.get_allocated_resources`
is made non-blocking. `request_resources` and `cancel_request` remain
blocking since they are not on the hot path.

Unit tests cover each behavior independently: in-flight caching, cache
update on success, non-Ray error propagation, and failure counter
escalation for both actor exceptions and timeouts.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
bveeramani added a commit that referenced this pull request Jun 9, 2026
… persistently starved (#63969)

## Description

A dataset's resource allocator depends on the `AutoscalingCoordinator`
server to get its share of allocated resources. To improve reliability,
#62725 made calls to the server
non-blocking. One consequence of this change is that the dataset gets
zero resources at the very start of execution while it waits for the
first response from the autoscaling coordnanator. As a result, we'd
consistently emit spurious warnings like this at the start of execution:
```
Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up.  
```

To avoid this confusion, I've made it so that we only emit the warning
after the first eligible operator has been starved for a minute.

## Related issues

## Additional information

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
sampan-s-nayak pushed a commit to sampan-s-nayak/ray that referenced this pull request Jun 10, 2026
… persistently starved (ray-project#63969)

## Description

A dataset's resource allocator depends on the `AutoscalingCoordinator`
server to get its share of allocated resources. To improve reliability,
ray-project#62725 made calls to the server
non-blocking. One consequence of this change is that the dataset gets
zero resources at the very start of execution while it waits for the
first response from the autoscaling coordnanator. As a result, we'd
consistently emit spurious warnings like this at the start of execution:
```
Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up.  
```

To avoid this confusion, I've made it so that we only emit the warning
after the first eligible operator has been starved for a minute.

## Related issues

## Additional information

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
… persistently starved (ray-project#63969)

## Description

A dataset's resource allocator depends on the `AutoscalingCoordinator`
server to get its share of allocated resources. To improve reliability,
ray-project#62725 made calls to the server
non-blocking. One consequence of this change is that the dataset gets
zero resources at the very start of execution while it waits for the
first response from the autoscaling coordnanator. As a result, we'd
consistently emit spurious warnings like this at the start of execution:
```
Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up.  
```

To avoid this confusion, I've made it so that we only emit the warning
after the first eligible operator has been starved for a minute.

## Related issues

## Additional information

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants