Skip to content

Commit 7dfb6ce

Browse files
abrarsheikhdstrodtman
authored andcommitted
use cached contexts for access logs in request path (ray-project#55166)
### Performance Comparison Table | Metric | GET / (Before) | GET / (After) | Aggregated (Before) | Aggregated (After) | |--------------------------|----------------|----------------|----------------------|---------------------| | # Requests | 27,390 | 284,097 | 27,390 | 284,097 | | # Fails | 0 | 0 | 0 | 0 | | Median (ms) | 450 | 400 | 450 | 400 | | 95%ile (ms) | 520 | 520 | 520 | 520 | | 99%ile (ms) | 540 | 570 | 540 | 570 | | Average (ms) | 451.13 | 400.84 | 451.13 | 400.84 | | Min (ms) | 226 | 160 | 226 | 160 | | Max (ms) | 728 | 777 | 728 | 777 | | Average size (bytes) | 13 | 13 | 13 | 13 | | Current RPS | 220.5 | 256.8 | 220.5 | 256.8 | | Current Failures/s | 0 | 0 | 0 | 0 | --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
1 parent 71e97f4 commit 7dfb6ce

5 files changed

Lines changed: 161 additions & 40 deletions

File tree

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,55 @@
11
import logging
2+
from typing import Any, Dict
23

34
import ray
45
from ray._private.ray_logging.constants import LogKey
56

67

78
class CoreContextFilter(logging.Filter):
8-
def filter(self, record):
9+
TASK_LEVEL_LOG_KEYS = [
10+
LogKey.TASK_ID.value,
11+
LogKey.TASK_NAME.value,
12+
LogKey.TASK_FUNCTION_NAME.value,
13+
]
14+
15+
@classmethod
16+
def get_ray_core_logging_context(cls) -> Dict[str, Any]:
17+
"""
18+
Get the ray core logging context as a dict.
19+
Only use this function if you need include the attributes to the log record
20+
yourself by bypassing the filter.
21+
"""
922
if not ray.is_initialized():
1023
# There is no additional context if ray is not initialized
11-
return True
24+
return {}
1225

1326
runtime_context = ray.get_runtime_context()
14-
setattr(record, LogKey.JOB_ID.value, runtime_context.get_job_id())
15-
setattr(record, LogKey.WORKER_ID.value, runtime_context.get_worker_id())
16-
setattr(record, LogKey.NODE_ID.value, runtime_context.get_node_id())
27+
ray_core_logging_context = {
28+
LogKey.JOB_ID.value: runtime_context.get_job_id(),
29+
LogKey.WORKER_ID.value: runtime_context.get_worker_id(),
30+
LogKey.NODE_ID.value: runtime_context.get_node_id(),
31+
}
1732
if runtime_context.worker.mode == ray.WORKER_MODE:
18-
actor_id = runtime_context.get_actor_id()
19-
if actor_id is not None:
20-
setattr(record, LogKey.ACTOR_ID.value, actor_id)
21-
task_id = runtime_context.get_task_id()
22-
if task_id is not None:
23-
setattr(record, LogKey.TASK_ID.value, task_id)
24-
task_name = runtime_context.get_task_name()
25-
if task_name is not None:
26-
setattr(record, LogKey.TASK_NAME.value, task_name)
27-
task_function_name = runtime_context.get_task_function_name()
28-
if task_function_name is not None:
29-
setattr(record, LogKey.TASK_FUNCTION_NAME.value, task_function_name)
30-
actor_name = runtime_context.get_actor_name()
31-
if actor_name is not None:
32-
setattr(record, LogKey.ACTOR_NAME.value, actor_name)
33+
ray_core_logging_context[
34+
LogKey.ACTOR_ID.value
35+
] = runtime_context.get_actor_id()
36+
ray_core_logging_context[
37+
LogKey.TASK_ID.value
38+
] = runtime_context.get_task_id()
39+
ray_core_logging_context[
40+
LogKey.TASK_NAME.value
41+
] = runtime_context.get_task_name()
42+
ray_core_logging_context[
43+
LogKey.TASK_FUNCTION_NAME.value
44+
] = runtime_context.get_task_function_name()
45+
ray_core_logging_context[
46+
LogKey.ACTOR_NAME.value
47+
] = runtime_context.get_actor_name()
48+
return ray_core_logging_context
49+
50+
def filter(self, record):
51+
context = self.get_ray_core_logging_context()
52+
for key, value in context.items():
53+
if value is not None:
54+
setattr(record, key, value)
3355
return True

‎python/ray/serve/_private/constants.py‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ def parse_latency_buckets(bucket_str: str, default_buckets: list) -> list:
247247
"serve_access_log",
248248
"task_id",
249249
"job_id",
250+
"skip_context_filter",
250251
}
251252

252253
RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S = int(

‎python/ray/serve/_private/logging_utils.py‎

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@
3434
buildin_print = builtins.print
3535

3636

37+
def should_skip_context_filter(record: logging.LogRecord) -> bool:
38+
"""Check if the log record should skip the context filter."""
39+
return getattr(record, "skip_context_filter", False)
40+
41+
42+
class ServeCoreContextFilter(CoreContextFilter):
43+
def filter(self, record: logging.LogRecord) -> bool:
44+
if should_skip_context_filter(record):
45+
return True
46+
return super().filter(record)
47+
48+
3749
class ServeComponentFilter(logging.Filter):
3850
"""Serve component filter.
3951
@@ -56,6 +68,8 @@ def filter(self, record: logging.LogRecord) -> bool:
5668
Note: the filter doesn't do any filtering, it only adds the component
5769
attributes.
5870
"""
71+
if should_skip_context_filter(record):
72+
return True
5973
if self.component_type and self.component_type == ServeComponentType.REPLICA:
6074
setattr(record, SERVE_LOG_DEPLOYMENT, self.component_name)
6175
setattr(record, SERVE_LOG_REPLICA, self.component_id)
@@ -77,6 +91,8 @@ class ServeContextFilter(logging.Filter):
7791
"""
7892

7993
def filter(self, record):
94+
if should_skip_context_filter(record):
95+
return True
8096
request_context = ray.serve.context._get_serve_request_context()
8197
if request_context.route:
8298
setattr(record, SERVE_LOG_ROUTE, request_context.route)
@@ -359,7 +375,7 @@ def configure_component_logger(
359375
"'LoggingConfig' to enable json format."
360376
)
361377
if RAY_SERVE_ENABLE_JSON_LOGGING or logging_config.encoding == EncodingType.JSON:
362-
file_handler.addFilter(CoreContextFilter())
378+
file_handler.addFilter(ServeCoreContextFilter())
363379
file_handler.addFilter(ServeContextFilter())
364380
file_handler.addFilter(
365381
ServeComponentFilter(component_name, component_id, component_type)

‎python/ray/serve/_private/proxy.py‎

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import ray
1818
from ray._common.utils import get_or_create_event_loop
19+
from ray._private.ray_logging.filters import CoreContextFilter
1920
from ray.serve._private.common import (
2021
DeploymentID,
2122
EndpointInfo,
@@ -33,6 +34,10 @@
3334
REQUEST_LATENCY_BUCKETS_MS,
3435
SERVE_CONTROLLER_NAME,
3536
SERVE_HTTP_REQUEST_ID_HEADER,
37+
SERVE_LOG_COMPONENT,
38+
SERVE_LOG_COMPONENT_ID,
39+
SERVE_LOG_REQUEST_ID,
40+
SERVE_LOG_ROUTE,
3641
SERVE_LOGGER_NAME,
3742
SERVE_MULTIPLEXED_MODEL_ID,
3843
SERVE_NAMESPACE,
@@ -79,7 +84,7 @@
7984
from ray.serve.config import HTTPOptions, gRPCOptions
8085
from ray.serve.generated.serve_pb2 import HealthzResponse, ListApplicationsResponse
8186
from ray.serve.handle import DeploymentHandle
82-
from ray.serve.schema import LoggingConfig
87+
from ray.serve.schema import EncodingType, LoggingConfig
8388
from ray.util import metrics
8489

8590
logger = logging.getLogger(SERVE_LOGGER_NAME)
@@ -127,6 +132,7 @@ def __init__(
127132
is_head: bool,
128133
proxy_router: ProxyRouter,
129134
request_timeout_s: Optional[float] = None,
135+
access_log_context: Dict[str, Any] = None,
130136
):
131137
self.request_timeout_s = request_timeout_s
132138
if self.request_timeout_s is not None and self.request_timeout_s < 0:
@@ -203,6 +209,8 @@ def __init__(
203209
# The node is not draining if it's None.
204210
self._draining_start_time: Optional[float] = None
205211

212+
self._access_log_context = access_log_context or {}
213+
206214
getattr(ServeUsageTag, f"{self.protocol.upper()}_PROXY_USED").record("1")
207215

208216
@property
@@ -437,14 +445,16 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:
437445
latency_ms = (time.time() - start_time) * 1000.0
438446
if response_handler_info.should_record_access_log:
439447
request_context = ray.serve.context._get_serve_request_context()
448+
self._access_log_context[SERVE_LOG_ROUTE] = request_context.route
449+
self._access_log_context[SERVE_LOG_REQUEST_ID] = request_context.request_id
440450
logger.info(
441451
access_log_msg(
442452
method=proxy_request.method,
443453
route=request_context.route,
444454
status=str(status.code),
445455
latency_ms=latency_ms,
446456
),
447-
extra={"log_to_stderr": False, "serve_access_log": True},
457+
extra=self._access_log_context,
448458
)
449459

450460
self.request_counter.inc(
@@ -704,13 +714,15 @@ def __init__(
704714
proxy_router: ProxyRouter,
705715
self_actor_name: str,
706716
request_timeout_s: Optional[float] = None,
717+
access_log_context: Dict[str, Any] = None,
707718
):
708719
super().__init__(
709720
node_id,
710721
node_ip_address,
711722
is_head,
712723
proxy_router,
713724
request_timeout_s=request_timeout_s,
725+
access_log_context=access_log_context,
714726
)
715727
self.self_actor_name = self_actor_name
716728
self.asgi_receive_queues: Dict[str, MessageQueue] = dict()
@@ -1053,6 +1065,32 @@ def __init__(
10531065
configure_component_memory_profiler(
10541066
component_name="proxy", component_id=node_ip_address
10551067
)
1068+
if logging_config.encoding == EncodingType.JSON:
1069+
# Create logging context for access logs as a performance optimization.
1070+
# While logging_utils can automatically add Ray core and Serve access log context,
1071+
# we pre-compute it here since context evaluation is expensive and this context
1072+
# will be reused for multiple access log entries.
1073+
ray_core_logging_context = CoreContextFilter.get_ray_core_logging_context()
1074+
# remove task level log keys from ray core logging context, it would be nice
1075+
# to have task level log keys here but we are letting those go in favor of
1076+
# performance optimization. Also we cannot include task level log keys here because
1077+
# they would referance the current task (__init__) and not the task that is logging.
1078+
for key in CoreContextFilter.TASK_LEVEL_LOG_KEYS:
1079+
ray_core_logging_context.pop(key, None)
1080+
access_log_context = {
1081+
**ray_core_logging_context,
1082+
SERVE_LOG_COMPONENT: "proxy",
1083+
SERVE_LOG_COMPONENT_ID: self._node_ip_address,
1084+
"log_to_stderr": False,
1085+
"skip_context_filter": True,
1086+
"serve_access_log": True,
1087+
}
1088+
else:
1089+
access_log_context = {
1090+
"log_to_stderr": False,
1091+
"skip_context_filter": True,
1092+
"serve_access_log": True,
1093+
}
10561094

10571095
is_head = self._node_id == get_head_node_id()
10581096
self.proxy_router = ProxyRouter(get_proxy_handle)
@@ -1063,6 +1101,7 @@ def __init__(
10631101
self_actor_name=ray.get_runtime_context().get_actor_name(),
10641102
proxy_router=self.proxy_router,
10651103
request_timeout_s=self._http_options.request_timeout_s,
1104+
access_log_context=access_log_context,
10661105
)
10671106
self.grpc_proxy = (
10681107
gRPCProxy(
@@ -1071,6 +1110,7 @@ def __init__(
10711110
is_head=is_head,
10721111
proxy_router=self.proxy_router,
10731112
request_timeout_s=self._grpc_options.request_timeout_s,
1113+
access_log_context=access_log_context,
10741114
)
10751115
if grpc_enabled
10761116
else None

0 commit comments

Comments
 (0)