| /* |
| Copyright 2019 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package network |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "net" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| discoveryv1 "k8s.io/api/discovery/v1" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| types "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/intstr" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/watch" |
| clientset "k8s.io/client-go/kubernetes" |
| "k8s.io/client-go/util/retry" |
| "k8s.io/kubernetes/test/e2e/framework" |
| e2epod "k8s.io/kubernetes/test/e2e/framework/pod" |
| "k8s.io/kubernetes/test/e2e/network/common" |
| imageutils "k8s.io/kubernetes/test/utils/image" |
| admissionapi "k8s.io/pod-security-admission/api" |
| "k8s.io/utils/pointer" |
| |
| "github.com/onsi/ginkgo/v2" |
| "github.com/onsi/gomega" |
| ) |
| |
| var _ = common.SIGDescribe("EndpointSlice", func() { |
| f := framework.NewDefaultFramework("endpointslice") |
| f.NamespacePodSecurityLevel = admissionapi.LevelBaseline |
| |
| var cs clientset.Interface |
| var podClient *e2epod.PodClient |
| |
| ginkgo.BeforeEach(func() { |
| cs = f.ClientSet |
| podClient = e2epod.NewPodClient(f) |
| }) |
| |
| /* |
| Release: v1.21 |
| Testname: EndpointSlice API |
| Description: The discovery.k8s.io API group MUST exist in the /apis discovery document. |
| The discovery.k8s.io/v1 API group/version MUST exist in the /apis/discovery.k8s.io discovery document. |
| The endpointslices resource MUST exist in the /apis/discovery.k8s.io/v1 discovery document. |
| The cluster MUST have a service named "kubernetes" on the default namespace referencing the API servers. |
| The "kubernetes.default" service MUST have Endpoints and EndpointSlices pointing to each API server instance. |
| */ |
| framework.ConformanceIt("should have Endpoints and EndpointSlices pointing to API Server", func(ctx context.Context) { |
| namespace := "default" |
| name := "kubernetes" |
| // verify "kubernetes.default" service exist |
| _, err := cs.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) |
| framework.ExpectNoError(err, "error obtaining API server \"kubernetes\" Service resource on \"default\" namespace") |
| |
| // verify Endpoints for the API servers exist |
| endpoints, err := cs.CoreV1().Endpoints(namespace).Get(ctx, name, metav1.GetOptions{}) |
| framework.ExpectNoError(err, "error obtaining API server \"kubernetes\" Endpoint resource on \"default\" namespace") |
| if len(endpoints.Subsets) == 0 { |
| framework.Failf("Expected at least 1 subset in endpoints, got %d: %#v", len(endpoints.Subsets), endpoints.Subsets) |
| } |
| // verify EndpointSlices for the API servers exist |
| endpointSliceList, err := cs.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{ |
| LabelSelector: "kubernetes.io/service-name=" + name, |
| }) |
| framework.ExpectNoError(err, "error obtaining API server \"kubernetes\" EndpointSlice resource on \"default\" namespace") |
| if len(endpointSliceList.Items) == 0 { |
| framework.Failf("Expected at least 1 EndpointSlice, got %d: %#v", len(endpoints.Subsets), endpoints.Subsets) |
| } |
| |
| if !endpointSlicesEqual(endpoints, endpointSliceList) { |
| framework.Failf("Expected EndpointSlice to have same addresses and port as Endpoints, got %#v: %#v", endpoints, endpointSliceList) |
| } |
| |
| }) |
| |
| /* |
| Release: v1.21 |
| Testname: EndpointSlice API |
| Description: The discovery.k8s.io API group MUST exist in the /apis discovery document. |
| The discovery.k8s.io/v1 API group/version MUST exist in the /apis/discovery.k8s.io discovery document. |
| The endpointslices resource MUST exist in the /apis/discovery.k8s.io/v1 discovery document. |
| The endpointslice controller should create and delete EndpointSlices for Pods matching a Service. |
| */ |
| framework.ConformanceIt("should create and delete Endpoints and EndpointSlices for a Service with a selector specified", func(ctx context.Context) { |
| svc := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "example-empty-selector", |
| }, |
| Spec: v1.ServiceSpec{ |
| Selector: map[string]string{ |
| "does-not-match-anything": "endpoints-and-endpoint-slices-should-still-be-created", |
| }, |
| Ports: []v1.ServicePort{{ |
| Name: "example", |
| Port: 80, |
| Protocol: v1.ProtocolTCP, |
| }}, |
| }, |
| }) |
| |
| // Expect Endpoints resource to be created. |
| if err := wait.PollImmediate(2*time.Second, wait.ForeverTestTimeout, func() (bool, error) { |
| _, err := cs.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{}) |
| if err != nil { |
| return false, nil |
| } |
| return true, nil |
| }); err != nil { |
| framework.Failf("No Endpoints found for Service %s/%s: %s", svc.Namespace, svc.Name, err) |
| } |
| |
| // Expect EndpointSlice resource to be created. |
| var endpointSlice discoveryv1.EndpointSlice |
| if err := wait.PollImmediate(2*time.Second, wait.ForeverTestTimeout, func() (bool, error) { |
| endpointSliceList, err := cs.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx, metav1.ListOptions{ |
| LabelSelector: "kubernetes.io/service-name=" + svc.Name, |
| }) |
| if err != nil { |
| return false, err |
| } |
| if len(endpointSliceList.Items) == 0 { |
| return false, nil |
| } |
| endpointSlice = endpointSliceList.Items[0] |
| return true, nil |
| }); err != nil { |
| framework.Failf("No EndpointSlice found for Service %s/%s: %s", svc.Namespace, svc.Name, err) |
| } |
| |
| // Ensure EndpointSlice has expected values. |
| managedBy, ok := endpointSlice.Labels[discoveryv1.LabelManagedBy] |
| expectedManagedBy := "endpointslice-controller.k8s.io" |
| if !ok { |
| framework.Failf("Expected EndpointSlice to have %s label, got %#v", discoveryv1.LabelManagedBy, endpointSlice.Labels) |
| } else if managedBy != expectedManagedBy { |
| framework.Failf("Expected EndpointSlice to have %s label with %s value, got %s", discoveryv1.LabelManagedBy, expectedManagedBy, managedBy) |
| } |
| if len(endpointSlice.Endpoints) != 0 { |
| framework.Failf("Expected EndpointSlice to have 0 endpoints, got %d: %#v", len(endpointSlice.Endpoints), endpointSlice.Endpoints) |
| } |
| |
| err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) |
| framework.ExpectNoError(err, "error deleting Service") |
| |
| // Expect Endpoints resource to be deleted when Service is. |
| if err := wait.PollImmediate(2*time.Second, wait.ForeverTestTimeout, func() (bool, error) { |
| _, err := cs.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{}) |
| if err != nil { |
| if apierrors.IsNotFound(err) { |
| return true, nil |
| } |
| return false, err |
| } |
| return false, nil |
| }); err != nil { |
| framework.Failf("Endpoints resource not deleted after Service %s/%s was deleted: %s", svc.Namespace, svc.Name, err) |
| } |
| |
| // Expect EndpointSlice resource to be deleted when Service is. Wait for |
| // up to 90 seconds since garbage collector only polls every 30 seconds |
| // and may need to retry informer resync at some point during an e2e |
| // run. |
| if err := wait.PollImmediate(2*time.Second, 90*time.Second, func() (bool, error) { |
| endpointSliceList, err := cs.DiscoveryV1().EndpointSlices(svc.Namespace).List(ctx, metav1.ListOptions{ |
| LabelSelector: "kubernetes.io/service-name=" + svc.Name, |
| }) |
| if err != nil { |
| return false, err |
| } |
| if len(endpointSliceList.Items) == 0 { |
| return true, nil |
| } |
| return false, nil |
| }); err != nil { |
| framework.Failf("EndpointSlice resource not deleted after Service %s/%s was deleted: %s", svc.Namespace, svc.Name, err) |
| } |
| }) |
| |
| /* |
| Release: v1.21 |
| Testname: EndpointSlice API |
| Description: The discovery.k8s.io API group MUST exist in the /apis discovery document. |
| The discovery.k8s.io/v1 API group/version MUST exist in the /apis/discovery.k8s.io discovery document. |
| The endpointslices resource MUST exist in the /apis/discovery.k8s.io/v1 discovery document. |
| The endpointslice controller must create EndpointSlices for Pods mataching a Service. |
| */ |
| framework.ConformanceIt("should create Endpoints and EndpointSlices for Pods matching a Service", func(ctx context.Context) { |
| labelPod1 := "pod1" |
| labelPod2 := "pod2" |
| labelPod3 := "pod3" |
| labelShared12 := "shared12" |
| labelValue := "on" |
| |
| pod1 := podClient.Create(ctx, &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "pod1", |
| Labels: map[string]string{ |
| labelPod1: labelValue, |
| labelShared12: labelValue, |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| { |
| Name: "container1", |
| Image: imageutils.GetE2EImage(imageutils.Nginx), |
| Ports: []v1.ContainerPort{{ |
| Name: "example-name", |
| ContainerPort: int32(3000), |
| }}, |
| }, |
| }, |
| }, |
| }) |
| |
| pod2 := podClient.Create(ctx, &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "pod2", |
| Labels: map[string]string{ |
| labelPod2: labelValue, |
| labelShared12: labelValue, |
| }, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| { |
| Name: "container1", |
| Image: imageutils.GetE2EImage(imageutils.Nginx), |
| Ports: []v1.ContainerPort{{ |
| Name: "example-name", |
| ContainerPort: int32(3001), |
| }, { |
| Name: "other-port", |
| ContainerPort: int32(3002), |
| }}, |
| }, |
| }, |
| }, |
| }) |
| |
| svc1 := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "example-int-port", |
| }, |
| Spec: v1.ServiceSpec{ |
| Selector: map[string]string{labelPod1: labelValue}, |
| PublishNotReadyAddresses: true, |
| Ports: []v1.ServicePort{{ |
| Name: "example", |
| Port: 80, |
| TargetPort: intstr.FromInt32(3000), |
| Protocol: v1.ProtocolTCP, |
| }}, |
| }, |
| }) |
| |
| svc2 := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "example-named-port", |
| }, |
| Spec: v1.ServiceSpec{ |
| Selector: map[string]string{labelShared12: labelValue}, |
| PublishNotReadyAddresses: true, |
| Ports: []v1.ServicePort{{ |
| Name: "http", |
| Port: 80, |
| TargetPort: intstr.FromString("example-name"), |
| Protocol: v1.ProtocolTCP, |
| }}, |
| }, |
| }) |
| |
| svc3 := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "example-no-match", |
| }, |
| Spec: v1.ServiceSpec{ |
| Selector: map[string]string{labelPod3: labelValue}, |
| PublishNotReadyAddresses: true, |
| Ports: []v1.ServicePort{{ |
| Name: "example-no-match", |
| Port: 80, |
| TargetPort: intstr.FromInt32(8080), |
| Protocol: v1.ProtocolTCP, |
| }}, |
| }, |
| }) |
| |
| err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 3*time.Minute, true, func(ctx context.Context) (bool, error) { |
| var err error |
| pod1, err = podClient.Get(ctx, pod1.Name, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| if len(pod1.Status.PodIPs) == 0 { |
| return false, nil |
| } |
| |
| pod2, err = podClient.Get(ctx, pod2.Name, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| if len(pod2.Status.PodIPs) == 0 { |
| return false, nil |
| } |
| |
| return true, nil |
| }) |
| framework.ExpectNoError(err, "timed out waiting for Pods to have IPs assigned") |
| |
| ginkgo.By("referencing a single matching pod") |
| expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc1, []*v1.Pod{pod1}, 1, 1, false) |
| |
| ginkgo.By("referencing matching pods with named port") |
| expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc2, []*v1.Pod{pod1, pod2}, 2, 2, true) |
| |
| ginkgo.By("creating empty Endpoints and EndpointSlices for no matching Pods") |
| expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc3, []*v1.Pod{}, 0, 1, false) |
| |
| // TODO: Update test to cover Endpoints recreation after deletes once it |
| // actually works. |
| ginkgo.By("recreating EndpointSlices after they've been deleted") |
| deleteEndpointSlices(ctx, cs, f.Namespace.Name, svc2) |
| expectEndpointsAndSlices(ctx, cs, f.Namespace.Name, svc2, []*v1.Pod{pod1, pod2}, 2, 2, true) |
| }) |
| |
| /* |
| Release: v1.21 |
| Testname: EndpointSlice API |
| Description: The discovery.k8s.io API group MUST exist in the /apis discovery document. |
| The discovery.k8s.io/v1 API group/version MUST exist in the /apis/discovery.k8s.io discovery document. |
| The endpointslices resource MUST exist in the /apis/discovery.k8s.io/v1 discovery document. |
| The endpointslices resource must support create, get, list, watch, update, patch, delete, and deletecollection. |
| */ |
| framework.ConformanceIt("should support creating EndpointSlice API operations", func(ctx context.Context) { |
| // Setup |
| ns := f.Namespace.Name |
| epsVersion := "v1" |
| epsClient := f.ClientSet.DiscoveryV1().EndpointSlices(ns) |
| |
| epsTemplate := &discoveryv1.EndpointSlice{ |
| ObjectMeta: metav1.ObjectMeta{GenerateName: "e2e-example-ing", |
| Labels: map[string]string{ |
| "special-label": f.UniqueName, |
| }}, |
| AddressType: discoveryv1.AddressTypeIPv4, |
| Endpoints: []discoveryv1.Endpoint{ |
| {Addresses: []string{"1.2.3.4", "5.6.7.8"}}, |
| {Addresses: []string{"2.2.3.4", "6.6.7.8"}}, |
| }, |
| } |
| // Discovery |
| ginkgo.By("getting /apis") |
| { |
| discoveryGroups, err := f.ClientSet.Discovery().ServerGroups() |
| framework.ExpectNoError(err) |
| found := false |
| for _, group := range discoveryGroups.Groups { |
| if group.Name == discoveryv1.GroupName { |
| for _, version := range group.Versions { |
| if version.Version == epsVersion { |
| found = true |
| break |
| } |
| } |
| } |
| } |
| if !found { |
| framework.Failf("expected discovery API group/version, got %#v", discoveryGroups.Groups) |
| } |
| } |
| |
| ginkgo.By("getting /apis/discovery.k8s.io") |
| { |
| group := &metav1.APIGroup{} |
| err := f.ClientSet.Discovery().RESTClient().Get().AbsPath("/apis/discovery.k8s.io").Do(ctx).Into(group) |
| framework.ExpectNoError(err) |
| found := false |
| for _, version := range group.Versions { |
| if version.Version == epsVersion { |
| found = true |
| break |
| } |
| } |
| if !found { |
| framework.Failf("expected discovery API version, got %#v", group.Versions) |
| } |
| } |
| |
| ginkgo.By("getting /apis/discovery.k8s.io" + epsVersion) |
| { |
| resources, err := f.ClientSet.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()) |
| framework.ExpectNoError(err) |
| foundEPS := false |
| for _, resource := range resources.APIResources { |
| switch resource.Name { |
| case "endpointslices": |
| foundEPS = true |
| } |
| } |
| if !foundEPS { |
| framework.Failf("expected endpointslices, got %#v", resources.APIResources) |
| } |
| } |
| |
| // EndpointSlice resource create/read/update/watch verbs |
| ginkgo.By("creating") |
| _, err := epsClient.Create(ctx, epsTemplate, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| _, err = epsClient.Create(ctx, epsTemplate, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| createdEPS, err := epsClient.Create(ctx, epsTemplate, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("getting") |
| queriedEPS, err := epsClient.Get(ctx, createdEPS.Name, metav1.GetOptions{}) |
| framework.ExpectNoError(err) |
| gomega.Expect(queriedEPS.UID).To(gomega.Equal(createdEPS.UID)) |
| |
| ginkgo.By("listing") |
| epsList, err := epsClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| gomega.Expect(epsList.Items).To(gomega.HaveLen(3), "filtered list should have 3 items") |
| |
| ginkgo.By("watching") |
| framework.Logf("starting watch") |
| epsWatch, err := epsClient.Watch(ctx, metav1.ListOptions{ResourceVersion: epsList.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| |
| // Test cluster-wide list and watch |
| clusterEPSClient := f.ClientSet.DiscoveryV1().EndpointSlices("") |
| ginkgo.By("cluster-wide listing") |
| clusterEPSList, err := clusterEPSClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| gomega.Expect(clusterEPSList.Items).To(gomega.HaveLen(3), "filtered list should have 3 items") |
| |
| ginkgo.By("cluster-wide watching") |
| framework.Logf("starting watch") |
| _, err = clusterEPSClient.Watch(ctx, metav1.ListOptions{ResourceVersion: epsList.ResourceVersion, LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| |
| ginkgo.By("patching") |
| patchedEPS, err := epsClient.Patch(ctx, createdEPS.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"patched":"true"}}}`), metav1.PatchOptions{}) |
| framework.ExpectNoError(err) |
| gomega.Expect(patchedEPS.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation") |
| |
| ginkgo.By("updating") |
| var epsToUpdate, updatedEPS *discoveryv1.EndpointSlice |
| err = retry.RetryOnConflict(retry.DefaultRetry, func() error { |
| epsToUpdate, err = epsClient.Get(ctx, createdEPS.Name, metav1.GetOptions{}) |
| if err != nil { |
| return err |
| } |
| epsToUpdate.Annotations["updated"] = "true" |
| updatedEPS, err = epsClient.Update(ctx, epsToUpdate, metav1.UpdateOptions{}) |
| return err |
| }) |
| framework.ExpectNoError(err) |
| gomega.Expect(updatedEPS.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation") |
| |
| framework.Logf("waiting for watch events with expected annotations") |
| for sawAnnotations := false; !sawAnnotations; { |
| select { |
| case evt, ok := <-epsWatch.ResultChan(): |
| if !ok { |
| framework.Fail("watch channel should not close") |
| } |
| gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified)) |
| watchedEPS, isEPS := evt.Object.(*discoveryv1.EndpointSlice) |
| if !isEPS { |
| framework.Failf("expected EndpointSlice, got %T", evt.Object) |
| } |
| if watchedEPS.Annotations["patched"] == "true" { |
| framework.Logf("saw patched and updated annotations") |
| sawAnnotations = true |
| epsWatch.Stop() |
| } else { |
| framework.Logf("missing expected annotations, waiting: %#v", watchedEPS.Annotations) |
| } |
| case <-time.After(wait.ForeverTestTimeout): |
| framework.Fail("timed out waiting for watch event") |
| } |
| } |
| |
| ginkgo.By("deleting") |
| |
| err = epsClient.Delete(ctx, createdEPS.Name, metav1.DeleteOptions{}) |
| framework.ExpectNoError(err) |
| _, err = epsClient.Get(ctx, createdEPS.Name, metav1.GetOptions{}) |
| if !apierrors.IsNotFound(err) { |
| framework.Failf("expected 404, got %v", err) |
| } |
| epsList, err = epsClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| gomega.Expect(epsList.Items).To(gomega.HaveLen(2), "filtered list should have 2 items") |
| for _, eps := range epsList.Items { |
| if eps.Namespace == createdEPS.Namespace && eps.Name == createdEPS.Name { |
| framework.Fail("listing after deleting createdEPS") |
| } |
| } |
| |
| ginkgo.By("deleting a collection") |
| err = epsClient.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| epsList, err = epsClient.List(ctx, metav1.ListOptions{LabelSelector: "special-label=" + f.UniqueName}) |
| framework.ExpectNoError(err) |
| gomega.Expect(epsList.Items).To(gomega.BeEmpty(), "filtered list should have 0 items") |
| }) |
| |
| ginkgo.It("should support a Service with multiple ports specified in multiple EndpointSlices", func(ctx context.Context) { |
| ns := f.Namespace.Name |
| svc := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "example-custom-endpoints", |
| }, |
| Spec: v1.ServiceSpec{ |
| Ports: []v1.ServicePort{ |
| { |
| Name: "port80", |
| Port: 80, |
| Protocol: v1.ProtocolTCP, |
| }, |
| { |
| Name: "port81", |
| Port: 81, |
| Protocol: v1.ProtocolTCP, |
| }, |
| }, |
| }, |
| }) |
| |
| // Add a backend pod to the service in the other node |
| port8090 := []v1.ContainerPort{ |
| { |
| ContainerPort: 8090, |
| Protocol: v1.ProtocolTCP, |
| }, |
| } |
| port9090 := []v1.ContainerPort{ |
| { |
| ContainerPort: 9090, |
| Protocol: v1.ProtocolTCP, |
| }, |
| } |
| |
| serverPod := e2epod.NewAgnhostPodFromContainers( |
| "", "pod-handle-http-request", nil, |
| e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8090, "netexec", "--http-port", "8090", "--udp-port", "-1"), |
| e2epod.NewAgnhostContainer("container-handle-9090-request", nil, port9090, "netexec", "--http-port", "9090", "--udp-port", "-1"), |
| ) |
| |
| pod := e2epod.NewPodClient(f).CreateSync(ctx, serverPod) |
| |
| if pod.Status.PodIP == "" { |
| framework.Failf("PodIP not assigned for pod %s", pod.Name) |
| } |
| |
| addressType := discoveryv1.AddressTypeIPv4 |
| if framework.TestContext.ClusterIsIPv6() { |
| addressType = discoveryv1.AddressTypeIPv6 |
| } |
| |
| // create custom endpoint slices |
| tcpProtocol := v1.ProtocolTCP |
| readyCondTrue := true |
| epsTemplate := &discoveryv1.EndpointSlice{ |
| ObjectMeta: metav1.ObjectMeta{GenerateName: "e2e-custom-slice", |
| Labels: map[string]string{ |
| discoveryv1.LabelServiceName: svc.Name, |
| discoveryv1.LabelManagedBy: "e2e-test" + ns, |
| }}, |
| AddressType: addressType, |
| Endpoints: []discoveryv1.Endpoint{ |
| { |
| Addresses: []string{pod.Status.PodIP}, |
| Conditions: discoveryv1.EndpointConditions{Ready: &readyCondTrue}, |
| }, |
| }, |
| } |
| |
| ginkgo.By("creating") |
| eps1 := epsTemplate.DeepCopy() |
| eps1.Ports = []discoveryv1.EndpointPort{{ |
| Name: pointer.String("port80"), |
| Port: pointer.Int32(8090), |
| Protocol: &tcpProtocol, |
| }} |
| |
| _, err := f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(ctx, eps1, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| eps2 := epsTemplate.DeepCopy() |
| eps2.Ports = []discoveryv1.EndpointPort{{ |
| Name: pointer.String("port81"), |
| Port: pointer.Int32(9090), |
| Protocol: &tcpProtocol, |
| }} |
| |
| _, err = f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(ctx, eps2, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| |
| // connect to the service must work |
| ginkgo.By("Creating a pause pods that will try to connect to the webserver") |
| pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) |
| e2epod.NewPodClient(f).CreateSync(ctx, pausePod0) |
| |
| dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80") |
| dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81") |
| execHostnameTest(*pausePod0, dest1, serverPod.Name) |
| execHostnameTest(*pausePod0, dest2, serverPod.Name) |
| |
| }) |
| |
| ginkgo.It("should support a Service with multiple endpoint IPs specified in multiple EndpointSlices", func(ctx context.Context) { |
| ns := f.Namespace.Name |
| svc := createServiceReportErr(ctx, cs, f.Namespace.Name, &v1.Service{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: "example-custom-endpoints", |
| }, |
| Spec: v1.ServiceSpec{ |
| Ports: []v1.ServicePort{ |
| { |
| Name: "port80", |
| Port: 80, |
| Protocol: v1.ProtocolTCP, |
| }, |
| { |
| Name: "port81", |
| Port: 81, |
| Protocol: v1.ProtocolTCP, |
| }, |
| }, |
| }, |
| }) |
| |
| // Add a backend pod to the service in the other node |
| port8090 := []v1.ContainerPort{ |
| { |
| ContainerPort: 8090, |
| Protocol: v1.ProtocolTCP, |
| }, |
| } |
| |
| serverPod1 := e2epod.NewAgnhostPodFromContainers( |
| "", "pod1-handle-http-request", nil, |
| e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8090, "netexec", "--http-port", "8090", "--udp-port", "-1"), |
| ) |
| pod1 := e2epod.NewPodClient(f).CreateSync(ctx, serverPod1) |
| |
| if pod1.Status.PodIP == "" { |
| framework.Failf("PodIP not assigned for pod %s", pod1.Name) |
| } |
| |
| serverPod2 := e2epod.NewAgnhostPodFromContainers( |
| "", "pod2-handle-http-request", nil, |
| e2epod.NewAgnhostContainer("container-handle-8090-request", nil, port8090, "netexec", "--http-port", "8090", "--udp-port", "-1"), |
| ) |
| pod2 := e2epod.NewPodClient(f).CreateSync(ctx, serverPod2) |
| |
| if pod2.Status.PodIP == "" { |
| framework.Failf("PodIP not assigned for pod %s", pod2.Name) |
| } |
| |
| addressType := discoveryv1.AddressTypeIPv4 |
| if framework.TestContext.ClusterIsIPv6() { |
| addressType = discoveryv1.AddressTypeIPv6 |
| } |
| |
| // create custom endpoint slices |
| tcpProtocol := v1.ProtocolTCP |
| readyCondTrue := true |
| epsTemplate := &discoveryv1.EndpointSlice{ |
| ObjectMeta: metav1.ObjectMeta{GenerateName: "e2e-custom-slice", |
| Labels: map[string]string{ |
| discoveryv1.LabelServiceName: svc.Name, |
| discoveryv1.LabelManagedBy: "e2e-test" + ns, |
| }}, |
| AddressType: addressType, |
| } |
| |
| ginkgo.By("creating") |
| eps1 := epsTemplate.DeepCopy() |
| eps1.Endpoints = []discoveryv1.Endpoint{ |
| { |
| Addresses: []string{pod1.Status.PodIP}, |
| Conditions: discoveryv1.EndpointConditions{Ready: &readyCondTrue}, |
| }, |
| } |
| eps1.Ports = []discoveryv1.EndpointPort{{ |
| Name: pointer.String("port80"), |
| Port: pointer.Int32(8090), |
| Protocol: &tcpProtocol, |
| }} |
| |
| _, err := f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), eps1, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| eps2 := epsTemplate.DeepCopy() |
| eps2.Endpoints = []discoveryv1.Endpoint{ |
| { |
| Addresses: []string{pod2.Status.PodIP}, |
| Conditions: discoveryv1.EndpointConditions{Ready: &readyCondTrue}, |
| }, |
| } |
| eps2.Ports = []discoveryv1.EndpointPort{{ |
| Name: pointer.String("port81"), |
| Port: pointer.Int32(8090), |
| Protocol: &tcpProtocol, |
| }} |
| _, err = f.ClientSet.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), eps2, metav1.CreateOptions{}) |
| framework.ExpectNoError(err) |
| |
| // connect to the service must work |
| ginkgo.By("Creating a pause pods that will try to connect to the webserver") |
| pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil) |
| e2epod.NewPodClient(f).CreateSync(ctx, pausePod0) |
| |
| dest1 := net.JoinHostPort(svc.Spec.ClusterIP, "80") |
| dest2 := net.JoinHostPort(svc.Spec.ClusterIP, "81") |
| execHostnameTest(*pausePod0, dest1, serverPod1.Name) |
| execHostnameTest(*pausePod0, dest2, serverPod2.Name) |
| |
| }) |
| |
| }) |
| |
| // expectEndpointsAndSlices verifies that Endpoints and EndpointSlices exist for |
| // a given Service and Namespace with the appropriate attributes set. This is a |
| // relatively complex function as the order of attributes or resources is not |
| // necessarily consistent. It is used as a helper function for the tests above |
| // and takes some shortcuts with the assumption that those test cases will be |
| // the only caller of this function. |
| func expectEndpointsAndSlices(ctx context.Context, cs clientset.Interface, ns string, svc *v1.Service, pods []*v1.Pod, numSubsets, numSlices int, namedPort bool) { |
| endpointSlices := []discoveryv1.EndpointSlice{} |
| if err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { |
| endpointSlicesFound, hasMatchingSlices := hasMatchingEndpointSlices(ctx, cs, ns, svc.Name, len(pods), numSlices) |
| if !hasMatchingSlices { |
| return false, nil |
| } |
| endpointSlices = endpointSlicesFound |
| return true, nil |
| }); err != nil { |
| framework.Failf("Timed out waiting for EndpointSlices to match expectations: %v", err) |
| } |
| |
| endpoints := &v1.Endpoints{} |
| if err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { |
| endpointsFound, hasMatchingEndpoints := hasMatchingEndpoints(ctx, cs, ns, svc.Name, len(pods), numSubsets) |
| if !hasMatchingEndpoints { |
| framework.Logf("Matching Endpoints not found") |
| return false, nil |
| } |
| endpoints = endpointsFound |
| return true, nil |
| }); err != nil { |
| framework.Failf("Timed out waiting for Endpoints to match expectations: %v", err) |
| } |
| |
| podsByIP := map[string]*v1.Pod{} |
| for _, pod := range pods { |
| podsByIP[pod.Status.PodIP] = pod |
| if len(pod.Spec.Containers) != 1 { |
| framework.Failf("Expected pod to have 1 container, got %d", len(pod.Spec.Containers)) |
| } |
| } |
| |
| if endpoints.Name != svc.Name { |
| framework.Failf("Expected Endpoints name to be %s, got %s", svc.Name, endpoints.Name) |
| } |
| |
| totalEndpointAddresses := 0 |
| for _, subset := range endpoints.Subsets { |
| addresses := append(subset.Addresses, subset.NotReadyAddresses...) |
| totalEndpointAddresses += len(addresses) |
| |
| if len(subset.Ports) != len(svc.Spec.Ports) { |
| framework.Failf("Expected subset to have %d ports, got %d", len(svc.Spec.Ports), len(subset.Ports)) |
| } |
| |
| // If not a named port, the subset ports should directly correspond with |
| // the Service ports. |
| if !namedPort { |
| for i, subsetPort := range subset.Ports { |
| svcPort := svc.Spec.Ports[i] |
| if subsetPort.Name != svcPort.Name { |
| framework.Failf("Expected port name to be %s, got %s", svcPort.Name, subsetPort.Name) |
| } |
| if subsetPort.Protocol != svcPort.Protocol { |
| framework.Failf("Expected protocol to be %s, got %s", svcPort.Protocol, subsetPort.Protocol) |
| } |
| if subsetPort.Port != svcPort.TargetPort.IntVal { |
| framework.Failf("Expected port to be %d, got %d", svcPort.TargetPort.IntVal, subsetPort.Port) |
| } |
| } |
| } |
| |
| for _, address := range addresses { |
| pod, ok := podsByIP[address.IP] |
| if !ok { |
| framework.Failf("Unexpected address with IP: %s", address.IP) |
| } |
| |
| ensurePodTargetRef(pod, address.TargetRef) |
| |
| // If a named port, the subset ports should directly correspond with |
| // each individual pod. |
| if namedPort { |
| container := pod.Spec.Containers[0] |
| for _, port := range container.Ports { |
| if port.Name == svc.Spec.Ports[0].TargetPort.String() { |
| subsetPort := subset.Ports[0] |
| if subsetPort.Port != port.ContainerPort { |
| framework.Failf("Expected subset port to be %d, got %d", port.ContainerPort, subsetPort.Port) |
| } |
| if subsetPort.Name != svc.Spec.Ports[0].Name { |
| framework.Failf("Expected subset port name to be %s, got %s", svc.Spec.Ports[0].Name, subsetPort.Name) |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| if len(pods) != totalEndpointAddresses { |
| framework.Failf("Expected %d addresses, got %d", len(pods), totalEndpointAddresses) |
| } |
| |
| if len(pods) == 0 && len(endpointSlices) != 1 { |
| framework.Failf("Expected 1 EndpointSlice, got %d", len(endpointSlices)) |
| } |
| |
| // Use a set for deduping values. Duplicate addresses are technically valid |
| // here although rare. |
| esAddresses := sets.NewString() |
| for _, endpointSlice := range endpointSlices { |
| for _, endpoint := range endpointSlice.Endpoints { |
| esAddresses.Insert(endpoint.Addresses[0]) |
| } |
| if len(pods) == 0 && len(endpointSlice.Ports) != 0 { |
| framework.Failf("Expected EndpointSlice to have 0 ports, got %d", len(endpointSlice.Ports)) |
| } |
| if len(pods) > 0 && len(endpointSlice.Ports) != len(svc.Spec.Ports) { |
| framework.Failf("Expected EndpointSlice to have %d ports, got %d", len(svc.Spec.Ports), len(endpointSlice.Ports)) |
| } |
| |
| // If not a named port, the EndpointSlice ports should directly |
| // correspond with the Service ports. |
| if !namedPort { |
| for i, esPort := range endpointSlice.Ports { |
| svcPort := svc.Spec.Ports[i] |
| if *esPort.Name != svcPort.Name { |
| framework.Failf("Expected port name to be %s, got %s", svcPort.Name, *esPort.Name) |
| } |
| if *esPort.Protocol != svcPort.Protocol { |
| framework.Failf("Expected protocol to be %s, got %s", svcPort.Protocol, *esPort.Protocol) |
| } |
| if *esPort.Port != svcPort.TargetPort.IntVal { |
| framework.Failf("Expected port to be %d, got %d", svcPort.TargetPort.IntVal, *esPort.Port) |
| } |
| } |
| } |
| |
| for _, endpoint := range endpointSlice.Endpoints { |
| if len(endpoint.Addresses) == 0 { |
| framework.Failf("Expected EndpointSlice endpoint to have at least 1 address") |
| } |
| pod, ok := podsByIP[endpoint.Addresses[0]] |
| if !ok { |
| framework.Failf("Unexpected address with IP: %s", endpoint.Addresses[0]) |
| } |
| |
| ensurePodTargetRef(pod, endpoint.TargetRef) |
| |
| // If a named port, the EndpointSlice ports should directly |
| // correspond with each individual pod. |
| if namedPort { |
| container := pod.Spec.Containers[0] |
| for _, port := range container.Ports { |
| if port.Name == svc.Spec.Ports[0].TargetPort.String() { |
| esPort := endpointSlice.Ports[0] |
| if *esPort.Port != port.ContainerPort { |
| framework.Failf("Expected EndpointSlice port to be %d, got %d", port.ContainerPort, *esPort.Port) |
| } |
| if *esPort.Name != svc.Spec.Ports[0].Name { |
| framework.Failf("Expected EndpointSlice port name to be %s, got %s", svc.Spec.Ports[0].Name, *esPort.Name) |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| if len(pods) != esAddresses.Len() { |
| framework.Failf("Expected %d addresses, got %d", len(pods), esAddresses.Len()) |
| } |
| } |
| |
| // deleteEndpointSlices deletes EndpointSlices for the specified Service. |
| func deleteEndpointSlices(ctx context.Context, cs clientset.Interface, ns string, svc *v1.Service) { |
| listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.Name)} |
| esList, err := cs.DiscoveryV1().EndpointSlices(ns).List(ctx, listOptions) |
| framework.ExpectNoError(err, "Error fetching EndpointSlices for %s/%s Service", ns, svc.Name) |
| |
| for _, endpointSlice := range esList.Items { |
| err := cs.DiscoveryV1().EndpointSlices(ns).Delete(ctx, endpointSlice.Name, metav1.DeleteOptions{}) |
| framework.ExpectNoError(err, "Error deleting %s/%s EndpointSlice", ns, endpointSlice.Name) |
| } |
| } |
| |
| // hasMatchingEndpointSlices returns any EndpointSlices that match the |
| // conditions along with a boolean indicating if all the conditions have been |
| // met. |
| func hasMatchingEndpointSlices(ctx context.Context, cs clientset.Interface, ns, svcName string, numEndpoints, numSlices int) ([]discoveryv1.EndpointSlice, bool) { |
| listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svcName)} |
| esList, err := cs.DiscoveryV1().EndpointSlices(ns).List(ctx, listOptions) |
| framework.ExpectNoError(err, "Error fetching EndpointSlice for Service %s/%s", ns, svcName) |
| |
| if len(esList.Items) == 0 { |
| framework.Logf("EndpointSlice for Service %s/%s not found", ns, svcName) |
| return []discoveryv1.EndpointSlice{}, false |
| } |
| // In some cases the EndpointSlice controller will create more |
| // EndpointSlices than necessary resulting in some duplication. This is |
| // valid and tests should only fail here if less EndpointSlices than |
| // expected are added. |
| if len(esList.Items) < numSlices { |
| framework.Logf("Expected at least %d EndpointSlices for Service %s/%s, got %d", numSlices, ns, svcName, len(esList.Items)) |
| for i, epSlice := range esList.Items { |
| epsData, err := json.Marshal(epSlice) |
| if err != nil { |
| framework.Logf("Error marshaling JSON for EndpointSlice: %v", err) |
| } else { |
| framework.Logf("%d - %v", i, string(epsData)) |
| } |
| } |
| return esList.Items, false |
| } |
| |
| actualNumEndpoints := 0 |
| for _, endpointSlice := range esList.Items { |
| actualNumEndpoints += len(endpointSlice.Endpoints) |
| } |
| // In some cases the EndpointSlice controller will create more |
| // EndpointSlices than necessary resulting in some duplication. This is |
| // valid and tests should only fail here if less EndpointSlices than |
| // expected are added. |
| if actualNumEndpoints < numEndpoints { |
| framework.Logf("EndpointSlices for %s/%s Service have %d/%d endpoints", ns, svcName, actualNumEndpoints, numEndpoints) |
| return esList.Items, false |
| } |
| |
| return esList.Items, true |
| } |
| |
| // hasMatchingEndpoints returns any Endpoints that match the conditions along |
| // with a boolean indicating if all the conditions have been met. |
| func hasMatchingEndpoints(ctx context.Context, cs clientset.Interface, ns, svcName string, numIPs, numSubsets int) (*v1.Endpoints, bool) { |
| endpoints, err := cs.CoreV1().Endpoints(ns).Get(ctx, svcName, metav1.GetOptions{}) |
| if err != nil { |
| if apierrors.IsNotFound(err) { |
| framework.Logf("Endpoints for %s/%s Service not found", ns, svcName) |
| return nil, false |
| } |
| framework.ExpectNoError(err, "Error fetching Endpoints for %s/%s Service", ns, svcName) |
| } |
| if len(endpoints.Subsets) != numSubsets { |
| framework.Logf("Endpoints for %s/%s Service with %d/%d Subsets", ns, svcName, len(endpoints.Subsets), numSubsets) |
| return nil, false |
| } |
| |
| actualNumIPs := 0 |
| for _, endpointSubset := range endpoints.Subsets { |
| actualNumIPs += len(endpointSubset.Addresses) + len(endpointSubset.NotReadyAddresses) |
| } |
| if actualNumIPs != numIPs { |
| framework.Logf("Endpoints for %s/%s Service with %d/%d IPs", ns, svcName, actualNumIPs, numIPs) |
| return nil, false |
| } |
| |
| return endpoints, true |
| } |
| |
| // ensurePodTargetRef ensures that a Pod matches the provided target reference. |
| func ensurePodTargetRef(pod *v1.Pod, targetRef *v1.ObjectReference) { |
| if targetRef == nil { |
| framework.Failf("Expected TargetRef to not be nil") |
| } |
| if targetRef.Kind != "Pod" { |
| framework.Failf("Expected TargetRef.Kind to be Pod, got %s", targetRef.Kind) |
| } |
| if targetRef.Namespace != pod.Namespace { |
| framework.Failf("Expected TargetRef.Namespace to be %s, got %s", pod.Namespace, targetRef.Namespace) |
| } |
| if targetRef.Name != pod.Name { |
| framework.Failf("Expected TargetRef.Name to be %s, got %s", pod.Name, targetRef.Name) |
| } |
| if targetRef.UID != pod.UID { |
| framework.Failf("Expected TargetRef.UID to be %s, got %s", pod.UID, targetRef.UID) |
| } |
| } |
| |
| // createServiceReportErr creates a Service and reports any associated error. |
| func createServiceReportErr(ctx context.Context, cs clientset.Interface, ns string, service *v1.Service) *v1.Service { |
| svc, err := cs.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{}) |
| framework.ExpectNoError(err, "error deleting Service") |
| return svc |
| } |
| |
| // endpointSlicesEqual compare if the Endpoint and the EndpointSliceList contains the same endpoints values |
| // as in addresses and ports, considering Ready and Unready addresses |
| func endpointSlicesEqual(endpoints *v1.Endpoints, endpointSliceList *discoveryv1.EndpointSliceList) bool { |
| // get the apiserver endpoint addresses |
| epAddresses := sets.NewString() |
| epPorts := sets.NewInt32() |
| for _, subset := range endpoints.Subsets { |
| for _, addr := range subset.Addresses { |
| epAddresses.Insert(addr.IP) |
| } |
| for _, addr := range subset.NotReadyAddresses { |
| epAddresses.Insert(addr.IP) |
| } |
| for _, port := range subset.Ports { |
| epPorts.Insert(port.Port) |
| } |
| } |
| framework.Logf("Endpoints addresses: %v , ports: %v", epAddresses.List(), epPorts.List()) |
| |
| // Endpoints are single stack, and must match the primary IP family of the Service kubernetes.default |
| // However, EndpointSlices can be IPv4 or IPv6, we can only compare the Slices that match the same IP family |
| // framework.TestContext.ClusterIsIPv6() reports the IP family of the kubernetes.default service |
| var addrType discoveryv1.AddressType |
| if framework.TestContext.ClusterIsIPv6() { |
| addrType = discoveryv1.AddressTypeIPv6 |
| } else { |
| addrType = discoveryv1.AddressTypeIPv4 |
| } |
| |
| // get the apiserver addresses from the endpoint slice list |
| sliceAddresses := sets.NewString() |
| slicePorts := sets.NewInt32() |
| for _, slice := range endpointSliceList.Items { |
| if slice.AddressType != addrType { |
| framework.Logf("Skipping slice %s: wanted %s family, got %s", slice.Name, addrType, slice.AddressType) |
| continue |
| } |
| for _, s := range slice.Endpoints { |
| sliceAddresses.Insert(s.Addresses...) |
| } |
| for _, ports := range slice.Ports { |
| if ports.Port != nil { |
| slicePorts.Insert(*ports.Port) |
| } |
| } |
| } |
| |
| framework.Logf("EndpointSlices addresses: %v , ports: %v", sliceAddresses.List(), slicePorts.List()) |
| if sliceAddresses.Equal(epAddresses) && slicePorts.Equal(epPorts) { |
| return true |
| } |
| return false |
| } |