| /* |
| Copyright 2023 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package job |
| |
| import ( |
| "fmt" |
| "sort" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/klog/v2" |
| apipod "k8s.io/kubernetes/pkg/api/v1/pod" |
| "k8s.io/utils/clock" |
| "k8s.io/utils/ptr" |
| ) |
| |
| type backoffRecord struct { |
| key string |
| failuresAfterLastSuccess int32 |
| lastFailureTime *time.Time |
| } |
| |
| type backoffStore struct { |
| store cache.Store |
| } |
| |
| func (s *backoffStore) updateBackoffRecord(record backoffRecord) error { |
| b, ok, err := s.store.GetByKey(record.key) |
| if err != nil { |
| return err |
| } |
| |
| if !ok { |
| err = s.store.Add(&record) |
| if err != nil { |
| return err |
| } |
| } else { |
| backoffRecord := b.(*backoffRecord) |
| backoffRecord.failuresAfterLastSuccess = record.failuresAfterLastSuccess |
| backoffRecord.lastFailureTime = record.lastFailureTime |
| } |
| |
| return nil |
| } |
| |
| func (s *backoffStore) removeBackoffRecord(jobId string) error { |
| b, ok, err := s.store.GetByKey(jobId) |
| if err != nil { |
| return err |
| } |
| |
| if ok { |
| err = s.store.Delete(b) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| |
| } |
| |
| func newBackoffStore() *backoffStore { |
| return &backoffStore{ |
| store: cache.NewStore(backoffRecordKeyFunc), |
| } |
| } |
| |
| var backoffRecordKeyFunc = func(obj interface{}) (string, error) { |
| if u, ok := obj.(*backoffRecord); ok { |
| return u.key, nil |
| } |
| return "", fmt.Errorf("could not find key for obj %#v", obj) |
| } |
| |
| func (backoffRecordStore *backoffStore) newBackoffRecord(key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord { |
| var backoff *backoffRecord |
| |
| if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists { |
| old := b.(*backoffRecord) |
| backoff = &backoffRecord{ |
| key: old.key, |
| failuresAfterLastSuccess: old.failuresAfterLastSuccess, |
| lastFailureTime: old.lastFailureTime, |
| } |
| } else { |
| backoff = &backoffRecord{ |
| key: key, |
| failuresAfterLastSuccess: 0, |
| lastFailureTime: nil, |
| } |
| } |
| |
| sortByFinishedTime(newSucceededPods) |
| sortByFinishedTime(newFailedPods) |
| |
| if len(newSucceededPods) == 0 { |
| if len(newFailedPods) == 0 { |
| return *backoff |
| } |
| |
| backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods)) |
| lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1]) |
| backoff.lastFailureTime = &lastFailureTime |
| return *backoff |
| |
| } else { |
| if len(newFailedPods) == 0 { |
| backoff.failuresAfterLastSuccess = 0 |
| backoff.lastFailureTime = nil |
| return *backoff |
| } |
| |
| backoff.failuresAfterLastSuccess = 0 |
| backoff.lastFailureTime = nil |
| |
| lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1]) |
| for i := len(newFailedPods) - 1; i >= 0; i-- { |
| failedTime := getFinishedTime(newFailedPods[i]) |
| if !failedTime.After(lastSuccessTime) { |
| break |
| } |
| if backoff.lastFailureTime == nil { |
| backoff.lastFailureTime = &failedTime |
| } |
| backoff.failuresAfterLastSuccess += 1 |
| } |
| |
| return *backoff |
| |
| } |
| |
| } |
| |
| func sortByFinishedTime(pods []*v1.Pod) { |
| sort.Slice(pods, func(i, j int) bool { |
| p1 := pods[i] |
| p2 := pods[j] |
| p1FinishTime := getFinishedTime(p1) |
| p2FinishTime := getFinishedTime(p2) |
| |
| return p1FinishTime.Before(p2FinishTime) |
| }) |
| } |
| |
| // Returns the pod finish time using the following lookups: |
| // 1. if all containers finished, use the latest time |
| // 2. if the pod has Ready=False condition, use the last transition time |
| // 3. if the pod has been deleted, use the `deletionTimestamp - grace_period` to estimate the moment of deletion |
| // 4. fallback to pod's creation time |
| // |
| // Pods owned by Kubelet are marked with Ready=False condition when |
| // transitioning to terminal phase, thus being handled by (1.) or (2.). |
| // Orphaned pods are deleted by PodGC, thus being handled by (3.). |
| func getFinishedTime(p *v1.Pod) time.Time { |
| if finishTime := getFinishTimeFromContainers(p); finishTime != nil { |
| return *finishTime |
| } |
| if finishTime := getFinishTimeFromPodReadyFalseCondition(p); finishTime != nil { |
| return *finishTime |
| } |
| if finishTime := getFinishTimeFromDeletionTimestamp(p); finishTime != nil { |
| return *finishTime |
| } |
| // This should not happen in clusters with Kubelet and PodGC running. |
| return p.CreationTimestamp.Time |
| } |
| |
| func getFinishTimeFromContainers(p *v1.Pod) *time.Time { |
| var finishTime *time.Time |
| for _, containerState := range p.Status.ContainerStatuses { |
| if containerState.State.Terminated == nil { |
| return nil |
| } |
| if containerState.State.Terminated.FinishedAt.Time.IsZero() { |
| return nil |
| } |
| if finishTime == nil || finishTime.Before(containerState.State.Terminated.FinishedAt.Time) { |
| finishTime = &containerState.State.Terminated.FinishedAt.Time |
| } |
| } |
| return finishTime |
| } |
| |
| func getFinishTimeFromPodReadyFalseCondition(p *v1.Pod) *time.Time { |
| if _, c := apipod.GetPodCondition(&p.Status, v1.PodReady); c != nil && c.Status == v1.ConditionFalse && !c.LastTransitionTime.Time.IsZero() { |
| return &c.LastTransitionTime.Time |
| } |
| return nil |
| } |
| |
| func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time { |
| if p.DeletionTimestamp != nil { |
| finishTime := p.DeletionTimestamp.Time.Add(-time.Duration(ptr.Deref(p.DeletionGracePeriodSeconds, 0)) * time.Second) |
| return &finishTime |
| } |
| return nil |
| } |
| |
| func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration { |
| return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, backoff.failuresAfterLastSuccess, backoff.lastFailureTime) |
| } |
| |
| // getRemainingTimePerIndex returns the remaining time left for a given index to |
| // create the replacement pods. The number of consecutive pod failures for the |
| // index is retrieved from the `job-index-failure-count` annotation of the |
| // last failed pod within the index (represented by `lastFailedPod`). |
| // The last failed pod is also used to determine the time of the last failure. |
| func getRemainingTimePerIndex(logger klog.Logger, clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, lastFailedPod *v1.Pod) time.Duration { |
| if lastFailedPod == nil { |
| // There is no previous failed pod for this index |
| return time.Duration(0) |
| } |
| failureCount := getIndexAbsoluteFailureCount(logger, lastFailedPod) + 1 |
| lastFailureTime := getFinishedTime(lastFailedPod) |
| return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, failureCount, &lastFailureTime) |
| } |
| |
| func getRemainingTimeForFailuresCount(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, failuresCount int32, lastFailureTime *time.Time) time.Duration { |
| if failuresCount == 0 { |
| return 0 |
| } |
| |
| backoffDuration := defaultBackoff |
| for i := 1; i < int(failuresCount); i++ { |
| backoffDuration = backoffDuration * 2 |
| if backoffDuration >= maxBackoff { |
| backoffDuration = maxBackoff |
| break |
| } |
| } |
| |
| timeElapsedSinceLastFailure := clock.Since(*lastFailureTime) |
| |
| if backoffDuration < timeElapsedSinceLastFailure { |
| return 0 |
| } |
| |
| return backoffDuration - timeElapsedSinceLastFailure |
| } |