Feature/offloading manager stats#35669
Conversation
There was a problem hiding this comment.
Code Review
This pull request effectively adds telemetry support for the OffloadingManager and introduces a FilteredOffloadingManager to gate offloading based on block reuse frequency. The implementation is well-structured, particularly the use of a decorator pattern for the filtering logic and the inclusion of comprehensive unit tests. I have identified a couple of areas for improvement to enhance configurability and robustness.
| store_threshold = int(self.extra_config.get("store_threshold", 0)) | ||
| if store_threshold > 1: | ||
| self._manager = FilteredOffloadingManager( | ||
| backing=self._manager, | ||
| store_threshold=store_threshold, | ||
| ) |
There was a problem hiding this comment.
The FilteredOffloadingManager is initialized without a configurable max_tracker_size. This means the LRU tracker for block reuse frequency will always use the default size. This parameter can have a significant impact on memory usage and filtering effectiveness, and should be configurable for performance tuning. I suggest reading max_tracker_size from self.extra_config, similar to how store_threshold is handled.
store_threshold = int(self.extra_config.get("store_threshold", 0))
if store_threshold > 1:
max_tracker_size = int(
self.extra_config.get("max_tracker_size", 64_000)
)
self._manager = FilteredOffloadingManager(
backing=self._manager,
store_threshold=store_threshold,
max_tracker_size=max_tracker_size,
)| stats = self._backing.get_stats() | ||
| stats["stores_skipped"] = self.stores_skipped | ||
| return stats |
There was a problem hiding this comment.
The get_stats method modifies the dictionary returned by self._backing.get_stats() in-place. While this is currently safe because the existing backing managers return a new empty dictionary, this pattern is fragile. If a backing manager's implementation changes in the future to return a shared or cached dictionary, this could lead to unintended side effects. It's safer to create a new dictionary to avoid mutating the object returned by the backing manager.
| stats = self._backing.get_stats() | |
| stats["stores_skipped"] = self.stores_skipped | |
| return stats | |
| return {**self._backing.get_stats(), "stores_skipped": self.stores_skipped} |
|
Hi @Srinivasoo7, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
c13d578 to
944208b
Compare
|
@Srinivasoo7 can you please rebase? |
944208b to
6c0ce3a
Compare
|
The stats dictionary returned by the connector is later used to feed |
|
Hi @orozery |
|
Hi @Srinivasoo7, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
e53557c to
6c0ce3a
Compare
|
This pull request has merge conflicts that must be resolved before it can be |
|
Please rebase :) |
48c4e97 to
5d75841
Compare
|
Done @orozery |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces telemetry for the offloading manager, which is a valuable addition. The changes to support a new statistics structure with both transfers and gauges are well-implemented in metrics.py and offloading_connector.py. However, I've identified a critical bug in reuse_manager.py that could lead to a runtime error, and another high-risk issue related to in-place data mutation. I have provided suggestions to address both of these concerns.
| bh for bh in block_hashes if self.counts.get(bh, 0) >= self.store_threshold | ||
| ] | ||
|
|
||
| self.stores_skipped += len(block_hashes) - len(eligible) | ||
|
|
There was a problem hiding this comment.
The block_hashes parameter is an Iterable. Calling len(block_hashes) after it has been potentially consumed by the list comprehension on the preceding lines can lead to incorrect behavior or a TypeError if block_hashes is a generator. To ensure correctness, you should first convert the iterable to a list and use that list for both operations.
block_hashes_list = list(block_hashes)
eligible = [
bh for bh in block_hashes_list
if self.counts.get(bh, 0) >= self.store_threshold
]
self.stores_skipped += len(block_hashes_list) - len(eligible)| def get_stats(self) -> dict[str, Any]: | ||
| stats = self._backing.get_stats() | ||
| stats.setdefault("gauges", {})["stores_skipped"] = self.stores_skipped | ||
| return stats |
There was a problem hiding this comment.
The current implementation modifies the dictionary returned by self._backing.get_stats() in-place. This can cause unexpected side effects if the backing manager's returned dictionary is not meant to be mutated. To prevent this, you should operate on a copy. The safest approach is to use copy.deepcopy.
You will need to add import copy to the file.
def get_stats(self) -> dict[str, Any]:
stats = copy.deepcopy(self._backing.get_stats())
stats.setdefault("gauges", {})["stores_skipped"] = self.stores_skipped
return stats|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces telemetry for the OffloadingManager, allowing statistics to be collected and emitted. The changes include adding a get_stats method to the OffloadingManager interface, implementing it in FilterReusedOffloadingManager to report stores_skipped, and updating the metrics aggregation logic to handle these new scalar statistics (gauges) alongside existing transfer metrics. The OffloadingConnector is also modified to collect and aggregate these stats from the scheduler's manager.
My main concern is with the new test file tests/v1/kv_offload/test_reuse_manager.py. It appears to be written for a different version of the code, referencing classes (FilteredOffloadingManager, BlockReuseTracker) that do not exist in vllm/v1/kv_offload/reuse_manager.py. This is a critical issue as it means the new functionality is not being tested correctly. Please see my detailed comment on this file.
| BlockReuseTracker = _mod.BlockReuseTracker # type: ignore[assignment,misc] | ||
| FilteredOffloadingManager = ( # type: ignore[assignment,misc] | ||
| _mod.FilteredOffloadingManager | ||
| ) |
There was a problem hiding this comment.
This test file attempts to import BlockReuseTracker and FilteredOffloadingManager from the reuse_manager module. However, the implementation in vllm/v1/kv_offload/reuse_manager.py defines a class named FilterReusedOffloadingManager and does not contain a BlockReuseTracker class. This will cause an AttributeError at runtime, and these tests will fail to run.
Please ensure the test file is updated to use the correct class names from the module under test. FilteredOffloadingManager should likely be FilterReusedOffloadingManager. The logic for BlockReuseTracker seems to be part of FilterReusedOffloadingManager now, so the tests might need significant refactoring to target the public API of FilterReusedOffloadingManager.
| mgr_stats_data = self.connector_scheduler.manager.get_stats() | ||
| if mgr_stats_data: | ||
| mgr_stats = self.build_kv_connector_stats(mgr_stats_data) | ||
| if mgr_stats is not None: | ||
| if stats is not None: | ||
| stats = stats.aggregate(mgr_stats) | ||
| else: | ||
| stats = mgr_stats |
There was a problem hiding this comment.
I don't think we need aggregation, as it is not possible for both self.connector_scheduler and self.connector_worker to be not None.
|
|
||
| def is_empty(self) -> bool: | ||
| return not self.data | ||
| return not self.data.get("transfers") and not self.data.get("gauges") |
There was a problem hiding this comment.
Why do we need this change?
|
|
||
| def reset(self): | ||
| self.data: dict[str, list[OffloadingOperationMetrics]] = {} | ||
| self.data: dict[str, Any] = {"transfers": {}, "gauges": {}} |
There was a problem hiding this comment.
Let's try and make it clearer by:
- Defining "transfers" and "gauges" as string constants, e..g
TRANSFERS_KEY = "transfers", GAUGUES_KEY = "gauges". - Add a docstring to
OffloadingConnectorStatsdescribing its expected structure.
|
|
||
| def get_stats(self) -> dict[str, Any]: | ||
| stats = copy.deepcopy(self._backing.get_stats()) | ||
| stats.setdefault("gauges", {})["stores_skipped"] = self.stores_skipped |
There was a problem hiding this comment.
Can we create and use a KVConnectorStats.set_gauge(gauge_name, gauge_value) method?
| """ | ||
| return () | ||
|
|
||
| def get_stats(self) -> dict[str, Any]: |
There was a problem hiding this comment.
Let's define it as:
| def get_stats(self) -> dict[str, Any]: | |
| def get_stats(self) -> dict[str, Any] | None: |
to save the dictionary allocation if stats are unused.
| @@ -0,0 +1,360 @@ | |||
| # SPDX-License-Identifier: Apache-2.0 | |||
There was a problem hiding this comment.
Looks like this is an old file?
We already have a unit test in test_cpu_manager.py
You should instead add a test that verifies the stores_skipped gauge.
|
Hi @orozery Thanks |
|
Hi @Srinivasoo7, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, |
|
@Srinivasoo7 Also need to fix 2 more things:
|
|
Yes boss @orozery, we'll fix it asap! |
Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
orozery
left a comment
There was a problem hiding this comment.
Thanks @Srinivasoo7 for the hard work! (and sorry for all of the nit picking :) )
I added an e2e test and noticed an issue that metric metadata is not serialized.
I changed the stats structs to next the metadata under the self.data dictionary, which is serialized.
Also another issue was we were missing empty worker side stats (until #43877 lands).
I pushed the fixes to your branch.
|
Gotcha @orozery. Also, from this PR we branched out to metrics redesign (#44008) request to help with your views there to start the redesign PR against the RFC. Looking forward to more such contributions! |
Signed-off-by: Sriusa4414@gmail.com Signed-off-by: srinivas_oo7 <Sriusa4414@gmail.com> Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Signed-off-by: Or Ozeri <oro@il.ibm.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Sriusa4414@gmail.com Signed-off-by: srinivas_oo7 <Sriusa4414@gmail.com> Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Signed-off-by: Or Ozeri <oro@il.ibm.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Sriusa4414@gmail.com Signed-off-by: srinivas_oo7 <Sriusa4414@gmail.com> Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Signed-off-by: Or Ozeri <oro@il.ibm.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Sriusa4414@gmail.com Signed-off-by: srinivas_oo7 <Sriusa4414@gmail.com> Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Signed-off-by: Or Ozeri <oro@il.ibm.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: divineearthly <divineearthly@gmail.com>
Signed-off-by: Sriusa4414@gmail.com Signed-off-by: srinivas_oo7 <Sriusa4414@gmail.com> Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Signed-off-by: Or Ozeri <oro@il.ibm.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Or Ozeri <oro@il.ibm.com>
…HPU scheduler, ngram proposer and offloading connector tests to upstream API drift (#1556) ## Bug 1: Forward throttle_prefills in HPUAsyncScheduler.schedule - **State machine id**: hpu_async_scheduler_schedule_positional_arg - **Commit**: 957ba4d ### Root cause vLLM PR #44558 added a throttle_prefills positional arg to Scheduler.schedule(); EngineCore calls it positionally but the HPU override only accepted self. ### Upstream PR vllm-project/vllm#44558 ### Fix Accept throttle_prefills (default False) on the HPUAsyncScheduler.schedule override and forward it to super().schedule(). ## Bug 2: Pass num_speculative_tokens to NgramProposer.propose - **State machine id**: ngram_proposer_propose_missing_positional_arg - **Commit**: 82155ea ### Root cause vLLM PR #32374 (Dynamic SD) added a leading num_speculative_tokens positional arg to NgramProposer.propose(). ### Upstream PR vllm-project/vllm#32374 ### Fix Prepend self.speculative_config.num_speculative_tokens in propose_ngram_draft_token_ids to match the new upstream signature. ## Bug 3: Align OffloadingConnector stats tests with upstream flat-metrics API - **State machine id**: offloading_connector_cpu_to_gpu_metrics_missing - **Commit**: c1eb9e3 ### Root cause vLLM PR #35669 rewrote OffloadingConnectorStats to a self-describing {types, data} flat-metric payload, dropping the per-direction CPU_to_GPU/GPU_to_CPU list shape the tests still asserted. ### Upstream PR vllm-project/vllm#35669 ### Fix Rewrite test_metrics.py to exercise increase_counter/observe_histogram/aggregate/reduce/reset against the new self-describing stats contract. ## Bug 4: Align OffloadingConnector scheduler flush assertions with upstream defer-on-finish - **State machine id**: offloading_connector_flush_on_finish_deferred - **Commit**: 575a178 ### Root cause vLLM commit f428718ffe (PR #45823, "Defer on_request_finished until in-flight transfers drain") changed OffloadingConnectorScheduler: a finishing request with in-flight store jobs no longer flushes those stores immediately — finalization is deferred until transfers drain, and flush now fires only on preemption or block reuse. test_concurrent_lookups_of_the_same_prefix and test_abort_loading_requests still asserted flush-on-finish, so they failed once the target vLLM SHA picked up #45823. ### Upstream PR vllm-project/vllm#45823 ### Fix Drop the stale expected_flushed_gpu_block_indexes assertions in the two affected tests (matching upstream's own equivalents, which assert no flush in these scenarios). test_request_preemption keeps its flush-on-preemption assertion, which upstream still honors. --------- Signed-off-by: Paweł Olejniczak <pawelx.olejniczak@intel.com>
Signed-off-by: Sriusa4414@gmail.com Signed-off-by: srinivas_oo7 <Sriusa4414@gmail.com> Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Signed-off-by: Or Ozeri <oro@il.ibm.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: Srinivasoo7 <158864704+Srinivasoo7@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Or Ozeri <oro@il.ibm.com>
Purpose
This PR adds support for telemetry emissions from the OffloadingManager interface inside the KV Connector.
As part of isolating the changes from the block-reuse frequency tracking PR, the OffloadingManager interface requires a standardized method to emit statistics up to the scheduler's KVConnector interfaces. This change:
OffloadingConnectorStats.reduceto gracefully support flat scalar statistics (ints, floats) alongside its standard list metrics.self.connector_scheduler.managerwithinOffloadingConnector.get_kv_connector_stats().stores_skippeddirectly from the FilteredOffloadingManager (formerlyStoreReusedOffloadingManager), tracking exactly how many block hashes failed the LRU reuse threshold.Test Plan
pytest tests/v1/kv_offloadto ensure manager abstraction and logic remain structurally sound.ruff formatandruff checkon the modified files to ensure compliance with the repository layout.Test Result