[KV Offloading] Add labeled metrics support#45957
Conversation
orozery
left a comment
There was a problem hiding this comment.
Thanks @Srinivasoo7 for the quick help on this!
| @@ -255,7 +310,9 @@ def __init__( | |||
|
|
|||
| self._observe_deprecated_metrics = issubclass(spec_cls, CPUOffloadingSpec) | |||
| self._offloading_metric_defs: dict[str, PromMetricT] = {} | |||
| self.offloading_metrics: dict[tuple[int, str], PromMetricT] = {} | |||
| self.offloading_metrics: dict[ | |||
| tuple[int, str, tuple[object, ...]], PromMetricT | |||
There was a problem hiding this comment.
From what I understand, label values come from the internal stats payload and may not always be strings (e.g. tier IDs as integers). Prometheus converts them to strings when .labels() is called.
If we use String here, it would require callers to pre-convert values.
Let me know your guidance on this!
There was a problem hiding this comment.
I think we want to force labels to be strings.
| @@ -255,7 +310,9 @@ def __init__( | |||
|
|
|||
| self._observe_deprecated_metrics = issubclass(spec_cls, CPUOffloadingSpec) | |||
| self._offloading_metric_defs: dict[str, PromMetricT] = {} | |||
| self.offloading_metrics: dict[tuple[int, str], PromMetricT] = {} | |||
| self.offloading_metrics: dict[ | |||
There was a problem hiding this comment.
Let's add a comment explaining the type of this variable
| @@ -326,11 +384,35 @@ def _create_metric( | |||
| raise AssertionError(f"Unknown offloading metric metadata: {metadata}") | |||
| return metric_cls(**kwargs) | |||
|
|
|||
| def _get_offloading_metric( | |||
There was a problem hiding this comment.
This should actually be _get_prometheus_metric
There was a problem hiding this comment.
Sure, tried to keep the naming consistent with existing class terminology. Will change this
| if key not in self.offloading_metrics: | ||
| engine_labelvalues = self.per_engine_labelvalues[engine_idx] | ||
| self.offloading_metrics[key] = self._offloading_metric_defs[ | ||
| metric_name | ||
| ].labels(*(engine_labelvalues + list(labelvalues))) | ||
| return self.offloading_metrics[key] |
There was a problem hiding this comment.
avoid double reading from dict:
| if key not in self.offloading_metrics: | |
| engine_labelvalues = self.per_engine_labelvalues[engine_idx] | |
| self.offloading_metrics[key] = self._offloading_metric_defs[ | |
| metric_name | |
| ].labels(*(engine_labelvalues + list(labelvalues))) | |
| return self.offloading_metrics[key] | |
| prom_metric = self.offloading_metrics.get(key) | |
| if prom_metric is None: | |
| engine_labelvalues = self.per_engine_labelvalues[engine_idx] | |
| prom_metric = self._offloading_metric_defs[ | |
| metric_name | |
| ].labels(*(engine_labelvalues + list(labelvalues))) | |
| self.offloading_metrics[key] = prom_metric | |
| return prom_metric |
There was a problem hiding this comment.
Okay, .get() is cleaner and avoids the extra lookup I made.
Will change this!
| if not metadata.labelnames: | ||
| for engine_idx, labelvalues in per_engine_labelvalues.items(): | ||
| self.offloading_metrics[(engine_idx, metric_name, ())] = ( | ||
| self._offloading_metric_defs[metric_name].labels(*labelvalues) | ||
| ) |
There was a problem hiding this comment.
I think we can remove this (covered lazily by _get_offloading_metric).
There was a problem hiding this comment.
Hmm.. yeah wanted your opinion on this.
This was intentional, so unlabeled metrics are registered and visible in Prometheus even before they're observed.
| @staticmethod | ||
| def _is_labeled_value_map(value: Any) -> bool: | ||
| return isinstance(value, Mapping) and all( | ||
| isinstance(labelvalues, tuple) for labelvalues in value | ||
| ) | ||
|
|
There was a problem hiding this comment.
I don't think we need this check. Remove? (along with its callsites)
There was a problem hiding this comment.
Oh, I added this as it will help us with the rolling-upgrade compatibility.
Surely can be removed.
| def _normalize_values(self) -> None: | ||
| values = self.data[_StatsKey.DATA] | ||
| for metric_name, value in list(values.items()): | ||
| if self._is_labeled_value_map(value): | ||
| continue | ||
| values[metric_name] = {(): value} |
There was a problem hiding this comment.
Don't think we need this.
All stats by construction should be labeled.
There was a problem hiding this comment.
Similar intention, it will help us with the rolling-upgrade compatibility.
Happy to remove!
| with patch.object( | ||
| CPUOffloadingSpec, "build_metric_definitions", return_value=metric_definitions | ||
| ): |
There was a problem hiding this comment.
Let's use a mocked spec instead of patching a real (CPU) spec.
Same for the tests below.
| TIER_BYTES: OffloadingCounterMetadata( | ||
| documentation="tier bytes", | ||
| labelnames=("tier",), | ||
| ), |
There was a problem hiding this comment.
It's a bit weird to use TIER_BYTES as the test case.
Why not just e.g. "my_counter" and "my_label"?
| engine0 = prom_metrics.offloading_metrics[(0, LOAD_BYTES)] | ||
| engine1 = prom_metrics.offloading_metrics[(1, LOAD_BYTES)] | ||
| engine0 = prom_metrics.offloading_metrics[(0, LOAD_BYTES, ())] | ||
| engine1 = prom_metrics.offloading_metrics[(1, LOAD_BYTES, ())] | ||
| assert engine0.increments == [] | ||
| assert engine1.increments == [100] | ||
|
|
There was a problem hiding this comment.
Claude:
Missing test: aggregate into self when self already has some labels but not others
The test test_aggregate_labeled_metrics starts with both stats having TIER_BYTES. There's no test for aggregating when self has a labeled metric and other has a different labeled metric that self doesn't have at all (metric name not present in self). The self._values.setdefault(key, {}) handles it, but a test would be good.
There was a problem hiding this comment.
Okay, I'll add a case on the missing-metric path as well!
4383cf9 to
a03fac4
Compare
|
Hi @orozery |
| if not metadata.labelnames: | ||
| for engine_idx, labelvalues in per_engine_labelvalues.items(): | ||
| self.offloading_metrics[(engine_idx, metric_name, ())] = ( | ||
| self._offloading_metric_defs[metric_name].labels(*labelvalues) | ||
| ) |
| ) -> None: | ||
| for observation in value: | ||
| self.offloading_metrics[(engine_idx, metric_name)].observe(observation) | ||
| if not self._observe_deprecated_metrics: | ||
| self._get_prometheus_metric(metric_name, labelvalues, engine_idx).observe( |
There was a problem hiding this comment.
Let's move self._get_prometheus_metric(metric_name, labelvalues, engine_idx) outside of the loop.
|
Hi @Srinivasoo7, the pre-commit checks have failed. Please run: uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, |
Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com>
a03fac4 to
dac20e2
Compare
Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com>
Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com>
Signed-off-by: srinivas_oo7 <sklinkedin0120@gmail.com> Co-authored-by: srinivas_oo7 <sklinkedin0120@gmail.com> Signed-off-by: Qiang Li <qiang.li2@amd.com>
Wire SimpleCPUOffloadConnector into the offloading stats / labeled-metrics API so its CPU KV transfers report through the shared vllm:kv_offload_* family (store/load bytes/time/size), plus per-connector CPU-pool gauges (vllm:simple_cpu_offload_*: total/free/used blocks, usage, pending loads/stores). Transfer timing brackets only the DMA: host-side batch-argument preparation is split out of copy_blocks() into prepare_copy()/launch_prepared_copy() and runs before the start CUDA event, and the vllm-project#46278 compute-done wait stays ordered before it, so kv_offload_*_time excludes host enqueue and compute-wait latency. Timing events are recycled via a small pool. Supersedes vllm-project#41790 (built on the pre-vllm-project#45957 metrics API); preserves vllm-project#43877 (scheduler post-completion stats aggregation) and vllm-project#46278 (GPU->CPU store barrier + srcAccessOrder). Tests: tests/v1/simple_kv_offload/test_metrics.py covers stats helpers/reduce, prom observe() end-to-end, serialized round-trip, scheduler pool gauges, worker transfer recording + reset-on-read, and event-pool reuse (no GPU required). AI assistance was used; every line was human-reviewed. Co-authored-by: OCWC22 <OCWC22@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Change72 <cguo51@asu.edu>
Wire SimpleCPUOffloadConnector into the offloading stats / labeled-metrics API so its CPU KV transfers report through the shared vllm:kv_offload_* family (store/load bytes/time/size), plus per-connector CPU-pool gauges (vllm:simple_cpu_offload_*: total/free/used blocks, usage, pending loads/stores). Transfer timing brackets only the DMA: host-side batch-argument preparation is split out of copy_blocks() into prepare_copy()/launch_prepared_copy() and runs before the start CUDA event; the vllm-project#46278 compute-done wait is enqueued before the prepare so it captures the shared compute event's current state and stays outside the timing bracket. So kv_offload_*_time excludes host enqueue and compute-wait latency. Timing events are recycled via a pool. Supersedes vllm-project#41790 (built on the pre-vllm-project#45957 metrics API); preserves vllm-project#43877 (scheduler post-completion stats aggregation) and vllm-project#46278 (GPU->CPU store barrier + srcAccessOrder). Tests: tests/v1/simple_kv_offload/test_metrics.py covers stats build/reduce, prom observe() end-to-end (transfer + pool gauges), scheduler pool gauges, worker transfer recording + reset-on-read, and event-pool reuse (no GPU required); tests/v1/simple_kv_offload/test_worker.py store-ordering still passes on GPU. AI assistance was used. Co-authored-by: OCWC22 <OCWC22@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Change72 <cguo51@asu.edu>
Wire SimpleCPUOffloadConnector into the offloading stats / labeled-metrics API so its CPU KV transfers report through the shared vllm:kv_offload_* family (store/load bytes/time/size), plus per-connector CPU-pool gauges (vllm:simple_cpu_offload_*: total/free/used blocks, usage, pending loads/stores). Transfer timing brackets only the DMA: host-side batch-argument preparation is split out of copy_blocks() into prepare_copy()/launch_prepared_copy() and runs before the start CUDA event; the vllm-project#46278 compute-done wait is enqueued before the prepare so it captures the shared compute event's current state and stays outside the timing bracket. So kv_offload_*_time excludes host enqueue and compute-wait latency. Timing events are recycled via a pool. Supersedes vllm-project#41790 (built on the pre-vllm-project#45957 metrics API); preserves vllm-project#43877 (scheduler post-completion stats aggregation) and vllm-project#46278 (GPU->CPU store barrier + srcAccessOrder). Tests: tests/v1/simple_kv_offload/test_metrics.py covers stats build/reduce, prom observe() end-to-end (transfer + pool gauges), scheduler pool gauges, worker transfer recording + reset-on-read, and event-pool reuse (no GPU required); tests/v1/simple_kv_offload/test_worker.py store-ordering still passes on GPU. AI assistance was used. Co-authored-by: OCWC22 <OCWC22@users.noreply.github.com> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Change72 <cguo51@asu.edu>
Summary
labelnamesto offloading metric metadata{metric_name: {labelvalues_tuple: value}}