Skip to content

[Data] Added tracking of block serialization time #60574

Merged
alexeykudinkin merged 19 commits into
masterfrom
ak/serde-mtrcs-add
Jan 31, 2026
Merged

[Data] Added tracking of block serialization time #60574
alexeykudinkin merged 19 commits into
masterfrom
ak/serde-mtrcs-add

Conversation

@alexeykudinkin

Copy link
Copy Markdown
Contributor

Changes

  1. Modified Ray Core's generator handling sequence to inject back object creation & serialization durations
  2. Updated task_completion_time_excl_backpressure_s to track both UDF block generation time AND block serialization overhead

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Updated `task_completion_time_excl_backpressure_s` to include block serde time

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…des serde time;

Updated tests;

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin requested review from a team as code owners January 29, 2026 01:07

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces tracking for block serialization time, which is a valuable addition for performance monitoring in Ray Data. The core change involves modifying the generator execution flow in _raylet.pyx to feed back serialization duration to the caller using gen.send(). This new metric is then integrated throughout the data stack, including BlockExecStats, OpRuntimeMetrics, and relevant operators like MapOperator and HashShuffleOperator. The tests have also been updated to cover these new metrics.

The implementation is clean and effective. The use of yield expressions to pass data back into generators is a good pattern for this use case. I have one minor suggestion to improve code clarity.

Comment on lines +277 to +280
def udf_time_s(self, reset: bool) -> float:
cur_time_s = self._udf_time_s
self._udf_time_s = 0
return cur_time_s

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The reset parameter in udf_time_s is not used in the function body; the timer self._udf_time_s is always reset to 0. Since the only call site in map_operator.py passes reset=True, the behavior is correct for now. However, to improve clarity and prevent potential misuse in the future, I suggest removing the reset parameter from the method signature and updating the call in map_operator.py:772 to map_transformer.udf_time_s().

Suggested change
def udf_time_s(self, reset: bool) -> float:
cur_time_s = self._udf_time_s
self._udf_time_s = 0
return cur_time_s
def udf_time_s(self) -> float:
cur_time_s = self._udf_time_s
self._udf_time_s = 0
return cur_time_s
Comment thread python/ray/data/_internal/execution/operators/map_transformer.py
@ray-gardener ray-gardener Bot added data Ray Data-related issues observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Jan 29, 2026
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Jan 29, 2026
Comment thread python/ray/data/block.py
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

@edoakes edoakes left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion, core change looks ok to me. Let's consider it a private API for now. I will follow up to add a disclaimer & context in code comments.

Defer to others to review the data changes. You may want to consider an alternative name to "serialization time" since it is not purely serialization, but also includes memory allocation & copy time. Perhaps "block_write_time_s" or "block_output_time_s"

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/execution/operators/hash_shuffle.py Outdated
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) January 30, 2026 23:12
Comment thread python/ray/_raylet.pyx

@dataclass(frozen=True)
class StreamingGeneratorStats:
object_creation_dur_s: float

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to object_serialization_time_s

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the conversation above. This is not just serialization time, it includes object serialization time

self._init_fn = init_fn if init_fn is not None else lambda: None
self._output_block_size_option_override = output_block_size_option_override
self._udf_time = 0
self._udf_time_s = 0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe a type var to denote secs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what _s prefix is for

# Yield block and retrieve its Ray object serialization timing
stats: StreamingGeneratorStats = yield block
if stats:
exec_stats.block_ser_time_s = stats.object_creation_dur_s

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when operators are fused, can we assert in that test this value is 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't be 0

@alexeykudinkin alexeykudinkin merged commit 2d85efa into master Jan 31, 2026
7 checks passed
@alexeykudinkin alexeykudinkin deleted the ak/serde-mtrcs-add branch January 31, 2026 01:55
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
Changes
---
1. Modified Ray Core's generator handling sequence to inject back object
creation & serialization durations
2. Updated `task_completion_time_excl_backpressure_s` to track both UDF
block generation time AND block serialization overhead

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
Changes
---
1. Modified Ray Core's generator handling sequence to inject back object
creation & serialization durations
2. Updated `task_completion_time_excl_backpressure_s` to track both UDF
block generation time AND block serialization overhead

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
Changes
---
1. Modified Ray Core's generator handling sequence to inject back object
creation & serialization durations
2. Updated `task_completion_time_excl_backpressure_s` to track both UDF
block generation time AND block serialization overhead

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

3 participants