Skip to content

Commit a284b82

Browse files
justinyeh1995JiangJiaWei1103win5923Future-Outlier
authored
[Fix][RayJob SidecarMode] Prevent premature job termination during transient head node spikes (#4399)
* [Fix][1/N] defer sidecar submitter failure until dashboard check Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Feature][SidecarMode] Add SidecarSubmitterRestart feature gate for per-container restart Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Feature][SidecarMode] Add SidecarSubmitterRestart feature gate unit test Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Feature][SidecarMode] Add K8s version check if SidecarSubmitterRestart feature gate is enabled at operator startup Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Chore] better explainations Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Chore] remove incorrect nil check Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] SidecarMode should not fail job based on container exit status Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * Trigger CI Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix][SidecarMode] add e2e tests Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix][SidecarMode] clarify RestartPolicyRules exit code rationale Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix][Sidecar Mode] Use ParseGeneric + GitVersion, and add patch version for more robust k8s version comparison Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * Apply suggestions Co-authored-by: Jia-Wei Jiang <36886416+JiangJiaWei1103@users.noreply.github.com> Signed-off-by: JustinYeh <justinyeh1995@gmail.com> * [Fix] use ContainerRestartPolicyOnFailure to express the intention better; update the comment to explain the intention better Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] bump version check to v1.35 and update sidecar mode test to check onFailure restart policy Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] check the existence of the feature gate instead of relying on the derivate result Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * improve the why for using K8s v1.35 Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] Revert checkSidecarContainerStatus removal; scope PR to K8s v1.35+ use case Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] use correct health command constant and argument order in the feature gate test. Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] remove leftover changes Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * [Fix] prevent panicing due to failing to fetch head pod Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * add warning about the risk of having kubelet version skew Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * document ray v2.54.0+ is needed Co-authored-by: Jun-Hao Wan <ken89@kimo.com> Signed-off-by: JustinYeh <justinyeh1995@gmail.com> * add max restart count annotaion, envVar, and default max restart count Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * do not fail the job if the submitter container exited successfully Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * fix BasePythonHealthCommand argument Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * trigger ci Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * address suggestion part 1 Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * k8s 1.35 version Signed-off-by: Future-Outlier <eric901201@gmail.com> * add override Signed-off-by: Future-Outlier <eric901201@gmail.com> * update Signed-off-by: Future-Outlier <eric901201@gmail.com> * update Signed-off-by: Future-Outlier <eric901201@gmail.com> * upd for helm and kustomization Signed-off-by: Future-Outlier <eric901201@gmail.com> * update Signed-off-by: Future-Outlier <eric901201@gmail.com> * fix typo Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * address comment, assert no resubmission Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * bump kind version to the latest version which supports v1.35.0 by default Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * incremental upgrade use kind-config-buildkite.yml (v1.35) Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> * replace hardcoded value to use rayJob.Spec.SubmitterConfig.BackoffLimit Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> --------- Signed-off-by: justinyeh1995 <justinyeh1995@gmail.com> Signed-off-by: JustinYeh <justinyeh1995@gmail.com> Signed-off-by: Future-Outlier <eric901201@gmail.com> Co-authored-by: Jia-Wei Jiang <36886416+JiangJiaWei1103@users.noreply.github.com> Co-authored-by: Jun-Hao Wan <ken89@kimo.com> Co-authored-by: Future-Outlier <eric901201@gmail.com>
1 parent ff0e074 commit a284b82

16 files changed

Lines changed: 448 additions & 28 deletions

File tree

‎.buildkite/setup-env.sh‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export PATH=$PATH:/usr/local/go/bin
77
export DOCKER_API_VERSION=1.43
88

99
# Install kind
10-
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.22.0/kind-linux-amd64
10+
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.31.0/kind-linux-amd64
1111
chmod +x ./kind
1212
mv ./kind /usr/local/bin/kind
1313

‎.buildkite/test-e2e.yml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
image: golang:1.26-bookworm
8484
commands:
8585
- source .buildkite/setup-env.sh
86-
- kind create cluster --wait 900s --config ./ci/kind-config-buildkite-1-29.yml
86+
- kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml
8787
- kubectl config set clusters.kind-kind.server https://docker:6443
8888

8989
# Install MetalLB for LoadBalancer IPs on Kind

‎.buildkite/values-kuberay-operator-override.yaml‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ featureGates:
2020
enabled: true
2121
- name: RayServiceIncrementalUpgrade
2222
enabled: true
23+
- name: SidecarSubmitterRestart
24+
enabled: true

‎ci/kind-config-buildkite.yml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ networking:
99
# https://blog.scottlowe.org/2019/07/30/adding-a-name-to-kubernetes-api-server-certificate/
1010
nodes:
1111
- role: control-plane
12-
image: kindest/node:v1.29.0
12+
image: kindest/node:v1.35.0
1313
kubeadmConfigPatches:
1414
- |
1515
kind: ClusterConfiguration

‎docs/reference/api.md‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ _Appears in:_
593593

594594
| Field | Description | Default | Validation |
595595
| --- | --- | --- | --- |
596-
| `backoffLimit` _integer_ | BackoffLimit of the submitter k8s job. | | |
596+
| `backoffLimit` _integer_ | BackoffLimit of the submitter. In K8sJobMode, this is the K8s Job backoffLimit.<br />In SidecarMode with SidecarSubmitterRestart enabled, this is the maximum container restart count. | | |
597597

598598

599599
#### UpscalingMode

‎ray-operator/apis/ray/v1/rayjob_types.go‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ const (
187187
)
188188

189189
type SubmitterConfig struct {
190-
// BackoffLimit of the submitter k8s job.
190+
// BackoffLimit of the submitter. In K8sJobMode, this is the K8s Job backoffLimit.
191+
// In SidecarMode with SidecarSubmitterRestart enabled, this is the maximum container restart count.
191192
// +optional
192193
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
193194
}

‎ray-operator/config/overlays/test-overrides/deployment-override.yaml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ spec:
99
containers:
1010
- name: kuberay-operator
1111
args:
12-
- --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true,RayMultiHostIndexing=true,RayCronJob=true,RayServiceIncrementalUpgrade=true
12+
- --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true,RayMultiHostIndexing=true,RayCronJob=true,RayServiceIncrementalUpgrade=true,SidecarSubmitterRestart=true

‎ray-operator/controllers/ray/common/job.go‎

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1616
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
17+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
1718
pkgutils "github.com/ray-project/kuberay/ray-operator/pkg/utils"
1819
)
1920

@@ -150,18 +151,21 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
150151
}
151152
cmd = append(cmd, waitLoop...)
152153

153-
// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,
154+
// In Sidecar mode without SidecarSubmitterRestart feature gate enabled, we only support RayJob level retry, which means that the submitter retry won't happen,
154155
// so we won't have to check if the job has been submitted.
155-
if submissionMode == rayv1.K8sJobMode {
156-
// Only check job status in K8s mode to handle duplicated submission gracefully
156+
// In K8sJobMode (submitter Job may retry) or Sidecar mode with SidecarSubmitterRestart feature gate enabled (submitter container may restart on failure).
157+
// we check job status before submitting to handle duplicated submission gracefully.
158+
needsStatusCheck := submissionMode == rayv1.K8sJobMode || (submissionMode == rayv1.SidecarMode && features.Enabled(features.SidecarSubmitterRestart))
159+
160+
if needsStatusCheck {
157161
cmd = append(cmd, "if", "!")
158162
cmd = append(cmd, jobStatusCommand...)
159163
cmd = append(cmd, ";", "then")
160164
}
161165

162166
cmd = append(cmd, jobSubmitCommand...)
163167

164-
if submissionMode == rayv1.K8sJobMode {
168+
if needsStatusCheck {
165169
cmd = append(cmd, "--no-wait")
166170
}
167171

@@ -199,7 +203,7 @@ func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.Jo
199203

200204
// "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments.
201205
cmd = append(cmd, "--", entrypoint, ";")
202-
if submissionMode == rayv1.K8sJobMode {
206+
if needsStatusCheck {
203207
cmd = append(cmd, "fi", ";")
204208
cmd = append(cmd, jobFollowCommand...)
205209
}

‎ray-operator/controllers/ray/common/job_test.go‎

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1414
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
15+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
1516
)
1617

1718
func rayJobTemplate() *rayv1.RayJob {
@@ -213,6 +214,54 @@ func TestBuildJobSubmitCommandWithK8sJobModeHealthWaitLoop(t *testing.T) {
213214
assert.NotContains(t, command[1], "wget")
214215
}
215216

217+
func TestBuildJobSubmitCommandWithSidecarModeAndFeatureGate(t *testing.T) {
218+
// Enable the SidecarSubmitterRestart feature gate for this test
219+
features.SetFeatureGateDuringTest(t, features.SidecarSubmitterRestart, true)
220+
221+
testRayJob := rayJobTemplate()
222+
testRayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers = []corev1.Container{
223+
{
224+
Ports: []corev1.ContainerPort{
225+
{
226+
Name: utils.DashboardPortName,
227+
ContainerPort: utils.DefaultDashboardPort,
228+
},
229+
},
230+
},
231+
}
232+
233+
// With SidecarSubmitterRestart enabled, the command should include:
234+
// - status check (if ! ray job status ...)
235+
// - --no-wait flag
236+
// - job logs follow at the end
237+
healthURL := fmt.Sprintf("http://localhost:%d/%s", utils.DefaultDashboardPort, utils.RayDashboardGCSHealthPath)
238+
expected := []string{
239+
"until",
240+
fmt.Sprintf(
241+
utils.BasePythonHealthCommand,
242+
healthURL,
243+
utils.RayDashboardGCSHealthCheckTimeoutSeconds,
244+
),
245+
">/dev/null", "2>&1", ";",
246+
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
247+
"if", "!", "ray", "job", "status", "--address", "http://127.0.0.1:8265", "testJobId", ">/dev/null", "2>&1", ";", "then",
248+
"ray", "job", "submit", "--address", "http://127.0.0.1:8265", "--no-wait",
249+
"--runtime-env-json", strconv.Quote(`{"test":"test"}`),
250+
"--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
251+
"--submission-id", "testJobId",
252+
"--entrypoint-num-cpus", "1.000000",
253+
"--entrypoint-num-gpus", "0.500000",
254+
"--entrypoint-resources", strconv.Quote(`{"Custom_1": 1, "Custom_2": 5.5}`),
255+
"--",
256+
"echo no quote 'single quote' \"double quote\"",
257+
";", "fi", ";",
258+
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
259+
}
260+
command, err := BuildJobSubmitCommand(testRayJob, rayv1.SidecarMode)
261+
require.NoError(t, err)
262+
assert.Equal(t, expected, command)
263+
}
264+
216265
func TestBuildJobSubmitCommandWithK8sJobModeAndYAML(t *testing.T) {
217266
rayJobWithYAML := &rayv1.RayJob{
218267
Spec: rayv1.RayJobSpec{

‎ray-operator/controllers/ray/rayjob_controller.go‎

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,19 @@ func getSubmitterContainer(rayJobInstance *rayv1.RayJob, rayClusterInstance *ray
588588
return corev1.Container{}, err
589589
}
590590

591+
// When SidecarSubmitterRestart feature gate is enabled, configure per-container restart rules.
592+
// This requires Kubernetes 1.35+ with ContainerRestartRules feature gate enabled.
593+
if features.Enabled(features.SidecarSubmitterRestart) {
594+
// OnFailure restarts only the submitter container (not all containers in the pod) on non-zero exit.
595+
// The non-zero exit can come from `ray job submit --no-wait` or `ray job logs --follow`.
596+
// The key case is `ray job logs --follow` exiting non-zero on a transient
597+
// WebSocket closure even when the Ray job is still running.
598+
// On restart, the submitter checks ray job status first.
599+
// Since the job is still running, the submitter simply reattaches to the log stream.
600+
// See BuildJobSubmitCommand in ray-operator/controllers/ray/common/job.go for more details.
601+
submitterContainer.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyOnFailure)
602+
}
603+
591604
return submitterContainer, nil
592605
}
593606

@@ -992,8 +1005,8 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
9921005
rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers, sidecar)
9931006
// In K8sJobMode, the submitter Job relies on the K8s Job backoffLimit API to restart if it fails.
9941007
// This mainly handles WebSocket connection failures caused by transient network issues.
995-
// In SidecarMode, however, the submitter container shares the same network namespace as the Ray dashboard,
996-
// so restarts are no longer needed.
1008+
// In SidecarMode, the pod-level RestartPolicy is set to Never.
1009+
// The submitter container may override this with per-container restart rules when the SidecarSubmitterRestart feature gate is enabled.
9971010
rayCluster.Spec.HeadGroupSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
9981011
}
9991012

@@ -1052,20 +1065,44 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
10521065
return
10531066
}
10541067

1055-
shouldUpdate, submitterContainerStatus = checkSidecarContainerStatus(headPod)
1056-
if shouldUpdate {
1057-
logger.Info("The submitter sidecar container has failed. Attempting to transition the status to `Failed`.",
1058-
"Submitter sidecar container", submitterContainerStatus.Name, "Reason", submitterContainerStatus.State.Terminated.Reason, "Message", submitterContainerStatus.State.Terminated.Message)
1059-
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
1060-
// The submitter sidecar container needs to wait for the user code to finish and retrieve its logs.
1061-
// Therefore, a failed Submitter sidecar container indicates that the submission itself has failed or the user code has thrown an error.
1062-
// If the failure is due to user code, the JobStatus and Job message will be updated accordingly from the previous reconciliation.
1063-
if rayJob.Status.JobStatus == rayv1.JobStatusFailed {
1064-
rayJob.Status.Reason = rayv1.AppFailed
1065-
} else {
1066-
rayJob.Status.Reason = rayv1.SubmissionFailed
1067-
rayJob.Status.Message = fmt.Sprintf("Ray head pod container %s terminated with exit code %d: %s",
1068-
submitterContainerStatus.Name, submitterContainerStatus.State.Terminated.ExitCode, submitterContainerStatus.State.Terminated.Reason)
1068+
// Only check exit code when the feature gate is disabled.
1069+
// When SidecarSubmitterRestart is enabled, the container restarts on non-zero exit,
1070+
// so a terminated container is transient — not a permanent failure.
1071+
if !features.Enabled(features.SidecarSubmitterRestart) {
1072+
shouldUpdate, submitterContainerStatus = checkSidecarContainerStatus(headPod)
1073+
if shouldUpdate {
1074+
logger.Info("The submitter sidecar container has failed. Attempting to transition the status to `Failed`.",
1075+
"Submitter sidecar container", submitterContainerStatus.Name, "Reason", submitterContainerStatus.State.Terminated.Reason, "Message", submitterContainerStatus.State.Terminated.Message)
1076+
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
1077+
// The submitter sidecar container needs to wait for the user code to finish and retrieve its logs.
1078+
// Therefore, a failed Submitter sidecar container indicates that the submission itself has failed or the user code has thrown an error.
1079+
// If the failure is due to user code, the JobStatus and Job message will be updated accordingly from the previous reconciliation.
1080+
if rayJob.Status.JobStatus == rayv1.JobStatusFailed {
1081+
rayJob.Status.Reason = rayv1.AppFailed
1082+
} else {
1083+
rayJob.Status.Reason = rayv1.SubmissionFailed
1084+
rayJob.Status.Message = fmt.Sprintf("Ray head pod container %s terminated with exit code %d: %s",
1085+
submitterContainerStatus.Name, submitterContainerStatus.State.Terminated.ExitCode, submitterContainerStatus.State.Terminated.Reason)
1086+
}
1087+
}
1088+
} else {
1089+
submitterBackoffLimit := int32(2)
1090+
if rayJob.Spec.SubmitterConfig != nil && rayJob.Spec.SubmitterConfig.BackoffLimit != nil {
1091+
submitterBackoffLimit = *rayJob.Spec.SubmitterConfig.BackoffLimit
1092+
}
1093+
shouldUpdate, submitterContainerStatus = checkIsRestartCountExceeded(headPod, submitterBackoffLimit)
1094+
if shouldUpdate {
1095+
logger.Info("The submitter sidecar container has exceeded the max restart count. Attempting to transition the status to `Failed`.",
1096+
"Submitter sidecar container", submitterContainerStatus.Name,
1097+
"RestartCount", submitterContainerStatus.RestartCount)
1098+
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
1099+
if rayJob.Status.JobStatus == rayv1.JobStatusFailed {
1100+
rayJob.Status.Reason = rayv1.AppFailed
1101+
} else {
1102+
rayJob.Status.Reason = rayv1.SubmissionFailed
1103+
rayJob.Status.Message = fmt.Sprintf("Ray head pod submitter container %s terminated after exceeding the maximum restart count",
1104+
submitterContainerStatus.Name)
1105+
}
10691106
}
10701107
}
10711108

@@ -1149,6 +1186,24 @@ func checkSidecarContainerStatus(headPod *corev1.Pod) (bool, *corev1.ContainerSt
11491186
return false, nil
11501187
}
11511188

1189+
func checkIsRestartCountExceeded(headPod *corev1.Pod, backoffLimit int32) (bool, *corev1.ContainerStatus) {
1190+
for _, containerStatus := range headPod.Status.ContainerStatuses {
1191+
if containerStatus.Name == utils.SubmitterContainerName {
1192+
// Only check when the container has been terminated at least once.
1193+
// When the submitter container fails in a CrashLoopBackOff fashion, LastTerminationState.Terminated is populated
1194+
if containerStatus.LastTerminationState.Terminated != nil {
1195+
// If the container exited successfully, we do not fail the job.
1196+
if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode == 0 {
1197+
break
1198+
}
1199+
return containerStatus.RestartCount >= backoffLimit, &containerStatus
1200+
}
1201+
break
1202+
}
1203+
}
1204+
return false, nil
1205+
}
1206+
11521207
func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
11531208
logger := ctrl.LoggerFrom(ctx)
11541209
if rayJob.Spec.ActiveDeadlineSeconds == nil || time.Now().Before(rayJob.Status.StartTime.Add(time.Duration(*rayJob.Spec.ActiveDeadlineSeconds)*time.Second)) {

0 commit comments

Comments
 (0)