Skip to content

[Data] Track peak USS memory per task via DistributionTracker#63489

Merged
bveeramani merged 18 commits into
masterfrom
balaji/track-max-uss-per-task
May 20, 2026
Merged

[Data] Track peak USS memory per task via DistributionTracker#63489
bveeramani merged 18 commits into
masterfrom
balaji/track-max-uss-per-task

Conversation

@bveeramani

@bveeramani bveeramani commented May 19, 2026

Copy link
Copy Markdown
Member

Stacked on #63488.

Description

Track the peak USS (Unique Set Size) memory per task and record it in a DistributionTracker on OpRuntimeMetrics, giving per-operator memory distribution stats (mean, p50, p99, etc.).

Also removes the per-block max_uss_bytes field from BlockExecStats since MemoryProfiler already tracks the running max across its lifetime — we just pass profiler.estimate_max_uss() directly to TaskExecWorkerStats at yield time. The average_max_uss_per_task metric property now reads from the DistributionTracker instead of a manual accumulator.

The intention of this change is because I want a good way to know how to configure logical memory, and averages can be ineffective because of spread.

Related issues

None.

Replace the narrow TaskDurationStats class (mean/stddev only) with a
general-purpose DistributionTracker that also tracks min, max, and
approximate percentiles (p50/p90/p95/p99) via datasketches KLL sketch.

This is a pure refactor — the only consumer (hanging detector) continues
to use mean/stddev, and no new metric fields are added beyond the
renamed op_task_duration_stats.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the tracking of peak memory (USS) usage by moving the max_uss_bytes metric from per-block execution stats to per-task execution stats and utilizing a DistributionTracker for aggregation. A significant issue was identified in map_operator.py where the peak memory for a task is incorrectly reported as the peak of only the last block due to the profiler being reset within the block processing loop.

@@ -761,7 +760,8 @@ def transform_iter_factory():
block_meta,
exec_stats=exec_stats,
task_exec_stats=TaskExecWorkerStats(
task_wall_time_s=task_dur_s
task_wall_time_s=task_dur_s,
max_uss_bytes=profiler.estimate_max_uss() or 0,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The value from profiler.estimate_max_uss() will only represent the peak memory usage for the current block, not for the entire task. This is because profiler.reset() is called on line 774 at the end of each iteration in the block processing loop.

Since task_exec_stats from the last block is used to represent the entire task's stats, this will incorrectly report the peak USS of just the last block as the peak for the whole task.

To fix this and correctly report the peak USS for the entire task, profiler.reset() should not be called inside the loop. Please consider removing line 774.

metric_field with default_factory is misleading since OpRuntimeMetrics
has a custom __init__ that never invokes the dataclass-generated one.
Switch to metric_property which cleanly separates metric registration
from instance state.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
@bveeramani bveeramani force-pushed the balaji/track-max-uss-per-task branch from 84a1e0c to eb79f3f Compare May 19, 2026 04:27

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit eb79f3f83af4fd4a66b9bdf8ba6e25f73447b84e. Configure here.

Comment thread python/ray/data/_internal/execution/operators/map_operator.py
@ray-gardener ray-gardener Bot added data Ray Data-related issues observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels May 19, 2026
- Use hasattr instead of isinstance in as_dict() to avoid TypeError
  when DistributionTracker is mocked in tests
- Add op_task_duration_stats to expected metrics in test_stats.py
- Add test_distribution_tracker to BUILD.bazel

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
- Use hasattr instead of isinstance in as_dict() to avoid TypeError
  when DistributionTracker is mocked in tests
- Add op_task_duration_stats to expected metrics in test_stats.py
- Canonicalize percentile values (N or None) to P for datasketches
  portability
- Add test_distribution_tracker to BUILD.bazel

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Add a max_uss_bytes field to TaskExecWorkerStats so workers report the
peak USS observed across all blocks in a task. The driver records these
samples in a new max_uss_bytes DistributionTracker on OpRuntimeMetrics,
giving per-operator memory distribution stats (mean, p50, p99, etc.).

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
MemoryProfiler already tracks the running max across its lifetime, so
we pass profiler.estimate_max_uss() directly to TaskExecWorkerStats
instead of accumulating a separate per-block max. Remove the now-unused
max_uss_bytes field from BlockExecStats and the _cum_max_uss_bytes
accumulator, rewriting average_max_uss_per_task to read from the
DistributionTracker instead.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
None means USS measurement is unavailable (non-Linux), rather than
masking that with 0 and needing a > 0 guard.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
metric_field with default_factory is misleading since OpRuntimeMetrics
has a custom __init__ that never invokes the dataclass-generated one.
Switch to metric_property which cleanly separates metric registration
from instance state.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
@bveeramani bveeramani force-pushed the balaji/track-max-uss-per-task branch from eb79f3f to eb087e0 Compare May 19, 2026 19:14
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
@bveeramani bveeramani added the go add ONLY when ready to merge, run all tests label May 19, 2026
bveeramani added a commit that referenced this pull request May 19, 2026
## Description

This PR replaces the narrow `TaskDurationStats` class (mean/stddev only)
with a general-purpose `DistributionTracker` that also tracks min, max,
and approximate percentiles (p50/p90/p95/p99) via a datasketches KLL
sketch.

This is a pure refactor — the only consumer (hanging detector) continues
to use mean/stddev, and no new metric fields are added beyond the
renamed `op_task_duration_stats`.

The intention of this change is so that I can track memory distribution
statistics in a follow-up PR:
#63489.

## Related issues

None.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Base automatically changed from balaji/refactor-distribution-tracker to master May 19, 2026 22:00
Resolve merge conflicts:
- op_runtime_metrics.py: keep DistributionTracker for max_uss_bytes
- test_distribution_tracker.py: adopt master's not-None checks on percentiles

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
…askExecWorkerStats

- test_op_runtime_metrics: rewrite test_average_max_uss_per_task to use
  on_task_finished with TaskExecWorkerStats instead of on_task_output_generated
  with BlockExecStats; remove max_uss_bytes=0 from BlockExecStats constructors
- test_stats: add max_uss_bytes DistributionTracker to expected metrics;
  remove "Peak heap memory usage" from expected output (memory_stats is now None);
  update memory= in repr expectations; add canonicalization for max_uss_bytes dict
- conftest: remove unused uss_bytes from block_params

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
- Set enable_get_object_locations_for_metrics=False on mock operator in
  test_average_max_uss_per_task to prevent ray.experimental.get_object_locations
  from being called without a Ray cluster
- Populate OperatorStatsSummary.memory from the max_uss_bytes
  DistributionTracker in extra_metrics, restoring the peak-USS-per-task
  stats that were lost when max_uss_bytes was removed from BlockExecStats

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
MagicMock returns truthy values for all attribute accesses, so
enable_get_object_locations_for_metrics must be explicitly set to
False to prevent on_task_finished from calling into Ray.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
USS measurement requires /proc/self/smaps (Linux only). On other
platforms the DistributionTracker has no samples, so memory is None.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
The memory field was populated from BlockExecStats.max_uss_bytes, which
was removed in favor of per-task USS tracking via DistributionTracker.
The same data is now available in extra_metrics, so the redundant
stats summary field and its consumers are no longer needed.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
@bveeramani bveeramani enabled auto-merge (squash) May 20, 2026 02:02
@bveeramani bveeramani merged commit 4427ec7 into master May 20, 2026
7 checks passed
@bveeramani bveeramani deleted the balaji/track-max-uss-per-task branch May 20, 2026 02:57
TruongQuangPhat pushed a commit to cyhapun/ray-fix-issue that referenced this pull request May 27, 2026
…t#63488)

## Description

This PR replaces the narrow `TaskDurationStats` class (mean/stddev only)
with a general-purpose `DistributionTracker` that also tracks min, max,
and approximate percentiles (p50/p90/p95/p99) via a datasketches KLL
sketch.

This is a pure refactor — the only consumer (hanging detector) continues
to use mean/stddev, and no new metric fields are added beyond the
renamed `op_task_duration_stats`.

The intention of this change is so that I can track memory distribution
statistics in a follow-up PR:
ray-project#63489.

## Related issues

None.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
TruongQuangPhat pushed a commit to cyhapun/ray-fix-issue that referenced this pull request May 27, 2026
…oject#63489)

Stacked on ray-project#63488.

## Description

Track the peak USS (Unique Set Size) memory per task and record it in a
`DistributionTracker` on `OpRuntimeMetrics`, giving per-operator memory
distribution stats (mean, p50, p99, etc.).

Also removes the per-block `max_uss_bytes` field from `BlockExecStats`
since `MemoryProfiler` already tracks the running max across its
lifetime — we just pass `profiler.estimate_max_uss()` directly to
`TaskExecWorkerStats` at yield time. The `average_max_uss_per_task`
metric property now reads from the `DistributionTracker` instead of a
manual accumulator.

The intention of this change is because I want a good way to know how to
configure logical `memory`, and averages can be ineffective because of
spread.

## Related issues

None.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: phattruong <23120318@student.hcmus.edu.vn>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

2 participants