| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package job |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "math" |
| "sort" |
| "strconv" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| batch "k8s.io/api/batch/v1" |
| v1 "k8s.io/api/core/v1" |
| apiequality "k8s.io/apimachinery/pkg/api/equality" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/rand" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/uuid" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/watch" |
| "k8s.io/apiserver/pkg/util/feature" |
| "k8s.io/client-go/informers" |
| clientset "k8s.io/client-go/kubernetes" |
| "k8s.io/client-go/kubernetes/fake" |
| restclient "k8s.io/client-go/rest" |
| core "k8s.io/client-go/testing" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/util/workqueue" |
| featuregatetesting "k8s.io/component-base/featuregate/testing" |
| metricstestutil "k8s.io/component-base/metrics/testutil" |
| "k8s.io/klog/v2" |
| "k8s.io/klog/v2/ktesting" |
| _ "k8s.io/kubernetes/pkg/apis/core/install" |
| "k8s.io/kubernetes/pkg/controller" |
| "k8s.io/kubernetes/pkg/controller/job/metrics" |
| "k8s.io/kubernetes/pkg/controller/testutil" |
| "k8s.io/kubernetes/pkg/features" |
| "k8s.io/utils/clock" |
| clocktesting "k8s.io/utils/clock/testing" |
| "k8s.io/utils/ptr" |
| ) |
| |
| var realClock = &clock.RealClock{} |
| var alwaysReady = func() bool { return true } |
| |
| const fastSyncJobBatchPeriod = 10 * time.Millisecond |
| const fastJobApiBackoff = 10 * time.Millisecond |
| const fastRequeue = 10 * time.Millisecond |
| |
| // testFinishedAt represents time one second later than unix epoch |
| // this will be used in various test cases where we don't want back-off to kick in |
| var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second)) |
| |
| func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { |
| j := &batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: name, |
| UID: uuid.NewUUID(), |
| Namespace: metav1.NamespaceDefault, |
| }, |
| Spec: batch.JobSpec{ |
| Selector: &metav1.LabelSelector{ |
| MatchLabels: map[string]string{"foo": "bar"}, |
| }, |
| Template: v1.PodTemplateSpec{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Labels: map[string]string{ |
| "foo": "bar", |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| {Image: "foo/bar"}, |
| }, |
| }, |
| }, |
| }, |
| } |
| if completionMode != "" { |
| j.Spec.CompletionMode = &completionMode |
| } |
| // Special case: -1 for either completions or parallelism means leave nil (negative is not allowed |
| // in practice by validation. |
| if completions >= 0 { |
| j.Spec.Completions = &completions |
| } else { |
| j.Spec.Completions = nil |
| } |
| if parallelism >= 0 { |
| j.Spec.Parallelism = ¶llelism |
| } else { |
| j.Spec.Parallelism = nil |
| } |
| j.Spec.BackoffLimit = &backoffLimit |
| |
| return j |
| } |
| |
| func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { |
| return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode) |
| } |
| |
| func newControllerFromClient(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { |
| t.Helper() |
| return newControllerFromClientWithClock(ctx, t, kubeClient, resyncPeriod, realClock) |
| } |
| |
| func newControllerFromClientWithClock(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) { |
| t.Helper() |
| sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) |
| jm, err := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock) |
| if err != nil { |
| t.Fatalf("Error creating Job controller: %v", err) |
| } |
| jm.podControl = &controller.FakePodControl{} |
| return jm, sharedInformers |
| } |
| |
| func newPod(name string, job *batch.Job) *v1.Pod { |
| return &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: name, |
| UID: types.UID(name), |
| Labels: job.Spec.Selector.MatchLabels, |
| Namespace: job.Namespace, |
| OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, |
| }, |
| } |
| } |
| |
| // create count pods with the given phase for the given job |
| func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod { |
| var pods []*v1.Pod |
| for i := 0; i < count; i++ { |
| newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) |
| newPod.Status = v1.PodStatus{Phase: status} |
| newPod.Status.ContainerStatuses = []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| } |
| newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer) |
| pods = append(pods, newPod) |
| } |
| return pods |
| } |
| |
| func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, terminatingPods, readyPods int) { |
| for _, pod := range newPodList(pendingPods, v1.PodPending, job) { |
| podIndexer.Add(pod) |
| } |
| running := newPodList(activePods, v1.PodRunning, job) |
| for i, p := range running { |
| if i >= readyPods { |
| break |
| } |
| p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{ |
| Type: v1.PodReady, |
| Status: v1.ConditionTrue, |
| }) |
| } |
| for _, pod := range running { |
| podIndexer.Add(pod) |
| } |
| for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) { |
| podIndexer.Add(pod) |
| } |
| for _, pod := range newPodList(failedPods, v1.PodFailed, job) { |
| podIndexer.Add(pod) |
| } |
| terminating := newPodList(terminatingPods, v1.PodRunning, job) |
| for _, p := range terminating { |
| now := metav1.Now() |
| p.DeletionTimestamp = &now |
| } |
| for _, pod := range terminating { |
| podIndexer.Add(pod) |
| } |
| } |
| |
| func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) { |
| for _, s := range status { |
| p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job) |
| p.Status = v1.PodStatus{Phase: s.Phase} |
| if s.Phase == v1.PodFailed || s.Phase == v1.PodSucceeded { |
| p.Status.ContainerStatuses = []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| } |
| } |
| if s.Index != noIndex { |
| p.Annotations = map[string]string{ |
| batch.JobCompletionIndexAnnotation: s.Index, |
| } |
| p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index) |
| } |
| p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) |
| podIndexer.Add(p) |
| } |
| } |
| |
| type jobInitialStatus struct { |
| active int |
| succeed int |
| failed int |
| startTime *time.Time |
| } |
| |
| func TestControllerSyncJob(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| jobConditionComplete := batch.JobComplete |
| jobConditionFailed := batch.JobFailed |
| jobConditionSuspended := batch.JobSuspended |
| referenceTime := time.Now() |
| |
| testCases := map[string]struct { |
| // job setup |
| parallelism int32 |
| completions int32 |
| backoffLimit int32 |
| deleting bool |
| podLimit int |
| completionMode batch.CompletionMode |
| wasSuspended bool |
| suspend bool |
| podReplacementPolicy *batch.PodReplacementPolicy |
| podFailurePolicy *batch.PodFailurePolicy |
| initialStatus *jobInitialStatus |
| backoffRecord *backoffRecord |
| controllerTime *time.Time |
| |
| // pod setup |
| |
| // If a podControllerError is set, finalizers are not able to be removed. |
| // This means that there is no status update so the counters for |
| // failedPods and succeededPods cannot be incremented. |
| podControllerError error |
| pendingPods int |
| activePods int |
| readyPods int |
| succeededPods int |
| failedPods int |
| terminatingPods int |
| podsWithIndexes []indexPhase |
| fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations |
| |
| // expectations |
| expectedCreations int32 |
| expectedDeletions int32 |
| expectedActive int32 |
| expectedReady *int32 |
| expectedSucceeded int32 |
| expectedCompletedIdxs string |
| expectedFailed int32 |
| expectedTerminating *int32 |
| expectedCondition *batch.JobConditionType |
| expectedConditionStatus v1.ConditionStatus |
| expectedConditionReason string |
| expectedCreatedIndexes sets.Set[int] |
| expectedPodPatches int |
| |
| // features |
| podIndexLabelDisabled bool |
| jobPodReplacementPolicy bool |
| jobPodFailurePolicy bool |
| }{ |
| "job start": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| expectedCreations: 2, |
| expectedActive: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "WQ job start": { |
| parallelism: 2, |
| completions: -1, |
| backoffLimit: 6, |
| expectedCreations: 2, |
| expectedActive: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "pending pods": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| pendingPods: 2, |
| expectedActive: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "correct # of pods": { |
| parallelism: 3, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 3, |
| readyPods: 2, |
| expectedActive: 3, |
| expectedReady: ptr.To[int32](2), |
| }, |
| "WQ job: correct # of pods": { |
| parallelism: 2, |
| completions: -1, |
| backoffLimit: 6, |
| activePods: 2, |
| expectedActive: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "too few active pods": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 1, |
| succeededPods: 1, |
| expectedCreations: 1, |
| expectedActive: 2, |
| expectedSucceeded: 1, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "WQ job: recreate pods when failed": { |
| parallelism: 1, |
| completions: -1, |
| backoffLimit: 6, |
| activePods: 1, |
| failedPods: 1, |
| podReplacementPolicy: podReplacementPolicy(batch.Failed), |
| jobPodReplacementPolicy: true, |
| terminatingPods: 1, |
| expectedTerminating: ptr.To[int32](1), |
| expectedReady: ptr.To[int32](0), |
| // Removes finalizer and deletes one failed pod |
| expectedPodPatches: 1, |
| expectedFailed: 1, |
| expectedActive: 1, |
| }, |
| "WQ job: turn on PodReplacementPolicy but not set PodReplacementPolicy": { |
| parallelism: 1, |
| completions: 1, |
| backoffLimit: 6, |
| activePods: 1, |
| failedPods: 1, |
| jobPodReplacementPolicy: true, |
| expectedTerminating: ptr.To[int32](1), |
| expectedReady: ptr.To[int32](0), |
| terminatingPods: 1, |
| expectedActive: 1, |
| expectedPodPatches: 2, |
| expectedFailed: 2, |
| }, |
| "WQ job: recreate pods when terminating or failed": { |
| parallelism: 1, |
| completions: -1, |
| backoffLimit: 6, |
| activePods: 1, |
| failedPods: 1, |
| podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed), |
| jobPodReplacementPolicy: true, |
| terminatingPods: 1, |
| expectedTerminating: ptr.To[int32](1), |
| expectedReady: ptr.To[int32](0), |
| expectedActive: 1, |
| expectedPodPatches: 2, |
| expectedFailed: 2, |
| }, |
| "more terminating pods than parallelism": { |
| parallelism: 1, |
| completions: 1, |
| backoffLimit: 6, |
| activePods: 2, |
| failedPods: 0, |
| terminatingPods: 4, |
| podReplacementPolicy: podReplacementPolicy(batch.Failed), |
| jobPodReplacementPolicy: true, |
| expectedTerminating: ptr.To[int32](4), |
| expectedReady: ptr.To[int32](0), |
| expectedActive: 1, |
| expectedDeletions: 1, |
| expectedPodPatches: 1, |
| }, |
| "more terminating pods than parallelism; PodFailurePolicy used": { |
| // Repro for https://github.com/kubernetes/kubernetes/issues/122235 |
| parallelism: 1, |
| completions: 1, |
| backoffLimit: 6, |
| activePods: 2, |
| failedPods: 0, |
| terminatingPods: 4, |
| jobPodFailurePolicy: true, |
| podFailurePolicy: &batch.PodFailurePolicy{}, |
| expectedTerminating: nil, |
| expectedReady: ptr.To[int32](0), |
| expectedActive: 1, |
| expectedDeletions: 1, |
| expectedPodPatches: 1, |
| }, |
| "too few active pods and active back-off": { |
| parallelism: 1, |
| completions: 1, |
| backoffLimit: 6, |
| backoffRecord: &backoffRecord{ |
| failuresAfterLastSuccess: 1, |
| lastFailureTime: &referenceTime, |
| }, |
| initialStatus: &jobInitialStatus{ |
| startTime: func() *time.Time { |
| now := time.Now() |
| return &now |
| }(), |
| }, |
| activePods: 0, |
| succeededPods: 0, |
| expectedCreations: 0, |
| expectedActive: 0, |
| expectedSucceeded: 0, |
| expectedPodPatches: 0, |
| expectedReady: ptr.To[int32](0), |
| controllerTime: &referenceTime, |
| }, |
| "too few active pods and no back-offs": { |
| parallelism: 1, |
| completions: 1, |
| backoffLimit: 6, |
| backoffRecord: &backoffRecord{ |
| failuresAfterLastSuccess: 0, |
| lastFailureTime: &referenceTime, |
| }, |
| activePods: 0, |
| succeededPods: 0, |
| expectedCreations: 1, |
| expectedActive: 1, |
| expectedSucceeded: 0, |
| expectedPodPatches: 0, |
| expectedReady: ptr.To[int32](0), |
| controllerTime: &referenceTime, |
| }, |
| "too few active pods with a dynamic job": { |
| parallelism: 2, |
| completions: -1, |
| backoffLimit: 6, |
| activePods: 1, |
| expectedCreations: 1, |
| expectedActive: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "too few active pods, with controller error": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| podControllerError: fmt.Errorf("fake error"), |
| activePods: 1, |
| succeededPods: 1, |
| expectedCreations: 1, |
| expectedActive: 1, |
| expectedSucceeded: 0, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "too many active pods": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 3, |
| expectedDeletions: 1, |
| expectedActive: 2, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "too many active pods, with controller error": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| podControllerError: fmt.Errorf("fake error"), |
| activePods: 3, |
| expectedDeletions: 0, |
| expectedPodPatches: 1, |
| expectedActive: 3, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "failed + succeed pods: reset backoff delay": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 1, |
| succeededPods: 1, |
| failedPods: 1, |
| expectedCreations: 1, |
| expectedActive: 2, |
| expectedSucceeded: 1, |
| expectedFailed: 1, |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "new failed pod": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 1, |
| failedPods: 1, |
| expectedCreations: 1, |
| expectedActive: 2, |
| expectedFailed: 1, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "no new pod; possible finalizer update of failed pod": { |
| parallelism: 1, |
| completions: 1, |
| backoffLimit: 6, |
| initialStatus: &jobInitialStatus{ |
| active: 1, |
| succeed: 0, |
| failed: 1, |
| }, |
| activePods: 1, |
| failedPods: 0, |
| expectedCreations: 0, |
| expectedActive: 1, |
| expectedFailed: 1, |
| expectedPodPatches: 0, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "only new failed pod with controller error": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| podControllerError: fmt.Errorf("fake error"), |
| activePods: 1, |
| failedPods: 1, |
| expectedCreations: 1, |
| expectedActive: 1, |
| expectedFailed: 0, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "job finish": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| succeededPods: 5, |
| expectedSucceeded: 5, |
| expectedCondition: &jobConditionComplete, |
| expectedConditionStatus: v1.ConditionTrue, |
| expectedPodPatches: 5, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "WQ job finishing": { |
| parallelism: 2, |
| completions: -1, |
| backoffLimit: 6, |
| activePods: 1, |
| succeededPods: 1, |
| expectedActive: 1, |
| expectedSucceeded: 1, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "WQ job all finished": { |
| parallelism: 2, |
| completions: -1, |
| backoffLimit: 6, |
| succeededPods: 2, |
| expectedSucceeded: 2, |
| expectedCondition: &jobConditionComplete, |
| expectedConditionStatus: v1.ConditionTrue, |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "WQ job all finished despite one failure": { |
| parallelism: 2, |
| completions: -1, |
| backoffLimit: 6, |
| succeededPods: 1, |
| failedPods: 1, |
| expectedSucceeded: 1, |
| expectedFailed: 1, |
| expectedCondition: &jobConditionComplete, |
| expectedConditionStatus: v1.ConditionTrue, |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "more active pods than parallelism": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 10, |
| expectedDeletions: 8, |
| expectedActive: 2, |
| expectedPodPatches: 8, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "more active pods than remaining completions": { |
| parallelism: 3, |
| completions: 4, |
| backoffLimit: 6, |
| activePods: 3, |
| succeededPods: 2, |
| expectedDeletions: 1, |
| expectedActive: 2, |
| expectedSucceeded: 2, |
| expectedPodPatches: 3, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "status change": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| activePods: 2, |
| succeededPods: 2, |
| expectedActive: 2, |
| expectedSucceeded: 2, |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "deleting job": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| deleting: true, |
| pendingPods: 1, |
| activePods: 1, |
| succeededPods: 1, |
| expectedActive: 2, |
| expectedSucceeded: 1, |
| expectedPodPatches: 3, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "limited pods": { |
| parallelism: 100, |
| completions: 200, |
| backoffLimit: 6, |
| podLimit: 10, |
| expectedCreations: 10, |
| expectedActive: 10, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "too many job failures": { |
| parallelism: 2, |
| completions: 5, |
| deleting: true, |
| failedPods: 1, |
| expectedFailed: 1, |
| expectedCondition: &jobConditionFailed, |
| expectedConditionStatus: v1.ConditionTrue, |
| expectedConditionReason: "BackoffLimitExceeded", |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "job failures, unsatisfied expectations": { |
| parallelism: 2, |
| completions: 5, |
| deleting: true, |
| failedPods: 1, |
| fakeExpectationAtCreation: 1, |
| expectedFailed: 1, |
| expectedPodPatches: 1, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job start": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| expectedCreations: 2, |
| expectedActive: 2, |
| expectedCreatedIndexes: sets.New(0, 1), |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job with some pods deleted, podReplacementPolicy Failed": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| expectedCreations: 1, |
| expectedActive: 1, |
| expectedCreatedIndexes: sets.New(0), |
| podReplacementPolicy: podReplacementPolicy(batch.Failed), |
| jobPodReplacementPolicy: true, |
| terminatingPods: 1, |
| expectedTerminating: ptr.To[int32](1), |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job with some pods deleted, podReplacementPolicy TerminatingOrFailed": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| expectedCreations: 2, |
| expectedActive: 2, |
| expectedCreatedIndexes: sets.New(0, 1), |
| podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed), |
| jobPodReplacementPolicy: true, |
| terminatingPods: 1, |
| expectedTerminating: ptr.To[int32](1), |
| expectedReady: ptr.To[int32](0), |
| expectedPodPatches: 1, |
| }, |
| "indexed job completed": { |
| parallelism: 2, |
| completions: 3, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| podsWithIndexes: []indexPhase{ |
| {"0", v1.PodSucceeded}, |
| {"1", v1.PodFailed}, |
| {"1", v1.PodSucceeded}, |
| {"2", v1.PodSucceeded}, |
| }, |
| expectedSucceeded: 3, |
| expectedFailed: 1, |
| expectedCompletedIdxs: "0-2", |
| expectedCondition: &jobConditionComplete, |
| expectedConditionStatus: v1.ConditionTrue, |
| expectedPodPatches: 4, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job repeated completed index": { |
| parallelism: 2, |
| completions: 3, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| podsWithIndexes: []indexPhase{ |
| {"0", v1.PodSucceeded}, |
| {"1", v1.PodSucceeded}, |
| {"1", v1.PodSucceeded}, |
| }, |
| expectedCreations: 1, |
| expectedActive: 1, |
| expectedSucceeded: 2, |
| expectedCompletedIdxs: "0,1", |
| expectedCreatedIndexes: sets.New(2), |
| expectedPodPatches: 3, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job some running and completed pods": { |
| parallelism: 8, |
| completions: 20, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| podsWithIndexes: []indexPhase{ |
| {"0", v1.PodRunning}, |
| {"2", v1.PodSucceeded}, |
| {"3", v1.PodPending}, |
| {"4", v1.PodSucceeded}, |
| {"5", v1.PodSucceeded}, |
| {"7", v1.PodSucceeded}, |
| {"8", v1.PodSucceeded}, |
| {"9", v1.PodSucceeded}, |
| }, |
| expectedCreations: 6, |
| expectedActive: 8, |
| expectedSucceeded: 6, |
| expectedCompletedIdxs: "2,4,5,7-9", |
| expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13), |
| expectedPodPatches: 6, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job some failed pods": { |
| parallelism: 3, |
| completions: 4, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| podsWithIndexes: []indexPhase{ |
| {"0", v1.PodFailed}, |
| {"1", v1.PodPending}, |
| {"2", v1.PodFailed}, |
| }, |
| expectedCreations: 2, |
| expectedActive: 3, |
| expectedFailed: 2, |
| expectedCreatedIndexes: sets.New(0, 2), |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job some pods without index": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| activePods: 1, |
| succeededPods: 1, |
| failedPods: 1, |
| podsWithIndexes: []indexPhase{ |
| {"invalid", v1.PodRunning}, |
| {"invalid", v1.PodSucceeded}, |
| {"invalid", v1.PodFailed}, |
| {"invalid", v1.PodPending}, |
| {"0", v1.PodSucceeded}, |
| {"1", v1.PodRunning}, |
| {"2", v1.PodRunning}, |
| }, |
| expectedDeletions: 3, |
| expectedActive: 2, |
| expectedSucceeded: 1, |
| expectedFailed: 0, |
| expectedCompletedIdxs: "0", |
| expectedPodPatches: 8, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job repeated indexes": { |
| parallelism: 5, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| succeededPods: 1, |
| failedPods: 1, |
| podsWithIndexes: []indexPhase{ |
| {"invalid", v1.PodRunning}, |
| {"0", v1.PodSucceeded}, |
| {"1", v1.PodRunning}, |
| {"2", v1.PodRunning}, |
| {"2", v1.PodPending}, |
| }, |
| expectedCreations: 0, |
| expectedDeletions: 2, |
| expectedActive: 2, |
| expectedSucceeded: 1, |
| expectedCompletedIdxs: "0", |
| expectedPodPatches: 5, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job with indexes outside of range": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| podsWithIndexes: []indexPhase{ |
| {"0", v1.PodSucceeded}, |
| {"5", v1.PodRunning}, |
| {"6", v1.PodSucceeded}, |
| {"7", v1.PodPending}, |
| {"8", v1.PodFailed}, |
| }, |
| expectedCreations: 0, // only one of creations and deletions can happen in a sync |
| expectedSucceeded: 1, |
| expectedDeletions: 2, |
| expectedCompletedIdxs: "0", |
| expectedActive: 0, |
| expectedFailed: 0, |
| expectedPodPatches: 5, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "suspending a job with satisfied expectations": { |
| // Suspended Job should delete active pods when expectations are |
| // satisfied. |
| suspend: true, |
| parallelism: 2, |
| activePods: 2, // parallelism == active, expectations satisfied |
| completions: 4, |
| backoffLimit: 6, |
| expectedCreations: 0, |
| expectedDeletions: 2, |
| expectedActive: 0, |
| expectedCondition: &jobConditionSuspended, |
| expectedConditionStatus: v1.ConditionTrue, |
| expectedConditionReason: "JobSuspended", |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "suspending a job with unsatisfied expectations": { |
| // Unlike the previous test, we expect the controller to NOT suspend the |
| // Job in the syncJob call because the controller will wait for |
| // expectations to be satisfied first. The next syncJob call (not tested |
| // here) will be the same as the previous test. |
| suspend: true, |
| parallelism: 2, |
| activePods: 3, // active > parallelism, expectations unsatisfied |
| fakeExpectationAtCreation: -1, // the controller is expecting a deletion |
| completions: 4, |
| backoffLimit: 6, |
| expectedCreations: 0, |
| expectedDeletions: 0, |
| expectedActive: 3, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "resuming a suspended job": { |
| wasSuspended: true, |
| suspend: false, |
| parallelism: 2, |
| completions: 4, |
| backoffLimit: 6, |
| expectedCreations: 2, |
| expectedDeletions: 0, |
| expectedActive: 2, |
| expectedCondition: &jobConditionSuspended, |
| expectedConditionStatus: v1.ConditionFalse, |
| expectedConditionReason: "JobResumed", |
| expectedReady: ptr.To[int32](0), |
| }, |
| "suspending a deleted job": { |
| // We would normally expect the active pods to be deleted (see a few test |
| // cases above), but since this job is being deleted, we don't expect |
| // anything changed here from before the job was suspended. The |
| // JobSuspended condition is also missing. |
| suspend: true, |
| deleting: true, |
| parallelism: 2, |
| activePods: 2, // parallelism == active, expectations satisfied |
| completions: 4, |
| backoffLimit: 6, |
| expectedCreations: 0, |
| expectedDeletions: 0, |
| expectedActive: 2, |
| expectedPodPatches: 2, |
| expectedReady: ptr.To[int32](0), |
| }, |
| "indexed job with podIndexLabel feature disabled": { |
| parallelism: 2, |
| completions: 5, |
| backoffLimit: 6, |
| completionMode: batch.IndexedCompletion, |
| expectedCreations: 2, |
| expectedActive: 2, |
| expectedCreatedIndexes: sets.New(0, 1), |
| podIndexLabelDisabled: true, |
| expectedReady: ptr.To[int32](0), |
| }, |
| } |
| |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| logger, _ := ktesting.NewTestContext(t) |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobPodFailurePolicy)() |
| // job manager setup |
| clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| |
| var fakeClock clock.WithTicker |
| if tc.controllerTime != nil { |
| fakeClock = clocktesting.NewFakeClock(*tc.controllerTime) |
| } else { |
| fakeClock = clocktesting.NewFakeClock(time.Now()) |
| } |
| |
| manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) |
| fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| // job & pods setup |
| job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) |
| job.Spec.Suspend = ptr.To(tc.suspend) |
| if tc.jobPodReplacementPolicy { |
| job.Spec.PodReplacementPolicy = tc.podReplacementPolicy |
| } |
| if tc.jobPodFailurePolicy { |
| job.Spec.PodFailurePolicy = tc.podFailurePolicy |
| } |
| if tc.initialStatus != nil { |
| startTime := metav1.Now() |
| job.Status.StartTime = &startTime |
| job.Status.Active = int32(tc.initialStatus.active) |
| job.Status.Succeeded = int32(tc.initialStatus.succeed) |
| job.Status.Failed = int32(tc.initialStatus.failed) |
| if tc.initialStatus.startTime != nil { |
| startTime := metav1.NewTime(*tc.initialStatus.startTime) |
| job.Status.StartTime = &startTime |
| } |
| } |
| |
| key, err := controller.KeyFunc(job) |
| if err != nil { |
| t.Errorf("Unexpected error getting job key: %v", err) |
| } |
| |
| if tc.backoffRecord != nil { |
| tc.backoffRecord.key = key |
| manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord) |
| } |
| if tc.fakeExpectationAtCreation < 0 { |
| manager.expectations.ExpectDeletions(logger, key, int(-tc.fakeExpectationAtCreation)) |
| } else if tc.fakeExpectationAtCreation > 0 { |
| manager.expectations.ExpectCreations(logger, key, int(tc.fakeExpectationAtCreation)) |
| } |
| if tc.wasSuspended { |
| job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) |
| } |
| if tc.deleting { |
| now := metav1.Now() |
| job.DeletionTimestamp = &now |
| } |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() |
| setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.terminatingPods, tc.readyPods) |
| setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) |
| |
| actual := job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| |
| // run |
| err = manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| |
| // We need requeue syncJob task if podController error |
| if tc.podControllerError != nil { |
| if err == nil { |
| t.Error("Syncing jobs expected to return error on podControl exception") |
| } |
| } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit { |
| if err == nil { |
| t.Error("Syncing jobs expected to return error when reached the podControl limit") |
| } |
| } else if err != nil { |
| t.Errorf("Unexpected error when syncing jobs: %v", err) |
| } |
| // validate created/deleted pods |
| if int32(len(fakePodControl.Templates)) != tc.expectedCreations { |
| t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) |
| } |
| if tc.completionMode == batch.IndexedCompletion { |
| checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name, tc.podIndexLabelDisabled) |
| } else { |
| for _, p := range fakePodControl.Templates { |
| // Fake pod control doesn't add generate name from the owner reference. |
| if p.GenerateName != "" { |
| t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "") |
| } |
| if p.Spec.Hostname != "" { |
| t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname) |
| } |
| } |
| } |
| if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { |
| t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) |
| } |
| // Each create should have an accompanying ControllerRef. |
| if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { |
| t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs)) |
| } |
| // Make sure the ControllerRefs are correct. |
| for _, controllerRef := range fakePodControl.ControllerRefs { |
| if got, want := controllerRef.APIVersion, "batch/v1"; got != want { |
| t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) |
| } |
| if got, want := controllerRef.Kind, "Job"; got != want { |
| t.Errorf("controllerRef.Kind = %q, want %q", got, want) |
| } |
| if got, want := controllerRef.Name, job.Name; got != want { |
| t.Errorf("controllerRef.Name = %q, want %q", got, want) |
| } |
| if got, want := controllerRef.UID, job.UID; got != want { |
| t.Errorf("controllerRef.UID = %q, want %q", got, want) |
| } |
| if controllerRef.Controller == nil || *controllerRef.Controller != true { |
| t.Errorf("controllerRef.Controller is not set to true") |
| } |
| } |
| // validate status |
| if actual.Status.Active != tc.expectedActive { |
| t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) |
| } |
| if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" { |
| t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff) |
| } |
| if actual.Status.Succeeded != tc.expectedSucceeded { |
| t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) |
| } |
| if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" { |
| t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) |
| } |
| if actual.Status.Failed != tc.expectedFailed { |
| t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) |
| } |
| if diff := cmp.Diff(tc.expectedTerminating, actual.Status.Terminating); diff != "" { |
| t.Errorf("Unexpected number of terminating pods (-want,+got): %s", diff) |
| } |
| if actual.Status.StartTime != nil && tc.suspend { |
| t.Error("Unexpected .status.startTime not nil when suspend is true") |
| } |
| if actual.Status.StartTime == nil && !tc.suspend { |
| t.Error("Missing .status.startTime") |
| } |
| // validate conditions |
| if tc.expectedCondition != nil { |
| if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { |
| t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) |
| } |
| } else { |
| if cond := hasTrueCondition(actual); cond != nil { |
| t.Errorf("Got condition %s, want none", *cond) |
| } |
| } |
| if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 { |
| t.Errorf("Unexpected conditions %v", actual.Status.Conditions) |
| } |
| // validate slow start |
| expectedLimit := 0 |
| for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ { |
| expectedLimit += controller.SlowStartInitialBatchSize << pass |
| } |
| if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit { |
| t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) |
| } |
| if p := len(fakePodControl.Patches); p != tc.expectedPodPatches { |
| t.Errorf("Got %d pod patches, want %d", p, tc.expectedPodPatches) |
| } |
| }) |
| } |
| } |
| |
| func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string, podIndexLabelDisabled bool) { |
| t.Helper() |
| gotIndexes := sets.New[int]() |
| for _, p := range control.Templates { |
| checkJobCompletionEnvVariable(t, &p.Spec, podIndexLabelDisabled) |
| if !podIndexLabelDisabled { |
| checkJobCompletionLabel(t, &p) |
| } |
| ix := getCompletionIndex(p.Annotations) |
| if ix == -1 { |
| t.Errorf("Created pod %s didn't have completion index", p.Name) |
| } else { |
| gotIndexes.Insert(ix) |
| } |
| expectedName := fmt.Sprintf("%s-%d", jobName, ix) |
| if expectedName != p.Spec.Hostname { |
| t.Errorf("Got pod hostname %s, want %s", p.Spec.Hostname, expectedName) |
| } |
| expectedName += "-" |
| if expectedName != p.GenerateName { |
| t.Errorf("Got pod generate name %s, want %s", p.GenerateName, expectedName) |
| } |
| } |
| if diff := cmp.Diff(sets.List(wantIndexes), sets.List(gotIndexes)); diff != "" { |
| t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff) |
| } |
| } |
| |
| func TestGetNewFinshedPods(t *testing.T) { |
| cases := map[string]struct { |
| job batch.Job |
| pods []*v1.Pod |
| expectedRmFinalizers sets.Set[string] |
| wantSucceeded int32 |
| wantFailed int32 |
| }{ |
| "some counted": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Succeeded: 2, |
| Failed: 1, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).Pod, |
| buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("d").phase(v1.PodFailed).Pod, |
| buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("f").phase(v1.PodRunning).Pod, |
| }, |
| wantSucceeded: 4, |
| wantFailed: 2, |
| }, |
| "some uncounted": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Succeeded: 1, |
| Failed: 1, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a", "c"}, |
| Failed: []types.UID{"e", "f"}, |
| }, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).Pod, |
| buildPod().uid("b").phase(v1.PodSucceeded).Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("d").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("e").phase(v1.PodFailed).Pod, |
| buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| wantSucceeded: 4, |
| wantFailed: 4, |
| }, |
| "with expected removed finalizers": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Succeeded: 2, |
| Failed: 2, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"d"}, |
| }, |
| }, |
| }, |
| expectedRmFinalizers: sets.New("b", "f"), |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).Pod, |
| buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("d").phase(v1.PodFailed).Pod, |
| buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| wantSucceeded: 4, |
| wantFailed: 5, |
| }, |
| "deleted pods": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Succeeded: 1, |
| Failed: 1, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().uid("c").phase(v1.PodRunning).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().uid("d").phase(v1.PodPending).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().uid("e").phase(v1.PodRunning).deletionTimestamp().Pod, |
| buildPod().uid("f").phase(v1.PodPending).deletionTimestamp().Pod, |
| }, |
| wantSucceeded: 2, |
| wantFailed: 4, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) |
| jobCtx := &syncJobCtx{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers} |
| succeededPods, failedPods := getNewFinishedPods(jobCtx) |
| succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded)) |
| failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed)) |
| if succeeded != tc.wantSucceeded { |
| t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) |
| } |
| if failed != tc.wantFailed { |
| t.Errorf("getStatus reports %d succeeded pods, want %d", failed, tc.wantFailed) |
| } |
| }) |
| } |
| } |
| |
| func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { |
| logger, ctx := ktesting.NewTestContext(t) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| now := fakeClock.Now() |
| minuteAgo := now.Add(-time.Minute) |
| completedCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", now) |
| succeededCond := newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, "", "", minuteAgo) |
| failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", now) |
| indexedCompletion := batch.IndexedCompletion |
| mockErr := errors.New("mock error") |
| cases := map[string]struct { |
| job batch.Job |
| pods []*v1.Pod |
| finishedCond *batch.JobCondition |
| expectedRmFinalizers sets.Set[string] |
| needsFlush bool |
| statusUpdateErr error |
| podControlErr error |
| wantErr error |
| wantRmFinalizers int |
| wantStatusUpdates []batch.JobStatus |
| wantSucceededPodsMetric int |
| wantFailedPodsMetric int |
| |
| // features |
| enableJobBackoffLimitPerIndex bool |
| enableJobSuccessPolicy bool |
| }{ |
| "no updates": {}, |
| "new active": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Active: 1, |
| }, |
| }, |
| needsFlush: true, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Active: 1, |
| }, |
| }, |
| }, |
| "track finished pods": { |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().uid("e").phase(v1.PodPending).trackingFinalizer().deletionTimestamp().Pod, |
| buildPod().phase(v1.PodPending).trackingFinalizer().Pod, |
| buildPod().phase(v1.PodRunning).trackingFinalizer().Pod, |
| }, |
| wantRmFinalizers: 5, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a", "c"}, |
| Failed: []types.UID{"b", "d", "e"}, |
| }, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Succeeded: 2, |
| Failed: 3, |
| }, |
| }, |
| wantSucceededPodsMetric: 2, |
| wantFailedPodsMetric: 3, |
| }, |
| "past and new finished pods": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Active: 1, |
| Succeeded: 2, |
| Failed: 3, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a", "e"}, |
| Failed: []types.UID{"b", "f"}, |
| }, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("e").phase(v1.PodSucceeded).Pod, |
| buildPod().phase(v1.PodFailed).Pod, |
| buildPod().phase(v1.PodPending).Pod, |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| wantRmFinalizers: 4, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a", "c"}, |
| Failed: []types.UID{"b", "d"}, |
| }, |
| Active: 1, |
| Succeeded: 3, |
| Failed: 4, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Active: 1, |
| Succeeded: 5, |
| Failed: 6, |
| }, |
| }, |
| wantSucceededPodsMetric: 3, |
| wantFailedPodsMetric: 3, |
| }, |
| "expecting removed finalizers": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| Succeeded: 2, |
| Failed: 3, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a", "g"}, |
| Failed: []types.UID{"b", "h"}, |
| }, |
| }, |
| }, |
| expectedRmFinalizers: sets.New("c", "d", "g", "h"), |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| wantRmFinalizers: 4, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a", "e"}, |
| Failed: []types.UID{"b", "f"}, |
| }, |
| Succeeded: 3, |
| Failed: 4, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Succeeded: 5, |
| Failed: 6, |
| }, |
| }, |
| wantSucceededPodsMetric: 3, |
| wantFailedPodsMetric: 3, |
| }, |
| "succeeding job by JobSuccessPolicy": { |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodPending).trackingFinalizer().Pod, |
| }, |
| finishedCond: succeededCond, |
| wantRmFinalizers: 3, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b"}, |
| }, |
| Conditions: []batch.JobCondition{*succeededCond}, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Succeeded: 1, |
| Failed: 1, |
| Conditions: []batch.JobCondition{*succeededCond, *completedCond}, |
| CompletionTime: ptr.To(metav1.NewTime(now)), |
| }, |
| }, |
| wantSucceededPodsMetric: 1, |
| wantFailedPodsMetric: 1, |
| enableJobSuccessPolicy: true, |
| }, |
| "completing job": { |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| finishedCond: completedCond, |
| wantRmFinalizers: 2, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b"}, |
| }, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Succeeded: 1, |
| Failed: 1, |
| Conditions: []batch.JobCondition{*completedCond}, |
| CompletionTime: &completedCond.LastTransitionTime, |
| }, |
| }, |
| wantSucceededPodsMetric: 1, |
| wantFailedPodsMetric: 1, |
| }, |
| "failing job": { |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("c").phase(v1.PodRunning).trackingFinalizer().Pod, |
| }, |
| finishedCond: failedCond, |
| // Running pod counts as failed. |
| wantRmFinalizers: 3, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b", "c"}, |
| }, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Succeeded: 1, |
| Failed: 2, |
| Conditions: []batch.JobCondition{*failedCond}, |
| }, |
| }, |
| wantSucceededPodsMetric: 1, |
| wantFailedPodsMetric: 2, |
| }, |
| "deleted job": { |
| job: batch.Job{ |
| ObjectMeta: metav1.ObjectMeta{ |
| DeletionTimestamp: &metav1.Time{}, |
| }, |
| Status: batch.JobStatus{ |
| Active: 1, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().phase(v1.PodRunning).trackingFinalizer().Pod, |
| }, |
| // Removing finalizer from Running pod, but doesn't count as failed. |
| wantRmFinalizers: 3, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b"}, |
| }, |
| Active: 1, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Active: 1, |
| Succeeded: 1, |
| Failed: 1, |
| }, |
| }, |
| wantSucceededPodsMetric: 1, |
| wantFailedPodsMetric: 1, |
| }, |
| "status update error": { |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| statusUpdateErr: mockErr, |
| wantErr: mockErr, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b"}, |
| }, |
| }, |
| }, |
| }, |
| "pod patch errors": { |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| podControlErr: mockErr, |
| wantErr: mockErr, |
| wantRmFinalizers: 2, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b"}, |
| }, |
| }, |
| }, |
| }, |
| "pod patch errors with partial success": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"a"}, |
| Failed: []types.UID{"b"}, |
| }, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodSucceeded).Pod, |
| buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| podControlErr: mockErr, |
| wantErr: mockErr, |
| wantRmFinalizers: 2, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: []types.UID{"c"}, |
| Failed: []types.UID{"d"}, |
| }, |
| Succeeded: 1, |
| Failed: 1, |
| }, |
| }, |
| }, |
| "indexed job new successful pods": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](6), |
| }, |
| Status: batch.JobStatus{ |
| Active: 1, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod, |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod, |
| buildPod().phase(v1.PodRunning).trackingFinalizer().index("5").Pod, |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantRmFinalizers: 4, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| Active: 1, |
| Succeeded: 2, |
| CompletedIndexes: "1,3", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| wantSucceededPodsMetric: 2, |
| }, |
| "indexed job prev successful pods outside current completions index range with no new succeeded pods": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](2), |
| Parallelism: ptr.To[int32](2), |
| }, |
| Status: batch.JobStatus{ |
| Active: 2, |
| Succeeded: 1, |
| CompletedIndexes: "3", |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod, |
| buildPod().phase(v1.PodRunning).trackingFinalizer().index("1").Pod, |
| }, |
| wantRmFinalizers: 0, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| Active: 2, |
| Succeeded: 0, |
| CompletedIndexes: "", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| }, |
| "indexed job prev successful pods outside current completions index range with new succeeded pods in range": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](2), |
| Parallelism: ptr.To[int32](2), |
| }, |
| Status: batch.JobStatus{ |
| Active: 2, |
| Succeeded: 1, |
| CompletedIndexes: "3", |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod, |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, |
| }, |
| wantRmFinalizers: 1, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| Active: 2, |
| Succeeded: 1, |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| wantSucceededPodsMetric: 1, |
| }, |
| "indexed job new failed pods": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](6), |
| }, |
| Status: batch.JobStatus{ |
| Active: 1, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().index("1").Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().index("3").Pod, |
| buildPod().uid("c").phase(v1.PodFailed).trackingFinalizer().index("3").Pod, |
| buildPod().uid("d").phase(v1.PodRunning).trackingFinalizer().index("5").Pod, |
| buildPod().phase(v1.PodFailed).trackingFinalizer().Pod, |
| }, |
| wantRmFinalizers: 4, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| Active: 1, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"a", "b", "c"}, |
| }, |
| }, |
| { |
| Active: 1, |
| Failed: 3, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| wantFailedPodsMetric: 3, |
| }, |
| "indexed job past and new pods": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](7), |
| }, |
| Status: batch.JobStatus{ |
| Failed: 2, |
| Succeeded: 5, |
| CompletedIndexes: "0-2,4,6,7", |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().phase(v1.PodSucceeded).index("0").Pod, |
| buildPod().phase(v1.PodFailed).index("1").Pod, |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, |
| buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod, |
| buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().index("2").Pod, |
| buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().index("5").Pod, |
| }, |
| wantRmFinalizers: 4, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| Succeeded: 6, |
| Failed: 2, |
| CompletedIndexes: "0-4,6", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"a", "b"}, |
| }, |
| }, |
| { |
| Succeeded: 6, |
| Failed: 4, |
| CompletedIndexes: "0-4,6", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| wantSucceededPodsMetric: 1, |
| wantFailedPodsMetric: 2, |
| }, |
| "too many finished": { |
| job: batch.Job{ |
| Status: batch.JobStatus{ |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"a", "b"}, |
| }, |
| }, |
| }, |
| pods: func() []*v1.Pod { |
| pods := make([]*v1.Pod, 500) |
| for i := range pods { |
| pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod |
| } |
| pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) |
| return pods |
| }(), |
| wantRmFinalizers: 499, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Succeeded: func() []types.UID { |
| uids := make([]types.UID, 499) |
| for i := range uids { |
| uids[i] = types.UID(strconv.Itoa(i)) |
| } |
| return uids |
| }(), |
| Failed: []types.UID{"b"}, |
| }, |
| Failed: 1, |
| }, |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"b"}, |
| }, |
| Succeeded: 499, |
| Failed: 1, |
| }, |
| }, |
| wantSucceededPodsMetric: 499, |
| wantFailedPodsMetric: 1, |
| }, |
| "too many indexed finished": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](501), |
| }, |
| }, |
| pods: func() []*v1.Pod { |
| pods := make([]*v1.Pod, 501) |
| for i := range pods { |
| pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod |
| } |
| return pods |
| }(), |
| wantRmFinalizers: 500, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| CompletedIndexes: "0-499", |
| Succeeded: 500, |
| }, |
| }, |
| wantSucceededPodsMetric: 500, |
| }, |
| "pod flips from failed to succeeded": { |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| Completions: ptr.To[int32](2), |
| Parallelism: ptr.To[int32](2), |
| }, |
| Status: batch.JobStatus{ |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"a", "b"}, |
| }, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod, |
| buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| finishedCond: failedCond, |
| wantRmFinalizers: 2, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Failed: 2, |
| Conditions: []batch.JobCondition{*failedCond}, |
| }, |
| }, |
| wantFailedPodsMetric: 2, |
| }, |
| "indexed job with a failed pod with delayed finalizer removal; the pod is not counted": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](6), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod, |
| }, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| FailedIndexes: ptr.To(""), |
| }, |
| }, |
| }, |
| "indexed job with a failed pod which is recreated by a running pod; the pod is counted": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](6), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| Status: batch.JobStatus{ |
| Active: 1, |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod, |
| buildPod().uid("a2").phase(v1.PodRunning).indexFailureCount("1").trackingFinalizer().index("1").Pod, |
| }, |
| wantRmFinalizers: 1, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| Active: 1, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"a1"}, |
| }, |
| FailedIndexes: ptr.To(""), |
| }, |
| { |
| Active: 1, |
| Failed: 1, |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| FailedIndexes: ptr.To(""), |
| }, |
| }, |
| wantFailedPodsMetric: 1, |
| }, |
| "indexed job with a failed pod for a failed index; the pod is counted": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| Spec: batch.JobSpec{ |
| CompletionMode: &indexedCompletion, |
| Completions: ptr.To[int32](6), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []*v1.Pod{ |
| buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().index("1").Pod, |
| }, |
| wantRmFinalizers: 1, |
| wantStatusUpdates: []batch.JobStatus{ |
| { |
| FailedIndexes: ptr.To("1"), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{ |
| Failed: []types.UID{"a"}, |
| }, |
| }, |
| { |
| Failed: 1, |
| FailedIndexes: ptr.To("1"), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| wantFailedPodsMetric: 1, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)() |
| |
| clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, _ := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) |
| fakePodControl := controller.FakePodControl{Err: tc.podControlErr} |
| metrics.JobPodsFinished.Reset() |
| manager.podControl = &fakePodControl |
| var statusUpdates []batch.JobStatus |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) |
| return job, tc.statusUpdateErr |
| } |
| job := tc.job.DeepCopy() |
| if job.Status.UncountedTerminatedPods == nil { |
| job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} |
| } |
| jobCtx := &syncJobCtx{ |
| job: job, |
| pods: tc.pods, |
| uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), |
| expectedRmFinalizers: tc.expectedRmFinalizers, |
| finishedCondition: tc.finishedCond, |
| } |
| if isIndexedJob(job) { |
| jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) |
| if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil { |
| jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods) |
| jobCtx.activePods = controller.FilterActivePods(logger, tc.pods) |
| jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) |
| } |
| } |
| |
| err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush) |
| if !errors.Is(err, tc.wantErr) { |
| t.Errorf("Got error %v, want %v", err, tc.wantErr) |
| } |
| if diff := cmp.Diff(tc.wantStatusUpdates, statusUpdates, |
| cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { |
| t.Errorf("Unexpected status updates (-want,+got):\n%s", diff) |
| } |
| rmFinalizers := len(fakePodControl.Patches) |
| if rmFinalizers != tc.wantRmFinalizers { |
| t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) |
| } |
| if tc.wantErr == nil { |
| completionMode := completionModeStr(job) |
| v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded)) |
| if err != nil { |
| t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) |
| } |
| if float64(tc.wantSucceededPodsMetric) != v { |
| t.Errorf("Metric reports %.0f succeeded pods, want %d", v, tc.wantSucceededPodsMetric) |
| } |
| v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) |
| if err != nil { |
| t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) |
| } |
| if float64(tc.wantFailedPodsMetric) != v { |
| t.Errorf("Metric reports %.0f failed pods, want %d", v, tc.wantFailedPodsMetric) |
| } |
| } |
| }) |
| } |
| } |
| |
| // TestSyncJobPastDeadline verifies tracking of active deadline in a single syncJob call. |
| func TestSyncJobPastDeadline(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| testCases := map[string]struct { |
| // job setup |
| parallelism int32 |
| completions int32 |
| activeDeadlineSeconds int64 |
| startTime int64 |
| backoffLimit int32 |
| suspend bool |
| |
| // pod setup |
| activePods int |
| succeededPods int |
| failedPods int |
| |
| // expectations |
| expectedDeletions int32 |
| expectedActive int32 |
| expectedSucceeded int32 |
| expectedFailed int32 |
| expectedCondition batch.JobConditionType |
| expectedConditionReason string |
| }{ |
| "activeDeadlineSeconds less than single pod execution": { |
| parallelism: 1, |
| completions: 1, |
| activeDeadlineSeconds: 10, |
| startTime: 15, |
| backoffLimit: 6, |
| activePods: 1, |
| expectedDeletions: 1, |
| expectedFailed: 1, |
| expectedCondition: batch.JobFailed, |
| expectedConditionReason: batch.JobReasonDeadlineExceeded, |
| }, |
| "activeDeadlineSeconds bigger than single pod execution": { |
| parallelism: 1, |
| completions: 2, |
| activeDeadlineSeconds: 10, |
| startTime: 15, |
| backoffLimit: 6, |
| activePods: 1, |
| succeededPods: 1, |
| expectedDeletions: 1, |
| expectedSucceeded: 1, |
| expectedFailed: 1, |
| expectedCondition: batch.JobFailed, |
| expectedConditionReason: batch.JobReasonDeadlineExceeded, |
| }, |
| "activeDeadlineSeconds times-out before any pod starts": { |
| parallelism: 1, |
| completions: 1, |
| activeDeadlineSeconds: 10, |
| startTime: 10, |
| backoffLimit: 6, |
| expectedCondition: batch.JobFailed, |
| expectedConditionReason: batch.JobReasonDeadlineExceeded, |
| }, |
| "activeDeadlineSeconds with backofflimit reach": { |
| parallelism: 1, |
| completions: 1, |
| activeDeadlineSeconds: 1, |
| startTime: 10, |
| failedPods: 1, |
| expectedFailed: 1, |
| expectedCondition: batch.JobFailed, |
| expectedConditionReason: batch.JobReasonBackoffLimitExceeded, |
| }, |
| "activeDeadlineSeconds is not triggered when Job is suspended": { |
| suspend: true, |
| parallelism: 1, |
| completions: 2, |
| activeDeadlineSeconds: 10, |
| startTime: 15, |
| backoffLimit: 6, |
| expectedCondition: batch.JobSuspended, |
| expectedConditionReason: "JobSuspended", |
| }, |
| } |
| |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| // job manager setup |
| clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| var actual *batch.Job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| |
| // job & pods setup |
| job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) |
| job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds |
| job.Spec.Suspend = ptr.To(tc.suspend) |
| start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0) |
| job.Status.StartTime = &start |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() |
| setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0, 0) |
| |
| // run |
| err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| if err != nil { |
| t.Errorf("Unexpected error when syncing jobs %v", err) |
| } |
| // validate created/deleted pods |
| if int32(len(fakePodControl.Templates)) != 0 { |
| t.Errorf("Unexpected number of creates. Expected 0, saw %d\n", len(fakePodControl.Templates)) |
| } |
| if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { |
| t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) |
| } |
| // validate status |
| if actual.Status.Active != tc.expectedActive { |
| t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) |
| } |
| if actual.Status.Succeeded != tc.expectedSucceeded { |
| t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) |
| } |
| if actual.Status.Failed != tc.expectedFailed { |
| t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) |
| } |
| if actual.Status.StartTime == nil { |
| t.Error("Missing .status.startTime") |
| } |
| // validate conditions |
| if !getCondition(actual, tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) { |
| t.Errorf("Expected fail condition. Got %#v", actual.Status.Conditions) |
| } |
| }) |
| } |
| } |
| |
| func getCondition(job *batch.Job, condition batch.JobConditionType, status v1.ConditionStatus, reason string) bool { |
| for _, v := range job.Status.Conditions { |
| if v.Type == condition && v.Status == status && v.Reason == reason { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func hasTrueCondition(job *batch.Job) *batch.JobConditionType { |
| for _, v := range job.Status.Conditions { |
| if v.Status == v1.ConditionTrue { |
| return &v.Type |
| } |
| } |
| return nil |
| } |
| |
| // TestPastDeadlineJobFinished ensures that a Job is correctly tracked until |
| // reaching the active deadline, at which point it is marked as Failed. |
| func TestPastDeadlineJobFinished(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := fake.NewSimpleClientset() |
| fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) |
| manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| manager.expectations = FakeJobExpectations{ |
| controller.NewControllerExpectations(), true, func() { |
| }, |
| } |
| ctx, cancel := context.WithCancel(context.Background()) |
| defer cancel() |
| sharedInformerFactory.Start(ctx.Done()) |
| sharedInformerFactory.WaitForCacheSync(ctx.Done()) |
| |
| go manager.Run(ctx, 1) |
| |
| tests := []struct { |
| name string |
| setStartTime bool |
| jobName string |
| }{ |
| { |
| name: "New job created without start time being set", |
| setStartTime: false, |
| jobName: "job1", |
| }, |
| { |
| name: "New job created with start time being set", |
| setStartTime: true, |
| jobName: "job2", |
| }, |
| } |
| for _, tc := range tests { |
| t.Run(tc.name, func(t *testing.T) { |
| job := newJobWithName(tc.jobName, 1, 1, 6, batch.NonIndexedCompletion) |
| job.Spec.ActiveDeadlineSeconds = ptr.To[int64](1) |
| if tc.setStartTime { |
| start := metav1.NewTime(fakeClock.Now()) |
| job.Status.StartTime = &start |
| } |
| |
| _, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) |
| if err != nil { |
| t.Errorf("Could not create Job: %v", err) |
| } |
| |
| var j *batch.Job |
| err = wait.PollUntilContextTimeout(ctx, 200*time.Microsecond, 3*time.Second, true, func(ctx context.Context) (done bool, err error) { |
| j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| return j.Status.StartTime != nil, nil |
| }) |
| if err != nil { |
| t.Errorf("Job failed to ensure that start time was set: %v", err) |
| } |
| err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (done bool, err error) { |
| j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) |
| if err != nil { |
| return false, nil |
| } |
| if getCondition(j, batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded) { |
| if manager.clock.Since(j.Status.StartTime.Time) < time.Duration(*j.Spec.ActiveDeadlineSeconds)*time.Second { |
| return true, errors.New("Job contains DeadlineExceeded condition earlier than expected") |
| } |
| return true, nil |
| } |
| manager.clock.Sleep(100 * time.Millisecond) |
| return false, nil |
| }) |
| if err != nil { |
| t.Errorf("Job failed to enforce activeDeadlineSeconds configuration. Expected condition with Reason 'DeadlineExceeded' was not found in %v", j.Status) |
| } |
| }) |
| } |
| } |
| |
| func TestSingleJobFailedCondition(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| var actual *batch.Job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| |
| job := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job.Spec.ActiveDeadlineSeconds = ptr.To[int64](10) |
| start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) |
| job.Status.StartTime = &start |
| job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline", realClock.Now())) |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| if err != nil { |
| t.Errorf("Unexpected error when syncing jobs %v", err) |
| } |
| if len(fakePodControl.DeletePodName) != 0 { |
| t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) |
| } |
| if actual == nil { |
| t.Error("Expected job modification\n") |
| } |
| failedConditions := getConditionsByType(actual.Status.Conditions, batch.JobFailed) |
| if len(failedConditions) != 1 { |
| t.Error("Unexpected number of failed conditions\n") |
| } |
| if failedConditions[0].Status != v1.ConditionTrue { |
| t.Errorf("Unexpected status for the failed condition. Expected: %v, saw %v\n", v1.ConditionTrue, failedConditions[0].Status) |
| } |
| |
| } |
| |
| func TestSyncJobComplete(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| job := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now())) |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| if err != nil { |
| t.Fatalf("Unexpected error when syncing jobs %v", err) |
| } |
| actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name) |
| if err != nil { |
| t.Fatalf("Unexpected error when trying to get job from the store: %v", err) |
| } |
| // Verify that after syncing a complete job, the conditions are the same. |
| if got, expected := len(actual.Status.Conditions), 1; got != expected { |
| t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got) |
| } |
| } |
| |
| func TestSyncJobDeleted(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, _ := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| return job, nil |
| } |
| job := newJob(2, 2, 6, batch.NonIndexedCompletion) |
| err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| if err != nil { |
| t.Errorf("Unexpected error when syncing jobs %v", err) |
| } |
| if len(fakePodControl.Templates) != 0 { |
| t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) |
| } |
| if len(fakePodControl.DeletePodName) != 0 { |
| t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) |
| } |
| } |
| |
| func TestSyncJobWhenManagedBy(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| now := metav1.Now() |
| baseJob := batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "foobar", |
| Namespace: metav1.NamespaceDefault, |
| }, |
| Spec: batch.JobSpec{ |
| Template: v1.PodTemplateSpec{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Labels: map[string]string{ |
| "foo": "bar", |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| {Image: "foo/bar"}, |
| }, |
| }, |
| }, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](6), |
| }, |
| Status: batch.JobStatus{ |
| Active: 1, |
| Ready: ptr.To[int32](1), |
| StartTime: &now, |
| }, |
| } |
| |
| testCases := map[string]struct { |
| enableJobManagedBy bool |
| job batch.Job |
| wantStatus batch.JobStatus |
| }{ |
| "job with custom value of managedBy; feature enabled; the status is unchanged": { |
| enableJobManagedBy: true, |
| job: func() batch.Job { |
| job := baseJob.DeepCopy() |
| job.Spec.ManagedBy = ptr.To("custom-managed-by") |
| return *job |
| }(), |
| wantStatus: baseJob.Status, |
| }, |
| "job with well known value of the managedBy; feature enabled; the status is updated": { |
| enableJobManagedBy: true, |
| job: func() batch.Job { |
| job := baseJob.DeepCopy() |
| job.Spec.ManagedBy = ptr.To(batch.JobControllerName) |
| return *job |
| }(), |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Ready: ptr.To[int32](0), |
| StartTime: &now, |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| "job with custom value of managedBy; feature disabled; the status is updated": { |
| job: func() batch.Job { |
| job := baseJob.DeepCopy() |
| job.Spec.ManagedBy = ptr.To("custom-managed-by") |
| return *job |
| }(), |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Ready: ptr.To[int32](0), |
| StartTime: &now, |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| "job without the managedBy; feature enabled; the status is updated": { |
| enableJobManagedBy: true, |
| job: baseJob, |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Ready: ptr.To[int32](0), |
| StartTime: &now, |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| } |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)() |
| |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| job := &tc.job |
| |
| actual := job |
| manager.updateStatusHandler = func(_ context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { |
| t.Fatalf("error %v while adding the %v job to the index", err, klog.KObj(job)) |
| } |
| |
| if err := manager.syncJob(ctx, testutil.GetKey(job, t)); err != nil { |
| t.Fatalf("error %v while reconciling the job %v", err, testutil.GetKey(job, t)) |
| } |
| |
| if diff := cmp.Diff(tc.wantStatus, actual.Status); diff != "" { |
| t.Errorf("Unexpected job status (-want,+got):\n%s", diff) |
| } |
| }) |
| } |
| } |
| |
| func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| now := metav1.Now() |
| indexedCompletionMode := batch.IndexedCompletion |
| validObjectMeta := metav1.ObjectMeta{ |
| Name: "foobar", |
| UID: uuid.NewUUID(), |
| Namespace: metav1.NamespaceDefault, |
| } |
| validSelector := &metav1.LabelSelector{ |
| MatchLabels: map[string]string{"foo": "bar"}, |
| } |
| validTemplate := v1.PodTemplateSpec{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Labels: map[string]string{ |
| "foo": "bar", |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| {Image: "foo/bar"}, |
| }, |
| }, |
| } |
| |
| onExitCodeRules := []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{1, 2, 3}, |
| }, |
| }, |
| { |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{5, 6, 7}, |
| }, |
| }, |
| } |
| |
| testCases := map[string]struct { |
| enableJobPodFailurePolicy bool |
| enablePodDisruptionConditions bool |
| enableJobPodReplacementPolicy bool |
| job batch.Job |
| pods []v1.Pod |
| wantConditions *[]batch.JobCondition |
| wantStatusFailed int32 |
| wantStatusActive int32 |
| wantStatusSucceeded int32 |
| wantStatusTerminating *int32 |
| }{ |
| "default handling for pod failure if the container matching the exit codes does not match the containerName restriction": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| ContainerName: ptr.To("main-container"), |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{1, 2, 3}, |
| }, |
| }, |
| { |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| ContainerName: ptr.To("main-container"), |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{5, 6, 7}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "monitoring-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 42, |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusSucceeded: 0, |
| wantStatusFailed: 1, |
| }, |
| "running pod should not result in job fail based on OnExitCodes": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodRunning, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 0, |
| wantStatusSucceeded: 0, |
| }, |
| "fail job based on OnExitCodes": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "job marked already as failure target with failed pod": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailureTarget, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "job marked already as failure target with failed pod, message based on already deleted pod": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailureTarget, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/already-deleted-pod failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/already-deleted-pod failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "default handling for a failed pod when the feature is disabled even, despite matching rule": { |
| enableJobPodFailurePolicy: false, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "fail job with multiple pods": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodRunning, |
| }, |
| }, |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/mypod-1 failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 2, |
| wantStatusSucceeded: 0, |
| }, |
| "fail indexed job based on OnExitCodes": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: &indexedCompletionMode, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "fail job based on OnExitCodes with NotIn operator": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpNotIn, |
| Values: []int32{5, 6, 7}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 42, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container main-container for pod default/mypod-0 failed with exit code 42 matching FailJob rule at index 0", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "default handling job based on OnExitCodes with NotIn operator": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpNotIn, |
| Values: []int32{5, 6, 7}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "fail job based on OnExitCodes for InitContainer": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| InitContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "init-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 5, |
| }, |
| }, |
| }, |
| }, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "main-container", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 143, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container init-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "ignore pod failure; both rules are matching, the first is executed only": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "container1", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 2, |
| }, |
| }, |
| }, |
| { |
| Name: "container2", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 6, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 0, |
| wantStatusSucceeded: 0, |
| }, |
| "ignore pod failure based on OnExitCodes": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 1, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 0, |
| wantStatusSucceeded: 0, |
| }, |
| "default job based on OnExitCodes": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: onExitCodeRules, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 10, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonBackoffLimitExceeded, |
| Message: "Job has reached the specified backoff limit", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "count pod failure based on OnExitCodes; both rules are matching, the first is executed only": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionCount, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{1, 2}, |
| }, |
| }, |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{2, 3}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 2, |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "count pod failure based on OnPodConditions; both rules are matching, the first is executed only": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionCount, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.PodConditionType("ResourceLimitExceeded"), |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{ |
| { |
| Type: v1.PodConditionType("ResourceLimitExceeded"), |
| Status: v1.ConditionTrue, |
| }, |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| FinishedAt: testFinishedAt, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "ignore pod failure based on OnPodConditions": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 1, |
| wantStatusFailed: 0, |
| wantStatusSucceeded: 0, |
| }, |
| "ignore pod failure based on OnPodConditions, ignored failures delays pod recreation": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| ObjectMeta: metav1.ObjectMeta{ |
| DeletionTimestamp: &now, |
| }, |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: nil, |
| wantStatusActive: 0, |
| wantStatusFailed: 0, |
| wantStatusSucceeded: 0, |
| }, |
| "fail job based on OnPodConditions": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](1), |
| Completions: ptr.To[int32](1), |
| BackoffLimit: ptr.To[int32](6), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| Status: v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| wantConditions: &[]batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", |
| }, |
| }, |
| wantStatusActive: 0, |
| wantStatusFailed: 1, |
| wantStatusSucceeded: 0, |
| }, |
| "terminating Pod considered failed when PodDisruptionConditions is disabled": { |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Parallelism: ptr.To[int32](1), |
| Selector: validSelector, |
| Template: validTemplate, |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionCount, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| ObjectMeta: metav1.ObjectMeta{ |
| DeletionTimestamp: &now, |
| }, |
| }, |
| }, |
| }, |
| "terminating Pod not considered failed when PodDisruptionConditions is enabled": { |
| enableJobPodFailurePolicy: true, |
| enablePodDisruptionConditions: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Parallelism: ptr.To[int32](1), |
| Selector: validSelector, |
| Template: validTemplate, |
| BackoffLimit: ptr.To[int32](0), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionCount, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ |
| { |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| { |
| ObjectMeta: metav1.ObjectMeta{ |
| DeletionTimestamp: &now, |
| }, |
| Status: v1.PodStatus{ |
| Phase: v1.PodRunning, |
| }, |
| }, |
| }, |
| }, |
| } |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)() |
| |
| if tc.job.Spec.PodReplacementPolicy == nil { |
| tc.job.Spec.PodReplacementPolicy = podReplacementPolicy(batch.Failed) |
| } |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| job := &tc.job |
| |
| actual := job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| for i, pod := range tc.pods { |
| pod := pod |
| pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) |
| if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { |
| pb.index(fmt.Sprintf("%v", i)) |
| } |
| pb = pb.trackingFinalizer() |
| sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) |
| } |
| |
| manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| |
| if tc.wantConditions != nil { |
| for _, wantCondition := range *tc.wantConditions { |
| conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type) |
| if len(conditions) != 1 { |
| t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type) |
| } |
| condition := *conditions[0] |
| if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { |
| t.Errorf("Unexpected job condition (-want,+got):\n%s", diff) |
| } |
| } |
| } else { |
| if cond := hasTrueCondition(actual); cond != nil { |
| t.Errorf("Got condition %s, want none", *cond) |
| } |
| } |
| // validate status |
| if actual.Status.Active != tc.wantStatusActive { |
| t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active) |
| } |
| if actual.Status.Succeeded != tc.wantStatusSucceeded { |
| t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded) |
| } |
| if actual.Status.Failed != tc.wantStatusFailed { |
| t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) |
| } |
| if ptr.Deref(actual.Status.Terminating, 0) != ptr.Deref(tc.wantStatusTerminating, 0) { |
| t.Errorf("unexpected number of terminating pods. Expected %d, saw %d\n", ptr.Deref(tc.wantStatusTerminating, 0), ptr.Deref(actual.Status.Terminating, 0)) |
| } |
| }) |
| } |
| } |
| |
| func TestSyncJobWithJobSuccessPolicy(t *testing.T) { |
| now := time.Now() |
| validTypeMeta := metav1.TypeMeta{ |
| APIVersion: batch.SchemeGroupVersion.String(), |
| Kind: "Job", |
| } |
| validObjectMeta := metav1.ObjectMeta{ |
| Name: "foobar", |
| UID: uuid.NewUUID(), |
| Namespace: metav1.NamespaceDefault, |
| } |
| validSelector := &metav1.LabelSelector{ |
| MatchLabels: map[string]string{"foo": "bar"}, |
| } |
| validTemplate := v1.PodTemplateSpec{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Labels: map[string]string{ |
| "foo": "bar", |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| {Image: "foobar"}, |
| }, |
| }, |
| } |
| |
| testCases := map[string]struct { |
| enableJobFailurePolicy bool |
| enableBackoffLimitPerIndex bool |
| enableJobSuccessPolicy bool |
| job batch.Job |
| pods []v1.Pod |
| wantStatus batch.JobStatus |
| }{ |
| "job with successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { |
| enableJobSuccessPolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](3), |
| Completions: ptr.To[int32](3), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with podFailurePolicy and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { |
| enableJobSuccessPolicy: true, |
| enableJobFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{{ |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with backoffLimitPerIndex and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { |
| enableJobSuccessPolicy: true, |
| enableBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| BackoffLimitPerIndex: ptr.To[int32](2), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| FailedIndexes: ptr.To(""), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { |
| enableJobSuccessPolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| BackoffLimitPerIndex: ptr.To[int32](2), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| // In the current mechanism, the job controller adds Complete condition to Job |
| // even if some running pods still remain. |
| // So, we need to revisit here before we graduate the JobSuccessPolicy to beta. |
| // TODO(#123775): A Job might finish with ready!=0 |
| // REF: https://github.com/kubernetes/kubernetes/issues/123775 |
| "job with successPolicy; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { |
| enableJobSuccessPolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](3), |
| Completions: ptr.To[int32](3), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| BackoffLimitPerIndex: ptr.To[int32](3), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("1").phase(v1.PodRunning).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and podFailurePolicy; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { |
| enableJobSuccessPolicy: true, |
| enableJobFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{{ |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").status(v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }).trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 2, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailureTarget, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", |
| }, |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and backoffLimitPerIndex; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { |
| enableJobSuccessPolicy: true, |
| enableBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| FailedIndexes: ptr.To("0"), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonFailedIndexes, |
| Message: "Job has failed indexes", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and backoffLimit; job has a failed condition when job meets to both successPolicy and backoffLimit": { |
| enableJobSuccessPolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](1), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 2, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonBackoffLimitExceeded, |
| Message: "Job has reached the specified backoff limit", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and podFailurePolicy; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { |
| enableJobSuccessPolicy: true, |
| enableJobFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{{ |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }}, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Failed: 0, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").status(v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and backoffLimitPerIndex; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { |
| enableJobSuccessPolicy: true, |
| enableBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("1"), |
| }}, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Failed: 0, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| FailedIndexes: ptr.To("0"), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and backoffLimit: job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { |
| enableJobSuccessPolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](1), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Failed: 0, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobSuccessCriteriaMet, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonSuccessPolicy, |
| Message: "Matched rules at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job with successPolicy and podFailureTarget; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { |
| enableJobSuccessPolicy: true, |
| enableJobFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{{ |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }}, |
| }, |
| }, |
| Status: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 0, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailureTarget, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").status(v1.PodStatus{ |
| Phase: v1.PodFailed, |
| Conditions: []v1.PodCondition{{ |
| Type: v1.DisruptionTarget, |
| Status: v1.ConditionTrue, |
| }}, |
| }).Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailureTarget, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", |
| }, |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", |
| }, |
| }, |
| }, |
| }, |
| "when the JobSuccessPolicy is disabled, the Job never got SuccessCriteriaMet condition even if the Job has the successPolicy field": { |
| job: batch.Job{ |
| TypeMeta: validTypeMeta, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| Parallelism: ptr.To[int32](3), |
| Completions: ptr.To[int32](3), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| SuccessPolicy: &batch.SuccessPolicy{ |
| Rules: []batch.SuccessPolicyRule{{ |
| SucceededIndexes: ptr.To("0,1"), |
| SucceededCount: ptr.To[int32](1), |
| }}, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, |
| *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Terminating: ptr.To[int32](0), |
| Succeeded: 1, |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| } |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)() |
| |
| clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(now) |
| _, ctx := ktesting.NewTestContext(t) |
| manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) |
| manager.podControl = &controller.FakePodControl{} |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| job := &tc.job |
| |
| actual := job |
| manager.updateStatusHandler = func(_ context.Context, j *batch.Job) (*batch.Job, error) { |
| actual = j |
| return j, nil |
| } |
| if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { |
| t.Fatalf("Failed to add the Job %q to sharedInformer: %v", klog.KObj(job), err) |
| } |
| for i, pod := range tc.pods { |
| pb := podBuilder{Pod: pod.DeepCopy()}.name(fmt.Sprintf("mypod-%d", i)).job(job) |
| if isIndexedJob(job) { |
| pb.index(strconv.Itoa(getCompletionIndex(pod.Annotations))) |
| } |
| if err := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod); err != nil { |
| t.Fatalf("Failed to add the Pod %q to sharedInformer: %v", klog.KObj(pb.Pod), err) |
| } |
| } |
| |
| if err := manager.syncJob(ctx, testutil.GetKey(job, t)); err != nil { |
| t.Fatalf("Failed to complete syncJob: %v", err) |
| } |
| |
| if diff := cmp.Diff(tc.wantStatus, actual.Status, |
| cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"), |
| cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { |
| t.Errorf("Unexpectd Job status (-want,+got):\n%s", diff) |
| } |
| }) |
| } |
| } |
| |
| func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| now := time.Now() |
| validObjectMeta := metav1.ObjectMeta{ |
| Name: "foobar", |
| UID: uuid.NewUUID(), |
| Namespace: metav1.NamespaceDefault, |
| } |
| validSelector := &metav1.LabelSelector{ |
| MatchLabels: map[string]string{"foo": "bar"}, |
| } |
| validTemplate := v1.PodTemplateSpec{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Labels: map[string]string{ |
| "foo": "bar", |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| {Image: "foo/bar"}, |
| }, |
| }, |
| } |
| |
| testCases := map[string]struct { |
| enableJobBackoffLimitPerIndex bool |
| enableJobPodFailurePolicy bool |
| job batch.Job |
| pods []v1.Pod |
| wantStatus batch.JobStatus |
| }{ |
| "successful job after a single failure within index": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a1").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, |
| *buildPod().uid("a2").index("0").phase(v1.PodSucceeded).indexFailureCount("1").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 2, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "0,1", |
| FailedIndexes: ptr.To(""), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| }, |
| }, |
| }, |
| }, |
| "single failed pod, not counted as the replacement pod creation is delayed": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| FailedIndexes: ptr.To(""), |
| }, |
| }, |
| "single failed pod replaced already": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("0").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Failed: 1, |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| FailedIndexes: ptr.To(""), |
| }, |
| }, |
| "single failed index due to exceeding the backoff limit per index, the job continues": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 1, |
| Failed: 1, |
| FailedIndexes: ptr.To("0"), |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| "single failed index due to FailIndex action, the job continues": { |
| enableJobBackoffLimitPerIndex: true, |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionFailIndex, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{3}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").status(v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 3, |
| }, |
| }, |
| }, |
| }, |
| }).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 1, |
| Failed: 1, |
| FailedIndexes: ptr.To("0"), |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| "job failed index due to FailJob action": { |
| enableJobBackoffLimitPerIndex: true, |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](6), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionFailJob, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{3}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").status(v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "x", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 3, |
| }, |
| }, |
| }, |
| }, |
| }).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 0, |
| Failed: 1, |
| FailedIndexes: ptr.To(""), |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailureTarget, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0", |
| }, |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonPodFailurePolicy, |
| Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0", |
| }, |
| }, |
| }, |
| }, |
| "job pod failure ignored due to matching Ignore action": { |
| enableJobBackoffLimitPerIndex: true, |
| enableJobPodFailurePolicy: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](6), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| PodFailurePolicy: &batch.PodFailurePolicy{ |
| Rules: []batch.PodFailurePolicyRule{ |
| { |
| Action: batch.PodFailurePolicyActionIgnore, |
| OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ |
| Operator: batch.PodFailurePolicyOnExitCodesOpIn, |
| Values: []int32{3}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").status(v1.PodStatus{ |
| Phase: v1.PodFailed, |
| ContainerStatuses: []v1.ContainerStatus{ |
| { |
| Name: "x", |
| State: v1.ContainerState{ |
| Terminated: &v1.ContainerStateTerminated{ |
| ExitCode: 3, |
| }, |
| }, |
| }, |
| }, |
| }).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Failed: 0, |
| FailedIndexes: ptr.To(""), |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| "job failed due to exceeding backoffLimit before backoffLimitPerIndex": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](1), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 2, |
| Succeeded: 0, |
| FailedIndexes: ptr.To(""), |
| Terminating: ptr.To[int32](0), |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonBackoffLimitExceeded, |
| Message: "Job has reached the specified backoff limit", |
| }, |
| }, |
| }, |
| }, |
| "job failed due to failed indexes": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](2), |
| Completions: ptr.To[int32](2), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 1, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| FailedIndexes: ptr.To("0"), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonFailedIndexes, |
| Message: "Job has failed indexes", |
| }, |
| }, |
| }, |
| }, |
| "job failed due to exceeding max failed indexes": { |
| enableJobBackoffLimitPerIndex: true, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](4), |
| Completions: ptr.To[int32](4), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| MaxFailedIndexes: ptr.To[int32](1), |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, |
| *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, |
| *buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, |
| *buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Failed: 3, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| FailedIndexes: ptr.To("0,2"), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| Conditions: []batch.JobCondition{ |
| { |
| Type: batch.JobFailed, |
| Status: v1.ConditionTrue, |
| Reason: batch.JobReasonMaxFailedIndexesExceeded, |
| Message: "Job has exceeded the specified maximal number of failed indexes", |
| }, |
| }, |
| }, |
| }, |
| "job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": { |
| enableJobBackoffLimitPerIndex: false, |
| job: batch.Job{ |
| TypeMeta: metav1.TypeMeta{Kind: "Job"}, |
| ObjectMeta: validObjectMeta, |
| Spec: batch.JobSpec{ |
| Selector: validSelector, |
| Template: validTemplate, |
| Parallelism: ptr.To[int32](3), |
| Completions: ptr.To[int32](3), |
| BackoffLimit: ptr.To[int32](math.MaxInt32), |
| CompletionMode: completionModePtr(batch.IndexedCompletion), |
| BackoffLimitPerIndex: ptr.To[int32](1), |
| }, |
| Status: batch.JobStatus{ |
| FailedIndexes: ptr.To("0"), |
| CompletedIndexes: "1", |
| }, |
| }, |
| pods: []v1.Pod{ |
| *buildPod().uid("c").index("2").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod, |
| }, |
| wantStatus: batch.JobStatus{ |
| Active: 2, |
| Succeeded: 1, |
| Terminating: ptr.To[int32](0), |
| CompletedIndexes: "1", |
| UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, |
| }, |
| }, |
| } |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() |
| defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(now) |
| manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| job := &tc.job |
| |
| actual := job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| for i, pod := range tc.pods { |
| pod := pod |
| pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) |
| if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { |
| pb.index(fmt.Sprintf("%v", getCompletionIndex(pod.Annotations))) |
| } |
| pb = pb.trackingFinalizer() |
| sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) |
| } |
| |
| manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| |
| // validate relevant fields of the status |
| if diff := cmp.Diff(tc.wantStatus, actual.Status, |
| cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"), |
| cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { |
| t.Errorf("unexpected job status. Diff: %s\n", diff) |
| } |
| }) |
| } |
| } |
| |
| func TestSyncJobUpdateRequeue(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| cases := map[string]struct { |
| updateErr error |
| wantRequeued bool |
| }{ |
| "no error": { |
| wantRequeued: false, |
| }, |
| "generic error": { |
| updateErr: fmt.Errorf("update error"), |
| wantRequeued: true, |
| }, |
| "conflict error": { |
| updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), |
| wantRequeued: true, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) |
| fakeClient := clocktesting.NewFakeClock(time.Now()) |
| manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClient) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| return job, tc.updateErr |
| } |
| job := newJob(2, 2, 6, batch.NonIndexedCompletion) |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| manager.queue.Add(testutil.GetKey(job, t)) |
| manager.processNextWorkItem(context.TODO()) |
| if tc.wantRequeued { |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1) |
| } else { |
| // We advance the clock to make sure there are not items awaiting |
| // to be added into the queue. We also sleep a little to give the |
| // delaying queue time to move the potential items from pre-queue |
| // into the queue asynchronously. |
| manager.clock.Sleep(fastJobApiBackoff) |
| time.Sleep(time.Millisecond) |
| verifyEmptyQueue(ctx, t, manager) |
| } |
| }) |
| } |
| } |
| |
| func TestUpdateJobRequeue(t *testing.T) { |
| logger, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| cases := map[string]struct { |
| oldJob *batch.Job |
| updateFn func(job *batch.Job) |
| wantRequeuedImmediately bool |
| }{ |
| "spec update": { |
| oldJob: newJob(1, 1, 1, batch.IndexedCompletion), |
| updateFn: func(job *batch.Job) { |
| job.Spec.Suspend = ptr.To(false) |
| job.Generation++ |
| }, |
| wantRequeuedImmediately: true, |
| }, |
| "status update": { |
| oldJob: newJob(1, 1, 1, batch.IndexedCompletion), |
| updateFn: func(job *batch.Job) { |
| job.Status.StartTime = &metav1.Time{Time: time.Now()} |
| }, |
| wantRequeuedImmediately: false, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.oldJob) |
| newJob := tc.oldJob.DeepCopy() |
| if tc.updateFn != nil { |
| tc.updateFn(newJob) |
| } |
| manager.updateJob(logger, tc.oldJob, newJob) |
| gotRequeuedImmediately := manager.queue.Len() > 0 |
| if tc.wantRequeuedImmediately != gotRequeuedImmediately { |
| t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately) |
| } |
| }) |
| } |
| } |
| |
| func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) { |
| logger, ctx := ktesting.NewTestContext(t) |
| now := time.Now() |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| cases := map[string]struct { |
| indexesToAdd []int |
| podsWithDelayedDeletionPerIndex map[int]*v1.Pod |
| wantIndexesToAdd []int |
| wantRemainingTime time.Duration |
| }{ |
| "simple index creation": { |
| indexesToAdd: []int{1, 3}, |
| wantIndexesToAdd: []int{1, 3}, |
| }, |
| "subset of indexes can be recreated now": { |
| indexesToAdd: []int{1, 3}, |
| podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ |
| 1: buildPod().indexFailureCount("0").index("1").customDeletionTimestamp(now).Pod, |
| }, |
| wantIndexesToAdd: []int{3}, |
| }, |
| "subset of indexes can be recreated now as the pods failed long time ago": { |
| indexesToAdd: []int{1, 3}, |
| podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ |
| 1: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod, |
| 3: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-DefaultJobPodFailureBackOff)).Pod, |
| }, |
| wantIndexesToAdd: []int{3}, |
| }, |
| "no indexes can be recreated now, need to wait default pod failure backoff": { |
| indexesToAdd: []int{1, 2, 3}, |
| podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ |
| 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now).Pod, |
| 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod, |
| 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now).Pod, |
| }, |
| wantRemainingTime: DefaultJobPodFailureBackOff, |
| }, |
| "no indexes can be recreated now, need to wait but 1s already passed": { |
| indexesToAdd: []int{1, 2, 3}, |
| podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ |
| 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now.Add(-time.Second)).Pod, |
| 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-time.Second)).Pod, |
| 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now.Add(-time.Second)).Pod, |
| }, |
| wantRemainingTime: DefaultJobPodFailureBackOff - time.Second, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| fakeClock := clocktesting.NewFakeClock(now) |
| manager, _ := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| gotIndexesToAdd, gotRemainingTime := manager.getPodCreationInfoForIndependentIndexes(logger, tc.indexesToAdd, tc.podsWithDelayedDeletionPerIndex) |
| if diff := cmp.Diff(tc.wantIndexesToAdd, gotIndexesToAdd); diff != "" { |
| t.Fatalf("Unexpected indexes to add: %s", diff) |
| } |
| if diff := cmp.Diff(tc.wantRemainingTime, gotRemainingTime); diff != "" { |
| t.Fatalf("Unexpected remaining time: %s", diff) |
| } |
| }) |
| } |
| } |
| |
| func TestJobPodLookup(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| testCases := []struct { |
| job *batch.Job |
| pod *v1.Pod |
| |
| expectedName string |
| }{ |
| // pods without labels don't match any job |
| { |
| job: &batch.Job{ |
| ObjectMeta: metav1.ObjectMeta{Name: "basic"}, |
| }, |
| pod: &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll}, |
| }, |
| expectedName: "", |
| }, |
| // matching labels, different namespace |
| { |
| job: &batch.Job{ |
| ObjectMeta: metav1.ObjectMeta{Name: "foo"}, |
| Spec: batch.JobSpec{ |
| Selector: &metav1.LabelSelector{ |
| MatchLabels: map[string]string{"foo": "bar"}, |
| }, |
| }, |
| }, |
| pod: &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "foo2", |
| Namespace: "ns", |
| Labels: map[string]string{"foo": "bar"}, |
| }, |
| }, |
| expectedName: "", |
| }, |
| // matching ns and labels returns |
| { |
| job: &batch.Job{ |
| ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"}, |
| Spec: batch.JobSpec{ |
| Selector: &metav1.LabelSelector{ |
| MatchExpressions: []metav1.LabelSelectorRequirement{ |
| { |
| Key: "foo", |
| Operator: metav1.LabelSelectorOpIn, |
| Values: []string{"bar"}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| pod: &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "foo3", |
| Namespace: "ns", |
| Labels: map[string]string{"foo": "bar"}, |
| }, |
| }, |
| expectedName: "bar", |
| }, |
| } |
| for _, tc := range testCases { |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) |
| if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 { |
| if got, want := len(jobs), 1; got != want { |
| t.Errorf("len(jobs) = %v, want %v", got, want) |
| } |
| job := jobs[0] |
| if tc.expectedName != job.Name { |
| t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName) |
| } |
| } else if tc.expectedName != "" { |
| t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name) |
| } |
| } |
| } |
| |
| func TestGetPodsForJob(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| job := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job.Name = "test_job" |
| otherJob := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| otherJob.Name = "other_job" |
| cases := map[string]struct { |
| jobDeleted bool |
| jobDeletedInCache bool |
| pods []*v1.Pod |
| wantPods []string |
| wantPodsFinalizer []string |
| }{ |
| "only matching": { |
| pods: []*v1.Pod{ |
| buildPod().name("pod1").job(job).trackingFinalizer().Pod, |
| buildPod().name("pod2").job(otherJob).Pod, |
| buildPod().name("pod3").ns(job.Namespace).Pod, |
| buildPod().name("pod4").job(job).Pod, |
| }, |
| wantPods: []string{"pod1", "pod4"}, |
| wantPodsFinalizer: []string{"pod1"}, |
| }, |
| "adopt": { |
| pods: []*v1.Pod{ |
| buildPod().name("pod1").job(job).Pod, |
| buildPod().name("pod2").job(job).clearOwner().Pod, |
| buildPod().name("pod3").job(otherJob).Pod, |
| }, |
| wantPods: []string{"pod1", "pod2"}, |
| wantPodsFinalizer: []string{"pod2"}, |
| }, |
| "no adopt when deleting": { |
| jobDeleted: true, |
| jobDeletedInCache: true, |
| pods: []*v1.Pod{ |
| buildPod().name("pod1").job(job).Pod, |
| buildPod().name("pod2").job(job).clearOwner().Pod, |
| }, |
| wantPods: []string{"pod1"}, |
| }, |
| "no adopt when deleting race": { |
| jobDeleted: true, |
| pods: []*v1.Pod{ |
| buildPod().name("pod1").job(job).Pod, |
| buildPod().name("pod2").job(job).clearOwner().Pod, |
| }, |
| wantPods: []string{"pod1"}, |
| }, |
| "release": { |
| pods: []*v1.Pod{ |
| buildPod().name("pod1").job(job).Pod, |
| buildPod().name("pod2").job(job).clearLabels().Pod, |
| }, |
| wantPods: []string{"pod1"}, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| job := job.DeepCopy() |
| if tc.jobDeleted { |
| job.DeletionTimestamp = &metav1.Time{} |
| } |
| clientSet := fake.NewSimpleClientset(job, otherJob) |
| jm, informer := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| cachedJob := job.DeepCopy() |
| if tc.jobDeletedInCache { |
| cachedJob.DeletionTimestamp = &metav1.Time{} |
| } |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob) |
| for _, p := range tc.pods { |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(p) |
| } |
| |
| pods, err := jm.getPodsForJob(context.TODO(), job) |
| if err != nil { |
| t.Fatalf("getPodsForJob() error: %v", err) |
| } |
| got := make([]string, len(pods)) |
| var gotFinalizer []string |
| for i, p := range pods { |
| got[i] = p.Name |
| if hasJobTrackingFinalizer(p) { |
| gotFinalizer = append(gotFinalizer, p.Name) |
| } |
| } |
| sort.Strings(got) |
| if diff := cmp.Diff(tc.wantPods, got); diff != "" { |
| t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff) |
| } |
| sort.Strings(gotFinalizer) |
| if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" { |
| t.Errorf("Pods with finalizers (-want,+got):\n%s", diff) |
| } |
| }) |
| } |
| } |
| |
| func TestAddPod(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| _, ctx := ktesting.NewTestContext(t) |
| logger := klog.FromContext(ctx) |
| |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| |
| pod1 := newPod("pod1", job1) |
| pod2 := newPod("pod2", job2) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) |
| |
| jm.addPod(logger, pod1) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) |
| key, done := jm.queue.Get() |
| if key == nil || done { |
| t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) |
| } |
| expectedKey, _ := controller.KeyFunc(job1) |
| if got, want := key.(string), expectedKey; got != want { |
| t.Errorf("queue.Get() = %v, want %v", got, want) |
| } |
| |
| jm.addPod(logger, pod2) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) |
| key, done = jm.queue.Get() |
| if key == nil || done { |
| t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) |
| } |
| expectedKey, _ = controller.KeyFunc(job2) |
| if got, want := key.(string), expectedKey; got != want { |
| t.Errorf("queue.Get() = %v, want %v", got, want) |
| } |
| } |
| |
| func TestAddPodOrphan(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| logger, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| job3 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job3.Name = "job3" |
| job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) |
| |
| pod1 := newPod("pod1", job1) |
| // Make pod an orphan. Expect all matching controllers to be queued. |
| pod1.OwnerReferences = nil |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| |
| jm.addPod(logger, pod1) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) |
| } |
| |
| func TestUpdatePod(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| _, ctx := ktesting.NewTestContext(t) |
| logger := klog.FromContext(ctx) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| |
| pod1 := newPod("pod1", job1) |
| pod2 := newPod("pod2", job2) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) |
| |
| prev := *pod1 |
| bumpResourceVersion(pod1) |
| jm.updatePod(logger, &prev, pod1) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) |
| key, done := jm.queue.Get() |
| if key == nil || done { |
| t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) |
| } |
| expectedKey, _ := controller.KeyFunc(job1) |
| if got, want := key.(string), expectedKey; got != want { |
| t.Errorf("queue.Get() = %v, want %v", got, want) |
| } |
| |
| prev = *pod2 |
| bumpResourceVersion(pod2) |
| jm.updatePod(logger, &prev, pod2) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) |
| key, done = jm.queue.Get() |
| if key == nil || done { |
| t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) |
| } |
| expectedKey, _ = controller.KeyFunc(job2) |
| if got, want := key.(string), expectedKey; got != want { |
| t.Errorf("queue.Get() = %v, want %v", got, want) |
| } |
| } |
| |
| func TestUpdatePodOrphanWithNewLabels(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| logger, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| |
| pod1 := newPod("pod1", job1) |
| pod1.OwnerReferences = nil |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| |
| // Labels changed on orphan. Expect newly matching controllers to queue. |
| prev := *pod1 |
| prev.Labels = map[string]string{"foo2": "bar2"} |
| bumpResourceVersion(pod1) |
| jm.updatePod(logger, &prev, pod1) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) |
| } |
| |
| func TestUpdatePodChangeControllerRef(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| _, ctx := ktesting.NewTestContext(t) |
| logger := klog.FromContext(ctx) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| |
| pod1 := newPod("pod1", job1) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| |
| // Changed ControllerRef. Expect both old and new to queue. |
| prev := *pod1 |
| prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)} |
| bumpResourceVersion(pod1) |
| jm.updatePod(logger, &prev, pod1) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) |
| } |
| |
| func TestUpdatePodRelease(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| _, ctx := ktesting.NewTestContext(t) |
| logger := klog.FromContext(ctx) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| |
| pod1 := newPod("pod1", job1) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| |
| // Remove ControllerRef. Expect all matching to queue for adoption. |
| prev := *pod1 |
| pod1.OwnerReferences = nil |
| bumpResourceVersion(pod1) |
| jm.updatePod(logger, &prev, pod1) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) |
| } |
| |
| func TestDeletePod(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) |
| logger, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| |
| pod1 := newPod("pod1", job1) |
| pod2 := newPod("pod2", job2) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) |
| |
| jm.deletePod(logger, pod1, true) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) |
| key, done := jm.queue.Get() |
| if key == nil || done { |
| t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) |
| } |
| expectedKey, _ := controller.KeyFunc(job1) |
| if got, want := key.(string), expectedKey; got != want { |
| t.Errorf("queue.Get() = %v, want %v", got, want) |
| } |
| |
| jm.deletePod(logger, pod2, true) |
| verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) |
| key, done = jm.queue.Get() |
| if key == nil || done { |
| t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) |
| } |
| expectedKey, _ = controller.KeyFunc(job2) |
| if got, want := key.(string), expectedKey; got != want { |
| t.Errorf("queue.Get() = %v, want %v", got, want) |
| } |
| } |
| |
| func TestDeletePodOrphan(t *testing.T) { |
| // Disable batching of pod updates to show it does not get requeued at all |
| t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0)) |
| logger, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| jm, informer := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| jm.podStoreSynced = alwaysReady |
| jm.jobStoreSynced = alwaysReady |
| |
| job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job1.Name = "job1" |
| job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job2.Name = "job2" |
| job3 := newJob(1, 1, 6, batch.NonIndexedCompletion) |
| job3.Name = "job3" |
| job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) |
| informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3) |
| |
| pod1 := newPod("pod1", job1) |
| pod1.OwnerReferences = nil |
| informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) |
| |
| jm.deletePod(logger, pod1, true) |
| if got, want := jm.queue.Len(), 0; got != want { |
| t.Fatalf("queue.Len() = %v, want %v", got, want) |
| } |
| } |
| |
| type FakeJobExpectations struct { |
| *controller.ControllerExpectations |
| satisfied bool |
| expSatisfied func() |
| } |
| |
| func (fe FakeJobExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool { |
| fe.expSatisfied() |
| return fe.satisfied |
| } |
| |
| // TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods |
| // and checking expectations. |
| func TestSyncJobExpectations(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| return job, nil |
| } |
| |
| job := newJob(2, 2, 6, batch.NonIndexedCompletion) |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| pods := newPodList(2, v1.PodPending, job) |
| podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() |
| podIndexer.Add(pods[0]) |
| |
| manager.expectations = FakeJobExpectations{ |
| controller.NewControllerExpectations(), true, func() { |
| // If we check active pods before checking expectations, the job |
| // will create a new replica because it doesn't see this pod, but |
| // has fulfilled its expectations. |
| podIndexer.Add(pods[1]) |
| }, |
| } |
| manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| if len(fakePodControl.Templates) != 0 { |
| t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) |
| } |
| if len(fakePodControl.DeletePodName) != 0 { |
| t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) |
| } |
| } |
| |
| func TestWatchJobs(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := fake.NewSimpleClientset() |
| fakeWatch := watch.NewFake() |
| clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| var testJob batch.Job |
| received := make(chan struct{}) |
| |
| // The update sent through the fakeWatcher should make its way into the workqueue, |
| // and eventually into the syncHandler. |
| manager.syncHandler = func(ctx context.Context, key string) error { |
| defer close(received) |
| ns, name, err := cache.SplitMetaNamespaceKey(key) |
| if err != nil { |
| t.Errorf("Error getting namespace/name from key %v: %v", key, err) |
| } |
| job, err := manager.jobLister.Jobs(ns).Get(name) |
| if err != nil || job == nil { |
| t.Errorf("Expected to find job under key %v: %v", key, err) |
| return nil |
| } |
| if !apiequality.Semantic.DeepDerivative(*job, testJob) { |
| t.Errorf("Expected %#v, but got %#v", testJob, *job) |
| } |
| return nil |
| } |
| // Start only the job watcher and the workqueue, send a watch event, |
| // and make sure it hits the sync method. |
| stopCh := make(chan struct{}) |
| defer close(stopCh) |
| sharedInformerFactory.Start(stopCh) |
| go manager.Run(context.TODO(), 1) |
| |
| // We're sending new job to see if it reaches syncHandler. |
| testJob.Namespace = "bar" |
| testJob.Name = "foo" |
| fakeWatch.Add(&testJob) |
| t.Log("Waiting for job to reach syncHandler") |
| <-received |
| } |
| |
| func TestWatchPods(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| testJob := newJob(2, 2, 6, batch.NonIndexedCompletion) |
| clientset := fake.NewSimpleClientset(testJob) |
| fakeWatch := watch.NewFake() |
| clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| // Put one job and one pod into the store |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(testJob) |
| received := make(chan struct{}) |
| // The pod update sent through the fakeWatcher should figure out the managing job and |
| // send it into the syncHandler. |
| manager.syncHandler = func(ctx context.Context, key string) error { |
| ns, name, err := cache.SplitMetaNamespaceKey(key) |
| if err != nil { |
| t.Errorf("Error getting namespace/name from key %v: %v", key, err) |
| } |
| job, err := manager.jobLister.Jobs(ns).Get(name) |
| if err != nil { |
| t.Errorf("Expected to find job under key %v: %v", key, err) |
| } |
| if !apiequality.Semantic.DeepDerivative(job, testJob) { |
| t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) |
| close(received) |
| return nil |
| } |
| close(received) |
| return nil |
| } |
| // Start only the pod watcher and the workqueue, send a watch event, |
| // and make sure it hits the sync method for the right job. |
| stopCh := make(chan struct{}) |
| defer close(stopCh) |
| go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh) |
| go manager.Run(context.TODO(), 1) |
| |
| pods := newPodList(1, v1.PodRunning, testJob) |
| testPod := pods[0] |
| testPod.Status.Phase = v1.PodFailed |
| fakeWatch.Add(testPod) |
| |
| t.Log("Waiting for pod to reach syncHandler") |
| <-received |
| } |
| |
| func TestWatchOrphanPods(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := fake.NewSimpleClientset() |
| sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) |
| manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) |
| if err != nil { |
| t.Fatalf("Error creating Job controller: %v", err) |
| } |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| stopCh := make(chan struct{}) |
| defer close(stopCh) |
| podInformer := sharedInformers.Core().V1().Pods().Informer() |
| go podInformer.Run(stopCh) |
| cache.WaitForCacheSync(stopCh, podInformer.HasSynced) |
| go manager.Run(context.TODO(), 1) |
| |
| // Create job but don't add it to the store. |
| cases := map[string]struct { |
| job *batch.Job |
| inCache bool |
| }{ |
| "job_does_not_exist": { |
| job: newJob(2, 2, 6, batch.NonIndexedCompletion), |
| }, |
| "orphan": {}, |
| "job_finished": { |
| job: func() *batch.Job { |
| j := newJob(2, 2, 6, batch.NonIndexedCompletion) |
| j.Status.Conditions = append(j.Status.Conditions, batch.JobCondition{ |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| }) |
| return j |
| }(), |
| inCache: true, |
| }, |
| } |
| for name, tc := range cases { |
| t.Run(name, func(t *testing.T) { |
| if tc.inCache { |
| if err := sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job); err != nil { |
| t.Fatalf("Failed to insert job in index: %v", err) |
| } |
| t.Cleanup(func() { |
| sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Delete(tc.job) |
| }) |
| } |
| |
| podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer() |
| if tc.job != nil { |
| podBuilder = podBuilder.job(tc.job) |
| } |
| orphanPod := podBuilder.Pod |
| orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) |
| if err != nil { |
| t.Fatalf("Creating orphan pod: %v", err) |
| } |
| |
| if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { |
| p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| return !hasJobTrackingFinalizer(p), nil |
| }); err != nil { |
| t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) |
| } |
| }) |
| } |
| } |
| |
| func bumpResourceVersion(obj metav1.Object) { |
| ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) |
| obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) |
| } |
| |
| func TestJobApiBackoffReset(t *testing.T) { |
| t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) |
| _, ctx := ktesting.NewTestContext(t) |
| |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| fakeClock := clocktesting.NewFakeClock(time.Now()) |
| manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| return job, nil |
| } |
| |
| job := newJob(1, 1, 2, batch.NonIndexedCompletion) |
| key := testutil.GetKey(job, t) |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| |
| // error returned make the key requeued |
| fakePodControl.Err = errors.New("Controller error") |
| manager.queue.Add(key) |
| manager.processNextWorkItem(context.TODO()) |
| retries := manager.queue.NumRequeues(key) |
| if retries != 1 { |
| t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries) |
| } |
| // await for the actual requeue after processing of the pending queue is done |
| awaitForQueueLen(ctx, t, manager, 1) |
| |
| // the queue is emptied on success |
| fakePodControl.Err = nil |
| manager.processNextWorkItem(context.TODO()) |
| verifyEmptyQueue(ctx, t, manager) |
| } |
| |
| var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{} |
| |
| type fakeRateLimitingQueue struct { |
| workqueue.Interface |
| requeues int |
| item interface{} |
| duration time.Duration |
| } |
| |
| func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {} |
| func (f *fakeRateLimitingQueue) Forget(item interface{}) { |
| f.requeues = 0 |
| } |
| func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int { |
| return f.requeues |
| } |
| func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) { |
| f.item = item |
| f.duration = duration |
| } |
| |
| func TestJobBackoff(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| logger := klog.FromContext(ctx) |
| job := newJob(1, 1, 1, batch.NonIndexedCompletion) |
| oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) |
| oldPod.ResourceVersion = "1" |
| newPod := oldPod.DeepCopy() |
| newPod.ResourceVersion = "2" |
| |
| testCases := map[string]struct { |
| requeues int |
| oldPodPhase v1.PodPhase |
| phase v1.PodPhase |
| wantBackoff time.Duration |
| }{ |
| "failure with pod updates batching": { |
| requeues: 0, |
| phase: v1.PodFailed, |
| wantBackoff: syncJobBatchPeriod, |
| }, |
| } |
| |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| queue := &fakeRateLimitingQueue{} |
| manager.queue = queue |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| |
| queue.requeues = tc.requeues |
| newPod.Status.Phase = tc.phase |
| oldPod.Status.Phase = v1.PodRunning |
| if tc.oldPodPhase != "" { |
| oldPod.Status.Phase = tc.oldPodPhase |
| } |
| manager.updatePod(logger, oldPod, newPod) |
| if queue.duration != tc.wantBackoff { |
| t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff) |
| } |
| }) |
| } |
| } |
| |
| func TestJobBackoffForOnFailure(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| jobConditionComplete := batch.JobComplete |
| jobConditionFailed := batch.JobFailed |
| jobConditionSuspended := batch.JobSuspended |
| |
| testCases := map[string]struct { |
| // job setup |
| parallelism int32 |
| completions int32 |
| backoffLimit int32 |
| suspend bool |
| |
| // pod setup |
| restartCounts []int32 |
| podPhase v1.PodPhase |
| |
| // expectations |
| expectedActive int32 |
| expectedSucceeded int32 |
| expectedFailed int32 |
| expectedCondition *batch.JobConditionType |
| expectedConditionReason string |
| }{ |
| "backoffLimit 0 should have 1 pod active": { |
| 1, 1, 0, |
| false, []int32{0}, v1.PodRunning, |
| 1, 0, 0, nil, "", |
| }, |
| "backoffLimit 1 with restartCount 0 should have 1 pod active": { |
| 1, 1, 1, |
| false, []int32{0}, v1.PodRunning, |
| 1, 0, 0, nil, "", |
| }, |
| "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": { |
| 1, 1, 1, |
| false, []int32{1}, v1.PodRunning, |
| 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": { |
| 1, 1, 1, |
| false, []int32{1}, v1.PodPending, |
| 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "too many job failures with podRunning - single pod": { |
| 1, 5, 2, |
| false, []int32{2}, v1.PodRunning, |
| 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "too many job failures with podPending - single pod": { |
| 1, 5, 2, |
| false, []int32{2}, v1.PodPending, |
| 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "too many job failures with podRunning - multiple pods": { |
| 2, 5, 2, |
| false, []int32{1, 1}, v1.PodRunning, |
| 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "too many job failures with podPending - multiple pods": { |
| 2, 5, 2, |
| false, []int32{1, 1}, v1.PodPending, |
| 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "not enough failures": { |
| 2, 5, 3, |
| false, []int32{1, 1}, v1.PodRunning, |
| 2, 0, 0, nil, "", |
| }, |
| "suspending a job": { |
| 2, 4, 6, |
| true, []int32{1, 1}, v1.PodRunning, |
| 0, 0, 0, &jobConditionSuspended, "JobSuspended", |
| }, |
| "finshed job": { |
| 2, 4, 6, |
| true, []int32{1, 1, 2, 0}, v1.PodSucceeded, |
| 0, 4, 0, &jobConditionComplete, "", |
| }, |
| } |
| |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| // job manager setup |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| var actual *batch.Job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| |
| // job & pods setup |
| job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) |
| job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure |
| job.Spec.Suspend = ptr.To(tc.suspend) |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() |
| for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) { |
| pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}} |
| podIndexer.Add(pod) |
| } |
| |
| // run |
| err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| |
| if err != nil { |
| t.Errorf("unexpected error syncing job. Got %#v", err) |
| } |
| // validate status |
| if actual.Status.Active != tc.expectedActive { |
| t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) |
| } |
| if actual.Status.Succeeded != tc.expectedSucceeded { |
| t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) |
| } |
| if actual.Status.Failed != tc.expectedFailed { |
| t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) |
| } |
| // validate conditions |
| if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) { |
| t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions) |
| } |
| }) |
| } |
| } |
| |
| func TestJobBackoffOnRestartPolicyNever(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| jobConditionFailed := batch.JobFailed |
| |
| testCases := map[string]struct { |
| // job setup |
| parallelism int32 |
| completions int32 |
| backoffLimit int32 |
| |
| // pod setup |
| activePodsPhase v1.PodPhase |
| activePods int |
| failedPods int |
| |
| // expectations |
| expectedActive int32 |
| expectedSucceeded int32 |
| expectedFailed int32 |
| expectedCondition *batch.JobConditionType |
| expectedConditionReason string |
| }{ |
| "not enough failures with backoffLimit 0 - single pod": { |
| 1, 1, 0, |
| v1.PodRunning, 1, 0, |
| 1, 0, 0, nil, "", |
| }, |
| "not enough failures with backoffLimit 1 - single pod": { |
| 1, 1, 1, |
| "", 0, 1, |
| 1, 0, 1, nil, "", |
| }, |
| "too many failures with backoffLimit 1 - single pod": { |
| 1, 1, 1, |
| "", 0, 2, |
| 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| "not enough failures with backoffLimit 6 - multiple pods": { |
| 2, 2, 6, |
| v1.PodRunning, 1, 6, |
| 2, 0, 6, nil, "", |
| }, |
| "too many failures with backoffLimit 6 - multiple pods": { |
| 2, 2, 6, |
| "", 0, 7, |
| 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded", |
| }, |
| } |
| |
| for name, tc := range testCases { |
| t.Run(name, func(t *testing.T) { |
| // job manager setup |
| clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) |
| manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) |
| fakePodControl := controller.FakePodControl{} |
| manager.podControl = &fakePodControl |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| var actual *batch.Job |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| actual = job |
| return job, nil |
| } |
| |
| // job & pods setup |
| job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) |
| job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever |
| sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() |
| for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) { |
| pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ |
| FinishedAt: testFinishedAt, |
| }}}} |
| podIndexer.Add(pod) |
| } |
| for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) { |
| podIndexer.Add(pod) |
| } |
| |
| // run |
| err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) |
| if err != nil { |
| t.Fatalf("unexpected error syncing job: %#v\n", err) |
| } |
| // validate status |
| if actual.Status.Active != tc.expectedActive { |
| t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) |
| } |
| if actual.Status.Succeeded != tc.expectedSucceeded { |
| t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) |
| } |
| if actual.Status.Failed != tc.expectedFailed { |
| t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) |
| } |
| // validate conditions |
| if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) { |
| t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions) |
| } |
| }) |
| } |
| } |
| |
| func TestEnsureJobConditions(t *testing.T) { |
| testCases := []struct { |
| name string |
| haveList []batch.JobCondition |
| wantType batch.JobConditionType |
| wantStatus v1.ConditionStatus |
| wantReason string |
| expectList []batch.JobCondition |
| expectUpdate bool |
| }{ |
| { |
| name: "append true condition", |
| haveList: []batch.JobCondition{}, |
| wantType: batch.JobSuspended, |
| wantStatus: v1.ConditionTrue, |
| wantReason: "foo", |
| expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, |
| expectUpdate: true, |
| }, |
| { |
| name: "append false condition", |
| haveList: []batch.JobCondition{}, |
| wantType: batch.JobSuspended, |
| wantStatus: v1.ConditionFalse, |
| wantReason: "foo", |
| expectList: []batch.JobCondition{}, |
| expectUpdate: false, |
| }, |
| { |
| name: "update true condition reason", |
| haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, |
| wantType: batch.JobSuspended, |
| wantStatus: v1.ConditionTrue, |
| wantReason: "bar", |
| expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "bar", "", realClock.Now())}, |
| expectUpdate: true, |
| }, |
| { |
| name: "update true condition status", |
| haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, |
| wantType: batch.JobSuspended, |
| wantStatus: v1.ConditionFalse, |
| wantReason: "foo", |
| expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())}, |
| expectUpdate: true, |
| }, |
| { |
| name: "update false condition status", |
| haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())}, |
| wantType: batch.JobSuspended, |
| wantStatus: v1.ConditionTrue, |
| wantReason: "foo", |
| expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, |
| expectUpdate: true, |
| }, |
| { |
| name: "condition already exists", |
| haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, |
| wantType: batch.JobSuspended, |
| wantStatus: v1.ConditionTrue, |
| wantReason: "foo", |
| expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())}, |
| expectUpdate: false, |
| }, |
| } |
| for _, tc := range testCases { |
| t.Run(tc.name, func(t *testing.T) { |
| gotList, isUpdated := ensureJobConditionStatus(tc.haveList, tc.wantType, tc.wantStatus, tc.wantReason, "", realClock.Now()) |
| if isUpdated != tc.expectUpdate { |
| t.Errorf("Got isUpdated=%v, want %v", isUpdated, tc.expectUpdate) |
| } |
| if len(gotList) != len(tc.expectList) { |
| t.Errorf("got a list of length %d, want %d", len(gotList), len(tc.expectList)) |
| } |
| if diff := cmp.Diff(tc.expectList, gotList, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { |
| t.Errorf("Unexpected JobCondition list: (-want,+got):\n%s", diff) |
| } |
| }) |
| } |
| } |
| |
| func TestFinalizersRemovedExpectations(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| clientset := fake.NewSimpleClientset() |
| sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) |
| manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) |
| if err != nil { |
| t.Fatalf("Error creating Job controller: %v", err) |
| } |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} |
| manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { |
| return job, nil |
| } |
| |
| job := newJob(2, 2, 6, batch.NonIndexedCompletion) |
| sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) |
| pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) |
| podInformer := sharedInformers.Core().V1().Pods().Informer() |
| podIndexer := podInformer.GetIndexer() |
| uids := sets.New[string]() |
| for i := range pods { |
| clientset.Tracker().Add(pods[i]) |
| podIndexer.Add(pods[i]) |
| uids.Insert(string(pods[i].UID)) |
| } |
| jobKey := testutil.GetKey(job, t) |
| |
| manager.syncJob(context.TODO(), jobKey) |
| gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) |
| if len(gotExpectedUIDs) != 0 { |
| t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", sets.List(gotExpectedUIDs)) |
| } |
| |
| // Remove failures and re-sync. |
| manager.podControl.(*controller.FakePodControl).Err = nil |
| manager.syncJob(context.TODO(), jobKey) |
| gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) |
| if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { |
| t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff) |
| } |
| |
| stopCh := make(chan struct{}) |
| defer close(stopCh) |
| go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) |
| cache.WaitForCacheSync(stopCh, podInformer.HasSynced) |
| |
| // Make sure the first syncJob sets the expectations, even after the caches synced. |
| gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) |
| if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { |
| t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff) |
| } |
| |
| // Change pods in different ways. |
| |
| podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"} |
| |
| update := pods[0].DeepCopy() |
| update.Finalizers = nil |
| update.ResourceVersion = "1" |
| err = clientset.Tracker().Update(podsResource, update, update.Namespace) |
| if err != nil { |
| t.Errorf("Removing finalizer: %v", err) |
| } |
| |
| update = pods[1].DeepCopy() |
| update.Finalizers = nil |
| update.DeletionTimestamp = &metav1.Time{Time: time.Now()} |
| update.ResourceVersion = "1" |
| err = clientset.Tracker().Update(podsResource, update, update.Namespace) |
| if err != nil { |
| t.Errorf("Removing finalizer and setting deletion timestamp: %v", err) |
| } |
| |
| // Preserve the finalizer. |
| update = pods[2].DeepCopy() |
| update.DeletionTimestamp = &metav1.Time{Time: time.Now()} |
| update.ResourceVersion = "1" |
| err = clientset.Tracker().Update(podsResource, update, update.Namespace) |
| if err != nil { |
| t.Errorf("Setting deletion timestamp: %v", err) |
| } |
| |
| err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name) |
| if err != nil { |
| t.Errorf("Deleting pod that had finalizer: %v", err) |
| } |
| |
| uids = sets.New(string(pods[2].UID)) |
| var diff string |
| if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { |
| gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) |
| diff = cmp.Diff(uids, gotExpectedUIDs) |
| return diff == "", nil |
| }); err != nil { |
| t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff) |
| } |
| } |
| |
| func TestFinalizerCleanup(t *testing.T) { |
| _, ctx := ktesting.NewTestContext(t) |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| |
| clientset := fake.NewSimpleClientset() |
| sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) |
| manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) |
| if err != nil { |
| t.Fatalf("Error creating Job controller: %v", err) |
| } |
| manager.podStoreSynced = alwaysReady |
| manager.jobStoreSynced = alwaysReady |
| |
| // Start the Pod and Job informers. |
| sharedInformers.Start(ctx.Done()) |
| sharedInformers.WaitForCacheSync(ctx.Done()) |
| // Initialize the controller with 0 workers to make sure the |
| // pod finalizers are not removed by the "syncJob" function. |
| go manager.Run(ctx, 0) |
| |
| // Create a simple Job |
| job := newJob(1, 1, 1, batch.NonIndexedCompletion) |
| job, err = clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) |
| if err != nil { |
| t.Fatalf("Creating job: %v", err) |
| } |
| |
| // Await for the Job to appear in the jobLister to ensure so that Job Pod gets tracked correctly. |
| if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { |
| job, _ := manager.jobLister.Jobs(job.GetNamespace()).Get(job.Name) |
| return job != nil, nil |
| }); err != nil { |
| t.Fatalf("Waiting for Job object to appear in jobLister: %v", err) |
| } |
| |
| // Create a Pod with the job tracking finalizer |
| pod := newPod("test-pod", job) |
| pod.Finalizers = append(pod.Finalizers, batch.JobTrackingFinalizer) |
| pod, err = clientset.CoreV1().Pods(pod.GetNamespace()).Create(ctx, pod, metav1.CreateOptions{}) |
| if err != nil { |
| t.Fatalf("Creating pod: %v", err) |
| } |
| |
| // Await for the Pod to appear in the podStore to ensure that the pod exists when cleaning up the Job. |
| // In a production environment, there wouldn't be these guarantees, but the Pod would be cleaned up |
| // by the orphan pod worker, when the Pod finishes. |
| if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { |
| pod, _ := manager.podStore.Pods(pod.GetNamespace()).Get(pod.Name) |
| return pod != nil, nil |
| }); err != nil { |
| t.Fatalf("Waiting for Pod to appear in podLister: %v", err) |
| } |
| |
| // Mark Job as complete. |
| job.Status.Conditions = append(job.Status.Conditions, batch.JobCondition{ |
| Type: batch.JobComplete, |
| Status: v1.ConditionTrue, |
| }) |
| _, err = clientset.BatchV1().Jobs(job.GetNamespace()).UpdateStatus(ctx, job, metav1.UpdateOptions{}) |
| if err != nil { |
| t.Fatalf("Updating job status: %v", err) |
| } |
| |
| // Verify the pod finalizer is removed for a finished Job, |
| // even if the jobs pods are not tracked by the main reconciliation loop. |
| if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { |
| p, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| return !hasJobTrackingFinalizer(p), nil |
| }); err != nil { |
| t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) |
| } |
| |
| } |
| |
| func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) { |
| t.Helper() |
| labels := p.GetLabels() |
| if labels == nil || labels[batch.JobCompletionIndexAnnotation] == "" { |
| t.Errorf("missing expected pod label %s", batch.JobCompletionIndexAnnotation) |
| } |
| } |
| |
| func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabelDisabled bool) { |
| t.Helper() |
| var fieldPath string |
| if podIndexLabelDisabled { |
| fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation) |
| } else { |
| fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation) |
| } |
| want := []v1.EnvVar{ |
| { |
| Name: "JOB_COMPLETION_INDEX", |
| ValueFrom: &v1.EnvVarSource{ |
| FieldRef: &v1.ObjectFieldSelector{ |
| FieldPath: fieldPath, |
| }, |
| }, |
| }, |
| } |
| for _, c := range spec.InitContainers { |
| if diff := cmp.Diff(want, c.Env); diff != "" { |
| t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff) |
| } |
| } |
| for _, c := range spec.Containers { |
| if diff := cmp.Diff(want, c.Env); diff != "" { |
| t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff) |
| } |
| } |
| } |
| |
| func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPolicy { |
| return &m |
| } |
| |
| func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { |
| t.Helper() |
| verifyEmptyQueue(ctx, t, jm) |
| awaitForQueueLen(ctx, t, jm, wantQueueLen) |
| } |
| |
| func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { |
| t.Helper() |
| verifyEmptyQueue(ctx, t, jm) |
| if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) { |
| if requeued := jm.queue.Len() == wantQueueLen; requeued { |
| return true, nil |
| } |
| jm.clock.Sleep(fastRequeue) |
| return false, nil |
| }); err != nil { |
| t.Errorf("Failed to await for expected queue.Len(). want %v, got: %v", wantQueueLen, jm.queue.Len()) |
| } |
| } |
| |
| func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) { |
| t.Helper() |
| if jm.queue.Len() > 0 { |
| t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len()) |
| } |
| } |
| |
| type podBuilder struct { |
| *v1.Pod |
| } |
| |
| func buildPod() podBuilder { |
| return podBuilder{Pod: &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| UID: types.UID(rand.String(5)), |
| }, |
| }} |
| } |
| |
| func getConditionsByType(list []batch.JobCondition, cType batch.JobConditionType) []*batch.JobCondition { |
| var result []*batch.JobCondition |
| for i := range list { |
| if list[i].Type == cType { |
| result = append(result, &list[i]) |
| } |
| } |
| return result |
| } |
| |
| func (pb podBuilder) name(n string) podBuilder { |
| pb.Name = n |
| return pb |
| } |
| |
| func (pb podBuilder) ns(n string) podBuilder { |
| pb.Namespace = n |
| return pb |
| } |
| |
| func (pb podBuilder) uid(u string) podBuilder { |
| pb.UID = types.UID(u) |
| return pb |
| } |
| |
| func (pb podBuilder) job(j *batch.Job) podBuilder { |
| pb.Labels = j.Spec.Selector.MatchLabels |
| pb.Namespace = j.Namespace |
| pb.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(j, controllerKind)} |
| return pb |
| } |
| |
| func (pb podBuilder) clearOwner() podBuilder { |
| pb.OwnerReferences = nil |
| return pb |
| } |
| |
| func (pb podBuilder) clearLabels() podBuilder { |
| pb.Labels = nil |
| return pb |
| } |
| |
| func (pb podBuilder) index(ix string) podBuilder { |
| return pb.annotation(batch.JobCompletionIndexAnnotation, ix) |
| } |
| |
| func (pb podBuilder) indexFailureCount(count string) podBuilder { |
| return pb.annotation(batch.JobIndexFailureCountAnnotation, count) |
| } |
| |
| func (pb podBuilder) indexIgnoredFailureCount(count string) podBuilder { |
| return pb.annotation(batch.JobIndexIgnoredFailureCountAnnotation, count) |
| } |
| |
| func (pb podBuilder) annotation(key, value string) podBuilder { |
| if pb.Annotations == nil { |
| pb.Annotations = make(map[string]string) |
| } |
| pb.Annotations[key] = value |
| return pb |
| } |
| |
| func (pb podBuilder) status(s v1.PodStatus) podBuilder { |
| pb.Status = s |
| return pb |
| } |
| |
| func (pb podBuilder) phase(p v1.PodPhase) podBuilder { |
| pb.Status.Phase = p |
| return pb |
| } |
| |
| func (pb podBuilder) trackingFinalizer() podBuilder { |
| for _, f := range pb.Finalizers { |
| if f == batch.JobTrackingFinalizer { |
| return pb |
| } |
| } |
| pb.Finalizers = append(pb.Finalizers, batch.JobTrackingFinalizer) |
| return pb |
| } |
| |
| func (pb podBuilder) deletionTimestamp() podBuilder { |
| pb.DeletionTimestamp = &metav1.Time{} |
| return pb |
| } |
| |
| func (pb podBuilder) customDeletionTimestamp(t time.Time) podBuilder { |
| pb.DeletionTimestamp = &metav1.Time{Time: t} |
| return pb |
| } |
| |
| func completionModePtr(m batch.CompletionMode) *batch.CompletionMode { |
| return &m |
| } |
| |
| func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() { |
| origVal := *val |
| *val = newVal |
| return func() { |
| *val = origVal |
| } |
| } |