[data] Add Dataset.mix() public API and user guide for weighted dataset mixing #63168
Conversation
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces the Dataset.mix public API to Ray Data, enabling weighted interleaving of multiple datasets. The changes include the implementation of the mix method, the addition of a comprehensive user guide, and the promotion of MixStoppingCondition to the public API. Feedback focuses on aligning documentation terminology with enum names, implementing validation for input weights to prevent logical errors, and ensuring consistent context usage in the execution plan.
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
| stats = DatasetStats( | ||
| metadata={"Mix": []}, | ||
| parent=[d._raw_stats() for d in datasets], | ||
| ) | ||
| stats.time_total_s = time.perf_counter() - start_time |
There was a problem hiding this comment.
what's up with all of this stuff?
There was a problem hiding this comment.
This is also done in Union. let's just keep it consistent and consider removing later since it's just measuring a small amount of logical operator creation time.
There was a problem hiding this comment.
ack, i was looking at join and didn't see the equivalent
| ) | ||
|
|
||
| @classmethod | ||
| @PublicAPI(stability="alpha", api_group=SMJ_API_GROUP) |
There was a problem hiding this comment.
feel like this should not be in SMJ group but rather like training ingest group or something
There was a problem hiding this comment.
I think it's ok for now since it falls in "merging datasets." Let's land this and reorganize APIs in a followup.
…ncy with union/zip Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit aac2eab. Configure here.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…aset mixing (ray-project#63168) * Adds Dataset.mix() classmethod (alpha) for streaming weighted interleaving of multiple datasets, building on the internal MixOperator from ray-project#62450. * Exports MixStoppingCondition from `ray.data`. * Adds a user guide under Ray Data docs covering per-block mixing, random mixing, stopping conditions, and limitations. Moved the other "scaling collation" user guide to Ray Data docs. * Updates test_mix.py to use the public API instead of the internal helper. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Adds a release test benchmark for `Dataset.mix()` (introduced in #63168) that measures mixing throughput and ratio accuracy. Benchmark design: - Creates 8 datasets reading ImageNet parquet, each stamped with a ds_index column - Mixes with Dataset.mix(), repartitions to 4 * batch_size rows per block - Consumes via TorchTrainer to mimic the seen weighting ratio when ingesting multiple local batches which are split across workers. - Tracks per-batch mixing ratios per worker, aggregates mean/std across workers via all_reduce to get the mean and standard deviation across **global batches.** - Asserts ratio mean is within 0.05 of target and std < 0.1 - Tests with and without a shuffling step after mixing with `--num-workers=1` to showcase the effectiveness of shuffling removing the dependency of mixing quality on the number of workers. [See here for more details.](https://docs.ray.io/en/master/data/mixing-data.html#random-mixing) --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…3286) Adds a release test benchmark for `Dataset.mix()` (introduced in ray-project#63168) that measures mixing throughput and ratio accuracy. Benchmark design: - Creates 8 datasets reading ImageNet parquet, each stamped with a ds_index column - Mixes with Dataset.mix(), repartitions to 4 * batch_size rows per block - Consumes via TorchTrainer to mimic the seen weighting ratio when ingesting multiple local batches which are split across workers. - Tracks per-batch mixing ratios per worker, aggregates mean/std across workers via all_reduce to get the mean and standard deviation across **global batches.** - Asserts ratio mean is within 0.05 of target and std < 0.1 - Tests with and without a shuffling step after mixing with `--num-workers=1` to showcase the effectiveness of shuffling removing the dependency of mixing quality on the number of workers. [See here for more details.](https://docs.ray.io/en/master/data/mixing-data.html#random-mixing) --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>

Description
MixOperatorfor weighted dataset mixing #62450.ray.data.Testing
See additional testing and result here: https://gist.github.com/justinvyu/0b73d66397a3fa0d9286f88b5e3ec3c3
To be added as a release test.