[Data] Non-blocking Default Autoscaling Coordinator#62725
Conversation
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Pull request overview
This PR updates Ray Data’s DefaultAutoscalingCoordinator.get_allocated_resources() to avoid blocking the scheduling loop by issuing the actor RPC asynchronously and returning the last cached allocation until the in-flight response is ready.
Changes:
- Add per-requester in-flight tracking (
_pending_allocated_resources) and cached fallback behavior for non-blockingget_allocated_resources(). - Introduce unit tests for in-flight caching, cache update on success, and failure-counter behavior for actor errors and timeouts.
- Update an existing test to poll until the async
get_allocated_resources()result arrives.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py | Implements non-blocking get_allocated_resources() with pending-request tracking, cancellation on timeout, and failure tracking/logging. |
| python/ray/data/tests/test_autoscaling_coordinator.py | Adds unit tests for the new async/cached behavior and updates an existing test to poll for eventual allocation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| result = ray.get(ref) | ||
| self._cached_allocated_resources[requester_id] = result | ||
| self._consecutive_failures_get_allocated_resources = 0 | ||
| except Exception as exc: |
There was a problem hiding this comment.
get_allocated_resources() currently catches Exception from ray.get(ref) and converts it into cached fallback behavior via _record_get_allocated_resources_failure(). This swallows non-Ray/programming errors (e.g., TypeError/ValueError) that should surface to the caller, and it also conflicts with the PR description’s claim that non-Ray errors are propagated. Consider only treating Ray-originated failures as recoverable here (e.g., catch ray.exceptions.RayError / relevant Ray exceptions), and re-raise unexpected exceptions after cleaning up _pending_allocated_resources so real bugs aren’t masked.
| except Exception as exc: | |
| except ray.exceptions.RayError as exc: |
There was a problem hiding this comment.
Code Review
This pull request refactors get_allocated_resources in DefaultAutoscalingCoordinator to be non-blocking, utilizing a cache and tracking in-flight asynchronous requests. The review feedback recommends using typing.Tuple for Python 3.8 compatibility and time.monotonic() for reliable duration tracking.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 7e0cb1e. Configure here.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…llocated_resources Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…or get_allocated_resources Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…Coordinator Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray.huang@anyscale.com>
bveeramani
left a comment
There was a problem hiding this comment.
Implementation LGTM. Left comments on tests
| Single-tenant: every instance is owned by exactly one | ||
| DefaultClusterAutoscalerV2 and is called with a single, fixed requester_id | ||
| for its entire lifetime. Violating this is undefined behavior. |
There was a problem hiding this comment.
I felt confused by this sentence. What does multi-tenancy mean in this context? I don't the jargon helps clarify the use of the class
I don't think we should state that it's owned by DefaultClusterAutoscalerV2 because the caller can be any Ray Data autoscaler implementation or even Ray Train. In general, I don't think we should make strong assumptions about who the caller is
There was a problem hiding this comment.
Updated comments to more accurately and directly describe the use of the DefaultAutoscalingCoordinator class.
| DefaultClusterAutoscalerV2 and is called with a single, fixed requester_id | ||
| for its entire lifetime. Violating this is undefined behavior. | ||
| ``get_allocated_resources`` tracks a single in-flight ref and falls back | ||
| to the cached value on actor errors; ``request_resources`` and |
There was a problem hiding this comment.
On actor error or if the value isn't ready?
There was a problem hiding this comment.
Both; the new docstring should explain the behavior more clearly.
| """Fire-and-forget: submit a resource request to the coordinator actor. | ||
|
|
||
| Returns immediately without observing the result or errors. Actor-side | ||
| errors (e.g. type mismatches) are not surfaced to the caller. |
There was a problem hiding this comment.
What type mismatch can happen? Like invalid inputs?
There was a problem hiding this comment.
Invalid inputs could lead to type mismatch. Upon review, this example seems too specific as an example for the docstring, considering there are much more meaningful designed ValueErrors that _AutoscalingCoordinatorActor can emit. Since all errors from the actor are swallowed in the current implementation, I've removed the example in the new commit.
| if ready: | ||
| self._pending_allocated_resources = None | ||
| try: | ||
| self._cached_allocated_resources = ray.get(ref) |
There was a problem hiding this comment.
We need a timeout here.
If the the wait returns the reference as ready, and then the actor dies from CPU overload or something, then this ray.get will hang until the actor gets reconstructed.
Unlikely but good to be safe.
There was a problem hiding this comment.
Fixed this using a timeout of 0 for get.
| call_method, | ||
| counter_attr, | ||
| error_msg_prefix, | ||
| def _make_coordinator_with_mock_actor(): |
There was a problem hiding this comment.
The tests in this file will be brittle because they assert against and mock several internal attributes. They're also a bit hard to read without understanding the implementation.
Here's a sketch of how you could refactor these:
- Expose a seam so that we can pass in the autoscaling coordinator actor to the client. This allows us to test against an actual actor implementation (and avoid a heavy mock) without using the shared state of the default named actor.
- Test only against the public methods (e.g., call
request_resourcesand wait for that to be consistent rather than mocking _cached_allocated_resources) - Minimize mocking to just ray.wait/ray.get. This still makes some assumptions about implementation, but I think that might be unavoidable
@pytest.fixture
def autoscaling_coordinator_actor(ray_start_regular_shared):
actor_cls = ray.remote(num_cpus=0)(_AutoscalingCoordinatorActor)
actor = actor_cls.remote(
send_resources_request=lambda b: None,
get_cluster_nodes=lambda: [
{"Alive": True, "Resources": {"CPU": 4}, "NodeID": "n1"}
],
)
yield actor
ray.kill(actor)
def test_get_allocated_resources_eventually_consistent(autoscaling_coordinator_actor):
coordinator = DefaultAutoscalingCoordinator(
requester_id="test",
autoscaling_coordinator_actor=autoscaling_coordinator_actor,
)
coordinator.request_resources(resources=[{"CPU": 1}], expire_after_s=60)
wait_for_condition(lambda: coordinator.get_allocated_resources("test") == [{"CPU": 1}], timeout=5)
def test_get_allocated_resources_returns_cached_while_pending(autoscaling_coordinator_actor, monkeypatch):
coordinator = DefaultAutoscalingCoordinator(
requester_id="test",
autoscaling_coordinator_actor=autoscaling_coordinator_actor,
)
coordinator.request_resources(resources=[{"CPU": 1}], expire_after_s=30)
wait_for_condition(
lambda: coordinator.get_allocated_resources("test") == [{"CPU": 1}],
timeout=5
)
# Make ray.wait report all refs as pending.
def fake_wait(refs, *args, **kwargs):
return [], refs
monkeypatch.setattr(ray, "wait", fake_wait)
coordinator.request_resources(resources=[{"CPU": 2}], expire_after_s=30)
# Should return the stale cached value, not block or error.
result = coordinator.get_allocated_resources("test")
assert result == [{"CPU": 1}]
etc. for other interface (not implementation) behaviorsSigned-off-by: Sirui Huang <ray.huang@anyscale.com>
| if autoscaling_coordinator_actor is not None: | ||
| # Bypass the cached_property by injecting the actor directly. | ||
| # Used in tests to avoid the shared named actor. | ||
| self.__dict__["_autoscaling_coordinator"] = autoscaling_coordinator_actor |
There was a problem hiding this comment.
Think we should avoid Python magic unless absolutely necessary.
This could be written alternatively like this:
def __init__(...):
self._autoscaling_coordinator = autoscaling_coordinator
def _get_or_create_autoscaling_coordinator(...):
if self._autoscaling_coordinator is None:
self._autoscaling_coordinator = # Create named actor
return self._autoscaling_coordinator
def request_resources(...):
autoscaling_coordinator = self._get_or_create_autoscaling_coordinator(...)## Description Problem: `get_allocated_resources` was called every ~1s from the scheduling loop but used a blocking `ray.get()`, so any actor queue delay or result transfer latency directly stalled dataset execution. This PR makes `get_allocated_resources` non-blocking: it fires the remote call in the background and immediately returns the last cached value, updating the cache when the response arrives on the next loop step. The first call for a new requester returns [] while the initial response is in-flight, resolving ~1s later. ## Additional information Currently, only `DefaultAutoscalingCoordinator.get_allocated_resources` is made non-blocking. `request_resources` and `cancel_request` remain blocking since they are not on the hot path. Unit tests cover each behavior independently: in-flight caching, cache update on success, non-Ray error propagation, and failure counter escalation for both actor exceptions and timeouts. --------- Signed-off-by: Sirui Huang <ray.huang@anyscale.com> Signed-off-by: HFFuture <ray.huang@anyscale.com>
… persistently starved (#63969) ## Description A dataset's resource allocator depends on the `AutoscalingCoordinator` server to get its share of allocated resources. To improve reliability, #62725 made calls to the server non-blocking. One consequence of this change is that the dataset gets zero resources at the very start of execution while it waits for the first response from the autoscaling coordnanator. As a result, we'd consistently emit spurious warnings like this at the start of execution: ``` Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up. ``` To avoid this confusion, I've made it so that we only emit the warning after the first eligible operator has been starved for a minute. ## Related issues ## Additional information --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
… persistently starved (ray-project#63969) ## Description A dataset's resource allocator depends on the `AutoscalingCoordinator` server to get its share of allocated resources. To improve reliability, ray-project#62725 made calls to the server non-blocking. One consequence of this change is that the dataset gets zero resources at the very start of execution while it waits for the first response from the autoscaling coordnanator. As a result, we'd consistently emit spurious warnings like this at the start of execution: ``` Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up. ``` To avoid this confusion, I've made it so that we only emit the warning after the first eligible operator has been starved for a minute. ## Related issues ## Additional information --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
… persistently starved (ray-project#63969) ## Description A dataset's resource allocator depends on the `AutoscalingCoordinator` server to get its share of allocated resources. To improve reliability, ray-project#62725 made calls to the server non-blocking. One consequence of this change is that the dataset gets zero resources at the very start of execution while it waits for the first response from the autoscaling coordnanator. As a result, we'd consistently emit spurious warnings like this at the start of execution: ``` Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up. ``` To avoid this confusion, I've made it so that we only emit the warning after the first eligible operator has been starved for a minute. ## Related issues ## Additional information --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>

Description
Problem:
get_allocated_resourceswas called every ~1s from the scheduling loop but used a blockingray.get(), so any actor queue delay or result transfer latency directly stalled dataset execution.This PR makes all three
DefaultAutoscalingCoordinatorpublic methods non-blocking:get_allocated_resources: fires a background request and immediately returns the last cached value; the cache is updated when the response arrives on the next loop iteration.request_resourcesandcancel_request: fire-and-forget; only send the request to the backingAutoscalingCoordinatorActorwithout any result observation.Additional information
This change is backed by #62838:
AutoscalingCoordinatorabstract methods no longer takerequester_id;DefaultAutoscalingCoordinator.__init__now requires it.The underlying
_AutoscalingCoordinatorActoris unchanged.Unit tests cover behaviors independently: in-flight caching, cache update on success, actor-error fallback to cached value, non-Ray error propagation, and client-side state cleanup on cancel.