Skip to content

[Data] Fixing StatsManager to properly handle StatsActor being killed on disconnect#55163

Merged
alexeykudinkin merged 6 commits into
ray-project:masterfrom
alexeykudinkin:ak/stat-act-fix
Aug 5, 2025
Merged

[Data] Fixing StatsManager to properly handle StatsActor being killed on disconnect#55163
alexeykudinkin merged 6 commits into
ray-project:masterfrom
alexeykudinkin:ak/stat-act-fix

Conversation

@alexeykudinkin

@alexeykudinkin alexeykudinkin commented Aug 2, 2025

Copy link
Copy Markdown
Contributor

Why are these changes needed?

Addresses #54841

Right now, StatsManager is overly reliant on the cached ActorHandle for StatsActor. Even though there's a corresponding check verifying that we're connected to the right cluster, it doesn't verify however whether StatsActor is still alive which could result in failures as highlighted in the linked ticket.

To address this issue:

  1. _stats_actor method is split into _get_stats_actor and _get_or_create_stats_actor
  2. _get_or_create_stats_actor always skips the cache forcing actor lookup every time

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(
…tats_actor`;

Revisited its semantic to make `_get_or_create_stats_actor` look up actor every time

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner August 2, 2025 01:16
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Aug 2, 2025

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @alexeykudinkin, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an issue where the StatsManager in Ray Data fails to properly handle cases where the StatsActor is killed when a driver disconnects from the Ray cluster. My changes refactor the actor retrieval logic to ensure that a fresh StatsActor handle is obtained or a new actor is created when the existing one becomes stale, preventing errors and improving robustness.

Highlights

  • Refactored StatsActor Retrieval: I've refactored the _stats_actor method into two distinct methods: _get_stats_actor and _get_or_create_stats_actor. The former retrieves an existing actor, optionally skipping the cache to force a lookup, while the latter ensures an actor is always available, creating one if necessary.
  • Improved Stale Handle Handling: The _get_or_create_stats_actor method now explicitly checks for stale actor handles by attempting a lookup with skip_cache=True. If the actor is not found (e.g., due to a prior driver disconnect), a new StatsActor is created.
  • Updated Actor Calls: All internal calls within StatsManager that interact with the StatsActor have been updated to use the appropriate new retrieval methods, ensuring correct behavior, especially when registering datasets or updating metrics.
  • Added Regression Test: I've introduced a new test case, test_stats_manager_stale_actor_handle, which simulates a driver disconnecting and reconnecting to the Ray cluster, verifying that the StatsManager can gracefully recover and continue to function without errors.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the StatsManager to more robustly handle cases where the StatsActor might have been killed, for example, when a driver disconnects. This is achieved by introducing _get_stats_actor and _get_or_create_stats_actor methods, with the latter forcing a fresh lookup of the actor to avoid using a stale handle. A new test case is added to cover this scenario.

My review found a critical issue in the implementation. One of the call sites was changed to use _get_stats_actor which does not create the actor if it's missing, whereas the previous logic did. This could lead to an AttributeError at runtime. I've provided a suggestion to fix this by using _get_or_create_stats_actor instead.

Comment thread python/ray/data/_internal/stats.py Outdated
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) August 4, 2025 20:26
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@github-actions github-actions Bot disabled auto-merge August 5, 2025 22:11
@alexeykudinkin alexeykudinkin merged commit 9a3741e into ray-project:master Aug 5, 2025
5 checks passed
aslonnie pushed a commit that referenced this pull request Aug 7, 2025
…rator state when there is a change (#54623)" (#55333)

reverts commit 7cb74e8, that broke
master branch due to conflict with
#55163


Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
alanwguo pushed a commit that referenced this pull request Aug 14, 2025
…es (#55355)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
This PR is a revert of
[#55333](#55333) and resolves
conflict by [#55163](#55163)

Original description:
Some frequently used metadata fields are missing in the export API
schema:
- For both dataset and operator: state, execution start and end time

These fields are important for us to observe the lifecycle of the
datasets and operators, and can be used to improve the accuracy of
reported metrics, such as throughput, which relies on the duration.

<!-- Please give a short summary of the change and the problem this
solves. -->
Summary of change:
- Add state, execution start and end time at the export API schema
- Add a new state enum `PENDING` for dataset and operator, to represent
the state when they are not running yet.
- Refresh the metadata when ever the state of dataset/operator gets
updated. And the event will always contains the latest snapshot of all
the metadata.

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [X] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
dioptre pushed a commit to sourcetable/ray that referenced this pull request Aug 20, 2025
…lled on disconnect (ray-project#55163)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Addresses ray-project#54841

Right now, `StatsManager` is overly reliant on the cached `ActorHandle`
for `StatsActor`. Even though there's a corresponding check verifying
that we're connected to the right cluster, it doesn't verify however
whether `StatsActor` is still alive which could result in failures as
highlighted in the linked ticket.

To address this issue:

1. `_stats_actor` method is split into `_get_stats_actor` and
`_get_or_create_stats_actor`
2. `_get_or_create_stats_actor` always skips the cache forcing actor
lookup every time

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Andrew Grosser <dioptre@gmail.com>
dioptre pushed a commit to sourcetable/ray that referenced this pull request Aug 20, 2025
…rator state when there is a change (ray-project#54623)" (ray-project#55333)

reverts commit 7cb74e8, that broke
master branch due to conflict with
ray-project#55163

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Andrew Grosser <dioptre@gmail.com>
dioptre pushed a commit to sourcetable/ray that referenced this pull request Aug 20, 2025
…es (ray-project#55355)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
This PR is a revert of
[ray-project#55333](ray-project#55333) and resolves
conflict by [ray-project#55163](ray-project#55163)

Original description:
Some frequently used metadata fields are missing in the export API
schema:
- For both dataset and operator: state, execution start and end time

These fields are important for us to observe the lifecycle of the
datasets and operators, and can be used to improve the accuracy of
reported metrics, such as throughput, which relies on the duration.

<!-- Please give a short summary of the change and the problem this
solves. -->
Summary of change:
- Add state, execution start and end time at the export API schema
- Add a new state enum `PENDING` for dataset and operator, to represent
the state when they are not running yet.
- Refresh the metadata when ever the state of dataset/operator gets
updated. And the event will always contains the latest snapshot of all
the metadata.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [X] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: Andrew Grosser <dioptre@gmail.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…lled on disconnect (ray-project#55163)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Addresses ray-project#54841

Right now, `StatsManager` is overly reliant on the cached `ActorHandle`
for `StatsActor`. Even though there's a corresponding check verifying
that we're connected to the right cluster, it doesn't verify however
whether `StatsActor` is still alive which could result in failures as
highlighted in the linked ticket.

To address this issue:

1. `_stats_actor` method is split into `_get_stats_actor` and
`_get_or_create_stats_actor`
2. `_get_or_create_stats_actor` always skips the cache forcing actor
lookup every time

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…rator state when there is a change (ray-project#54623)" (ray-project#55333)

reverts commit 7cb74e8, that broke
master branch due to conflict with
ray-project#55163

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…es (ray-project#55355)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
This PR is a revert of
[ray-project#55333](ray-project#55333) and resolves
conflict by [ray-project#55163](ray-project#55163)

Original description:
Some frequently used metadata fields are missing in the export API
schema:
- For both dataset and operator: state, execution start and end time

These fields are important for us to observe the lifecycle of the
datasets and operators, and can be used to improve the accuracy of
reported metrics, such as throughput, which relies on the duration.

<!-- Please give a short summary of the change and the problem this
solves. -->
Summary of change:
- Add state, execution start and end time at the export API schema
- Add a new state enum `PENDING` for dataset and operator, to represent
the state when they are not running yet.
- Refresh the metadata when ever the state of dataset/operator gets
updated. And the event will always contains the latest snapshot of all
the metadata.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [X] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
…lled on disconnect (#55163)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Addresses #54841

Right now, `StatsManager` is overly reliant on the cached `ActorHandle`
for `StatsActor`. Even though there's a corresponding check verifying
that we're connected to the right cluster, it doesn't verify however
whether `StatsActor` is still alive which could result in failures as
highlighted in the linked ticket.

To address this issue:

1. `_stats_actor` method is split into `_get_stats_actor` and
`_get_or_create_stats_actor`
2. `_get_or_create_stats_actor` always skips the cache forcing actor
lookup every time

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
…rator state when there is a change (#54623)" (#55333)

reverts commit 7cb74e8, that broke
master branch due to conflict with
#55163

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…es (ray-project#55355)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
This PR is a revert of
[ray-project#55333](ray-project#55333) and resolves
conflict by [ray-project#55163](ray-project#55163)

Original description:
Some frequently used metadata fields are missing in the export API
schema:
- For both dataset and operator: state, execution start and end time

These fields are important for us to observe the lifecycle of the
datasets and operators, and can be used to improve the accuracy of
reported metrics, such as throughput, which relies on the duration.

<!-- Please give a short summary of the change and the problem this
solves. -->
Summary of change:
- Add state, execution start and end time at the export API schema
- Add a new state enum `PENDING` for dataset and operator, to represent
the state when they are not running yet.
- Refresh the metadata when ever the state of dataset/operator gets
updated. And the event will always contains the latest snapshot of all
the metadata.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [X] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
bveeramani pushed a commit that referenced this pull request Nov 14, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that #55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

2 participants