Skip to content

[data] Support multiple datasets in a cluster (1/2): Pipe DataContext.ExecutionOptions.label_selector to task submissions#63331

Merged
justinvyu merged 4 commits into
ray-project:masterfrom
TimothySeah:tseah/2-datasets-prototype
May 29, 2026
Merged

[data] Support multiple datasets in a cluster (1/2): Pipe DataContext.ExecutionOptions.label_selector to task submissions#63331
justinvyu merged 4 commits into
ray-project:masterfrom
TimothySeah:tseah/2-datasets-prototype

Conversation

@TimothySeah

@TimothySeah TimothySeah commented May 14, 2026

Copy link
Copy Markdown
Contributor

Summary

The end goal is to support running 2 ray data datasets in 1 cluster with subcluster label scheduling. To that end, I will create the following PR stack:

  1. This PR: Allow users to set dataset-level label_selector through DataContext.execution_options.label_selector.
  2. Followup PR: AutoscalingCoordinator _tick loop should respect subcluster labels when scaling up the cluster and allocating resources to datasets.

In the same way that the executor pulls scheduling_strategy from the DataContext before submitting certain Ray Data tasks (

if "scheduling_strategy" not in ray_remote_args:
), after this PR, the executor will do the same for label_selector. Note that label_selector is in INHERITABLE_REMOTE_ARGS so operator fusion will respect it.

I piped the labels through the following types of callsites. Let me know if I'm missing any and/or if any are unnecessary/problematic.

  1. Map operators. These already merge other values (like scheduling_strategy) from the Dataset or Operator to ray_remote_args.
  2. Various operators (like LimitOperator and ZipOperator) that call .remote(). These places now pipe label_selector through.
  3. Exchange schedulers (sort, repartition, aggregation, random_shuffle). These places now pipe label_selector through.
  4. Planning-time tasks (file metadata fetch, parquet sampling)
  5. Construction-time tasks (from_pandas/numpy/arrow)
  6. Conversion-time tasks (to_pandas/numpy/arrow, block-num-rows)
  7. Niche features (RandomAccess, checkpoint)

Note that I may have missed piping label_selector through other paths, but this is no worse than today's behavior and we can close these gaps when we encounter them. I intentionally did not close the following gaps in this PR:

  1. _PushBasedShuffleStage uses NodeAffinitySchedulingStrategy instead of the dataset's label_selector. In practice this is fine since the blocks should already be in-subcluster from the preceding steps.
  2. Multiple datasinks fan out nested .remote() calls from within an already-placed wrte task. Fixing may require reading the DataContext from inside the task.

Testing

Unit tests. I will do more e2e testing in the followup PR. I unit tested some but not all callsites since many of them are deeply entangled and would require a lot of mocking; at the very least existing unit tests should verify that piping an empty label_selector through doesn't break anything.

I cherrypicked this PR and #63375 into a different PR and ran an async checkpointing and validation benchmark. I dove into the training and validation datasets and confirmed that all of their tasks were placed on the correct subcluster.

Alternative Considered

With this PR, the API for interleaved validation is simple - the user can simply set training and validation dataset execution options in the dataset_config:

dataset_config = ray.train.DataConfig(
    datasets_to_split=["train", "test"],
    execution_options={
        "train": ExecutionOptions(label_selector={"subcluster": "train"}),
        "test": ExecutionOptions(label_selector={"subcluster": "validation"}),
    },
)

However, the API for async validation is a bit more confusing because we do not split the validation dataset in the overall trainer and therefore need to set it within the validation function itself:

dataset_config = ray.train.DataConfig(
    datasets_to_split=["train"],
    execution_options={
        "train": ExecutionOptions(label_selector={"subcluster": "train"}),
    },
)

...
def validate_fn(checkpoint):
    validation_dataset = ...
    validation_dataset.context.execution_options.label_selector = {
        "subcluster": "validation"
    }
    ...

One alternative @justinvyu and I discussed is to make DataConfig the sole public API even in the async validation case. However, this would mean that 1) we need to pass the dataset around from the driver to the trainer to the validator as opposed to just creating it in the validator 2) we need to remember not to split the validation dataset up front 3) we need a way for the validation function to get the un-split Dataset (as opposed to the DataIterator)

dataset_config = ray.train.DataConfig(
    datasets_to_split=["train"],
    execution_options={
        "train": ExecutionOptions(label_selector={"subcluster": "train"}),
        "test": ExecutionOptions(label_selector={"subcluster": "validation"}),
    },
)

def validate_fn(checkpoint, validation_dataset):
    validation_datset.map_batches(...)
 

def train_fn():
    ....
    validation_dataset = ray.train.get_dataset_shard("train", split=False)
    ray.train.report(..., validate_fn=validate_fn, validation=ValidationTaskConfig(validation_dataset=validation_dataset)
    

Ultimately we decided that this PR is fine for now since modifying execution_options is already a mostly recommended public API right now anyway. Though this alternate approach is more "public facing," it moves more data around and might actually be even more confusing.

…ssions

Signed-off-by: Timothy Seah <tseah@anyscale.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a label_selector to ExecutionOptions, allowing users to constrain Ray Data tasks and actors to specific nodes within a cluster. The implementation propagates this selector across various components, including data sources, execution operators (Map, Shuffle, Limit, Zip), and planners (Aggregate, Sort, Repartition). A new utility, merge_label_selector, is added to handle merging context-level selectors with operator-level arguments while ensuring that existing node-pin selectors are preserved. Review feedback identified critical issues where method signatures in SplitRepartitionTaskScheduler were not updated to accept new arguments, which would lead to TypeError and NameError at runtime. Additionally, a suggestion was made to improve the robustness of the merging utility by handling null inputs for remote arguments.

Comment thread python/ray/data/_internal/planner/repartition.py
Comment thread python/ray/data/_internal/execution/util.py
Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah TimothySeah marked this pull request as ready for review May 14, 2026 21:02
@TimothySeah TimothySeah requested a review from a team as a code owner May 14, 2026 21:02
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label May 15, 2026

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes make sense thanks for being so thorough and catching all the cases where operators spawn tasks outside of the typical maps.

One concern I have is that any future modifications to operators / new operators will need to remember to add these label options. One idea I had was to automatically merge the label in cached_remove_fn. But then you'd need to fetch the global DataContext.get_current() which might not be consistent with the current dataset's context (ex: if you set it differently per dataset). So we can just go with the current way where we explicitly pass the arg into every remote call.

Also, could you make a note of the places where we pull the label selector from DataContext.get_current()? Can this cause issues?

DataContext.get_current().label_selector = "A"
ds = ray.data.read_parquet().map()

DataContext.get_current().label_selector = "B"
ds.materialize()  # <-- some places will use "B" and some will use "A"?
…t.get_current()

Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah

TimothySeah commented May 22, 2026

Copy link
Copy Markdown
Contributor Author

One concern I have is that any future modifications to operators / new operators will need to remember to add these label options.

I agree this isn't ideal, but forgetting to do this isn't that bad since we would just have a bit of resource leakage, which is no worse than what would happen today. Is there a good data test I can run to test this? I was thinking of running an "interesting" data workload and verifying that all the tasks that were created landed on the right subcluster.

Also, could you make a note of the places where we pull the label selector from DataContext.get_current()? Can this cause issues?

DataContext.get_current().label_selector = "A"
ds = ray.data.read_parquet().map()

DataContext.get_current().label_selector = "B"
ds.materialize()  # <-- some places will use "B" and some will use "A"?

Good callout. This PR adds 5 places that pull the label selector from DataContext.get_current(). 4 of them happen at dataset construction time (parquet_datasource.py, file_meta_provider.py, random_access_dataset.py, read_api.py) so all of them would use A in your example. The Ray Train DataConfig.execution_options path that we agreed to recommend to users unfortunately won't be respected here. It's not great but I think we can either:

  1. Tell users to set the DataContext.get_current().execution_options before constructing each dataset.
  2. Do nothing. Iiuc these tasks are relatively lightweight so they won't mess up backpressure.
  3. Pipe label_selector through these "construction" paths as well.

Open to suggestions here.

The 5th place is CheckpointManager.load_checkpoint/CheckpointManager._clean_pending_checkpoints. I changed these to use the DataContext passed from the Dataset that created them instead of DataContext.get_current(), which should close the mismatch here.

@TimothySeah TimothySeah requested a review from justinvyu May 27, 2026 00:35
@justinvyu

Copy link
Copy Markdown
Contributor

Is there a good data test I can run to test this?

heterogeneous_memory_batch_inference would be good to test (multiple node types).

Tell users to set the DataContext.get_current().execution_options before constructing each dataset.

Gotcha, let's just make sure to publicly document these caveats.


Kind of related question: how will DataConfig.execution_options configure async validation datasets?

Ex:

train_ds = ray.data.read_parquet(...)
valid_ds = ray.data.read_parquet(...)

# We don't actually pass `valid_ds` through `TorchTrainer`
TorchTrainer(train_loop_config={"valid_ds": valid_ds}, datasets={"train": train_ds}, dataset_config=DataConfig(execution_options={"train": ...}))

def valid_fn(valid_ds):
    # Does the user need to actually use the DataContext API?
    valid_ds.context.label_selector = ...
    valid_ds.materialize()

def train_fn_per_worker(config):
    ray.train.report(..., valid_fn, config["valid_ds"])
@TimothySeah

Copy link
Copy Markdown
Contributor Author

Kind of related question: how will DataConfig.execution_options configure async validation datasets?

Ex:

train_ds = ray.data.read_parquet(...)
valid_ds = ray.data.read_parquet(...)

# We don't actually pass `valid_ds` through `TorchTrainer`
TorchTrainer(train_loop_config={"valid_ds": valid_ds}, datasets={"train": train_ds}, dataset_config=DataConfig(execution_options={"train": ...}))

def valid_fn(valid_ds):
    # Does the user need to actually use the DataContext API?
    valid_ds.context.label_selector = ...
    valid_ds.materialize()

def train_fn_per_worker(config):
    ray.train.report(..., valid_fn, config["valid_ds"])

Yeah the user would need to set valid_ds.context.execution_options in the driver or in the validation function.

@justinvyu justinvyu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you call out the alternative API proposal we discussed? Ok to go with modifying the context since we already recommend that as a sort of public API. Let's label this somehow as an Alpha configuration.

@TimothySeah

TimothySeah commented May 29, 2026

Copy link
Copy Markdown
Contributor Author
  1. Try to centralize task options rather than piping it everywhere. This can be followup for other global settings that we use e.g. scheduling_strategy
  2. Raise error if the label selector conflicts with execution options e.g. if map_batches sets something different from what is on the base dataset. This might also be easier if we do 1) first.

I already filed a bug for the above.

@TimothySeah TimothySeah changed the title [data] Pipe DataContext.ExecutionOptions.label_selector to task submissions May 29, 2026
@TimothySeah

Copy link
Copy Markdown
Contributor Author

Can you call out the alternative API proposal we discussed? Ok to go with modifying the context since we already recommend that as a sort of public API. Let's label this somehow as an Alpha configuration.

Added to PR description.

@justinvyu justinvyu enabled auto-merge (squash) May 29, 2026 22:13
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label May 29, 2026
@github-actions github-actions Bot disabled auto-merge May 29, 2026 23:00
@justinvyu justinvyu enabled auto-merge (squash) May 29, 2026 23:10
@justinvyu justinvyu merged commit 750ef4e into ray-project:master May 29, 2026
7 checks passed
rueian pushed a commit to rueian/ray that referenced this pull request Jun 4, 2026
….ExecutionOptions.label_selector to task submissions (ray-project#63331)

The end goal is to support running 2 ray data datasets in 1 cluster with
subcluster label scheduling. To that end, I will create the following PR
stack:
1) This PR: Allow users to set dataset-level `label_selector` through
`DataContext.execution_options.label_selector`.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
justinvyu added a commit that referenced this pull request Jun 9, 2026
…r resources by subcluster label (#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
sampan-s-nayak pushed a commit to sampan-s-nayak/ray that referenced this pull request Jun 10, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
TimothySeah added a commit to TimothySeah/ray that referenced this pull request Jun 18, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
TimothySeah added a commit to TimothySeah/ray that referenced this pull request Jun 18, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
….ExecutionOptions.label_selector to task submissions (ray-project#63331)

The end goal is to support running 2 ray data datasets in 1 cluster with
subcluster label scheduling. To that end, I will create the following PR
stack:
1) This PR: Allow users to set dataset-level `label_selector` through
`DataContext.execution_options.label_selector`.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants