| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package e2enode |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "os" |
| "path/filepath" |
| "regexp" |
| "strings" |
| "time" |
| |
| "github.com/onsi/ginkgo/v2" |
| "github.com/onsi/gomega" |
| "github.com/onsi/gomega/gcustom" |
| "github.com/onsi/gomega/types" |
| |
| appsv1 "k8s.io/api/apps/v1" |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/runtime/serializer" |
| k8stypes "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/sets" |
| utilfeature "k8s.io/apiserver/pkg/util/feature" |
| runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" |
| kubeletdevicepluginv1beta1 "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" |
| admissionapi "k8s.io/pod-security-admission/api" |
| |
| "k8s.io/apimachinery/pkg/api/resource" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/util/uuid" |
| "k8s.io/kubectl/pkg/util/podutils" |
| kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" |
| kubeletpodresourcesv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" |
| "k8s.io/kubernetes/pkg/features" |
| "k8s.io/kubernetes/test/e2e/feature" |
| "k8s.io/kubernetes/test/e2e/framework" |
| e2enode "k8s.io/kubernetes/test/e2e/framework/node" |
| e2epod "k8s.io/kubernetes/test/e2e/framework/pod" |
| e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" |
| e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" |
| "k8s.io/kubernetes/test/e2e/nodefeature" |
| ) |
| |
| var ( |
| appsScheme = runtime.NewScheme() |
| appsCodecs = serializer.NewCodecFactory(appsScheme) |
| ) |
| |
| // Serial because the test restarts Kubelet |
| var _ = SIGDescribe("Device Plugin", feature.DevicePluginProbe, nodefeature.DevicePluginProbe, framework.WithSerial(), func() { |
| f := framework.NewDefaultFramework("device-plugin-errors") |
| f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged |
| testDevicePlugin(f, kubeletdevicepluginv1beta1.DevicePluginPath) |
| testDevicePluginNodeReboot(f, kubeletdevicepluginv1beta1.DevicePluginPath) |
| }) |
| |
| // readDaemonSetV1OrDie reads daemonset object from bytes. Panics on error. |
| func readDaemonSetV1OrDie(objBytes []byte) *appsv1.DaemonSet { |
| appsv1.AddToScheme(appsScheme) |
| requiredObj, err := runtime.Decode(appsCodecs.UniversalDecoder(appsv1.SchemeGroupVersion), objBytes) |
| if err != nil { |
| panic(err) |
| } |
| return requiredObj.(*appsv1.DaemonSet) |
| } |
| |
| const ( |
| // TODO(vikasc): Instead of hard-coding number of devices, provide number of devices in the sample-device-plugin using configmap |
| // and then use the same here |
| expectedSampleDevsAmount int64 = 2 |
| |
| // This is the sleep interval specified in the command executed in the pod to ensure container is running "forever" in the test timescale |
| sleepIntervalForever string = "24h" |
| |
| // This is the sleep interval specified in the command executed in the pod so that container is restarted within the expected test run time |
| sleepIntervalWithRestart string = "60s" |
| ) |
| |
| func testDevicePlugin(f *framework.Framework, pluginSockDir string) { |
| pluginSockDir = filepath.Join(pluginSockDir) + "/" |
| f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { |
| var devicePluginPod, dptemplate *v1.Pod |
| var v1alphaPodResources *kubeletpodresourcesv1alpha1.ListPodResourcesResponse |
| var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse |
| var err error |
| |
| ginkgo.BeforeEach(func(ctx context.Context) { |
| ginkgo.By("Wait for node to be ready") |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| nodes, err := e2enode.TotalReady(ctx, f.ClientSet) |
| framework.ExpectNoError(err) |
| return nodes == 1 |
| }, time.Minute, time.Second).Should(gomega.BeTrue()) |
| |
| // Before we run the device plugin test, we need to ensure |
| // that the cluster is in a clean state and there are no |
| // pods running on this node. |
| // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. |
| // xref: https://issue.k8s.io/115381 |
| gomega.Eventually(ctx, func(ctx context.Context) error { |
| v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) |
| if err != nil { |
| return fmt.Errorf("failed to get node local podresources by accessing the (v1alpha) podresources API endpoint: %v", err) |
| } |
| |
| v1PodResources, err = getV1NodeDevices(ctx) |
| if err != nil { |
| return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err) |
| } |
| |
| if len(v1alphaPodResources.PodResources) > 0 { |
| return fmt.Errorf("expected v1alpha pod resources to be empty, but got non-empty resources: %+v", v1alphaPodResources.PodResources) |
| } |
| |
| if len(v1PodResources.PodResources) > 0 { |
| return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) |
| } |
| return nil |
| }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.Succeed()) |
| |
| ginkgo.By("Scheduling a sample device plugin pod") |
| dp := getSampleDevicePluginPod(pluginSockDir) |
| dptemplate = dp.DeepCopy() |
| devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) |
| |
| ginkgo.By("Waiting for devices to become available on the local node") |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && CountSampleDeviceCapacity(node) > 0 |
| }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) |
| framework.Logf("Successfully created device plugin pod") |
| |
| ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount)) |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && |
| CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && |
| CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount |
| }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) |
| }) |
| |
| ginkgo.AfterEach(func(ctx context.Context) { |
| ginkgo.By("Deleting the device plugin pod") |
| e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) |
| |
| ginkgo.By("Deleting any Pods created by the test") |
| l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) |
| framework.ExpectNoError(err) |
| for _, p := range l.Items { |
| if p.Namespace != f.Namespace.Name { |
| continue |
| } |
| |
| framework.Logf("Deleting pod: %s", p.Name) |
| e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) |
| } |
| |
| restartKubelet(true) |
| |
| ginkgo.By("Waiting for devices to become unavailable on the local node") |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && CountSampleDeviceCapacity(node) <= 0 |
| }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) |
| |
| ginkgo.By("devices now unavailable on the local node") |
| }) |
| |
| ginkgo.It("Can schedule a pod that requires a device", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") |
| |
| v1alphaPodResources, err = getV1alpha1NodeDevices(ctx) |
| framework.ExpectNoError(err) |
| |
| v1PodResources, err = getV1NodeDevices(ctx) |
| framework.ExpectNoError(err) |
| |
| framework.Logf("v1alphaPodResources.PodResources:%+v\n", v1alphaPodResources.PodResources) |
| framework.Logf("v1PodResources.PodResources:%+v\n", v1PodResources.PodResources) |
| framework.Logf("len(v1alphaPodResources.PodResources):%+v", len(v1alphaPodResources.PodResources)) |
| framework.Logf("len(v1PodResources.PodResources):%+v", len(v1PodResources.PodResources)) |
| |
| gomega.Expect(v1alphaPodResources.PodResources).To(gomega.HaveLen(2)) |
| gomega.Expect(v1PodResources.PodResources).To(gomega.HaveLen(2)) |
| |
| var v1alphaResourcesForOurPod *kubeletpodresourcesv1alpha1.PodResources |
| for _, res := range v1alphaPodResources.GetPodResources() { |
| if res.Name == pod1.Name { |
| v1alphaResourcesForOurPod = res |
| } |
| } |
| |
| var v1ResourcesForOurPod *kubeletpodresourcesv1.PodResources |
| for _, res := range v1PodResources.GetPodResources() { |
| if res.Name == pod1.Name { |
| v1ResourcesForOurPod = res |
| } |
| } |
| |
| gomega.Expect(v1alphaResourcesForOurPod).NotTo(gomega.BeNil()) |
| gomega.Expect(v1ResourcesForOurPod).NotTo(gomega.BeNil()) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Name).To(gomega.Equal(pod1.Name)) |
| gomega.Expect(v1ResourcesForOurPod.Name).To(gomega.Equal(pod1.Name)) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Namespace).To(gomega.Equal(pod1.Namespace)) |
| gomega.Expect(v1ResourcesForOurPod.Namespace).To(gomega.Equal(pod1.Namespace)) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Containers).To(gomega.HaveLen(1)) |
| gomega.Expect(v1ResourcesForOurPod.Containers).To(gomega.HaveLen(1)) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Containers[0].Name).To(gomega.Equal(pod1.Spec.Containers[0].Name)) |
| gomega.Expect(v1ResourcesForOurPod.Containers[0].Name).To(gomega.Equal(pod1.Spec.Containers[0].Name)) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Containers[0].Devices).To(gomega.HaveLen(1)) |
| gomega.Expect(v1ResourcesForOurPod.Containers[0].Devices).To(gomega.HaveLen(1)) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Containers[0].Devices[0].ResourceName).To(gomega.Equal(SampleDeviceResourceName)) |
| gomega.Expect(v1ResourcesForOurPod.Containers[0].Devices[0].ResourceName).To(gomega.Equal(SampleDeviceResourceName)) |
| |
| gomega.Expect(v1alphaResourcesForOurPod.Containers[0].Devices[0].DeviceIds).To(gomega.HaveLen(1)) |
| gomega.Expect(v1ResourcesForOurPod.Containers[0].Devices[0].DeviceIds).To(gomega.HaveLen(1)) |
| }) |
| |
| ginkgo.It("[NodeSpecialFeature:CDI] can make a CDI device accessible in a container", func(ctx context.Context) { |
| e2eskipper.SkipUnlessFeatureGateEnabled(features.DevicePluginCDIDevices) |
| // check if CDI_DEVICE env variable is set |
| // and only one correspondent device node /tmp/<CDI_DEVICE> is available inside a container |
| podObj := makeBusyboxPod(SampleDeviceResourceName, "[ $(ls /tmp/CDI-Dev-[1,2] | wc -l) -eq 1 -a -b /tmp/$CDI_DEVICE ]") |
| podObj.Spec.RestartPolicy = v1.RestartPolicyNever |
| pod := e2epod.NewPodClient(f).Create(ctx, podObj) |
| framework.ExpectNoError(e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace)) |
| }) |
| |
| // simulate container restart, while all other involved components (kubelet, device plugin) stay stable. To do so, in the container |
| // entry point we sleep for a limited and short period of time. The device assignment should be kept and be stable across the container |
| // restarts. For the sake of brevity we however check just the fist restart. |
| ginkgo.It("Keeps device plugin assignments across pod restarts (no kubelet restart, no device plugin restart)", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") |
| |
| pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Waiting for container to restart") |
| ensurePodContainerRestart(ctx, f, pod1.Name, pod1.Name) |
| |
| // check from the device assignment is preserved and stable from perspective of the container |
| ginkgo.By("Confirming that after a container restart, fake-device assignment is kept") |
| devIDRestart1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| gomega.Expect(devIDRestart1).To(gomega.Equal(devID1)) |
| |
| // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. |
| // needs to match the container perspective. |
| ginkgo.By("Verifying the device assignment after container restart using podresources API") |
| v1PodResources, err = getV1NodeDevices(ctx) |
| if err != nil { |
| framework.ExpectNoError(err, "getting pod resources assignment after pod restart") |
| } |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after pod restart") |
| |
| ginkgo.By("Creating another pod") |
| pod2 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, pod2.Name, f.Namespace.Name, 1*time.Minute) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Checking that pod got a fake device") |
| devID2, err := parseLog(ctx, f, pod2.Name, pod2.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod2.Name) |
| |
| gomega.Expect(devID2).To(gomega.Not(gomega.Equal("")), "pod2 requested a device but started successfully without") |
| |
| ginkgo.By("Verifying the device assignment after extra container start using podresources API") |
| v1PodResources, err = getV1NodeDevices(ctx) |
| if err != nil { |
| framework.ExpectNoError(err, "getting pod resources assignment after pod restart") |
| } |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod1") |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID2}) |
| framework.ExpectNoError(err, "inconsistent device assignment after extra container restart - pod2") |
| }) |
| |
| // simulate kubelet restart. A compliant device plugin is expected to re-register, while the pod and the container stays running. |
| // The flow with buggy or slow device plugin is deferred to another test. |
| // The device assignment should be kept and be stable across the kubelet restart, because it's the kubelet which performs the device allocation, |
| // and both the device plugin and the actual consumer (container) are stable. |
| ginkgo.It("Keeps device plugin assignments across kubelet restarts (no pod restart, no device plugin restart)", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") |
| |
| pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| framework.Logf("testing pod: pre-restart UID=%s namespace=%s name=%s ready=%v", pod1.UID, pod1.Namespace, pod1.Name, podutils.IsPodReady(pod1)) |
| |
| ginkgo.By("Restarting Kubelet") |
| restartKubelet(true) |
| |
| ginkgo.By("Wait for node to be ready again") |
| e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) |
| |
| ginkgo.By("Waiting for resource to become available on the local node after restart") |
| gomega.Eventually(ctx, func() bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && |
| CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && |
| CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount |
| }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) |
| |
| ginkgo.By("Checking the same instance of the pod is still running") |
| gomega.Eventually(ctx, getPodByName). |
| WithArguments(f, pod1.Name). |
| WithTimeout(time.Minute). |
| Should(BeTheSamePodStillRunning(pod1), |
| "the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts") |
| |
| pod2, err := e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| framework.Logf("testing pod: post-restart UID=%s namespace=%s name=%s ready=%v", pod2.UID, pod2.Namespace, pod2.Name, podutils.IsPodReady(pod2)) |
| |
| // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. |
| // note we don't check again the logs of the container: the check is done at startup, the container |
| // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check |
| // is useless. |
| ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") |
| gomega.Eventually(ctx, func() error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| return err |
| }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") |
| |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod2.Namespace, pod2.Name, pod2.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after pod restart") |
| }) |
| |
| // simulate kubelet and container restart, *but not* device plugin restart. |
| // The device assignment should be kept and be stable across the kubelet and container restart, because it's the kubelet which |
| // performs the device allocation, and both the device plugin is stable. |
| ginkgo.It("Keeps device plugin assignments across pod and kubelet restarts (no device plugin restart)", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") |
| |
| pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Wait for node to be ready again") |
| e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) |
| |
| ginkgo.By("Waiting for container to restart") |
| ensurePodContainerRestart(ctx, f, pod1.Name, pod1.Name) |
| |
| ginkgo.By("Confirming that after a container restart, fake-device assignment is kept") |
| devIDRestart1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| gomega.Expect(devIDRestart1).To(gomega.Equal(devID1)) |
| |
| ginkgo.By("Restarting Kubelet") |
| restartKubelet(true) |
| |
| ginkgo.By("Wait for node to be ready again") |
| e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) |
| |
| ginkgo.By("Checking an instance of the pod is running") |
| gomega.Eventually(ctx, getPodByName). |
| WithArguments(f, pod1.Name). |
| // The kubelet restarts pod with an exponential back-off delay, with a maximum cap of 5 minutes. |
| // Allow 5 minutes and 10 seconds for the pod to start in a slow environment. |
| WithTimeout(5*time.Minute+10*time.Second). |
| Should(gomega.And( |
| BeAPodInPhase(v1.PodRunning), |
| BeAPodReady(), |
| ), |
| "the pod should still be running, the workload should not be perturbed by kubelet restarts") |
| |
| ginkgo.By("Verifying the device assignment after pod and kubelet restart using container logs") |
| var devID1Restarted string |
| gomega.Eventually(ctx, func() string { |
| devID1Restarted, err = parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| if err != nil { |
| framework.Logf("error getting logds for pod %q: %v", pod1.Name, err) |
| return "" |
| } |
| return devID1Restarted |
| }, 30*time.Second, framework.Poll).Should(gomega.Equal(devID1), "pod %s reports a different device after restarts: %s (expected %s)", pod1.Name, devID1Restarted, devID1) |
| |
| ginkgo.By("Verifying the device assignment after pod and kubelet restart using podresources API") |
| gomega.Eventually(ctx, func() error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| return err |
| }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") |
| |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after pod restart") |
| }) |
| |
| // simulate device plugin re-registration, *but not* container and kubelet restart. |
| // After the device plugin has re-registered, the list healthy devices is repopulated based on the devices discovered. |
| // Once Pod2 is running we determine the device that was allocated it. As long as the device allocation succeeds the |
| // test should pass. |
| ginkgo.It("Keeps device plugin assignments after the device plugin has restarted (no kubelet restart, pod restart)", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1 requested a device but started successfully without") |
| |
| pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Wait for node to be ready again") |
| e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) |
| |
| ginkgo.By("Re-Register resources and delete the plugin pod") |
| gp := int64(0) |
| deleteOptions := metav1.DeleteOptions{ |
| GracePeriodSeconds: &gp, |
| } |
| e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, time.Minute) |
| waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) |
| |
| ginkgo.By("Recreating the plugin pod") |
| devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) |
| err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, f.ClientSet, devicePluginPod.Name, devicePluginPod.Namespace, 1*time.Minute) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Waiting for resource to become available on the local node after re-registration") |
| gomega.Eventually(ctx, func() bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && |
| CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && |
| CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount |
| }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) |
| |
| // crosscheck that after device plugin restart the device assignment is preserved and |
| // stable from the kubelet's perspective. |
| // note we don't check again the logs of the container: the check is done at startup, the container |
| // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check |
| // is useless. |
| ginkgo.By("Verifying the device assignment after device plugin restart using podresources API") |
| gomega.Eventually(ctx, func() error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| return err |
| }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") |
| |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after pod restart") |
| }) |
| |
| // simulate kubelet restart *and* device plugin restart, while the pod and the container stays running. |
| // The device assignment should be kept and be stable across the kubelet/device plugin restart, as both the aforementioned components |
| // orchestrate the device allocation: the actual consumer (container) is stable. |
| ginkgo.It("Keeps device plugin assignments after kubelet restart and device plugin restart (no pod restart)", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) // the pod has to run "forever" in the timescale of this test |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| |
| gomega.Expect(devID1).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without") |
| |
| pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Restarting Kubelet") |
| restartKubelet(true) |
| |
| ginkgo.By("Wait for node to be ready again") |
| e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) |
| |
| ginkgo.By("Checking the same instance of the pod is still running after kubelet restart") |
| gomega.Eventually(ctx, getPodByName). |
| WithArguments(f, pod1.Name). |
| WithTimeout(time.Minute). |
| Should(BeTheSamePodStillRunning(pod1), |
| "the same pod instance not running across kubelet restarts, workload should not be perturbed by kubelet restarts") |
| |
| // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. |
| // note we don't check again the logs of the container: the check is done at startup, the container |
| // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check |
| // is useless. |
| ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") |
| gomega.Eventually(ctx, func() error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| return err |
| }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") |
| |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after pod restart") |
| |
| ginkgo.By("Re-Register resources by deleting the plugin pod") |
| gp := int64(0) |
| deleteOptions := metav1.DeleteOptions{ |
| GracePeriodSeconds: &gp, |
| } |
| e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, deleteOptions, time.Minute) |
| waitForContainerRemoval(ctx, devicePluginPod.Spec.Containers[0].Name, devicePluginPod.Name, devicePluginPod.Namespace) |
| |
| ginkgo.By("Recreating the plugin pod") |
| devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dptemplate) |
| |
| ginkgo.By("Waiting for resource to become available on the local node after restart") |
| gomega.Eventually(ctx, func() bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && |
| CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && |
| CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount |
| }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) |
| |
| ginkgo.By("Checking the same instance of the pod is still running after the device plugin restart") |
| gomega.Eventually(ctx, getPodByName). |
| WithArguments(f, pod1.Name). |
| WithTimeout(time.Minute). |
| Should(BeTheSamePodStillRunning(pod1), |
| "the same pod instance not running across kubelet restarts, workload should not be perturbed by device plugins restarts") |
| }) |
| |
| ginkgo.It("[OrphanedPods] Ensures pods consuming devices deleted while kubelet is down are cleaned up correctly", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalWithRestart) |
| pod := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID, err := parseLog(ctx, f, pod.Name, pod.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod.Name) |
| gomega.Expect(devID).To(gomega.Not(gomega.BeEmpty()), "pod1 requested a device but started successfully without") |
| |
| pod, err = e2epod.NewPodClient(f).Get(ctx, pod.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("stopping the kubelet") |
| startKubelet := stopKubelet() |
| |
| // wait until the kubelet health check will fail |
| gomega.Eventually(ctx, func() bool { |
| ok := kubeletHealthCheck(kubeletHealthCheckURL) |
| framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok) |
| return ok |
| }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse()) |
| |
| framework.Logf("Delete the pod while the kubelet is not running") |
| // Delete pod sync by name will force delete the pod, removing it from kubelet's config |
| deletePodSyncByName(ctx, f, pod.Name) |
| |
| framework.Logf("Starting the kubelet") |
| startKubelet() |
| |
| // wait until the kubelet health check will succeed |
| gomega.Eventually(ctx, func() bool { |
| ok := kubeletHealthCheck(kubeletHealthCheckURL) |
| framework.Logf("kubelet health check at %q value=%v", kubeletHealthCheckURL, ok) |
| return ok |
| }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue()) |
| |
| framework.Logf("wait for the pod %v to disappear", pod.Name) |
| gomega.Eventually(ctx, func(ctx context.Context) error { |
| err := checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace) |
| framework.Logf("pod %s/%s disappear check err=%v", pod.Namespace, pod.Name, err) |
| return err |
| }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil()) |
| |
| waitForAllContainerRemoval(ctx, pod.Name, pod.Namespace) |
| |
| ginkgo.By("Verifying the device assignment after device plugin restart using podresources API") |
| gomega.Eventually(ctx, func() error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| return err |
| }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") |
| err, allocated := checkPodResourcesAssignment(v1PodResources, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, SampleDeviceResourceName, []string{}) |
| if err == nil || allocated { |
| framework.Fail(fmt.Sprintf("stale device assignment after pod deletion while kubelet was down allocated=%v error=%v", allocated, err)) |
| } |
| }) |
| |
| f.It("Can schedule a pod with a restartable init container", nodefeature.SidecarContainers, func(ctx context.Context) { |
| podRECMD := "devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s" |
| sleepOneSecond := "1s" |
| rl := v1.ResourceList{v1.ResourceName(SampleDeviceResourceName): *resource.NewQuantity(1, resource.DecimalSI)} |
| pod := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{Name: "device-plugin-test-" + string(uuid.NewUUID())}, |
| Spec: v1.PodSpec{ |
| RestartPolicy: v1.RestartPolicyAlways, |
| InitContainers: []v1.Container{ |
| { |
| Image: busyboxImage, |
| Name: "init-1", |
| Command: []string{"sh", "-c", fmt.Sprintf(podRECMD, sleepOneSecond)}, |
| Resources: v1.ResourceRequirements{ |
| Limits: rl, |
| Requests: rl, |
| }, |
| }, |
| { |
| Image: busyboxImage, |
| Name: "restartable-init-2", |
| Command: []string{"sh", "-c", fmt.Sprintf(podRECMD, sleepIntervalForever)}, |
| Resources: v1.ResourceRequirements{ |
| Limits: rl, |
| Requests: rl, |
| }, |
| RestartPolicy: &containerRestartPolicyAlways, |
| }, |
| }, |
| Containers: []v1.Container{{ |
| Image: busyboxImage, |
| Name: "regular-1", |
| Command: []string{"sh", "-c", fmt.Sprintf(podRECMD, sleepIntervalForever)}, |
| Resources: v1.ResourceRequirements{ |
| Limits: rl, |
| Requests: rl, |
| }, |
| }}, |
| }, |
| } |
| |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, pod) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Spec.InitContainers[0].Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q/%q", pod1.Name, pod1.Spec.InitContainers[0].Name) |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal("")), "pod1's init container requested a device but started successfully without") |
| |
| devID2, err := parseLog(ctx, f, pod1.Name, pod1.Spec.InitContainers[1].Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q/%q", pod1.Name, pod1.Spec.InitContainers[1].Name) |
| gomega.Expect(devID2).To(gomega.Not(gomega.Equal("")), "pod1's restartable init container requested a device but started successfully without") |
| |
| gomega.Expect(devID2).To(gomega.Equal(devID1), "pod1's init container and restartable init container should share the same device") |
| |
| devID3, err := parseLog(ctx, f, pod1.Name, pod1.Spec.Containers[0].Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q/%q", pod1.Name, pod1.Spec.Containers[0].Name) |
| gomega.Expect(devID3).To(gomega.Not(gomega.Equal("")), "pod1's regular container requested a device but started successfully without") |
| |
| gomega.Expect(devID3).NotTo(gomega.Equal(devID2), "pod1's restartable init container and regular container should not share the same device") |
| |
| podResources, err := getV1NodeDevices(ctx) |
| framework.ExpectNoError(err) |
| |
| framework.Logf("PodResources.PodResources:%+v\n", podResources.PodResources) |
| framework.Logf("len(PodResources.PodResources):%+v", len(podResources.PodResources)) |
| |
| gomega.Expect(podResources.PodResources).To(gomega.HaveLen(2)) |
| |
| var resourcesForOurPod *kubeletpodresourcesv1.PodResources |
| for _, res := range podResources.GetPodResources() { |
| if res.Name == pod1.Name { |
| resourcesForOurPod = res |
| } |
| } |
| |
| gomega.Expect(resourcesForOurPod).NotTo(gomega.BeNil()) |
| |
| gomega.Expect(resourcesForOurPod.Name).To(gomega.Equal(pod1.Name)) |
| gomega.Expect(resourcesForOurPod.Namespace).To(gomega.Equal(pod1.Namespace)) |
| |
| gomega.Expect(resourcesForOurPod.Containers).To(gomega.HaveLen(2)) |
| |
| for _, container := range resourcesForOurPod.Containers { |
| if container.Name == pod1.Spec.InitContainers[1].Name { |
| gomega.Expect(container.Devices).To(gomega.HaveLen(1)) |
| gomega.Expect(container.Devices[0].ResourceName).To(gomega.Equal(SampleDeviceResourceName)) |
| gomega.Expect(container.Devices[0].DeviceIds).To(gomega.HaveLen(1)) |
| } else if container.Name == pod1.Spec.Containers[0].Name { |
| gomega.Expect(container.Devices).To(gomega.HaveLen(1)) |
| gomega.Expect(container.Devices[0].ResourceName).To(gomega.Equal(SampleDeviceResourceName)) |
| gomega.Expect(container.Devices[0].DeviceIds).To(gomega.HaveLen(1)) |
| } else { |
| framework.Failf("unexpected container name: %s", container.Name) |
| } |
| } |
| }) |
| }) |
| } |
| |
| func testDevicePluginNodeReboot(f *framework.Framework, pluginSockDir string) { |
| f.Context("DevicePlugin", f.WithSerial(), f.WithDisruptive(), func() { |
| var devicePluginPod *v1.Pod |
| var v1PodResources *kubeletpodresourcesv1.ListPodResourcesResponse |
| var triggerPathFile, triggerPathDir string |
| var err error |
| |
| ginkgo.BeforeEach(func(ctx context.Context) { |
| ginkgo.By("Wait for node to be ready") |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| nodes, err := e2enode.TotalReady(ctx, f.ClientSet) |
| framework.ExpectNoError(err) |
| return nodes == 1 |
| }, time.Minute, time.Second).Should(gomega.BeTrue()) |
| |
| // Before we run the device plugin test, we need to ensure |
| // that the cluster is in a clean state and there are no |
| // pods running on this node. |
| // This is done in a gomega.Eventually with retries since a prior test in a different test suite could've run and the deletion of it's resources may still be in progress. |
| // xref: https://issue.k8s.io/115381 |
| gomega.Eventually(ctx, func(ctx context.Context) error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| if err != nil { |
| return fmt.Errorf("failed to get node local podresources by accessing the (v1) podresources API endpoint: %v", err) |
| } |
| |
| if len(v1PodResources.PodResources) > 0 { |
| return fmt.Errorf("expected v1 pod resources to be empty, but got non-empty resources: %+v", v1PodResources.PodResources) |
| } |
| return nil |
| }, f.Timeouts.SystemDaemonsetStartup, f.Timeouts.Poll).Should(gomega.Succeed()) |
| |
| ginkgo.By("Setting up the directory for controlling registration") |
| triggerPathDir = filepath.Join(devicePluginDir, "sample") |
| if _, err := os.Stat(triggerPathDir); err != nil { |
| if errors.Is(err, os.ErrNotExist) { |
| if err := os.Mkdir(triggerPathDir, os.ModePerm); err != nil { |
| framework.Fail(fmt.Sprintf("registration control directory %q creation failed: %v ", triggerPathDir, err)) |
| } |
| framework.Logf("registration control directory created successfully") |
| } else { |
| framework.Fail(fmt.Sprintf("unexpected error checking %q: %v", triggerPathDir, err)) |
| } |
| } else { |
| framework.Logf("registration control directory %q already present", triggerPathDir) |
| } |
| |
| ginkgo.By("Setting up the file trigger for controlling registration") |
| triggerPathFile = filepath.Join(triggerPathDir, "registration") |
| if _, err := os.Stat(triggerPathFile); err != nil { |
| if errors.Is(err, os.ErrNotExist) { |
| if _, err = os.Create(triggerPathFile); err != nil { |
| framework.Fail(fmt.Sprintf("registration control file %q creation failed: %v", triggerPathFile, err)) |
| } |
| framework.Logf("registration control file created successfully") |
| } else { |
| framework.Fail(fmt.Sprintf("unexpected error creating %q: %v", triggerPathFile, err)) |
| } |
| } else { |
| framework.Logf("registration control file %q already present", triggerPathFile) |
| } |
| |
| ginkgo.By("Scheduling a sample device plugin pod") |
| data, err := e2etestfiles.Read(SampleDevicePluginControlRegistrationDSYAML) |
| if err != nil { |
| framework.Fail(fmt.Sprintf("error reading test data %q: %v", SampleDevicePluginControlRegistrationDSYAML, err)) |
| } |
| ds := readDaemonSetV1OrDie(data) |
| |
| dp := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: SampleDevicePluginName, |
| }, |
| Spec: ds.Spec.Template.Spec, |
| } |
| |
| devicePluginPod = e2epod.NewPodClient(f).CreateSync(ctx, dp) |
| |
| go func() { |
| // Since autoregistration is disabled for the device plugin (as REGISTER_CONTROL_FILE |
| // environment variable is specified), device plugin registration needs to be triggerred |
| // manually. |
| // This is done by deleting the control file at the following path: |
| // `/var/lib/kubelet/device-plugins/sample/registration`. |
| |
| defer ginkgo.GinkgoRecover() |
| framework.Logf("Deleting the control file: %q to trigger registration", triggerPathFile) |
| err := os.Remove(triggerPathFile) |
| framework.ExpectNoError(err) |
| }() |
| |
| ginkgo.By("Waiting for devices to become available on the local node") |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && CountSampleDeviceCapacity(node) > 0 |
| }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) |
| framework.Logf("Successfully created device plugin pod") |
| |
| ginkgo.By(fmt.Sprintf("Waiting for the resource exported by the sample device plugin to become available on the local node (instances: %d)", expectedSampleDevsAmount)) |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && |
| CountSampleDeviceCapacity(node) == expectedSampleDevsAmount && |
| CountSampleDeviceAllocatable(node) == expectedSampleDevsAmount |
| }, 30*time.Second, framework.Poll).Should(gomega.BeTrue()) |
| }) |
| |
| ginkgo.AfterEach(func(ctx context.Context) { |
| ginkgo.By("Deleting the device plugin pod") |
| e2epod.NewPodClient(f).DeleteSync(ctx, devicePluginPod.Name, metav1.DeleteOptions{}, time.Minute) |
| |
| ginkgo.By("Deleting any Pods created by the test") |
| l, err := e2epod.NewPodClient(f).List(ctx, metav1.ListOptions{}) |
| framework.ExpectNoError(err) |
| for _, p := range l.Items { |
| if p.Namespace != f.Namespace.Name { |
| continue |
| } |
| |
| framework.Logf("Deleting pod: %s", p.Name) |
| e2epod.NewPodClient(f).DeleteSync(ctx, p.Name, metav1.DeleteOptions{}, 2*time.Minute) |
| } |
| |
| err = os.Remove(triggerPathDir) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("Waiting for devices to become unavailable on the local node") |
| gomega.Eventually(ctx, func(ctx context.Context) bool { |
| node, ready := getLocalTestNode(ctx, f) |
| return ready && CountSampleDeviceCapacity(node) <= 0 |
| }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) |
| |
| ginkgo.By("devices now unavailable on the local node") |
| }) |
| |
| // simulate node reboot scenario by removing pods using CRI before kubelet is started. In addition to that, |
| // intentionally a scenario is created where after node reboot, application pods requesting devices appear before the device plugin pod |
| // exposing those devices as resource has restarted. The expected behavior is that the application pod fails at admission time. |
| ginkgo.It("Keeps device plugin assignments across node reboots (no pod restart, no device plugin re-registration)", func(ctx context.Context) { |
| podRECMD := fmt.Sprintf("devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs && sleep %s", sleepIntervalForever) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, makeBusyboxPod(SampleDeviceResourceName, podRECMD)) |
| deviceIDRE := "stub devices: (Dev-[0-9]+)" |
| devID1, err := parseLog(ctx, f, pod1.Name, pod1.Name, deviceIDRE) |
| framework.ExpectNoError(err, "getting logs for pod %q", pod1.Name) |
| |
| gomega.Expect(devID1).To(gomega.Not(gomega.Equal(""))) |
| |
| pod1, err = e2epod.NewPodClient(f).Get(ctx, pod1.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("stopping the kubelet") |
| startKubelet := stopKubelet() |
| |
| ginkgo.By("stopping all the local containers - using CRI") |
| rs, _, err := getCRIClient() |
| framework.ExpectNoError(err) |
| sandboxes, err := rs.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{}) |
| framework.ExpectNoError(err) |
| for _, sandbox := range sandboxes { |
| gomega.Expect(sandbox.Metadata).ToNot(gomega.BeNil()) |
| ginkgo.By(fmt.Sprintf("deleting pod using CRI: %s/%s -> %s", sandbox.Metadata.Namespace, sandbox.Metadata.Name, sandbox.Id)) |
| |
| err := rs.RemovePodSandbox(ctx, sandbox.Id) |
| framework.ExpectNoError(err) |
| } |
| |
| ginkgo.By("restarting the kubelet") |
| startKubelet() |
| |
| ginkgo.By("Wait for node to be ready again") |
| e2enode.WaitForAllNodesSchedulable(ctx, f.ClientSet, 5*time.Minute) |
| |
| ginkgo.By("Waiting for the pod to fail with admission error as device plugin hasn't re-registered yet") |
| gomega.Eventually(ctx, getPod). |
| WithArguments(f, pod1.Name). |
| WithTimeout(time.Minute). |
| Should(HaveFailedWithAdmissionError(), |
| "the pod succeeded to start, when it should fail with the admission error") |
| |
| // crosscheck from the device assignment is preserved and stable from perspective of the kubelet. |
| // note we don't check again the logs of the container: the check is done at startup, the container |
| // never restarted (runs "forever" from this test timescale perspective) hence re-doing this check |
| // is useless. |
| ginkgo.By("Verifying the device assignment after kubelet restart using podresources API") |
| gomega.Eventually(ctx, func() error { |
| v1PodResources, err = getV1NodeDevices(ctx) |
| return err |
| }, 30*time.Second, framework.Poll).ShouldNot(gomega.HaveOccurred(), "cannot fetch the compute resource assignment after kubelet restart") |
| |
| err, _ = checkPodResourcesAssignment(v1PodResources, pod1.Namespace, pod1.Name, pod1.Spec.Containers[0].Name, SampleDeviceResourceName, []string{devID1}) |
| framework.ExpectNoError(err, "inconsistent device assignment after node reboot") |
| |
| }) |
| }) |
| } |
| |
| // makeBusyboxPod returns a simple Pod spec with a busybox container |
| // that requests SampleDeviceResourceName and runs the specified command. |
| func makeBusyboxPod(SampleDeviceResourceName, cmd string) *v1.Pod { |
| podName := "device-plugin-test-" + string(uuid.NewUUID()) |
| rl := v1.ResourceList{v1.ResourceName(SampleDeviceResourceName): *resource.NewQuantity(1, resource.DecimalSI)} |
| |
| return &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{Name: podName}, |
| Spec: v1.PodSpec{ |
| RestartPolicy: v1.RestartPolicyAlways, |
| Containers: []v1.Container{{ |
| Image: busyboxImage, |
| Name: podName, |
| // Runs the specified command in the test pod. |
| Command: []string{"sh", "-c", cmd}, |
| Resources: v1.ResourceRequirements{ |
| Limits: rl, |
| Requests: rl, |
| }, |
| }}, |
| }, |
| } |
| } |
| |
| // ensurePodContainerRestart confirms that pod container has restarted at least once |
| func ensurePodContainerRestart(ctx context.Context, f *framework.Framework, podName string, contName string) { |
| var initialCount int32 |
| var currentCount int32 |
| p, err := e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil || len(p.Status.ContainerStatuses) < 1 { |
| framework.Failf("ensurePodContainerRestart failed for pod %q: %v", podName, err) |
| } |
| initialCount = p.Status.ContainerStatuses[0].RestartCount |
| gomega.Eventually(ctx, func() bool { |
| p, err = e2epod.NewPodClient(f).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil || len(p.Status.ContainerStatuses) < 1 { |
| return false |
| } |
| currentCount = p.Status.ContainerStatuses[0].RestartCount |
| framework.Logf("initial %v, current %v", initialCount, currentCount) |
| return currentCount > initialCount |
| }, 5*time.Minute, framework.Poll).Should(gomega.BeTrue()) |
| } |
| |
| // parseLog returns the matching string for the specified regular expression parsed from the container logs. |
| func parseLog(ctx context.Context, f *framework.Framework, podName string, contName string, re string) (string, error) { |
| logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, podName, contName) |
| if err != nil { |
| return "", err |
| } |
| |
| framework.Logf("got pod logs: %v", logs) |
| regex := regexp.MustCompile(re) |
| matches := regex.FindStringSubmatch(logs) |
| if len(matches) < 2 { |
| return "", fmt.Errorf("unexpected match in logs: %q", logs) |
| } |
| |
| return matches[1], nil |
| } |
| |
| func checkPodResourcesAssignment(v1PodRes *kubeletpodresourcesv1.ListPodResourcesResponse, podNamespace, podName, containerName, resourceName string, devs []string) (error, bool) { |
| for _, podRes := range v1PodRes.PodResources { |
| if podRes.Namespace != podNamespace || podRes.Name != podName { |
| continue |
| } |
| for _, contRes := range podRes.Containers { |
| if contRes.Name != containerName { |
| continue |
| } |
| return matchContainerDevices(podNamespace+"/"+podName+"/"+containerName, contRes.Devices, resourceName, devs) |
| } |
| } |
| err := fmt.Errorf("no resources found for %s/%s/%s", podNamespace, podName, containerName) |
| framework.Logf("%v", err) |
| return err, false |
| } |
| |
| func matchContainerDevices(ident string, contDevs []*kubeletpodresourcesv1.ContainerDevices, resourceName string, devs []string) (error, bool) { |
| expected := sets.New[string](devs...) |
| assigned := sets.New[string]() |
| for _, contDev := range contDevs { |
| if contDev.ResourceName != resourceName { |
| continue |
| } |
| assigned = assigned.Insert(contDev.DeviceIds...) |
| } |
| expectedStr := strings.Join(expected.UnsortedList(), ",") |
| assignedStr := strings.Join(assigned.UnsortedList(), ",") |
| framework.Logf("%s: devices expected %q assigned %q", ident, expectedStr, assignedStr) |
| if !assigned.Equal(expected) { |
| return fmt.Errorf("device allocation mismatch for %s expected %s assigned %s", ident, expectedStr, assignedStr), true |
| } |
| return nil, true |
| } |
| |
| // getSampleDevicePluginPod returns the Sample Device Plugin pod to be used e2e tests. |
| func getSampleDevicePluginPod(pluginSockDir string) *v1.Pod { |
| data, err := e2etestfiles.Read(SampleDevicePluginDSYAML) |
| if err != nil { |
| framework.Fail(err.Error()) |
| } |
| |
| ds := readDaemonSetV1OrDie(data) |
| dp := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: SampleDevicePluginName, |
| }, |
| Spec: ds.Spec.Template.Spec, |
| } |
| for i := range dp.Spec.Containers[0].Env { |
| if dp.Spec.Containers[0].Env[i].Name == SampleDeviceEnvVarNamePluginSockDir { |
| dp.Spec.Containers[0].Env[i].Value = pluginSockDir |
| } |
| } |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.DevicePluginCDIDevices) { |
| dp.Spec.Containers[0].Env = append(dp.Spec.Containers[0].Env, v1.EnvVar{Name: "CDI_ENABLED", Value: "1"}) |
| } |
| |
| return dp |
| } |
| |
| func BeTheSamePodStillRunning(expected *v1.Pod) types.GomegaMatcher { |
| return gomega.And( |
| BeTheSamePodAs(expected.UID), |
| BeAPodInPhase(v1.PodRunning), |
| BeAPodReady(), |
| ) |
| } |
| |
| // BeReady matches if the pod is reported ready |
| func BeAPodReady() types.GomegaMatcher { |
| return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) { |
| return podutils.IsPodReady(actual), nil |
| }).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} UID {{.Actual.UID}} not ready yet") |
| } |
| |
| // BeAPodInPhase matches if the pod is running |
| func BeAPodInPhase(phase v1.PodPhase) types.GomegaMatcher { |
| return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) { |
| return actual.Status.Phase == phase, nil |
| }).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} failed {{.To}} be in phase {{.Data}} instead is in phase {{.Actual.Status.Phase}}").WithTemplateData(phase) |
| } |
| |
| // BeTheSamePodAs matches if the pod has the given UID |
| func BeTheSamePodAs(podUID k8stypes.UID) types.GomegaMatcher { |
| return gcustom.MakeMatcher(func(actual *v1.Pod) (bool, error) { |
| return actual.UID == podUID, nil |
| }).WithTemplate("Pod {{.Actual.Namespace}}/{{.Actual.Name}} expected UID {{.Data}} has UID instead {{.Actual.UID}}").WithTemplateData(podUID) |
| } |