How Klaviyo built a robust model serving platform with Ray Serve
We, the Data Science platform team at Klaviyo, have been using Ray for a number of different use cases including online model inference and running periodic and ad-hoc training and inference jobs. In this blog post, I want to share insights and lessons from over a year of using Ray Serve as our model serving platform: DART (DAtascience RunTime) Online.
What is Ray Serve?
Ray Serve is a model serving library, built on top of Ray, used for building online inference APIs. The great thing about Ray Serve is that it is model agnostic — which means that you can use it as a generic tool to deploy any kind of ML model, including any and all business logic that goes along with it. Technically, your Ray Serve deployments don’t even need to have a machine learning model at all! At Klaviyo, we use Ray Serve to deploy everything from large transformer models to XGBoost and logistic regression models without needing to write any extra scaffolding.
You can learn more about Ray Serve on their official website. The rest of this post assumes that you are familiar with Ray Serve’s architecture and key concepts.
Why we chose Ray Serve
Up until two years ago, Klaviyo did not have a standardized platform to deploy machine learning models to production. For every new model that needed to be deployed, a new Flask or FastAPI application was built from the ground up. This included setting up all the AWS infrastructure and CI pipelines to build new docker images. This meant that adding new machine learning features to the Klaviyo app took a long time. The Data Science Platform team was created to tackle this problem, and decrease the time it took to deploy new ML models into production.
We looked at a lot of different frameworks for ML model serving, including SageMaker, KServe, BentoML, TorchServe and Ray Serve. After a lot of research and a few PoCs, we decided to move forward with Ray Serve because:
- Ray Serve is platform agnostic: A lot of other ML platforms require different types of setup to deploy different types of models. These platforms usually have good support for common frameworks like Scikit-Learn and PyTorch, but things can get complicated if a data scientist is working with a new cutting-edge package, or something that isn’t very popular.
- It supports adding arbitrary business logic: Some ML platforms make it very easy to deploy ML models, but ML models only. For example, SageMaker lets you deploy certain models where you just need to specify the model location on S3, and the model can be deployed in a single click. This is very useful. But at Klaviyo, models usually require pre and post processing steps with custom business logic. Ray Serve allows you to write arbitrary code and that fits perfectly with our needs.
- Ray / Serve is optimized for running ML models: Ray Serve is optimized for ML. It has features such as request batching, which comes from the realization that many ML models are optimized to run predictions in batches. For example, with one model, I was able to batch 8 requests, with just a 10–20% increase in the overall latency. This allows you to serve high throughput models with a low number of replicas, resulting in significant cost savings. Other such features include model composition and model multiplexing.
- The wider Ray ecosystem: Ray also has a large ecosystem, with a set of high-level libraries maintained by the Ray creators, such as Ray Datasets, Ray Train, Ray Tune, Ray RLib and of course Ray Serve. These are powerful libraries for training and serving large ML models at scale. Using Ray for both training and serving makes it easier to move models into production, keeping all the Ray optimizations in place. Ray also has a lot of third-party integrations with other packages such as Prefect, HuggingFace, and Flyte.
- Great community and developer support: Over the course of building DART, I’ve interacted with the Ray community on Slack quite a lot. My questions are usually answered quickly by other Ray users or the Ray developers. I’ve reported numerous bugs and features requests on Ray and KubeRay’s GitHub repository, and they’ve all been implemented in anywhere between two days and two months of reporting. I’ve also personally met the Ray and KubeRay developers, and they’re always looking for feedback from the Ray users to decide their roadmap.
Deploying Ray Serve on Kubernetes
KubeRay is an open source kubernetes operator that streamlines the deployment and management of Ray clusters on kubernetes. The recommended way to deploy Ray Serve in production is to use the RayService custom resource from KubeRay.
KubeRay includes several features such as:
- Zero downtime upgrade — allows you to upgrade docker images and certain configs without any disruption to your online service. It spins up a parallel Ray cluster and switches over traffic once all deployments in the new cluster are running and healthy.
- High availability — allows you to “outsource” the Global Control Service (GCS) to an external highly-available redis so the cluster can recover after a head node failure.
DART Online
Klaviyo’s DART Online is a robust, highly available, online model serving platform built on top of Ray Serve.
All our models are hosted on the same Ray cluster, as different Ray Serve applications. This helps us share resources across our applications, which helps contain GPU costs. We use the KubeRay RayService resource, with all the high availability settings. This means that we have external Redis instances connected to the Ray clusters to prevent the head node from being a single point of failure.
We use two identical such Ray clusters for better fault tolerance. Incoming traffic is evenly routed between the two. This is done using Route53’s weighted routing feature. The benefit of using Route53 to distribute traffic is that it can be connected to a health check that can be used to stop sending traffic to one of the Ray clusters in case it becomes unhealthy.
We connect the Route53 health check to our own health check service which marks a cluster as unhealthy if any one of the deployments are unhealthy or if the Ray Serve health check endpoint fails. This is a very conservative health check mechanism which could be made less strict.
What does DART Online add to Ray Serve?
Ray Serve is a great framework by itself. It has its own fault tolerance mechanisms, and a bunch of other features that makes it easy to serve machine learning models at scale.
Architecturally, DART Online adds an extra layer of fault tolerance on top of Ray Serve by creating a duplicate Ray cluster and routing traffic based on the health of the cluster. We’ve found this to be critical in cases where the head node goes down, for example if it gets evicted because the node is low on resources or the node is drained manually to apply AMI changes to the underlying kubernetes node.
In terms of developing and deploying new applications, DART Online requires all applications to be defined in a class with a specific template, which inherits from a base class. The base class provides a bunch of helpful methods, including setup and access to Klaviyo’s internal monitoring systems, payload validation, and handling errors by responding with correct status codes. This helps our data scientists focus on writing the business logic for the application, and not worry about the scaffolding.
DART Online also provides a similar base class for running integration tests which spins up Ray Serve with the application(s) we’re testing, and allows data scientists to write tests that interact with the server directly to validate the responses, without worrying about all the scaffolding around it.
Get Smit Kiri’s stories in your inbox
Join Medium for free to get updates from this writer.
We also provide a client package that allows stakeholders to call different applications deployed on DART Online, which makes it easy to integrate DART Online applications with other Klaviyo services.
Lifecycle of a request
Here’s exactly how a client request is handled by DART Online:
- Route53 weighted routing checks the health status of each cluster.
- If both clusters are healthy, it randomly picks one and forwards the request to that cluster’s load balancer (ALB).
- The ALB forwards the request to one of the Ray nodes (kubernetes pods) in a round robin.
- In the Ray node, the request goes to the Ray Serve HTTP proxy which parses the request.
- The proxy randomly selects at most two replicas corresponding to the endpoint.
— In the first iteration, only replicas co-located on the same node as the router are considered.
— If there are no replicas on the same node, it will consider all replicas on all nodes. - For these two replicas, the router will check two things:
— Is it available? i.e. Is number of requests running on the replica > max_concurrent_queries?
— What’s the queue depth? - If both replicas are available, the request is forwarded to the replica with the lowest queue depth.
- If none of the two replicas are available, the entire process is run again with backoff.
- The request then stays in the replica’s queue until the replica is available to process it.
- Once the replica is available, it processes the request and sends back the response to the client.
If the client disconnects when the request is still in the queue, it is dropped. If the client disconnects when the request is still being processed, Ray Serve will try to stop the process.
Two takeaways from this routing behavior:
- Requests for all applications go through “shared” HTTP proxies. Too many requests with large payloads to one endpoint can slow them down, affecting other applications.
- If your cluster is large, and your replicas aren’t spread out well enough throughout the cluster, most requests will need to have an extra hop to a Ray node that actually is running the replica. This can add a tiny bit of latency to the overall lifetime of a request. The same thing applies if your deployment calls any other deployment handles. This shouldn’t be a problem for most people, but could be a problem if your application requires ultra-low latency.
What happens when something goes wrong?
Some (or all) replicas for a deployment go down in one of the two Ray clusters — In this situation, the Route53 health check will mark that cluster unhealthy, and not forward any requests to it. Ray Serve will spin those replicas back up. Once all replicas spin back up, the cluster will be marked healthy again and traffic will start flowing to it.
Some replicas of a deployment go down in both clusters — Both clusters will be marked unhealthy, and traffic will be sent to both clusters 50–50. Ray Serve will forward these requests to the replicas that are running. Requests may be queued up depending on how much traffic it receives and how many replicas are available.
All replicas of a deployment are down in both clusters — In this case, the traffic is sent to both clusters, and Ray Serve will put the request in a queue. It waits for a replica to be available, but the request will be dropped if the client disconnects while waiting for a response.
When should you use extra fault tolerance?
You don’t always need a setup like us with duplicate Ray clusters. Here are situations where multiple clusters is overkill:
- The app doesn’t need high availability: This is the most obvious reason. If you don’t mind your app occasionally returning 5xx errors that usually recover in a few minutes, you don’t need any of the extra setup we have.
- Your Kubernetes pods are stable: This means that the pods aren’t randomly evicted or nodes aren’t occasionally drained. One reason that we have the multi-cluster setup is because we drain our nodes for security patches which take down Ray cluster nodes. Pods can be evicted for many different reasons outlined here, so there is always some risk of your Ray nodes going down.
- The app gets very low traffic: If a head / worker pod goes down, kubernetes will try to schedule it on to a new node. This is fairly quick (a couple seconds) if you have enough resources available in your kubernetes cluster. Depending on the time it takes for your app to start up, a Ray node going down might not impact your traffic at all.
Multiple applications on a cluster
Ray Serve supports deploying multiple applications on the same Ray cluster. If your applications depend on one another and / or if you need to deploy changes to multiple applications at the same time, you should use the same cluster to deploy multiple applications. It also makes it simpler to manage infrastructure. If you have any infra deployed around a Ray cluster, such as load balancers or IAM policies, you don’t need to maintain several copies of them. It also makes it easier to share resources across multiple applications which can sometimes result in cost savings. However, there are several caveats when choosing this architecture:
- Heavy traffic on one app can impact others: If one of your applications suddenly starts receiving heavy traffic and/or the payloads are very big and if you don’t have enough Ray nodes running, traffic to other applications will be affected. This is because all apps share proxies deployed on all nodes and the proxy would become a bottleneck.
- You’ll need 2x resources available: Most updates to the RayService manifest will trigger a “zero downtime upgrade” of the cluster. This means that KubeRay will spin up a new cluster with the new manifest. Once that is ready, it switches all traffic to the new cluster and deletes the old cluster. This means that you need 2x resources available to be able to make updates to your cluster. This is sometimes difficult if you have a large cluster with a lot of GPU nodes.
- One app can block all others from starting up: We’ve run into this issue during incidents where the entire cluster is down, and we need to wait for a new cluster to start up. Because we have all applications on the same Ray cluster, we need to wait for all of them to initialize for traffic to start flowing into it. We run into this when the new cluster is trying to spin up new GPUs for one of the applications, which takes a while, and it increases downtime for other applications even though they might normally start up quickly.
Pitfalls to avoid
- Sharing Redis for multiple Ray clusters: This is something that we learned the hard way. If you share the same Redis instance for multiple Ray clusters, and if Redis goes down, all your clusters will go down with it. This is especially important to avoid if you’re using a different cluster for better fault tolerance. It is always better to have separate Redis instances for your important workloads, if not all.
- Installing too many packages at run time: Ray Serve has a feature where you can specify the pip packages you need for a deployment, and those will be installed at run time in the node. This is great, because you can have a small “base” image and each of your deployments can run in its own environment. However, these packages get installed in ephemeral storage of the kubernetes pod, and the size of these environments can get very large, very quickly. We’ve seen these get to ~10–15 GB, which can cause the pod to be evicted. This is a big problem, especially if the head node is evicted, because it doesn’t automatically restart, and it can take down the whole Ray cluster. (See next point.)
- Running workloads on the head node: Running anything on the head node other than the core Ray processes can increase the chance of the head node going down. For example, we’ve seen the head node pod being evicted by kubernetes because it was running low on a resource such as ephemeral storage. Evicted pods in kubernetes will not restart automatically. So the Ray head node pod will stay down until you manually restart it. If the worker pods cannot reach the head, they will crash too, taking the entire cluster down. Even when you manually restart the evicted pod, there is a possibility that you’ll run into the same error again. It is best to set
num-cpus: "0"in therayStartParamsfield of the head node as specified here. - Replicas aren’t spread out across the cluster: By default, replicas are randomly assigned to nodes. However I’ve noticed that a lot of the times all replicas of my deployment end up on the same Ray node. This is a problem because if the Ray node goes down, it will take down all replicas with it, resulting in a downtime for your application. Make sure you set the
max_replicas_per_nodeparameter for your deployment to spread your replicas across multiple nodes. - Using very large Ray nodes: There’s a balance in choosing the size of your Ray nodes. Larger Ray nodes reduce the internal networking latency between Ray processes which could boost performance. On the other hand, if a large Ray node goes down, it takes down a lot of replicas with it which means a larger impact on your Ray Serve applications. I would recommend starting with your best guess at a node size. Assess the impact of one of those nodes going down. Increase or decrease the node size until you’re comfortable with the impact of one (or maybe two) nodes going down.
Conclusion
We’ve been using Ray Serve for a year and now have ~20 machine learning applications running on it. We’re happy with our choice. We’ve had a huge boost in operational efficiency and our year of experience has provided us with insights into both the capabilities and limitations of this powerful tool. Our next step with DART Online is to address a lesson discussed above. We’re going to migrate each application to its own dedicated Ray cluster with all dependencies installed within the docker image to enhance isolation, cluster spin up times and fault tolerance. I expect we’ll continue to get better and better at operating Ray Serve and I plan to write another blog post when we have new insights to share.
