Skip to content

[data] Support multiple datasets in a cluster (2/2): partition cluster resources by subcluster label#63375

Merged
justinvyu merged 13 commits into
ray-project:masterfrom
TimothySeah:tseah/2-datasets-prototype-2
Jun 9, 2026
Merged

[data] Support multiple datasets in a cluster (2/2): partition cluster resources by subcluster label#63375
justinvyu merged 13 commits into
ray-project:masterfrom
TimothySeah:tseah/2-datasets-prototype-2

Conversation

@TimothySeah

@TimothySeah TimothySeah commented May 15, 2026

Copy link
Copy Markdown
Contributor

Summary

The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets and 2 trainers sharing the same AutoscalingCoordinator. The previous PR in this stack (#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster.

To this end, the main change was to AutoscalingCoordinator._tick, which is called at regular intervals. AutoscalingCoordinator._tick calls 3 helper methods, which this PR changes as follows:

  1. merge_and_send_requests: each autoscaling request now includes the subcluster label of the requester
  2. update_cluster_node_resources: we now group cluster nodes by subcluster
  3. _reallocate_resources: we now update OngoingRequests with their subcluster-scoped resources.

I also changed the try_trigger_scaling method, which creates datasets' autoscaling requests. Before this change, this method tried to scale up every node in the cluster. Now, it only scales up the relevant subcluster. Note that this only applies to dataset requesters; trainer requesters attempt scaleup by requesting resource bundles with their corresponding label selectors (which includes subcluster labels), so I didn't need to touch that path.

API summary

To use subcluster scheduling, the user must set the __subcluster__ label in their compute config

- name: training_node
  instance_type: p4d.24xlarge
  max_workers: 2
  min_workers: 2
  use_spot: false
  labels:
    __subcluster__: training

- name: validation_node
  instance_type: p4d.24xlarge
  max_workers: 1
  min_workers: 1
  use_spot: false
  labels:
    __subcluster__: validation

and the label_selector on their dataset

# Option 1: set using ray.train.DataConfig
trainer = TorchTrainer(
	train_fn,
	datasets={"train": train_ds, "val": val_ds},
	dataset_config=ray.train.DataConfig(
		datasets_to_split=["train"],
		data_execution_options=DataExecutionOptions(
			per_dataset_execution_options={
				"train": ExecutionOptions(
					label_selector={"__subcluster__": "train"}
				),
				"val": ExecutionOptoins(
					label_selector={"__subcluster__": "validation"}
				)
			}
		)
	)

# Option 2: set directly using DataContext.ExecutionOptions
train_ds.context.label_selector = {"__subcluster__": "training"}

Testing

Ran multitenancy stress test based on this PR (PR: #63737, test: https://buildkite.com/ray-project/release/builds/95982).

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements label_selector and subcluster_label_key support across Ray Data and Ray Train, allowing users to constrain task and actor placement to specific labeled subsets of a cluster. The changes include updates to ExecutionOptions, the AutoscalingCoordinator for resource bucketing, and broad propagation of these selectors through physical operators, planners, and data source implementations. Feedback was provided regarding the merge_label_selector utility, suggesting that it should always return a new dictionary to resolve a contradiction in its docstring and prevent potential mutation bugs.

Comment thread python/ray/data/_internal/execution/util.py

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/tests/test_autoscaling_coordinator.py Outdated
@TimothySeah TimothySeah marked this pull request as ready for review May 29, 2026 17:40
@TimothySeah TimothySeah requested review from a team as code owners May 29, 2026 17:40
@TimothySeah TimothySeah changed the title [data] AutoscalingCoordinator _tick loop respects subcluster boundaries May 29, 2026
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label May 29, 2026
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah TimothySeah force-pushed the tseah/2-datasets-prototype-2 branch from 60ccec0 to 7061322 Compare May 30, 2026 00:40
Comment thread python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py Outdated
…resources + request_remaining=True

Signed-off-by: Timothy Seah <tseah@anyscale.com>

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread python/ray/data/_internal/cluster_autoscaler/base_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/execution_options.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/execution_options.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit ef0e3e5. Configure here.

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is request_resources(label_selectors) used? what happens if you do request_resources(label_selectors, subcluster_selector)? Does one overwrite the other?

Is that meant to be "non-subcluster related labels"?

Can we also raise an error to explicitly disallow one requester trying to request bundles from multiple subclusters?

around here:

  if subcluster_selector and label_selectors:
      req_subcluster = subcluster_selector.get(SUBCLUSTER_LABEL_KEY)
      for i, sel in enumerate(label_selectors):
          bundle_subcluster = sel.get(SUBCLUSTER_LABEL_KEY)
          if bundle_subcluster is not None and bundle_subcluster != req_subcluster:
              raise ValueError(
                  f"Bundle {i} label_selector targets subcluster "
                  f"{bundle_subcluster!r}, but requester is registered to "
                  f"{req_subcluster!r}. Per-bundle cross-subcluster "
                  f"allocation is not supported."
              )
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py Outdated
Comment thread python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py Outdated
Signed-off-by: Timothy Seah <tseah@anyscale.com>
…ataset subcluster changes

Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah

Copy link
Copy Markdown
Contributor Author

when is request_resources(label_selectors) used?
Is that meant to be "non-subcluster related labels"?

Check out #58845 and #63287. In the former PR, my goal was to support placing Ray Train workers on nodes with particular attributes. These would usually be subcluster labels, but could also be nodes within a subcluster. For example, we may want to place Ray Train workers on gpu nodes within the training subcluster, as opposed to Ray Data workers for the training dataset on the cpu nodes within the training subcluster. However, I forgot to update the AutoscalingCoordinator to scale up these nodes if they don't currently exist, which @liulehui added in the latter PR.

Right now, there are two types of requesters - datasets and ray train. Datasets will always request the subcluster using subcluster_selector, while Ray Train will always request all desired node attributes - including the subcluster - together using label_selectors. I agree it's a bit clunky/confusing though so I am open to suggestions on how to clean up this separation.

what happens if you do request_resources(label_selectors, subcluster_selector)? Does one overwrite the other?
Can we also raise an error to explicitly disallow one requester trying to request bundles from multiple subclusters?

subcluster_selector takes precedence: https://github.com/ray-project/ray/pull/63375/changes#diff-23e42254510d06fc2e4595cb52c69872e0b16f6c52932f06b502d63548e72067R361. I also implemented your ValueError suggestion, so now we should raise an error before we even get to this point.

@TimothySeah TimothySeah requested a review from justinvyu June 8, 2026 23:17

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Can you update the PR description?

Comment thread python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py Outdated
@TimothySeah TimothySeah changed the title [data] Support multiple datasets in a cluster (2/2): AutoscalingCoordinator _tick loop respects subcluster boundaries Jun 9, 2026
TimothySeah and others added 2 commits June 8, 2026 18:01
…utoscaler_v2.py

Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
@justinvyu justinvyu enabled auto-merge (squash) June 9, 2026 02:07
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label Jun 9, 2026
@justinvyu justinvyu merged commit 5d2c4e7 into ray-project:master Jun 9, 2026
8 checks passed
sampan-s-nayak pushed a commit to sampan-s-nayak/ray that referenced this pull request Jun 10, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
elliot-barn pushed a commit that referenced this pull request Jun 10, 2026
…ter (#63982)

#63375 doesn't work because
`__subcluster__` is not a valid label name. I am testing whether
`subcluster` works on this PR
(#63737) and cherrypicked that
change here.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
TimothySeah added a commit to TimothySeah/ray that referenced this pull request Jun 18, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
TimothySeah added a commit to TimothySeah/ray that referenced this pull request Jun 18, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
elliot-barn added a commit that referenced this pull request Jun 18, 2026
…ter (#64003)

#63375 doesn't work because
subcluster is not a valid label name. I am testing whether subcluster
works on this PR (#63737) and
cherrypicked that change here.

Merged to 2.56.0 release branch already
#63982

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Co-authored-by: Timothy Seah <tseah@anyscale.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…ter (ray-project#64003)

ray-project#63375 doesn't work because
subcluster is not a valid label name. I am testing whether subcluster
works on this PR (ray-project#63737) and
cherrypicked that change here.

Merged to 2.56.0 release branch already
ray-project#63982

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Co-authored-by: Timothy Seah <tseah@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants