[data] Support multiple datasets in a cluster (1/2): Pipe DataContext.ExecutionOptions.label_selector to task submissions#63331
Conversation
…ssions Signed-off-by: Timothy Seah <tseah@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a label_selector to ExecutionOptions, allowing users to constrain Ray Data tasks and actors to specific nodes within a cluster. The implementation propagates this selector across various components, including data sources, execution operators (Map, Shuffle, Limit, Zip), and planners (Aggregate, Sort, Repartition). A new utility, merge_label_selector, is added to handle merging context-level selectors with operator-level arguments while ensuring that existing node-pin selectors are preserved. Review feedback identified critical issues where method signatures in SplitRepartitionTaskScheduler were not updated to accept new arguments, which would lead to TypeError and NameError at runtime. Additionally, a suggestion was made to improve the robustness of the merging utility by handling null inputs for remote arguments.
Signed-off-by: Timothy Seah <tseah@anyscale.com>
justinvyu
left a comment
There was a problem hiding this comment.
The changes make sense thanks for being so thorough and catching all the cases where operators spawn tasks outside of the typical maps.
One concern I have is that any future modifications to operators / new operators will need to remember to add these label options. One idea I had was to automatically merge the label in cached_remove_fn. But then you'd need to fetch the global DataContext.get_current() which might not be consistent with the current dataset's context (ex: if you set it differently per dataset). So we can just go with the current way where we explicitly pass the arg into every remote call.
Also, could you make a note of the places where we pull the label selector from DataContext.get_current()? Can this cause issues?
DataContext.get_current().label_selector = "A"
ds = ray.data.read_parquet().map()
DataContext.get_current().label_selector = "B"
ds.materialize() # <-- some places will use "B" and some will use "A"?
…t.get_current() Signed-off-by: Timothy Seah <tseah@anyscale.com>
I agree this isn't ideal, but forgetting to do this isn't that bad since we would just have a bit of resource leakage, which is no worse than what would happen today. Is there a good data test I can run to test this? I was thinking of running an "interesting" data workload and verifying that all the tasks that were created landed on the right subcluster.
Good callout. This PR adds 5 places that pull the label selector from
Open to suggestions here. The 5th place is |
Gotcha, let's just make sure to publicly document these caveats. Kind of related question: how will Ex: train_ds = ray.data.read_parquet(...)
valid_ds = ray.data.read_parquet(...)
# We don't actually pass `valid_ds` through `TorchTrainer`
TorchTrainer(train_loop_config={"valid_ds": valid_ds}, datasets={"train": train_ds}, dataset_config=DataConfig(execution_options={"train": ...}))
def valid_fn(valid_ds):
# Does the user need to actually use the DataContext API?
valid_ds.context.label_selector = ...
valid_ds.materialize()
def train_fn_per_worker(config):
ray.train.report(..., valid_fn, config["valid_ds"]) |
Yeah the user would need to set |
I already filed a bug for the above. |
Added to PR description. |
….ExecutionOptions.label_selector to task submissions (ray-project#63331) The end goal is to support running 2 ray data datasets in 1 cluster with subcluster label scheduling. To that end, I will create the following PR stack: 1) This PR: Allow users to set dataset-level `label_selector` through `DataContext.execution_options.label_selector`. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com>
…r resources by subcluster label (#63375) The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in this stack (#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
…r resources by subcluster label (ray-project#63375) The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in this stack (ray-project#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
…r resources by subcluster label (ray-project#63375) The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in this stack (ray-project#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
…r resources by subcluster label (ray-project#63375) The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in this stack (ray-project#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
….ExecutionOptions.label_selector to task submissions (ray-project#63331) The end goal is to support running 2 ray data datasets in 1 cluster with subcluster label scheduling. To that end, I will create the following PR stack: 1) This PR: Allow users to set dataset-level `label_selector` through `DataContext.execution_options.label_selector`. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com>
…r resources by subcluster label (ray-project#63375) The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in this stack (ray-project#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
Summary
The end goal is to support running 2 ray data datasets in 1 cluster with subcluster label scheduling. To that end, I will create the following PR stack:
label_selectorthroughDataContext.execution_options.label_selector.AutoscalingCoordinator_tickloop should respect subcluster labels when scaling up the cluster and allocating resources to datasets.In the same way that the executor pulls
scheduling_strategyfrom theDataContextbefore submitting certain Ray Data tasks (ray/python/ray/data/_internal/execution/operators/map_operator.py
Line 470 in 1fd5697
label_selector. Note thatlabel_selectoris inINHERITABLE_REMOTE_ARGSso operator fusion will respect it.I piped the labels through the following types of callsites. Let me know if I'm missing any and/or if any are unnecessary/problematic.
scheduling_strategy) from theDatasetorOperatortoray_remote_args.LimitOperatorandZipOperator) that call.remote(). These places now pipelabel_selectorthrough.label_selectorthrough.Note that I may have missed piping
label_selectorthrough other paths, but this is no worse than today's behavior and we can close these gaps when we encounter them. I intentionally did not close the following gaps in this PR:_PushBasedShuffleStageusesNodeAffinitySchedulingStrategyinstead of the dataset'slabel_selector. In practice this is fine since the blocks should already be in-subcluster from the preceding steps..remote()calls from within an already-placed wrte task. Fixing may require reading theDataContextfrom inside the task.Testing
Unit tests. I will do more e2e testing in the followup PR. I unit tested some but not all callsites since many of them are deeply entangled and would require a lot of mocking; at the very least existing unit tests should verify that piping an empty
label_selectorthrough doesn't break anything.I cherrypicked this PR and #63375 into a different PR and ran an async checkpointing and validation benchmark. I dove into the training and validation datasets and confirmed that all of their tasks were placed on the correct subcluster.
Alternative Considered
With this PR, the API for interleaved validation is simple - the user can simply set training and validation dataset execution options in the dataset_config:
However, the API for async validation is a bit more confusing because we do not split the validation dataset in the overall trainer and therefore need to set it within the validation function itself:
One alternative @justinvyu and I discussed is to make
DataConfigthe sole public API even in the async validation case. However, this would mean that 1) we need to pass the dataset around from the driver to the trainer to the validator as opposed to just creating it in the validator 2) we need to remember not to split the validation dataset up front 3) we need a way for the validation function to get the un-split Dataset (as opposed to the DataIterator)Ultimately we decided that this PR is fine for now since modifying
execution_optionsis already a mostly recommended public API right now anyway. Though this alternate approach is more "public facing," it moves more data around and might actually be even more confusing.