[Data] Track peak USS memory per task via DistributionTracker#63489
Conversation
Replace the narrow TaskDurationStats class (mean/stddev only) with a general-purpose DistributionTracker that also tracks min, max, and approximate percentiles (p50/p90/p95/p99) via datasketches KLL sketch. This is a pure refactor — the only consumer (hanging detector) continues to use mean/stddev, and no new metric fields are added beyond the renamed op_task_duration_stats. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
There was a problem hiding this comment.
Code Review
This pull request refactors the tracking of peak memory (USS) usage by moving the max_uss_bytes metric from per-block execution stats to per-task execution stats and utilizing a DistributionTracker for aggregation. A significant issue was identified in map_operator.py where the peak memory for a task is incorrectly reported as the peak of only the last block due to the profiler being reset within the block processing loop.
| @@ -761,7 +760,8 @@ def transform_iter_factory(): | |||
| block_meta, | |||
| exec_stats=exec_stats, | |||
| task_exec_stats=TaskExecWorkerStats( | |||
| task_wall_time_s=task_dur_s | |||
| task_wall_time_s=task_dur_s, | |||
| max_uss_bytes=profiler.estimate_max_uss() or 0, | |||
There was a problem hiding this comment.
The value from profiler.estimate_max_uss() will only represent the peak memory usage for the current block, not for the entire task. This is because profiler.reset() is called on line 774 at the end of each iteration in the block processing loop.
Since task_exec_stats from the last block is used to represent the entire task's stats, this will incorrectly report the peak USS of just the last block as the peak for the whole task.
To fix this and correctly report the peak USS for the entire task, profiler.reset() should not be called inside the loop. Please consider removing line 774.
metric_field with default_factory is misleading since OpRuntimeMetrics has a custom __init__ that never invokes the dataclass-generated one. Switch to metric_property which cleanly separates metric registration from instance state. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
84a1e0c to
eb79f3f
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit eb79f3f83af4fd4a66b9bdf8ba6e25f73447b84e. Configure here.
- Use hasattr instead of isinstance in as_dict() to avoid TypeError when DistributionTracker is mocked in tests - Add op_task_duration_stats to expected metrics in test_stats.py - Add test_distribution_tracker to BUILD.bazel Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
- Use hasattr instead of isinstance in as_dict() to avoid TypeError when DistributionTracker is mocked in tests - Add op_task_duration_stats to expected metrics in test_stats.py - Canonicalize percentile values (N or None) to P for datasketches portability - Add test_distribution_tracker to BUILD.bazel Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Add a max_uss_bytes field to TaskExecWorkerStats so workers report the peak USS observed across all blocks in a task. The driver records these samples in a new max_uss_bytes DistributionTracker on OpRuntimeMetrics, giving per-operator memory distribution stats (mean, p50, p99, etc.). Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
MemoryProfiler already tracks the running max across its lifetime, so we pass profiler.estimate_max_uss() directly to TaskExecWorkerStats instead of accumulating a separate per-block max. Remove the now-unused max_uss_bytes field from BlockExecStats and the _cum_max_uss_bytes accumulator, rewriting average_max_uss_per_task to read from the DistributionTracker instead. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
None means USS measurement is unavailable (non-Linux), rather than masking that with 0 and needing a > 0 guard. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
metric_field with default_factory is misleading since OpRuntimeMetrics has a custom __init__ that never invokes the dataclass-generated one. Switch to metric_property which cleanly separates metric registration from instance state. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
eb79f3f to
eb087e0
Compare
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
## Description This PR replaces the narrow `TaskDurationStats` class (mean/stddev only) with a general-purpose `DistributionTracker` that also tracks min, max, and approximate percentiles (p50/p90/p95/p99) via a datasketches KLL sketch. This is a pure refactor — the only consumer (hanging detector) continues to use mean/stddev, and no new metric fields are added beyond the renamed `op_task_duration_stats`. The intention of this change is so that I can track memory distribution statistics in a follow-up PR: #63489. ## Related issues None. --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Resolve merge conflicts: - op_runtime_metrics.py: keep DistributionTracker for max_uss_bytes - test_distribution_tracker.py: adopt master's not-None checks on percentiles Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
…askExecWorkerStats - test_op_runtime_metrics: rewrite test_average_max_uss_per_task to use on_task_finished with TaskExecWorkerStats instead of on_task_output_generated with BlockExecStats; remove max_uss_bytes=0 from BlockExecStats constructors - test_stats: add max_uss_bytes DistributionTracker to expected metrics; remove "Peak heap memory usage" from expected output (memory_stats is now None); update memory= in repr expectations; add canonicalization for max_uss_bytes dict - conftest: remove unused uss_bytes from block_params Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
- Set enable_get_object_locations_for_metrics=False on mock operator in test_average_max_uss_per_task to prevent ray.experimental.get_object_locations from being called without a Ray cluster - Populate OperatorStatsSummary.memory from the max_uss_bytes DistributionTracker in extra_metrics, restoring the peak-USS-per-task stats that were lost when max_uss_bytes was removed from BlockExecStats Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
MagicMock returns truthy values for all attribute accesses, so enable_get_object_locations_for_metrics must be explicitly set to False to prevent on_task_finished from calling into Ray. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
USS measurement requires /proc/self/smaps (Linux only). On other platforms the DistributionTracker has no samples, so memory is None. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
The memory field was populated from BlockExecStats.max_uss_bytes, which was removed in favor of per-task USS tracking via DistributionTracker. The same data is now available in extra_metrics, so the redundant stats summary field and its consumers are no longer needed. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
…t#63488) ## Description This PR replaces the narrow `TaskDurationStats` class (mean/stddev only) with a general-purpose `DistributionTracker` that also tracks min, max, and approximate percentiles (p50/p90/p95/p99) via a datasketches KLL sketch. This is a pure refactor — the only consumer (hanging detector) continues to use mean/stddev, and no new metric fields are added beyond the renamed `op_task_duration_stats`. The intention of this change is so that I can track memory distribution statistics in a follow-up PR: ray-project#63489. ## Related issues None. --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
…oject#63489) Stacked on ray-project#63488. ## Description Track the peak USS (Unique Set Size) memory per task and record it in a `DistributionTracker` on `OpRuntimeMetrics`, giving per-operator memory distribution stats (mean, p50, p99, etc.). Also removes the per-block `max_uss_bytes` field from `BlockExecStats` since `MemoryProfiler` already tracks the running max across its lifetime — we just pass `profiler.estimate_max_uss()` directly to `TaskExecWorkerStats` at yield time. The `average_max_uss_per_task` metric property now reads from the `DistributionTracker` instead of a manual accumulator. The intention of this change is because I want a good way to know how to configure logical `memory`, and averages can be ineffective because of spread. ## Related issues None. --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>

Stacked on #63488.
Description
Track the peak USS (Unique Set Size) memory per task and record it in a
DistributionTrackeronOpRuntimeMetrics, giving per-operator memory distribution stats (mean, p50, p99, etc.).Also removes the per-block
max_uss_bytesfield fromBlockExecStatssinceMemoryProfileralready tracks the running max across its lifetime — we just passprofiler.estimate_max_uss()directly toTaskExecWorkerStatsat yield time. Theaverage_max_uss_per_taskmetric property now reads from theDistributionTrackerinstead of a manual accumulator.The intention of this change is because I want a good way to know how to configure logical
memory, and averages can be ineffective because of spread.Related issues
None.