Skip to content
Merged
2 changes: 2 additions & 0 deletions python/ray/data/_internal/cluster_autoscaler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def create_cluster_autoscaler(
execution_id: str,
) -> ClusterAutoscaler:
resource_limits = data_context.execution_options.resource_limits
label_selector = data_context.execution_options.label_selector
cluster_autoscaler_version = os.environ.get(
CLUSTER_AUTOSCALER_ENV_KEY, DEFAULT_CLUSTER_AUTOSCALER_VERSION
)
Expand All @@ -50,6 +51,7 @@ def create_cluster_autoscaler(
resource_manager,
execution_id=execution_id,
resource_limits=resource_limits,
label_selector=label_selector,
)

elif cluster_autoscaler_version == ClusterAutoscalerVersion.V1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
logger = logging.getLogger(__name__)

HEAD_NODE_RESOURCE_LABEL = "node:__internal_head__"
# Label key the cluster autoscaler uses to bucket nodes by subcluster.
# Hardcoded so all components agree without per-Dataset configuration.
SUBCLUSTER_LABEL_KEY = "__subcluster__"
# Sentinel for "no subcluster" — used as both a node-label fallback and
# the bucket key for unlabeled nodes in ``_cluster_node_resources``.
DEFAULT_SUBCLUSTER: Optional[str] = None


RAY_DATA_AUTOSCALING_COORDINATOR_LOG_TRACEBACK = env_bool(
"RAY_DATA_AUTOSCALING_COORDINATOR_LOG_TRACEBACK", True
Expand Down Expand Up @@ -71,8 +78,12 @@ def __init__(
self,
requester_id: str,
autoscaling_coordinator_actor=None, # For testing only: injects an actor instead of using the shared named singleton.
subcluster_selector: Optional[Dict[str, str]] = None,
):
self._requester_id = requester_id
# Label selector keyed by ``SUBCLUSTER_LABEL_KEY`` pinning this
# requester to a single subcluster.
self._subcluster_selector = subcluster_selector
self._cached_allocated_resources: List[ResourceDict] = []
# In-flight get_allocated_resources ref, or None if no request is pending.
self._pending_allocated_resources: Optional[ray.ObjectRef] = None
Expand All @@ -83,7 +94,7 @@ def __init__(

@functools.cached_property
def _autoscaling_coordinator(self):
# Create the coordinator actor lazily rather than eagerly in the constructor.
# Lazy: avoids creating the actor in __init__.
return get_or_create_autoscaling_coordinator()

def request_resources(
Expand All @@ -105,6 +116,7 @@ def request_resources(
request_remaining=request_remaining,
priority=priority,
label_selectors=label_selectors,
subcluster_selector=self._subcluster_selector,
)

def cancel_request(self) -> None:
Expand Down Expand Up @@ -190,7 +202,11 @@ def __init__(
self._get_cluster_nodes = get_cluster_nodes

self._ongoing_reqs: Dict[str, OngoingRequest] = {}
self._cluster_node_resources: List[ResourceDict] = []
# Map from requester id to its subcluster selector.
self._subcluster_selectors: Dict[str, Optional[Dict[str, str]]] = {}
# Node resources bucketed by their ``SUBCLUSTER_LABEL_KEY`` value.
# Nodes without the key fall under ``DEFAULT_SUBCLUSTER``.
self._cluster_node_resources: Dict[Optional[str], List[ResourceDict]] = {}
# Lock for thread-safe access to shared state from the background
self._lock = threading.Lock()
self._update_cluster_node_resources()
Expand Down Expand Up @@ -223,12 +239,15 @@ def request_resources(
request_remaining: bool = False,
priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
label_selectors: Optional[List[Dict[str, str]]] = None,
subcluster_selector: Optional[Dict[str, str]] = None,
) -> None:
logger.debug(
"Received request from %s: %s (label_selectors=%s).",
"Received request from %s: %s "
"(label_selectors=%s, subcluster_selector=%s).",
requester_id,
resources,
label_selectors,
subcluster_selector,
)
if label_selectors is None:
label_selectors = [{} for _ in resources]
Expand All @@ -237,6 +256,20 @@ def request_resources(
f"label_selectors length ({len(label_selectors)}) must match "
f"resources length ({len(resources)})."
)
if subcluster_selector and label_selectors:
req_subcluster = subcluster_selector.get(SUBCLUSTER_LABEL_KEY)
for i, sel in enumerate(label_selectors):
bundle_subcluster = sel.get(SUBCLUSTER_LABEL_KEY)
if (
bundle_subcluster is not None
and bundle_subcluster != req_subcluster
):
raise ValueError(
f"Bundle {i} label_selector targets subcluster "
f"{bundle_subcluster!r}, but requester is registered to "
f"{req_subcluster!r}. Per-bundle cross-subcluster "
f"allocation is not supported."
)
with self._lock:
now = self._get_current_time()
request_updated = False
Expand All @@ -248,6 +281,15 @@ def request_resources(
)
if priority.value != old_req.priority:
raise ValueError("Cannot change priority of an ongoing request.")
if (
requester_id in self._subcluster_selectors
and self._subcluster_selectors[requester_id] != subcluster_selector
):
raise ValueError(
"Cannot change subcluster_selector of an ongoing request "
f"from {self._subcluster_selectors[requester_id]!r} to "
f"{subcluster_selector!r}."
)

request_updated = (
resources != old_req.requested_resources
Expand All @@ -267,6 +309,9 @@ def request_resources(
expiration_time=now + expire_after_s,
allocated_resources=[],
)
# Write subcluster after all validations so a rejected call
# never leaves the registry on a new subcluster.
self._subcluster_selectors[requester_id] = subcluster_selector
if request_updated:
# If the request has updated, immediately send
# a new request and reallocate resources.
Expand All @@ -282,25 +327,38 @@ def cancel_request(
if requester_id not in self._ongoing_reqs:
return
del self._ongoing_reqs[requester_id]
self._subcluster_selectors.pop(requester_id, None)
self._merge_and_send_requests()
self._reallocate_resources()

def _purge_expired_requests(self):
now = self._get_current_time()
self._ongoing_reqs = {
live = {
requester_id: req
for requester_id, req in self._ongoing_reqs.items()
if req.expiration_time > now
}
for expired_id in self._ongoing_reqs.keys() - live.keys():
self._subcluster_selectors.pop(expired_id, None)
self._ongoing_reqs = live

def _merge_and_send_requests(self):
"""Merge requests and send them to Ray Autoscaler."""
Comment thread
cursor[bot] marked this conversation as resolved.
"""Merge requests and send them to Ray Autoscaler.

Each bundle's forwarded selector is the union of its per-bundle
``requested_label_selectors`` entry and the requester's
``subcluster_selector``. The subcluster pin wins on key conflict,
so the autoscaler always sees the correct subcluster regardless
of what the per-bundle selectors contain.
"""
self._purge_expired_requests()
merged_req: List[ResourceDict] = []
merged_selectors: List[Dict[str, str]] = []
for req in self._ongoing_reqs.values():
for requester_id, req in self._ongoing_reqs.items():
merged_req.extend(req.requested_resources)
merged_selectors.extend(req.requested_label_selectors)
subcluster_selector = self._subcluster_selectors.get(requester_id) or {}
for per_bundle in req.requested_label_selectors:
merged_selectors.append({**per_bundle, **subcluster_selector})
if any(merged_selectors):
self._send_resources_request(merged_req, label_selectors=merged_selectors)
else:
Expand All @@ -324,7 +382,7 @@ def _maybe_subtract_resources(self, res1: ResourceDict, res2: ResourceDict) -> b
return True

def _update_cluster_node_resources(self) -> bool:
"""Update cluster's total resources. Return True if changed."""
"""Update cluster resources bucketed by subcluster. Return True if changed."""

def _is_node_eligible(node):
# Exclude dead nodes.
Expand All @@ -341,47 +399,69 @@ def _is_node_eligible(node):

nodes = list(filter(_is_node_eligible, self._get_cluster_nodes()))
Comment thread
TimothySeah marked this conversation as resolved.
nodes = sorted(nodes, key=lambda node: node.get("NodeID", ""))
cluster_node_resources = [node["Resources"] for node in nodes]
cluster_node_resources: Dict[Optional[str], List[ResourceDict]] = {}
for node in nodes:
# Safeguard against case where the value of Labels is None.
labels = node.get("Labels") or {}
subcluster = labels.get(SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER)
cluster_node_resources.setdefault(subcluster, []).append(node["Resources"])
if cluster_node_resources == self._cluster_node_resources:
return False
else:
logger.debug("Cluster resources updated: %s.", cluster_node_resources)
self._cluster_node_resources = cluster_node_resources
return True
logger.debug("Cluster resources updated: %s.", cluster_node_resources)
self._cluster_node_resources = cluster_node_resources
return True

def _reallocate_resources(self):
"""Reallocate cluster resources."""
"""Reallocate cluster resources.

Each requester's subcluster comes from its ``subcluster_selector``.
A requester without one is eligible only for the ``None`` bucket.
"""
now = self._get_current_time()
cluster_node_resources = copy.deepcopy(self._cluster_node_resources)
ongoing_reqs = sorted(
[req for req in self._ongoing_reqs.values() if req.expiration_time >= now]
cluster_node_resources: Dict[Optional[str], List[ResourceDict]] = copy.deepcopy(
self._cluster_node_resources
)
# Allocate resources to ongoing requests.
live_items = [
(req_id, req)
for req_id, req in self._ongoing_reqs.items()
if req.expiration_time >= now
]
live_items.sort(key=lambda item: item[1])

def _subcluster_of(requester_id: str) -> Optional[str]:
selector = self._subcluster_selectors.get(requester_id)
return (selector or {}).get(SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER)

# TODO(hchen): Optimize the following triple loop.
for ongoing_req in ongoing_reqs:
for requester_id, ongoing_req in live_items:
ongoing_req.allocated_resources = []
for req in ongoing_req.requested_resources:
for node_resource in cluster_node_resources:
if self._maybe_subtract_resources(node_resource, req):
ongoing_req.allocated_resources.append(req)
subcluster = _subcluster_of(requester_id)
for bundle in ongoing_req.requested_resources:
for node_resource in cluster_node_resources.get(subcluster, []):
if self._maybe_subtract_resources(node_resource, bundle):
ongoing_req.allocated_resources.append(bundle)
break
# Allocate remaining resources.
# NOTE: to handle the case where multiple datasets are running concurrently,
# we divide remaining resources equally to all requesters with `request_remaining=True`.
Comment thread
TimothySeah marked this conversation as resolved.
remaining_resource_requesters = [
req for req in ongoing_reqs if req.request_remaining

# Allocate remaining resources. Multiple concurrent requesters in
# the same subcluster split that subcluster's leftovers equally.
remaining_items = [
(req_id, req) for req_id, req in live_items if req.request_remaining
]
num_remaining_requesters = len(remaining_resource_requesters)
if num_remaining_requesters > 0:
for node_resource in cluster_node_resources:
# Divide remaining resources equally among requesters.
# NOTE: Integer division may leave some resources unallocated.
divided_resource = {
k: v // num_remaining_requesters for k, v in node_resource.items()
}
for ongoing_req in remaining_resource_requesters:
if any(v > 0 for v in divided_resource.values()):
ongoing_req.allocated_resources.append(divided_resource)
for subcluster, node_resources in cluster_node_resources.items():
eligible = [
req
for req_id, req in remaining_items
if _subcluster_of(req_id) == subcluster
]
if not eligible:
continue
for node_resource in node_resources:
# Integer division may leave some resources unallocated.
divided = {k: v // len(eligible) for k, v in node_resource.items()}
if not any(v > 0 for v in divided.values()):
continue
for r in eligible:
r.allocated_resources.append(divided)

if logger.isEnabledFor(logging.DEBUG):
msg = "Allocated resources:\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import ray
from .base_autoscaling_coordinator import AutoscalingCoordinator, ResourceDict
from .default_autoscaling_coordinator import (
DEFAULT_SUBCLUSTER,
SUBCLUSTER_LABEL_KEY,
DefaultAutoscalingCoordinator,
)
from .resource_utilization_gauge import (
Expand Down Expand Up @@ -69,8 +71,27 @@ def to_bundle(self):
return {"CPU": self.cpu, "GPU": self.gpu, "memory": self.mem}


def _get_node_resource_spec_and_count() -> Dict[_NodeResourceSpec, int]:
"""Get the unique node resource specs and their count in the cluster."""
def _get_node_resource_spec_and_count(
subcluster: Optional[str] = DEFAULT_SUBCLUSTER,
) -> Dict[_NodeResourceSpec, int]:
"""Get the unique node resource specs and their count in the cluster,
scoped to a single subcluster.

``subcluster`` is the value at ``SUBCLUSTER_LABEL_KEY`` to match
against. The default ``DEFAULT_SUBCLUSTER`` (None) selects nodes with
no subcluster label.

Quirk: the returned dict also contains a ``node_type: 0`` (ex: `"m5.xlarge": 0`) entry for every
node type registered in ``cluster_config.node_group_configs`` that
isn't included in this subcluster. ``get_cluster_config()``
reports node types but not labels, so the only way to know a
shape's subcluster is to inspect live nodes. Harmless: for example,
if m5.xlarge nodes only exist in the training subcluster, the validation
dataset will emit pending-bundle scale-up demand for foo nodes
stamped with the validation label, which the autoscaler can never
satisfy.
TODO: get labels from cluster config so the catalog can be filtered.
"""
nodes_resource_spec_count = defaultdict(int)

cluster_config = ray._private.state.state.get_cluster_config()
Expand All @@ -84,11 +105,16 @@ def _get_node_resource_spec_and_count() -> Dict[_NodeResourceSpec, int]:
)
nodes_resource_spec_count[node_resource_spec] = 0

# Filter out the head node.
# Filter out the head node and nodes outside the requester's subcluster.
node_resources = [
node["Resources"]
for node in ray.nodes()
if node["Alive"] and "node:__internal_head__" not in node["Resources"]
if (
node["Alive"]
and "node:__internal_head__" not in node["Resources"]
and (node.get("Labels") or {}).get(SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER)
== subcluster
)
]

for r in node_resources:
Expand Down Expand Up @@ -162,10 +188,9 @@ def __init__(
min_gap_between_autoscaling_requests_s: float = MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS, # noqa: E501
low_util_request_release_delay_s: float = DEFAULT_LOW_UTIL_REQUEST_RELEASE_DELAY_S, # noqa: E501
autoscaling_coordinator: Optional[AutoscalingCoordinator] = None,
get_node_counts: Callable[[], Dict[_NodeResourceSpec, int]] = (
_get_node_resource_spec_and_count
),
get_node_counts: Optional[Callable[[], Dict[_NodeResourceSpec, int]]] = None,
get_time: Callable[[], float] = time.time,
label_selector: Optional[Dict[str, str]] = None,
):
assert cluster_scaling_up_delta > 0
assert cluster_util_avg_window_s > 0
Expand All @@ -180,6 +205,7 @@ def __init__(
)

self._resource_limits = resource_limits
self._label_selector = label_selector or {}
self._resource_utilization_calculator = resource_utilization_calculator
# Threshold of cluster utilization to trigger scaling up.
self._cluster_scaling_up_util_threshold = cluster_scaling_up_util_threshold
Expand All @@ -200,9 +226,20 @@ def __init__(
self._requester_id = f"data-{execution_id}"
if autoscaling_coordinator is None:
autoscaling_coordinator = DefaultAutoscalingCoordinator(
requester_id=self._requester_id
requester_id=self._requester_id,
subcluster_selector=label_selector,
Comment thread
TimothySeah marked this conversation as resolved.
)
self._autoscaling_coordinator = autoscaling_coordinator
if get_node_counts is None:
# Scope node-shape/count discovery to this requester's subcluster
# so try_trigger_scaling doesn't pull node shapes / counts from
# other subclusters into ``active_bundles`` / ``pending_bundles``.
subcluster = self._label_selector.get(
SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER
)
get_node_counts = lambda: _get_node_resource_spec_and_count( # noqa: E731
subcluster=subcluster
)
self._get_node_counts = get_node_counts
self._get_time = get_time
self._autoscaling_enabled = is_autoscaling_enabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def request_resources(
request_remaining: bool = False,
priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
label_selectors: Optional[List[Dict[str, str]]] = None,
subcluster_selector: Optional[Dict[str, str]] = None,
) -> None:
if priority != ResourceRequestPriority.MEDIUM:
raise NotImplementedError(
Expand Down
Loading
Loading