Skip to content
62 changes: 42 additions & 20 deletions python/ray/_private/ray_logging/filters.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,55 @@
import logging
from typing import Any, Dict

import ray
from ray._private.ray_logging.constants import LogKey


class CoreContextFilter(logging.Filter):
def filter(self, record):
TASK_LEVEL_LOG_KEYS = [
LogKey.TASK_ID.value,
LogKey.TASK_NAME.value,
LogKey.TASK_FUNCTION_NAME.value,
]

@classmethod
def get_ray_core_logging_context(cls) -> Dict[str, Any]:
Comment thread
abrarsheikh marked this conversation as resolved.
"""
Get the ray core logging context as a dict.
Only use this function if you need include the attributes to the log record
yourself by bypassing the filter.
"""
if not ray.is_initialized():
# There is no additional context if ray is not initialized
return True
return {}

runtime_context = ray.get_runtime_context()
setattr(record, LogKey.JOB_ID.value, runtime_context.get_job_id())
setattr(record, LogKey.WORKER_ID.value, runtime_context.get_worker_id())
setattr(record, LogKey.NODE_ID.value, runtime_context.get_node_id())
ray_core_logging_context = {
LogKey.JOB_ID.value: runtime_context.get_job_id(),
LogKey.WORKER_ID.value: runtime_context.get_worker_id(),
LogKey.NODE_ID.value: runtime_context.get_node_id(),
}
if runtime_context.worker.mode == ray.WORKER_MODE:
actor_id = runtime_context.get_actor_id()
if actor_id is not None:
setattr(record, LogKey.ACTOR_ID.value, actor_id)
task_id = runtime_context.get_task_id()
if task_id is not None:
setattr(record, LogKey.TASK_ID.value, task_id)
task_name = runtime_context.get_task_name()
if task_name is not None:
setattr(record, LogKey.TASK_NAME.value, task_name)
task_function_name = runtime_context.get_task_function_name()
if task_function_name is not None:
setattr(record, LogKey.TASK_FUNCTION_NAME.value, task_function_name)
actor_name = runtime_context.get_actor_name()
if actor_name is not None:
setattr(record, LogKey.ACTOR_NAME.value, actor_name)
ray_core_logging_context[
LogKey.ACTOR_ID.value
] = runtime_context.get_actor_id()
ray_core_logging_context[
LogKey.TASK_ID.value
] = runtime_context.get_task_id()
ray_core_logging_context[
LogKey.TASK_NAME.value
] = runtime_context.get_task_name()
ray_core_logging_context[
LogKey.TASK_FUNCTION_NAME.value
] = runtime_context.get_task_function_name()
ray_core_logging_context[
LogKey.ACTOR_NAME.value
] = runtime_context.get_actor_name()
return ray_core_logging_context

def filter(self, record):
context = self.get_ray_core_logging_context()
for key, value in context.items():
if value is not None:
setattr(record, key, value)
return True
1 change: 1 addition & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def parse_latency_buckets(bucket_str: str, default_buckets: list) -> list:
"serve_access_log",
"task_id",
"job_id",
"skip_context_filter",
}

RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S = int(
Expand Down
18 changes: 17 additions & 1 deletion python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@
buildin_print = builtins.print


def should_skip_context_filter(record: logging.LogRecord) -> bool:
"""Check if the log record should skip the context filter."""
return getattr(record, "skip_context_filter", False)


class ServeCoreContextFilter(CoreContextFilter):
def filter(self, record: logging.LogRecord) -> bool:
if should_skip_context_filter(record):
return True
return super().filter(record)


class ServeComponentFilter(logging.Filter):
"""Serve component filter.

Expand All @@ -56,6 +68,8 @@ def filter(self, record: logging.LogRecord) -> bool:
Note: the filter doesn't do any filtering, it only adds the component
attributes.
"""
if should_skip_context_filter(record):
return True
if self.component_type and self.component_type == ServeComponentType.REPLICA:
setattr(record, SERVE_LOG_DEPLOYMENT, self.component_name)
setattr(record, SERVE_LOG_REPLICA, self.component_id)
Expand All @@ -77,6 +91,8 @@ class ServeContextFilter(logging.Filter):
"""

def filter(self, record):
if should_skip_context_filter(record):
return True
request_context = ray.serve.context._get_serve_request_context()
if request_context.route:
setattr(record, SERVE_LOG_ROUTE, request_context.route)
Expand Down Expand Up @@ -359,7 +375,7 @@ def configure_component_logger(
"'LoggingConfig' to enable json format."
)
if RAY_SERVE_ENABLE_JSON_LOGGING or logging_config.encoding == EncodingType.JSON:
file_handler.addFilter(CoreContextFilter())
file_handler.addFilter(ServeCoreContextFilter())
file_handler.addFilter(ServeContextFilter())
file_handler.addFilter(
ServeComponentFilter(component_name, component_id, component_type)
Expand Down
44 changes: 42 additions & 2 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import ray
from ray._common.utils import get_or_create_event_loop
from ray._private.ray_logging.filters import CoreContextFilter
from ray.serve._private.common import (
DeploymentID,
EndpointInfo,
Expand All @@ -33,6 +34,10 @@
REQUEST_LATENCY_BUCKETS_MS,
SERVE_CONTROLLER_NAME,
SERVE_HTTP_REQUEST_ID_HEADER,
SERVE_LOG_COMPONENT,
SERVE_LOG_COMPONENT_ID,
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
Comment thread
abrarsheikh marked this conversation as resolved.
SERVE_LOGGER_NAME,
SERVE_MULTIPLEXED_MODEL_ID,
SERVE_NAMESPACE,
Expand Down Expand Up @@ -79,7 +84,7 @@
from ray.serve.config import HTTPOptions, gRPCOptions
from ray.serve.generated.serve_pb2 import HealthzResponse, ListApplicationsResponse
from ray.serve.handle import DeploymentHandle
from ray.serve.schema import LoggingConfig
from ray.serve.schema import EncodingType, LoggingConfig
from ray.util import metrics

logger = logging.getLogger(SERVE_LOGGER_NAME)
Expand Down Expand Up @@ -127,6 +132,7 @@ def __init__(
is_head: bool,
proxy_router: ProxyRouter,
request_timeout_s: Optional[float] = None,
access_log_context: Dict[str, Any] = None,
):
self.request_timeout_s = request_timeout_s
if self.request_timeout_s is not None and self.request_timeout_s < 0:
Expand Down Expand Up @@ -203,6 +209,8 @@ def __init__(
# The node is not draining if it's None.
self._draining_start_time: Optional[float] = None

self._access_log_context = access_log_context or {}

getattr(ServeUsageTag, f"{self.protocol.upper()}_PROXY_USED").record("1")

@property
Expand Down Expand Up @@ -437,14 +445,16 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:
latency_ms = (time.time() - start_time) * 1000.0
if response_handler_info.should_record_access_log:
request_context = ray.serve.context._get_serve_request_context()
self._access_log_context[SERVE_LOG_ROUTE] = request_context.route
self._access_log_context[SERVE_LOG_REQUEST_ID] = request_context.request_id
Comment thread
abrarsheikh marked this conversation as resolved.
logger.info(
access_log_msg(
method=proxy_request.method,
route=request_context.route,
status=str(status.code),
latency_ms=latency_ms,
),
extra={"log_to_stderr": False, "serve_access_log": True},
extra=self._access_log_context,
)

self.request_counter.inc(
Expand Down Expand Up @@ -704,13 +714,15 @@ def __init__(
proxy_router: ProxyRouter,
self_actor_name: str,
request_timeout_s: Optional[float] = None,
access_log_context: Dict[str, Any] = None,
):
super().__init__(
node_id,
node_ip_address,
is_head,
proxy_router,
request_timeout_s=request_timeout_s,
access_log_context=access_log_context,
)
self.self_actor_name = self_actor_name
self.asgi_receive_queues: Dict[str, MessageQueue] = dict()
Expand Down Expand Up @@ -1053,6 +1065,32 @@ def __init__(
configure_component_memory_profiler(
component_name="proxy", component_id=node_ip_address
)
if logging_config.encoding == EncodingType.JSON:
# Create logging context for access logs as a performance optimization.
# While logging_utils can automatically add Ray core and Serve access log context,
# we pre-compute it here since context evaluation is expensive and this context
# will be reused for multiple access log entries.
ray_core_logging_context = CoreContextFilter.get_ray_core_logging_context()
Comment thread
abrarsheikh marked this conversation as resolved.
# remove task level log keys from ray core logging context, it would be nice
# to have task level log keys here but we are letting those go in favor of
# performance optimization. Also we cannot include task level log keys here because
# they would referance the current task (__init__) and not the task that is logging.
for key in CoreContextFilter.TASK_LEVEL_LOG_KEYS:
ray_core_logging_context.pop(key, None)
access_log_context = {
**ray_core_logging_context,
SERVE_LOG_COMPONENT: "proxy",
SERVE_LOG_COMPONENT_ID: self._node_ip_address,
"log_to_stderr": False,
"skip_context_filter": True,
Comment thread
abrarsheikh marked this conversation as resolved.
"serve_access_log": True,
}
else:
access_log_context = {
"log_to_stderr": False,
"skip_context_filter": True,
"serve_access_log": True,
}

is_head = self._node_id == get_head_node_id()
self.proxy_router = ProxyRouter(get_proxy_handle)
Expand All @@ -1063,6 +1101,7 @@ def __init__(
self_actor_name=ray.get_runtime_context().get_actor_name(),
proxy_router=self.proxy_router,
request_timeout_s=self._http_options.request_timeout_s,
access_log_context=access_log_context,
)
self.grpc_proxy = (
gRPCProxy(
Expand All @@ -1071,6 +1110,7 @@ def __init__(
is_head=is_head,
proxy_router=self.proxy_router,
request_timeout_s=self._grpc_options.request_timeout_s,
access_log_context=access_log_context,
)
if grpc_enabled
else None
Expand Down
Loading