[Data] Track p50/p90 of streaming scheduling-loop step duration#63586
Conversation
Add opt-in fixed-bound histogram to `Timer` (O(1) memory, one bisect per `add()`) and use it to report `streaming_exec_schedule_p90_s` alongside the existing avg/max. Opt in only for the scheduling-loop timer in `DatasetStats` — every other `Timer` instance keeps the same per-add cost as before. Surfaced in `collect_dataset_stats` so the `worker_scaling_*` release tests pick it up automatically; useful because max alone hides whether a regression is a true tail blow-up or just bursty noise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces percentile tracking for the streaming executor's scheduling loop duration by adding an optional fixed-bound histogram to the Timer class. This allows for O(1) memory overhead when computing approximate percentiles like p90, which is now exposed in the dataset stats summary and benchmark results. Feedback was provided regarding the percentile method, suggesting that the input parameter p should be validated or clamped to the [0, 1] range to prevent unexpected behavior.
There was a problem hiding this comment.
Code Review
This pull request introduces percentile tracking for the scheduling loop duration in Ray Data by enhancing the Timer class with an optional fixed-bound histogram. This allows for efficient, O(1) memory percentile approximations (specifically p90) without storing raw samples. The changes include the histogram implementation using bisect for binning, integration into the streaming executor's stats, and updates to the summary and benchmark outputs. Comprehensive unit tests for the histogram logic and end-to-end integration have also been added. I have no feedback to provide.
- Rename ``Timer.percentile`` → ``Timer.approx_percentile`` and expand the docstring to make the approximation explicit (fixed log-spaced histogram, O(1) memory, ~2x bin resolution, overflow tail collapses to max()). - Rename ``streaming_exec_schedule_p90_s`` → ``..._approx_p90_s`` on ``DatasetStatsSummary`` and add matching ``approx_p50_s`` / ``approx_p75_s`` fields. Report all three from ``collect_dataset_stats`` so release-test dashboards can track the lower percentiles too. - Extend ``TestTimerHistogram`` with a three-tier distribution that exercises p50/p75/p90 in distinct bins; cover the parametrized uniform case at p75 as well. Smoke test now also asserts monotonic ordering (p50 ≤ p75 ≤ p90 ≤ max). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
- Drop p75; report p70 instead in the scheduling-loop release-test output and on ``DatasetStatsSummary`` (``approx_p70_s``). - Validate ``p`` in ``Timer.approx_percentile``: raise ``ValueError`` for values outside ``[0.0, 1.0]``. Catches the common ``approx_percentile(90)`` typo, which would otherwise silently return ``max()`` and look like a tail spike. - Restructure the docstring with proper Args / Returns / Raises sections so pydoclint is happy. - Update unit tests accordingly; new parametrized tests cover both rejected (``-0.1``, ``1.1``, ``90``) and accepted boundary (``0.0``, ``1.0``) inputs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
Add 10s, 20s, 50s, 100s bins so we can distinguish "loop is degraded" (seconds) from "cluster is wedged" (tens of seconds) instead of collapsing everything past 5s into the overflow bucket. Memory cost goes from 13 to 17 ints per Dataset — still trivial. Update the overflow test to use a value clearly above the new largest bound (500s) so it actually exercises the overflow code path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
The dataclass field was renamed to ``streaming_exec_schedule_approx_p70_s`` in the previous review pass, but the integration smoke test still read ``..._approx_p75_s`` — would have raised ``AttributeError`` at runtime. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
A real run reported avg=0.446s, max=0.778s but all of approx_p50, approx_p70, approx_p90 collapsed to 0.778 — the value of max(). Two compounding issues: 1. ``approx_percentile`` clamped its result to ``max()``. Whenever the percentile lands in the same bin as the max sample, every percentile in that bin collapses to ``max`` — erasing the histogram signal we wanted. Now return the bin's upper bound directly; document that it can legitimately exceed ``max`` by up to one bin width since the histogram is coarse. 2. The bin spacing was 2x throughout, leaving the realistic 100ms-2s median-loop-step range covered by only a handful of bins (200ms, 500ms, 1s, 2s). Tighten to ~1.5x in 50ms-2s while keeping ~2x outside it. 23 finite bins now; memory still trivial (~200 bytes per Dataset). Add a regression test (``test_percentile_not_clamped_to_max``) covering the original failure case: 100 samples all at 0.77 → percentile must report the bin upper bound 1.0, not the clamped 0.77. Smoke test no longer asserts ``p90 <= max`` since that invariant no longer holds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
A worker_scaling-scale run reported avg=11.6s, max=37.7s, but all percentiles (p50/p70/p90) collapsed to 20.0 — the upper bound of the ``(10, 20]`` bin holding the majority of samples. Reporting the bin upper bound was always going to do this whenever a percentile lands in the same bin as the bulk of the distribution. Switch to per-bin mean: maintain a parallel ``_hist_sum`` array, and when ``approx_percentile`` lands in bin ``i`` return ``sum[i] / count[i]``. This gives: - Values close to the actual percentile estimate (bin mean reflects where samples sit inside the bin, not where the bin happens to end). - Natural ``≤ max()`` invariant (bin mean is bounded by max). - Sub-bin resolution: if 80% of samples sit at 1.05s and 20% at 1.3s in the (1.0, 1.5] bin, we report 1.1 — not 1.5 or 1.3. Also tighten the bin layout in the 1 s-50 s range (added 4, 7, 12, 15, 25, 30, 40, 70) so worker_scaling-scale workloads (thousands of workers, scheduling-loop step ~10-30 s) get good resolution. Memory cost: ~30 ints + ~30 floats per Dataset — still trivial. Restored ``p90 <= max`` assertion in the smoke test (the invariant bin-mean reporting now guarantees). New regression test ``test_worker_scaling_regression_case`` simulates the failing scenario. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
``target = p * self._total_count`` is float math; for some (p, N) combinations the product lands slightly above the ideal integer target. Example: ``0.55 * 100`` evaluates to ``55.00000000000001`` in IEEE 754. With cum reaching exactly 55, the naive ``cum >= target`` check failed and the percentile silently advanced into the next bin — inflating the reported value by one full bin width (~1.5x). Add a 1e-9 tolerance to the comparison. ``cum`` is always an integer count, so the tolerance is comfortably below any meaningful gap and can only absorb sub-ULP float error. Add a regression test (``test_robust_to_float_imprecision_in_target``) that pins this: 55 small samples + 45 large samples at p=0.55 must report the small bin's mean, not the large bin's. Verified the test fails on the un-guarded code path before re-applying the fix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
…etention
The bin-mean histogram was still too coarse — a worker_scaling case
with samples ~8-37s produced ambiguous percentiles even after multiple
rounds of bin tuning. Switch to raw sample retention + exact
linear-interpolation percentiles. This trades unbounded memory for
exactness — fine for short release-test runs but not safe in
production.
Design:
- ``Timer(track_distribution=True)`` retains every sample in a list;
``Timer.percentile(p)`` sorts once per call and interpolates.
Class-level docstring warns the flag is only for tests/benchmarks.
- ``DatasetStats.__init__`` gates sample retention on
``RAY_DATA_TRACK_SCHEDULING_LOOP_SAMPLES=1``. Default off — every
Ray Data job in production keeps the old no-tracking behavior.
- ``release/release_data_tests.yaml`` sets the env var in the
``worker_scaling_*`` runtime_env so the release tests get exact
percentiles.
- Field names lose the ``approx_`` prefix (now exact when populated):
``streaming_exec_schedule_{p50,p70,p90}_s`` and the benchmark output
keys ``p{50,70,90}_scheduling_loop_duration_s``.
Tests reworked into ``TestTimerPercentile``: exact-percentile behavior
(uniform / linear / bimodal / interpolation between ranks) plus two
integration smoke tests covering the env-var-on and env-var-off paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
- Remove p70 from ``DatasetStatsSummary`` and the benchmark output. Method ``Timer.percentile`` still accepts arbitrary ``p``, but the release tests only need the median and the tail. - When ``RAY_DATA_TRACK_SCHEDULING_LOOP_SAMPLES`` isn't set, ``collect_dataset_stats`` now omits the percentile keys instead of emitting ``"p50_scheduling_loop_duration_s": 0`` — an explicit 0 is indistinguishable from a real measurement and would mislead dashboards. Skipping the keys makes "tracking was off" unambiguous. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
The driver speedscope from worker_scaling shows ~80 s in Ray Core's .remote() with most of the time inside `auto_init_wrapper` / `_invocation_remote_span` / `invocation` — Python wrappers around Cython entry points. Without `--native`, the speedscope can't see past the Cython boundary, so cloudpickle / msgpack / raylet RPC leaves are merged into a single opaque frame. Add `--native` to both py-spy invocations (driver + per-worker attach). Works with `--nonblocking` and `--subprocesses`; py-spy just additionally walks DWARF/eh_frame on each sample. Signed-off-by: xgui <xgui@anyscale.com>
Dataset.materialize runs on a deep copy of the dataset, so stats land on the MaterializedDataset it returns. The original ds keeps an empty cache — reading stats from it yields a fresh, never-added-to Timer, which is why test_streaming_exec_schedule_percentiles_off_by_default saw streaming_exec_schedule_max_s == 0. Both percentile tests now capture the materialize result and read stats from there. The populated test also asserts p90 > 0 so it can't silently pass against an empty Timer (the previous "0 <= 0 <= 0 <= 0" assertion was vacuously true). Signed-off-by: xgui <xgui@anyscale.com>
Adds a third matrix axis ``num_operators: [1, 15]`` to the worker_scaling test. The single-operator runs were the existing shape; the 15-operator runs chain that many map_batches together, splitting the worker pool evenly across them (num_workers // 15 each). Why: the previous matrix stressed the dispatch loop with many workers per op, but a single op masks the per-iteration update_usages / _update_allocated_budgets cost that scales with N_ops. Production pipelines routinely have a dozen+ operators in one Dataset; the 15-op variant gives a representative measurement of that shape without changing cluster footprint. Benchmark changes: - New ``--num-operators`` arg (default 1) on worker_scaling_benchmark.py. - Chains that many map_batches operators. Worker pool split: ActorPoolStrategy(size = num_workers // num_operators) for actors, ``concurrency = num_workers // num_operators`` for tasks (caps in-flight task count per operator so the chain doesn't starve). - Emits ``num_operators`` and ``workers_per_operator`` into the output JSON for downstream attribution. Release matrix expands from 8 -> 16 variants. Each cluster_compute is unchanged (the cluster sizes by num_workers, not by num_operators). Signed-off-by: xgui <xgui@anyscale.com>
…e in test name Signed-off-by: xgui <xgui@anyscale.com>
… variants With 15 chained map_batches operators each gets ~num_workers/15 actors (or concurrency=num_workers/15 for tasks). Even with streaming pipelining, end-to-end throughput is bounded by the slowest stage's parallelism, so wall-clock at 5000 workers / 15 ops scales noticeably past the single-op shape's runtime. The 1-op variants finish well under 2700s already, so bumping the shared cap to 90 min only affects the 15-op cells in the matrix. Signed-off-by: xgui <xgui@anyscale.com>
1. UDF assumed input batch had column ``id``. That column comes from
``ray.data.range``, which only feeds the first operator. The output
schema of ``RealisticSchemaUDF`` is ``{scalar_col_*, array_col_*}``
— when the second operator in the 15-op chain runs the same UDF on
the previous op's output, ``batch["id"]`` is a ``KeyError``.
Switch to ``len(next(iter(batch.values())))`` so the UDF works on
any input batch.
2. Worker nodes failed to import the ``profiling`` package when Ray
tried to deserialize ``_UDFPySpyProfiler`` on them. Cause:
``ray.init(runtime_env={"py_modules": benchmark_py_modules()})``
only ships ``benchmark.py`` — not the ``profiling/`` sibling.
Resolve the package's directory and append it to ``py_modules`` so
workers see ``profiling`` on their path.
Signed-off-by: xgui <xgui@anyscale.com>
In the worker_scaling_5000_tasks_15_ops run, both driver and worker py-spy processes exited with code 1 — no .speedscope.json files were produced. Only .log files made it to S3. ``--native`` (paired with ``--nonblocking`` and the Anyscale base image's Python install) turns out to be incompatible; py-spy fails to start the unwind setup before sampling. Revert to the pre-PR shape on both invocations. We lose visibility into Cython / cloudpickle / msgpack leaves, but get a working speedscope back. Revisit later if we want to mix --native with --blocking or upgrade py-spy in the image. Signed-off-by: xgui <xgui@anyscale.com>
…tely) Reverts the chained-ops matrix axis, --num-operators arg, worker-pool split, UDF input-schema-agnostic change, timeout bump, and new test naming. Keeps the ``profiling`` py_modules fix from the same series because it's a real latent bug (worker-side py-spy failing silently with ModuleNotFoundError) that lands cleanly without the 15-op work. Scope of this PR is now just: percentile tracking + materialize test fix + py-spy profiling package shipping. Signed-off-by: xgui <xgui@anyscale.com>
| # env var is unset, percentile fields on the summary read 0. | ||
| self.streaming_exec_schedule_s: Timer = Timer( | ||
| track_distribution=( | ||
| os.environ.get("RAY_DATA_TRACK_SCHEDULING_LOOP_SAMPLES", "0") == "1" |
There was a problem hiding this comment.
I think if u go the route i mentioned above, u can remove this env flag since the DistributionTracker will bound the memory usage, and this should make the tests simpler
Replace Timer's opt-in sample-retention list with the existing DistributionTracker (KLL sketch, k=200). Memory is now bounded (~few KB) regardless of how many samples are added, so p50/p90 tracking is always on — drop the RAY_DATA_TRACK_SCHEDULING_LOOP_SAMPLES env-var gate from stats.py, benchmark.py, and release_data_tests.yaml. Tradeoff: KLL is approximate (~1.65% rank error at k=200) instead of exact linear interpolation, but well within the noise of the metrics we report. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
| self._total_count: float = 0 | ||
| # Bounded-memory percentile backend. add() forwards every value | ||
| # to ``add_sample`` and ``percentile`` reads from it. | ||
| self._distribution: DistributionTracker = DistributionTracker() |
There was a problem hiding this comment.
Every Timer allocates unused KLL sketch and redundant trackers
Low Severity
Every Timer() now unconditionally creates a DistributionTracker (including a kll_doubles_sketch(200)), but only one out of 14+ Timer instances per DatasetStats — streaming_exec_schedule_s — ever calls percentile(). The iteration timers (iter_wait_s, iter_next_batch_s, etc.) fire on the per-batch hot path, and each add() now redundantly runs Welford's algorithm plus a KLL sketch update via add_sample(), in addition to Timer's own count/min/max tracking. Making the DistributionTracker opt-in (e.g. a constructor flag) would avoid the wasted allocations and per-call overhead on the 13+ timers that never need percentiles.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 7d5e6ec. Configure here.
kll_doubles_sketch is C++-backed and not picklable natively. Since DistributionTracker now rides on every Timer (which lives on DatasetStats), cloudpickling a Dataset hit ``TypeError: cannot pickle 'kll_doubles_sketch' object`` — breaking test_can_pickle, test_iterator_to_materialized_dataset, test_equal_split, test_context_saved_when_dataset_created, and test_cache_dataset. Round-trip the sketch through its own serialize() / deserialize() bytes in __getstate__ / __setstate__. Add regression tests on both DistributionTracker and Timer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
| f"p must be in [0.0, 1.0], got {p!r}. " | ||
| "Pass a fraction like 0.9, not a percent like 90." | ||
| ) | ||
| q = self._distribution._quantile(p) |
There was a problem hiding this comment.
Timer accesses private _quantile method across module boundary
Low Severity
Timer.percentile() calls self._distribution._quantile(p), reaching into a private method of DistributionTracker from a different module. The single-underscore convention signals this method is an internal implementation detail subject to change. Since Timer now relies on it for production percentile queries, _quantile warrants promotion to a public quantile(q) method on DistributionTracker.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 775123e. Configure here.
DistributionTracker's KLL sketch needs datasketches; without it, percentile() returns None and the release-test JSON reports ``p50_scheduling_loop_duration_s=0`` / ``p90=0`` even though ``avg`` and ``max`` are populated. Also un-breaks the silent-None percentiles already present on every operator's ``op_task_duration_stats`` / ``max_uss_bytes`` in the BYOD image. Add to the cpu and gpu py3.10 ``.in`` files and regenerate ``ray-base_extra_testdeps_py3.10.lock`` and ``ray-gpu-base_extra_testdeps_py3.10.lock`` via ``bazelisk run //ci/raydepsets:raydepsets -- build ci/raydepsets/configs/rayimg.depsets.yaml --name ray_base_extra_testdeps[_gpu]_310``. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
The previous attempt (7b9199d) put datasketches in release/ray_release/byod/requirements_byod_3.10.in, which feeds the base_extra_testdeps lockfile — but worker_scaling's actual runtime image is ray-py3.10-cpu-base-extra, built from docker/base-extra/requirements.in via the base_extra (not testdeps) lockfile chain. The custom BYOD layer applied on top only adds env vars (see build_context.fill_build_context_dir), so the testdeps lock never gets pip-installed — datasketches stayed missing in the cluster. Move the entry to docker/base-extra/requirements.in so it lands in ray_base_extra_py3.10.lock (and 3.11/3.12/3.13/3.14, and the gpu/llm/ml testdeps variants for symmetry). Regenerated via ``bazelisk run //ci/raydepsets:raydepsets -- build ci/raydepsets/configs/rayimg.depsets.yaml``. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Reviewed by Cursor Bugbot for commit 3cc3e92. Configure here.
| schedule_max = summary.streaming_exec_schedule_max_s | ||
| # Percentiles are populated, monotonic, and bounded by max. | ||
| assert p90 > 0 | ||
| assert 0 <= p50 <= p90 <= schedule_max |
There was a problem hiding this comment.
Tests assert percentile > 0 without datasketches skip guard
Medium Severity
test_streaming_exec_schedule_percentiles_populated asserts p90 > 0, but Timer.percentile() returns 0 when datasketches is not installed (it's an optional dependency, guarded by a try/except ImportError). Unlike other sketch-dependent tests in test_distribution_tracker.py which use @pytest.mark.skipif(datasketches is None, ...), this test and the TestTimerPercentile class (e.g. test_single_sample asserts percentile(p) == approx(0.042)) have no such guard. These tests will fail in any environment where datasketches is absent.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 3cc3e92. Configure here.
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…3825) ## Why #63586 added a `DistributionTracker` to `Timer` for percentile tracking. `DistributionTracker` is not JSON-serializable, which broke the `training-ingest-benchmark` release test (`image_classification.full_training`, s3_url + parquet): ``` TypeError: Object of type DistributionTracker is not JSON serializable ``` The benchmark checkpoints each `Timer` via `v.__dict__.copy()` + `json.dump`, and `__dict__` now carries the tracker. The tracker's `__getstate__`/`__setstate__` only cover **pickle**, not `json`, so they don't help here. ## Fix - Add `Timer.as_dict()` / `Timer.from_dict()` that round-trip only the scalar fields (`_total`, `_min`, `_max`, `_total_count`). `_distribution` is excluded — the KLL sketch isn't reconstructable from summary stats and isn't meant to persist across checkpoints, so it restarts empty on restore (same graceful degradation as when `datasketches` isn't installed). - Switch the benchmark runner to use these instead of poking `__dict__`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: xgui <xgui@anyscale.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…project#63586) ## Summary - Add `Timer.percentile(p)` backed by a `DistributionTracker` (KLL sketch, k=200). Memory is bounded (~few KB) regardless of sample count, so tracking is always on — no env-var gate. - `DatasetStatsSummary` exposes `streaming_exec_schedule_p50_s` / `_p90_s`. `collect_dataset_stats` emits `p50_scheduling_loop_duration_s` / `p90_scheduling_loop_duration_s` unconditionally in release-test output. - Worker_scaling release benchmark: ship the `profiling/` package via `runtime_env.py_modules` so `_UDFPySpyProfiler` actors can deserialize on workers (they were silently failing `ModuleNotFoundError`, swallowed by a broad `except`). - Drop `--native` from py-spy (broke on the Anyscale image). - Two unit tests (`test_streaming_exec_schedule_percentiles_*`) now read stats from the `materialize()` return value — `Dataset.materialize()` runs on a deep copy so stats land on the `MaterializedDataset`, and the original `ds` would yield a fresh, never-added-to Timer. ## Why `max_scheduling_loop_duration_s` alone hides whether a regression is a true tail blow-up or just bursty noise. KLL gives bounded-memory approximate quantiles (~1.65% rank error at k=200), so we get a useful p50/p90 signal without the production cost concern that gated earlier revisions of this PR. Confirmed on a worker_scaling release run (5000 tasks): avg 730 ms · p50 442 ms · p90 588 ms · max 12.3 s. The max is ~20× the p90 — exactly the tail outlier `max` alone over-reports as the "typical" loop step. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: xgui <xgui@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: elliot-barn <elliot.barnwell@anyscale.com>
…y-project#63825) ## Why ray-project#63586 added a `DistributionTracker` to `Timer` for percentile tracking. `DistributionTracker` is not JSON-serializable, which broke the `training-ingest-benchmark` release test (`image_classification.full_training`, s3_url + parquet): ``` TypeError: Object of type DistributionTracker is not JSON serializable ``` The benchmark checkpoints each `Timer` via `v.__dict__.copy()` + `json.dump`, and `__dict__` now carries the tracker. The tracker's `__getstate__`/`__setstate__` only cover **pickle**, not `json`, so they don't help here. ## Fix - Add `Timer.as_dict()` / `Timer.from_dict()` that round-trip only the scalar fields (`_total`, `_min`, `_max`, `_total_count`). `_distribution` is excluded — the KLL sketch isn't reconstructable from summary stats and isn't meant to persist across checkpoints, so it restarts empty on restore (same graceful degradation as when `datasketches` isn't installed). - Switch the benchmark runner to use these instead of poking `__dict__`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: xgui <xgui@anyscale.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…project#63586) ## Summary - Add `Timer.percentile(p)` backed by a `DistributionTracker` (KLL sketch, k=200). Memory is bounded (~few KB) regardless of sample count, so tracking is always on — no env-var gate. - `DatasetStatsSummary` exposes `streaming_exec_schedule_p50_s` / `_p90_s`. `collect_dataset_stats` emits `p50_scheduling_loop_duration_s` / `p90_scheduling_loop_duration_s` unconditionally in release-test output. - Worker_scaling release benchmark: ship the `profiling/` package via `runtime_env.py_modules` so `_UDFPySpyProfiler` actors can deserialize on workers (they were silently failing `ModuleNotFoundError`, swallowed by a broad `except`). - Drop `--native` from py-spy (broke on the Anyscale image). - Two unit tests (`test_streaming_exec_schedule_percentiles_*`) now read stats from the `materialize()` return value — `Dataset.materialize()` runs on a deep copy so stats land on the `MaterializedDataset`, and the original `ds` would yield a fresh, never-added-to Timer. ## Why `max_scheduling_loop_duration_s` alone hides whether a regression is a true tail blow-up or just bursty noise. KLL gives bounded-memory approximate quantiles (~1.65% rank error at k=200), so we get a useful p50/p90 signal without the production cost concern that gated earlier revisions of this PR. Confirmed on a worker_scaling release run (5000 tasks): avg 730 ms · p50 442 ms · p90 588 ms · max 12.3 s. The max is ~20× the p90 — exactly the tail outlier `max` alone over-reports as the "typical" loop step. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: xgui <xgui@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: elliot-barn <elliot.barnwell@anyscale.com>
…y-project#63825) ## Why ray-project#63586 added a `DistributionTracker` to `Timer` for percentile tracking. `DistributionTracker` is not JSON-serializable, which broke the `training-ingest-benchmark` release test (`image_classification.full_training`, s3_url + parquet): ``` TypeError: Object of type DistributionTracker is not JSON serializable ``` The benchmark checkpoints each `Timer` via `v.__dict__.copy()` + `json.dump`, and `__dict__` now carries the tracker. The tracker's `__getstate__`/`__setstate__` only cover **pickle**, not `json`, so they don't help here. ## Fix - Add `Timer.as_dict()` / `Timer.from_dict()` that round-trip only the scalar fields (`_total`, `_min`, `_max`, `_total_count`). `_distribution` is excluded — the KLL sketch isn't reconstructable from summary stats and isn't meant to persist across checkpoints, so it restarts empty on restore (same graceful degradation as when `datasketches` isn't installed). - Switch the benchmark runner to use these instead of poking `__dict__`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: xgui <xgui@anyscale.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>


Summary
Timer.percentile(p)backed by aDistributionTracker(KLL sketch, k=200). Memory is bounded (~few KB) regardless of sample count, so tracking is always on — no env-var gate.DatasetStatsSummaryexposesstreaming_exec_schedule_p50_s/_p90_s.collect_dataset_statsemitsp50_scheduling_loop_duration_s/p90_scheduling_loop_duration_sunconditionally in release-test output.profiling/package viaruntime_env.py_modulesso_UDFPySpyProfileractors can deserialize on workers (they were silently failingModuleNotFoundError, swallowed by a broadexcept).--nativefrom py-spy (broke on the Anyscale image).test_streaming_exec_schedule_percentiles_*) now read stats from thematerialize()return value —Dataset.materialize()runs on a deep copy so stats land on theMaterializedDataset, and the originaldswould yield a fresh, never-added-to Timer.Why
max_scheduling_loop_duration_salone hides whether a regression is a true tail blow-up or just bursty noise. KLL gives bounded-memory approximate quantiles (~1.65% rank error at k=200), so we get a useful p50/p90 signal without the production cost concern that gated earlier revisions of this PR.Confirmed on a worker_scaling release run (5000 tasks): avg 730 ms · p50 442 ms · p90 588 ms · max 12.3 s. The max is ~20× the p90 — exactly the tail outlier
maxalone over-reports as the "typical" loop step.🤖 Generated with Claude Code