Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1beta1/sandbox_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ type SandboxStatus struct {
// A pod may have multiple IPs in dual-stack clusters.
// +optional
PodIPs []string `json:"podIPs,omitempty"`

// nodeName is the name of the node where the underlying pod is scheduled.
// +optional
NodeName string `json:"nodeName,omitempty"`
}

// +genclient
Expand Down
2 changes: 2 additions & 0 deletions controllers/sandbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,11 @@ func (r *SandboxReconciler) reconcileChildResources(ctx context.Context, sandbox
allErrors = errors.Join(allErrors, err)
if pod == nil {
sandbox.Status.PodIPs = nil
sandbox.Status.NodeName = ""
} else {
sandbox.Status.LabelSelector = fmt.Sprintf("%s=%s", sandboxLabel, NameHash(sandbox.Name))
sandbox.Status.PodIPs = podIPsFromStatus(pod.Status.PodIPs)
sandbox.Status.NodeName = pod.Spec.NodeName
}

// Reconcile Service
Expand Down
4 changes: 4 additions & 0 deletions controllers/sandbox_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ func TestReconcile(t *testing.T) {
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test-container"}},
NodeName: "node-1",
},
Status: corev1.PodStatus{
PodIPs: []corev1.PodIP{{IP: "10.244.0.5"}, {IP: "fd00::5"}},
Expand All @@ -589,6 +590,7 @@ func TestReconcile(t *testing.T) {
ServiceFQDN: "sandbox-name.sandbox-ns.svc.cluster.local",
LabelSelector: "agents.x-k8s.io/sandbox-name-hash=" + nameHash,
PodIPs: []string{"10.244.0.5", "fd00::5"},
NodeName: "node-1",
Conditions: []metav1.Condition{
{
Type: "Ready",
Expand Down Expand Up @@ -634,6 +636,7 @@ func TestReconcile(t *testing.T) {
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test-container"}},
NodeName: "node-2",
},
Status: corev1.PodStatus{
PodIPs: []corev1.PodIP{{IP: "10.244.0.5"}},
Expand All @@ -654,6 +657,7 @@ func TestReconcile(t *testing.T) {
wantStatus: sandboxv1beta1.SandboxStatus{
LabelSelector: "agents.x-k8s.io/sandbox-name-hash=" + nameHash,
PodIPs: []string{"10.244.0.5"},
NodeName: "node-2",
Conditions: []metav1.Condition{
{
Type: "Ready",
Expand Down
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ _Appears in:_
| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v/#condition-v1-meta) array_ | conditions defines the status conditions array | | Optional: \{\} <br /> |
| `selector` _string_ | selector is the label selector for pods. | | Optional: \{\} <br /> |
| `podIPs` _string array_ | podIPs are the IP addresses of the underlying pod.<br />A pod may have multiple IPs in dual-stack clusters. | | Optional: \{\} <br /> |
| `nodeName` _string_ | nodeName is the name of the node where the underlying pod is scheduled. | | Optional: \{\} <br /> |


#### ShutdownPolicy
Expand Down
59 changes: 58 additions & 1 deletion extensions/controllers/queue/simple_sandbox_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type SandboxKey types.NamespacedName
type SandboxQueue interface {
Add(warmPoolName string, item SandboxKey)
Get(warmPoolName string) (SandboxKey, bool)
GetWithStrategy(warmPoolName string, pick func([]SandboxKey) (SandboxKey, bool)) (SandboxKey, bool)
RemoveQueue(warmPoolName string)
RemoveItem(warmPoolName string, item SandboxKey)
}
Expand Down Expand Up @@ -58,6 +59,15 @@ func (s *SimpleSandboxQueue) Get(warmPoolName string) (SandboxKey, bool) {
return q.(*synchronizedQueue).Pop()
}

// GetWithStrategy pops an item from the specific warm pool's queue using a custom strategy.
func (s *SimpleSandboxQueue) GetWithStrategy(warmPoolName string, pick func([]SandboxKey) (SandboxKey, bool)) (SandboxKey, bool) {
q, ok := s.queues.Load(warmPoolName)
if !ok {
return SandboxKey{}, false
}
return q.(*synchronizedQueue).PopWithStrategy(pick)
}

// RemoveItem deletes a specific sandbox from a warm pool's queue.
func (s *SimpleSandboxQueue) RemoveItem(warmPoolName string, item SandboxKey) {
if q, ok := s.queues.Load(warmPoolName); ok {
Expand Down Expand Up @@ -139,8 +149,55 @@ func (q *synchronizedQueue) Pop() (SandboxKey, bool) {
return item, true
}

// PopWithStrategy applies the strategy function to pick an item from the queue,
// removes it thread-safely, and returns it.
func (q *synchronizedQueue) PopWithStrategy(pick func([]SandboxKey) (SandboxKey, bool)) (SandboxKey, bool) {
for {
q.mu.Lock()
if len(q.items) == 0 {
q.mu.Unlock()
return SandboxKey{}, false
}

// Snapshot the queue items
snapshot := make([]SandboxKey, len(q.items))
copy(snapshot, q.items)
q.mu.Unlock()

key, ok := pick(snapshot)
if !ok {
return SandboxKey{}, false
}

q.mu.Lock()
// Verify the key is still present in the queue
if _, exists := q.set[key]; !exists {
// The picked key was concurrently popped by another goroutine.
// Unlock and retry snapshot and pick.
q.mu.Unlock()
continue
}

// Find the picked key in q.items and remove it
for i, k := range q.items {
if k == key {
// Shift left and clear the tail slot
last := len(q.items) - 1
copy(q.items[i:], q.items[i+1:])
q.items[last] = SandboxKey{}
q.items = q.items[:last]
break
}
}
delete(q.set, key)
q.mu.Unlock()

return key, true
}
}
Comment thread
vicentefb marked this conversation as resolved.

// RemoveQueue completely deletes a warm pool's queue from the sync.Map
// to prevent memory leaks when WarmPools are deleted.
// to prevent memory leaks when SandboxTemplates or WarmPools are deleted.
func (s *SimpleSandboxQueue) RemoveQueue(warmPoolName string) {
s.queues.Delete(warmPoolName)
}
47 changes: 47 additions & 0 deletions extensions/controllers/queue/simple_sandbox_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,50 @@ func TestSimpleSandboxQueue_RemoveQueue_MemoryLeakFix(t *testing.T) {
t.Errorf("Expected queue to be completely removed, but it still existed")
}
}

func TestSimpleSandboxQueue_GetWithStrategy(t *testing.T) {
q := NewSimpleSandboxQueue()
hash := "template-hash-1"

key1 := SandboxKey{Namespace: "default", Name: "sb-1"}
key2 := SandboxKey{Namespace: "default", Name: "sb-2"}
key3 := SandboxKey{Namespace: "default", Name: "sb-3"}

q.Add(hash, key1)
q.Add(hash, key2)
q.Add(hash, key3)

// Custom strategy to pick key2 specifically
pickKey2 := func(items []SandboxKey) (SandboxKey, bool) {
for _, item := range items {
if item.Name == "sb-2" {
return item, true
}
}
return SandboxKey{}, false
}

// Pop with strategy
got, ok := q.GetWithStrategy(hash, pickKey2)
if !ok || got != key2 {
t.Errorf("Expected to pick %v, got %v (ok: %v)", key2, got, ok)
}

// First standard pop should be key1 (since key2 was removed)
got1, _ := q.Get(hash)
if got1 != key1 {
t.Errorf("Expected first remaining item to be %v, got %v", key1, got1)
}

// Second standard pop should be key3
got3, _ := q.Get(hash)
if got3 != key3 {
t.Errorf("Expected second remaining item to be %v, got %v", key3, got3)
}

// Queue should now be empty
_, ok3 := q.Get(hash)
if ok3 {
t.Errorf("Expected queue to be empty, but got an item")
}
}
112 changes: 106 additions & 6 deletions extensions/controllers/sandboxclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,16 +611,37 @@ func (r *SandboxClaimReconciler) getCandidate(ctx context.Context, claim *extens
}
}()

var sandboxList v1beta1.SandboxList
if err := r.List(ctx, &sandboxList, client.InNamespace(claim.Namespace)); err != nil {
Comment thread
vicentefb marked this conversation as resolved.
logger.Error(err, "Failed to list sandboxes for smart selection node counting")
return nil, queue.SandboxKey{}, err
}

nodeCounts := make(map[string]int)
sbMap := make(map[string]*v1beta1.Sandbox)
for i := range sandboxList.Items {
sb := &sandboxList.Items[i]
sbMap[sb.Name] = sb
if _, isWarm := sb.Labels[warmPoolSandboxLabel]; !isWarm {
if sb.Status.NodeName != "" {
nodeCounts[sb.Status.NodeName]++
}
}
}

selector := &smartSelector{
claim: claim,
sbMap: sbMap,
nodeCounts: nodeCounts,
}

for {
adoptedKey, ok := r.WarmSandboxQueue.Get(claim.Spec.WarmPoolRef.Name)
adoptedKey, ok := r.WarmSandboxQueue.GetWithStrategy(claim.Spec.WarmPoolRef.Name, selector.pick)
if !ok {
return nil, queue.SandboxKey{}, nil
}

// 1. Hand the Kubernetes client the empty bucket
adopted := &v1beta1.Sandbox{}

// 2. Fetch from the Informer Cache
err := r.Get(ctx, client.ObjectKey{Namespace: adoptedKey.Namespace, Name: adoptedKey.Name}, adopted)
if err != nil {
if k8errors.IsNotFound(err) {
Expand All @@ -635,8 +656,6 @@ func (r *SandboxClaimReconciler) getCandidate(ctx context.Context, claim *extens

if err := verifySandboxCandidate(adopted, claim); err != nil {
logger.V(1).Info("sandbox candidate can't be adopted", "sandbox", adopted.Name, "warmPool", claim.Spec.WarmPoolRef.Name, "reason", err.Error())
// If it's a good sandbox just in the wrong namespace,
// add it to the skipped list so the defer block puts it back.
if errors.Is(err, ErrCrossNamespaceAdoption) {
skipped = append(skipped, adoptedKey)
}
Expand Down Expand Up @@ -1733,3 +1752,84 @@ func getWarmPoolName(obj metav1.Object) string {
}
return ""
}

type smartSelector struct {
claim *extensionsv1beta1.SandboxClaim
sbMap map[string]*v1beta1.Sandbox
nodeCounts map[string]int
}

func (s *smartSelector) pick(keys []queue.SandboxKey) (queue.SandboxKey, bool) {
var bestKey queue.SandboxKey
var bestSandbox *v1beta1.Sandbox
found := false

for _, key := range keys {
// Upfront filter mismatching namespaces to avoid map lookup
if key.Namespace != s.claim.Namespace {
continue
}

sb, exists := s.sbMap[key.Name]
if !exists {
// The sandbox doesn't exist in cache/cluster or r.List failed.
// Pop it to either discard it (if deleted) or perform a direct Get fallback.
return key, true
}

if err := verifySandboxCandidate(sb, s.claim); err != nil {
// The sandbox is invalid. Pop it to discard it.
return key, true
}

if !found {
bestKey = key
bestSandbox = sb
found = true
continue
}

sbReady := isSandboxReady(sb)
bestReady := isSandboxReady(bestSandbox)

if sbReady != bestReady {
if sbReady {
bestKey = key
bestSandbox = sb
}
continue
}

sbNode := sb.Status.NodeName
bestNode := bestSandbox.Status.NodeName

if sbNode == "" && bestNode != "" {
continue
}
if sbNode != "" && bestNode == "" {
bestKey = key
bestSandbox = sb
continue
}

if sbNode != "" && bestNode != "" && sbNode != bestNode {
sbCount := s.nodeCounts[sbNode]
bestCount := s.nodeCounts[bestNode]
if sbCount < bestCount {
bestKey = key
bestSandbox = sb
continue
}
if sbCount > bestCount {
continue
}
}

if sb.CreationTimestamp.Before(&bestSandbox.CreationTimestamp) {
bestKey = key
bestSandbox = sb
}
}

return bestKey, found
}
Loading