| /* |
| Copyright 2018 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package util |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "net/http" |
| "sync/atomic" |
| "testing" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| policy "k8s.io/api/policy/v1" |
| resourcev1alpha2 "k8s.io/api/resource/v1alpha2" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/uuid" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apiserver/pkg/admission" |
| utilfeature "k8s.io/apiserver/pkg/util/feature" |
| cacheddiscovery "k8s.io/client-go/discovery/cached/memory" |
| "k8s.io/client-go/dynamic" |
| "k8s.io/client-go/dynamic/dynamicinformer" |
| "k8s.io/client-go/informers" |
| clientset "k8s.io/client-go/kubernetes" |
| corelisters "k8s.io/client-go/listers/core/v1" |
| "k8s.io/client-go/metadata" |
| "k8s.io/client-go/metadata/metadatainformer" |
| restclient "k8s.io/client-go/rest" |
| "k8s.io/client-go/restmapper" |
| "k8s.io/client-go/scale" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/tools/events" |
| cliflag "k8s.io/component-base/cli/flag" |
| pvutil "k8s.io/component-helpers/storage/volume" |
| "k8s.io/controller-manager/pkg/informerfactory" |
| "k8s.io/klog/v2" |
| kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" |
| "k8s.io/kubernetes/cmd/kube-apiserver/app/options" |
| podutil "k8s.io/kubernetes/pkg/api/v1/pod" |
| "k8s.io/kubernetes/pkg/controller/disruption" |
| "k8s.io/kubernetes/pkg/controller/garbagecollector" |
| "k8s.io/kubernetes/pkg/controller/namespace" |
| "k8s.io/kubernetes/pkg/controller/resourceclaim" |
| "k8s.io/kubernetes/pkg/controlplane" |
| "k8s.io/kubernetes/pkg/features" |
| "k8s.io/kubernetes/pkg/scheduler" |
| kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" |
| configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" |
| schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" |
| "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" |
| frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" |
| "k8s.io/kubernetes/pkg/scheduler/profile" |
| st "k8s.io/kubernetes/pkg/scheduler/testing" |
| taintutils "k8s.io/kubernetes/pkg/util/taints" |
| "k8s.io/kubernetes/test/integration/framework" |
| imageutils "k8s.io/kubernetes/test/utils/image" |
| "k8s.io/kubernetes/test/utils/ktesting" |
| "k8s.io/utils/ptr" |
| ) |
| |
| // ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module |
| type ShutdownFunc func() |
| |
| // StartScheduler configures and starts a scheduler given a handle to the clientSet interface |
| // and event broadcaster. It returns the running scheduler and podInformer. Background goroutines |
| // will keep running until the context is canceled. |
| func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory) { |
| informerFactory := scheduler.NewInformerFactory(clientSet, 0) |
| evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ |
| Interface: clientSet.EventsV1()}) |
| go func() { |
| <-ctx.Done() |
| evtBroadcaster.Shutdown() |
| }() |
| |
| evtBroadcaster.StartRecordingToSink(ctx.Done()) |
| |
| logger := klog.FromContext(ctx) |
| |
| sched, err := scheduler.New( |
| ctx, |
| clientSet, |
| informerFactory, |
| nil, |
| profile.NewRecorderFactory(evtBroadcaster), |
| scheduler.WithKubeConfig(kubeConfig), |
| scheduler.WithProfiles(cfg.Profiles...), |
| scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore), |
| scheduler.WithPodMaxBackoffSeconds(cfg.PodMaxBackoffSeconds), |
| scheduler.WithPodInitialBackoffSeconds(cfg.PodInitialBackoffSeconds), |
| scheduler.WithExtenders(cfg.Extenders...), |
| scheduler.WithParallelism(cfg.Parallelism), |
| scheduler.WithFrameworkOutOfTreeRegistry(outOfTreePluginRegistry), |
| ) |
| if err != nil { |
| logger.Error(err, "Error creating scheduler") |
| klog.FlushAndExit(klog.ExitFlushTimeout, 1) |
| } |
| |
| informerFactory.Start(ctx.Done()) |
| informerFactory.WaitForCacheSync(ctx.Done()) |
| if err = sched.WaitForHandlersSync(ctx); err != nil { |
| logger.Error(err, "Failed waiting for handlers to sync") |
| klog.FlushAndExit(klog.ExitFlushTimeout, 1) |
| } |
| logger.V(3).Info("Handlers synced") |
| go sched.Run(ctx) |
| |
| return sched, informerFactory |
| } |
| |
| func CreateResourceClaimController(ctx context.Context, tb ktesting.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() { |
| podInformer := informerFactory.Core().V1().Pods() |
| schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts() |
| claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() |
| claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() |
| claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, schedulingInformer, claimInformer, claimTemplateInformer) |
| if err != nil { |
| tb.Fatalf("Error creating claim controller: %v", err) |
| } |
| return func() { |
| go claimController.Run(ctx, 5 /* workers */) |
| } |
| } |
| |
| // StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding. |
| // TODO(mborsz): Use a real PV controller here. |
| func StartFakePVController(ctx context.Context, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) { |
| pvInformer := informerFactory.Core().V1().PersistentVolumes() |
| |
| syncPV := func(obj *v1.PersistentVolume) { |
| if obj.Spec.ClaimRef != nil { |
| claimRef := obj.Spec.ClaimRef |
| pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) |
| if err != nil { |
| // Note that the error can be anything, because components like |
| // apiserver are also shutting down at the same time, but this |
| // check is conservative and only ignores the "context canceled" |
| // error while shutting down. |
| if ctx.Err() == nil || !errors.Is(err, context.Canceled) { |
| klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) |
| } |
| return |
| } |
| |
| if pvc.Spec.VolumeName == "" { |
| pvc.Spec.VolumeName = obj.Name |
| metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes") |
| _, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) |
| if err != nil { |
| if ctx.Err() == nil || !errors.Is(err, context.Canceled) { |
| // Shutting down, no need to record this. |
| klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err) |
| } |
| return |
| } |
| } |
| } |
| } |
| |
| pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: func(obj interface{}) { |
| syncPV(obj.(*v1.PersistentVolume)) |
| }, |
| UpdateFunc: func(_, obj interface{}) { |
| syncPV(obj.(*v1.PersistentVolume)) |
| }, |
| }) |
| } |
| |
| // CreateGCController creates a garbage controller and returns a run function |
| // for it. The informer factory needs to be started before invoking that |
| // function. |
| func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { |
| restclient.AddUserAgent(&restConfig, "gc-controller") |
| clientSet := clientset.NewForConfigOrDie(&restConfig) |
| metadataClient, err := metadata.NewForConfig(&restConfig) |
| if err != nil { |
| tb.Fatalf("Failed to create metadataClient: %v", err) |
| } |
| restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) |
| restMapper.Reset() |
| metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) |
| alwaysStarted := make(chan struct{}) |
| close(alwaysStarted) |
| gc, err := garbagecollector.NewGarbageCollector( |
| ctx, |
| clientSet, |
| metadataClient, |
| restMapper, |
| garbagecollector.DefaultIgnoredResources(), |
| informerfactory.NewInformerFactory(informerSet, metadataInformers), |
| alwaysStarted, |
| ) |
| if err != nil { |
| tb.Fatalf("Failed creating garbage collector") |
| } |
| startGC := func() { |
| syncPeriod := 5 * time.Second |
| go wait.Until(func() { |
| restMapper.Reset() |
| }, syncPeriod, ctx.Done()) |
| go gc.Run(ctx, 1) |
| go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) |
| } |
| return startGC |
| } |
| |
| // CreateNamespaceController creates a namespace controller and returns a run |
| // function for it. The informer factory needs to be started before invoking |
| // that function. |
| func CreateNamespaceController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { |
| restclient.AddUserAgent(&restConfig, "namespace-controller") |
| clientSet := clientset.NewForConfigOrDie(&restConfig) |
| metadataClient, err := metadata.NewForConfig(&restConfig) |
| if err != nil { |
| tb.Fatalf("Failed to create metadataClient: %v", err) |
| } |
| discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources |
| controller := namespace.NewNamespaceController( |
| ctx, |
| clientSet, |
| metadataClient, |
| discoverResourcesFn, |
| informerSet.Core().V1().Namespaces(), |
| 10*time.Hour, |
| v1.FinalizerKubernetes) |
| return func() { |
| go controller.Run(ctx, 5) |
| } |
| } |
| |
| // TestContext store necessary context info. |
| // It also contains some optional parameters for InitTestScheduler. |
| type TestContext struct { |
| // DisableEventSink, if set to true before calling InitTestScheduler, |
| // will skip the eventBroadcaster.StartRecordingToSink and thus |
| // some extra goroutines which are tricky to get rid of after |
| // a test. |
| DisableEventSink bool |
| |
| NS *v1.Namespace |
| ClientSet clientset.Interface |
| KubeConfig *restclient.Config |
| InformerFactory informers.SharedInformerFactory |
| DynInformerFactory dynamicinformer.DynamicSharedInformerFactory |
| Scheduler *scheduler.Scheduler |
| // This is the top context when initializing the test environment. |
| Ctx context.Context |
| // CloseFn will stop the apiserver and clean up the resources |
| // after itself, including shutting down its storage layer. |
| CloseFn framework.TearDownFunc |
| // This is the context when initializing scheduler. |
| SchedulerCtx context.Context |
| // SchedulerCloseFn will tear down the resources in creating scheduler, |
| // including the scheduler itself. |
| SchedulerCloseFn framework.TearDownFunc |
| |
| // RoundTrip, if set, will be called for every HTTP request going to the apiserver. |
| // It can be used for error injection. |
| RoundTrip atomic.Pointer[RoundTripWrapper] |
| } |
| |
| type RoundTripWrapper func(http.RoundTripper, *http.Request) (*http.Response, error) |
| |
| type roundTripWrapper struct { |
| tc *TestContext |
| transport http.RoundTripper |
| } |
| |
| func (r roundTripWrapper) RoundTrip(req *http.Request) (*http.Response, error) { |
| wrapper := r.tc.RoundTrip.Load() |
| if wrapper != nil { |
| return (*wrapper)(r.transport, req) |
| } |
| return r.transport.RoundTrip(req) |
| } |
| |
| var _ http.RoundTripper = roundTripWrapper{} |
| |
| // CleanupNodes cleans all nodes which were created during integration test |
| func CleanupNodes(cs clientset.Interface, t *testing.T) { |
| err := cs.CoreV1().Nodes().DeleteCollection(context.TODO(), *metav1.NewDeleteOptions(0), metav1.ListOptions{}) |
| if err != nil { |
| t.Errorf("error while deleting all nodes: %v", err) |
| } |
| } |
| |
| // PodDeleted returns true if a pod is not found in the given namespace. |
| func PodDeleted(ctx context.Context, c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc { |
| return func(context.Context) (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) |
| if apierrors.IsNotFound(err) { |
| return true, nil |
| } |
| if pod.DeletionTimestamp != nil { |
| return true, nil |
| } |
| return false, nil |
| } |
| } |
| |
| // PodsCleanedUp returns true if all pods are deleted in the specific namespace. |
| func PodsCleanedUp(ctx context.Context, c clientset.Interface, namespace string) wait.ConditionWithContextFunc { |
| return func(context.Context) (bool, error) { |
| list, err := c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) |
| if err != nil { |
| return false, err |
| } |
| return len(list.Items) == 0, nil |
| } |
| } |
| |
| // SyncSchedulerInformerFactory starts informer and waits for caches to be synced |
| func SyncSchedulerInformerFactory(testCtx *TestContext) { |
| testCtx.InformerFactory.Start(testCtx.SchedulerCtx.Done()) |
| if testCtx.DynInformerFactory != nil { |
| testCtx.DynInformerFactory.Start(testCtx.SchedulerCtx.Done()) |
| } |
| testCtx.InformerFactory.WaitForCacheSync(testCtx.SchedulerCtx.Done()) |
| if testCtx.DynInformerFactory != nil { |
| testCtx.DynInformerFactory.WaitForCacheSync(testCtx.SchedulerCtx.Done()) |
| } |
| } |
| |
| // CleanupTest cleans related resources which were created during integration test |
| func CleanupTest(t *testing.T, testCtx *TestContext) { |
| // Cleanup nodes and namespaces. |
| if err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.Ctx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}); err != nil { |
| t.Errorf("error while cleaning up nodes, error: %v", err) |
| } |
| framework.DeleteNamespaceOrDie(testCtx.ClientSet, testCtx.NS, t) |
| // Terminate the scheduler and apiserver. |
| testCtx.CloseFn() |
| } |
| |
| func RemovePodFinalizersInNamespace(ctx context.Context, cs clientset.Interface, t *testing.T, ns string) { |
| t.Helper() |
| pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{}) |
| if err != nil { |
| t.Fatalf("Failed obtaining list of pods: %v", err) |
| } |
| RemovePodFinalizers(ctx, cs, t, pods.Items...) |
| } |
| |
| // RemovePodFinalizers removes pod finalizers for the pods |
| func RemovePodFinalizers(ctx context.Context, cs clientset.Interface, t *testing.T, pods ...v1.Pod) { |
| t.Helper() |
| for _, p := range pods { |
| pod, err := cs.CoreV1().Pods(p.Namespace).Get(ctx, p.Name, metav1.GetOptions{}) |
| if err != nil && !apierrors.IsNotFound(err) { |
| t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(&p), err) |
| } else if pod != nil && len(pod.Finalizers) > 0 { |
| // Use Patch to remove finalizer, instead of Update, to avoid transient |
| // conflicts. |
| patchBytes, _ := json.Marshal(map[string]interface{}{ |
| "metadata": map[string]interface{}{ |
| "$deleteFromPrimitiveList/finalizers": pod.Finalizers, |
| }, |
| }) |
| _, err = cs.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) |
| if err != nil { |
| t.Errorf("error while updating pod status for %v: %v", klog.KObj(&p), err) |
| } |
| } |
| } |
| } |
| |
| // CleanupPods deletes the given pods and waits for them to be actually deleted. |
| func CleanupPods(ctx context.Context, cs clientset.Interface, t *testing.T, pods []*v1.Pod) { |
| for _, p := range pods { |
| err := cs.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, *metav1.NewDeleteOptions(0)) |
| if err != nil && !apierrors.IsNotFound(err) { |
| t.Errorf("error while deleting pod %v/%v: %v", p.Namespace, p.Name, err) |
| } |
| } |
| for _, p := range pods { |
| if err := wait.PollUntilContextTimeout(ctx, time.Duration(time.Microsecond.Seconds()), wait.ForeverTestTimeout, true, |
| PodDeleted(ctx, cs, p.Namespace, p.Name)); err != nil { |
| t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err) |
| } |
| } |
| } |
| |
| // AddTaintToNode add taints to specific node |
| func AddTaintToNode(cs clientset.Interface, nodeName string, taint v1.Taint) error { |
| node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) |
| if err != nil { |
| return err |
| } |
| node.Spec.Taints = append(node.Spec.Taints, taint) |
| _, err = cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) |
| return err |
| } |
| |
| // RemoveTaintOffNode removes a specific taint from a node |
| func RemoveTaintOffNode(cs clientset.Interface, nodeName string, taint v1.Taint) error { |
| node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) |
| if err != nil { |
| return err |
| } |
| var taints []v1.Taint |
| for _, t := range node.Spec.Taints { |
| if !t.MatchTaint(&taint) { |
| taints = append(taints, t) |
| } |
| } |
| node.Spec.Taints = taints |
| _, err = cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) |
| return err |
| } |
| |
| // WaitForNodeTaints waits for a node to have the target taints and returns |
| // an error if it does not have taints within the given timeout. |
| func WaitForNodeTaints(cs clientset.Interface, node *v1.Node, taints []v1.Taint) error { |
| return wait.Poll(100*time.Millisecond, 30*time.Second, NodeTainted(cs, node.Name, taints)) |
| } |
| |
| // NodeTainted return a condition function that returns true if the given node contains |
| // the taints. |
| func NodeTainted(cs clientset.Interface, nodeName string, taints []v1.Taint) wait.ConditionFunc { |
| return func() (bool, error) { |
| node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| |
| // node.Spec.Taints may have more taints |
| if len(taints) > len(node.Spec.Taints) { |
| return false, nil |
| } |
| |
| for _, taint := range taints { |
| if !taintutils.TaintExists(node.Spec.Taints, &taint) { |
| return false, nil |
| } |
| } |
| |
| return true, nil |
| } |
| } |
| |
| // NodeReadyStatus returns the status of first condition with type NodeReady. |
| // If none of the condition is of type NodeReady, returns an error. |
| func NodeReadyStatus(conditions []v1.NodeCondition) (v1.ConditionStatus, error) { |
| for _, c := range conditions { |
| if c.Type != v1.NodeReady { |
| continue |
| } |
| // Just return the first condition with type NodeReady |
| return c.Status, nil |
| } |
| return v1.ConditionFalse, errors.New("None of the conditions is of type NodeReady") |
| } |
| |
| // GetTolerationSeconds gets the period of time the toleration |
| func GetTolerationSeconds(tolerations []v1.Toleration) (int64, error) { |
| for _, t := range tolerations { |
| if t.Key == v1.TaintNodeNotReady && t.Effect == v1.TaintEffectNoExecute && t.Operator == v1.TolerationOpExists { |
| return *t.TolerationSeconds, nil |
| } |
| } |
| return 0, fmt.Errorf("cannot find toleration") |
| } |
| |
| // NodeCopyWithConditions duplicates the ode object with conditions |
| func NodeCopyWithConditions(node *v1.Node, conditions []v1.NodeCondition) *v1.Node { |
| copy := node.DeepCopy() |
| copy.ResourceVersion = "0" |
| copy.Status.Conditions = conditions |
| for i := range copy.Status.Conditions { |
| copy.Status.Conditions[i].LastHeartbeatTime = metav1.Now() |
| } |
| return copy |
| } |
| |
| // UpdateNodeStatus updates the status of node. |
| func UpdateNodeStatus(cs clientset.Interface, node *v1.Node) error { |
| _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}) |
| return err |
| } |
| |
| // InitTestAPIServer initializes a test environment and creates an API server with default |
| // configuration. |
| // It registers cleanup functions to t.Cleanup(), they will be called when the test completes, |
| // no need to do this again. |
| func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext { |
| tCtx := ktesting.Init(t) |
| testCtx := &TestContext{Ctx: tCtx} |
| |
| testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(tCtx, t, framework.TestServerSetup{ |
| ModifyServerRunOptions: func(options *options.ServerRunOptions) { |
| options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"} |
| if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { |
| options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{ |
| resourcev1alpha2.SchemeGroupVersion.String(): "true", |
| } |
| } |
| }, |
| ModifyServerConfig: func(config *controlplane.Config) { |
| if admission != nil { |
| config.GenericConfig.AdmissionControl = admission |
| } |
| }, |
| }) |
| |
| // Support wrapping HTTP requests. |
| testCtx.KubeConfig.Wrap(func(transport http.RoundTripper) http.RoundTripper { |
| return roundTripWrapper{tc: testCtx, transport: transport} |
| }) |
| var err error |
| testCtx.ClientSet, err = clientset.NewForConfig(testCtx.KubeConfig) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| oldCloseFn := testCtx.CloseFn |
| testCtx.CloseFn = func() { |
| tCtx.Cancel("tearing down apiserver") |
| oldCloseFn() |
| } |
| |
| if nsPrefix != "default" { |
| testCtx.NS = framework.CreateNamespaceOrDie(testCtx.ClientSet, nsPrefix+string(uuid.NewUUID()), t) |
| } else { |
| testCtx.NS = framework.CreateNamespaceOrDie(testCtx.ClientSet, "default", t) |
| } |
| |
| t.Cleanup(func() { |
| CleanupTest(t, testCtx) |
| }) |
| |
| return testCtx |
| } |
| |
| // WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete |
| func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) { |
| schedulerCacheIsEmpty := func() (bool, error) { |
| dump := sched.Cache.Dump() |
| |
| return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil |
| } |
| |
| if err := wait.Poll(time.Second, wait.ForeverTestTimeout, schedulerCacheIsEmpty); err != nil { |
| t.Errorf("Failed to wait for scheduler cache cleanup: %v", err) |
| } |
| } |
| |
| // InitTestScheduler initializes a test environment and creates a scheduler with default |
| // configuration. |
| func InitTestScheduler( |
| t *testing.T, |
| testCtx *TestContext, |
| ) *TestContext { |
| // Pod preemption is enabled by default scheduler configuration. |
| return InitTestSchedulerWithOptions(t, testCtx, 0) |
| } |
| |
| // InitTestSchedulerWithOptions initializes a test environment and creates a scheduler with default |
| // configuration and other options. |
| func InitTestSchedulerWithOptions( |
| t *testing.T, |
| testCtx *TestContext, |
| resyncPeriod time.Duration, |
| opts ...scheduler.Option, |
| ) *TestContext { |
| ctx, cancel := context.WithCancel(testCtx.Ctx) |
| testCtx.SchedulerCtx = ctx |
| |
| // 1. Create scheduler |
| testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod) |
| if testCtx.KubeConfig != nil { |
| dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig) |
| testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil) |
| } |
| |
| var err error |
| eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ |
| Interface: testCtx.ClientSet.EventsV1(), |
| }) |
| |
| opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig)) |
| testCtx.Scheduler, err = scheduler.New( |
| ctx, |
| testCtx.ClientSet, |
| testCtx.InformerFactory, |
| testCtx.DynInformerFactory, |
| profile.NewRecorderFactory(eventBroadcaster), |
| opts..., |
| ) |
| |
| if err != nil { |
| t.Fatalf("Couldn't create scheduler: %v", err) |
| } |
| |
| if !testCtx.DisableEventSink { |
| eventBroadcaster.StartRecordingToSink(ctx.Done()) |
| } |
| |
| oldCloseFn := testCtx.CloseFn |
| testCtx.CloseFn = func() { |
| oldCloseFn() |
| eventBroadcaster.Shutdown() |
| } |
| |
| testCtx.SchedulerCloseFn = func() { |
| cancel() |
| eventBroadcaster.Shutdown() |
| } |
| |
| return testCtx |
| } |
| |
| // WaitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns |
| // an error if it does not scheduled within the given timeout. |
| func WaitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { |
| return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, timeout, false, PodScheduled(cs, pod.Namespace, pod.Name)) |
| } |
| |
| // WaitForPodToSchedule waits for a pod to get scheduled and returns an error if |
| // it does not get scheduled within the timeout duration (30 seconds). |
| func WaitForPodToSchedule(cs clientset.Interface, pod *v1.Pod) error { |
| return WaitForPodToScheduleWithTimeout(cs, pod, 30*time.Second) |
| } |
| |
| // PodScheduled checks if the pod has been scheduled |
| func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc { |
| return func(ctx context.Context) (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil { |
| // This could be a connection error so we want to retry. |
| return false, nil |
| } |
| if pod.Spec.NodeName == "" { |
| return false, nil |
| } |
| return true, nil |
| } |
| } |
| |
| // InitDisruptionController initializes and runs a Disruption Controller to properly |
| // update PodDisuptionBudget objects. |
| func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController { |
| informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour) |
| |
| discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery()) |
| mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) |
| |
| config := restclient.CopyConfig(testCtx.KubeConfig) |
| scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery()) |
| scaleClient, err := scale.NewForConfig(config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) |
| if err != nil { |
| t.Fatalf("Error in create scaleClient: %v", err) |
| } |
| |
| dc := disruption.NewDisruptionController( |
| testCtx.Ctx, |
| informers.Core().V1().Pods(), |
| informers.Policy().V1().PodDisruptionBudgets(), |
| informers.Core().V1().ReplicationControllers(), |
| informers.Apps().V1().ReplicaSets(), |
| informers.Apps().V1().Deployments(), |
| informers.Apps().V1().StatefulSets(), |
| testCtx.ClientSet, |
| mapper, |
| scaleClient, |
| testCtx.ClientSet.Discovery()) |
| |
| informers.Start(testCtx.Scheduler.StopEverything) |
| informers.WaitForCacheSync(testCtx.Scheduler.StopEverything) |
| go dc.Run(testCtx.Ctx) |
| return dc |
| } |
| |
| // InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default |
| // configuration. |
| func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext { |
| testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), 0, opts...) |
| SyncSchedulerInformerFactory(testCtx) |
| go testCtx.Scheduler.Run(testCtx.SchedulerCtx) |
| return testCtx |
| } |
| |
| // InitTestDisablePreemption initializes a test environment and creates API server and scheduler with default |
| // configuration but with pod preemption disabled. |
| func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { |
| cfg := configtesting.V1ToInternalWithDefaults(t, kubeschedulerconfigv1.KubeSchedulerConfiguration{ |
| Profiles: []kubeschedulerconfigv1.KubeSchedulerProfile{{ |
| SchedulerName: ptr.To(v1.DefaultSchedulerName), |
| Plugins: &kubeschedulerconfigv1.Plugins{ |
| PostFilter: kubeschedulerconfigv1.PluginSet{ |
| Disabled: []kubeschedulerconfigv1.Plugin{ |
| {Name: defaultpreemption.Name}, |
| }, |
| }, |
| }, |
| }}, |
| }) |
| testCtx := InitTestSchedulerWithOptions( |
| t, InitTestAPIServer(t, nsPrefix, nil), |
| 0, |
| scheduler.WithProfiles(cfg.Profiles...)) |
| SyncSchedulerInformerFactory(testCtx) |
| go testCtx.Scheduler.Run(testCtx.SchedulerCtx) |
| return testCtx |
| } |
| |
| // WaitForReflection waits till the passFunc confirms that the object it expects |
| // to see is in the store. Used to observe reflected events. |
| func WaitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, |
| passFunc func(n interface{}) bool) error { |
| var nodes []*v1.Node |
| err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { |
| n, err := nodeLister.Get(key) |
| |
| switch { |
| case err == nil && passFunc(n): |
| return true, nil |
| case apierrors.IsNotFound(err): |
| nodes = append(nodes, nil) |
| case err != nil: |
| t.Errorf("Unexpected error: %v", err) |
| default: |
| nodes = append(nodes, n) |
| } |
| |
| return false, nil |
| }) |
| if err != nil { |
| t.Logf("Logging consecutive node versions received from store:") |
| for i, n := range nodes { |
| t.Logf("%d: %#v", i, n) |
| } |
| } |
| return err |
| } |
| |
| func UpdateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { |
| return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) |
| } |
| |
| func CreateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { |
| return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) |
| } |
| |
| func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { |
| nodes := make([]*v1.Node, numNodes) |
| for i := 0; i < numNodes; i++ { |
| nodeName := fmt.Sprintf("%v-%d", prefix, i) |
| node, err := CreateNode(cs, wrapper.Name(nodeName).Label("kubernetes.io/hostname", nodeName).Obj()) |
| if err != nil { |
| return nodes[:], err |
| } |
| nodes[i] = node |
| } |
| return nodes[:], nil |
| } |
| |
| // CreateAndWaitForNodesInCache calls createNodes(), and wait for the created |
| // nodes to be present in scheduler cache. |
| func CreateAndWaitForNodesInCache(testCtx *TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { |
| existingNodes := testCtx.Scheduler.Cache.NodeCount() |
| nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) |
| if err != nil { |
| return nodes, fmt.Errorf("cannot create nodes: %v", err) |
| } |
| return nodes, WaitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes) |
| } |
| |
| // WaitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache |
| // within 30 seconds; otherwise returns false. |
| func WaitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { |
| err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { |
| return sched.Cache.NodeCount() >= nodeCount, nil |
| }) |
| if err != nil { |
| return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) |
| } |
| return nil |
| } |
| |
| type PausePodConfig struct { |
| Name string |
| Namespace string |
| Affinity *v1.Affinity |
| Annotations, Labels, NodeSelector map[string]string |
| Resources *v1.ResourceRequirements |
| Tolerations []v1.Toleration |
| NodeName string |
| SchedulerName string |
| Priority *int32 |
| PreemptionPolicy *v1.PreemptionPolicy |
| PriorityClassName string |
| Volumes []v1.Volume |
| } |
| |
| // InitPausePod initializes a pod API object from the given config. It is used |
| // mainly in pod creation process. |
| func InitPausePod(conf *PausePodConfig) *v1.Pod { |
| pod := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: conf.Name, |
| Namespace: conf.Namespace, |
| Labels: conf.Labels, |
| Annotations: conf.Annotations, |
| }, |
| Spec: v1.PodSpec{ |
| NodeSelector: conf.NodeSelector, |
| Affinity: conf.Affinity, |
| Containers: []v1.Container{ |
| { |
| Name: conf.Name, |
| Image: imageutils.GetPauseImageName(), |
| }, |
| }, |
| Tolerations: conf.Tolerations, |
| NodeName: conf.NodeName, |
| SchedulerName: conf.SchedulerName, |
| Priority: conf.Priority, |
| PreemptionPolicy: conf.PreemptionPolicy, |
| PriorityClassName: conf.PriorityClassName, |
| Volumes: conf.Volumes, |
| }, |
| } |
| if conf.Resources != nil { |
| pod.Spec.Containers[0].Resources = *conf.Resources |
| } |
| return pod |
| } |
| |
| // CreatePausePod creates a pod with "Pause" image and the given config and |
| // return its pointer and error status. |
| func CreatePausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) { |
| return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{}) |
| } |
| |
| // CreatePausePodWithResource creates a pod with "Pause" image and the given |
| // resources and returns its pointer and error status. The resource list can be |
| // nil. |
| func CreatePausePodWithResource(cs clientset.Interface, podName string, |
| nsName string, res *v1.ResourceList) (*v1.Pod, error) { |
| var conf PausePodConfig |
| if res == nil { |
| conf = PausePodConfig{ |
| Name: podName, |
| Namespace: nsName, |
| } |
| } else { |
| conf = PausePodConfig{ |
| Name: podName, |
| Namespace: nsName, |
| Resources: &v1.ResourceRequirements{ |
| Requests: *res, |
| }, |
| } |
| } |
| return CreatePausePod(cs, InitPausePod(&conf)) |
| } |
| |
| // CreatePVC creates a PersistentVolumeClaim with the given config and returns |
| // its pointer and error status. |
| func CreatePVC(cs clientset.Interface, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { |
| return cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}) |
| } |
| |
| // CreatePV creates a PersistentVolume with the given config and returns its |
| // pointer and error status. |
| func CreatePV(cs clientset.Interface, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { |
| return cs.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}) |
| } |
| |
| // DeletePVC deletes the given PVC in the given namespace. |
| func DeletePVC(cs clientset.Interface, pvcName string, nsName string) error { |
| return cs.CoreV1().PersistentVolumeClaims(nsName).Delete(context.TODO(), pvcName, *metav1.NewDeleteOptions(0)) |
| } |
| |
| // DeletePV deletes the given PV in the given namespace. |
| func DeletePV(cs clientset.Interface, pvName string) error { |
| return cs.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, *metav1.NewDeleteOptions(0)) |
| } |
| |
| // RunPausePod creates a pod with "Pause" image and the given config and waits |
| // until it is scheduled. It returns its pointer and error status. |
| func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { |
| pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) |
| if err != nil { |
| return nil, fmt.Errorf("failed to create pause pod: %v", err) |
| } |
| if err = WaitForPodToSchedule(cs, pod); err != nil { |
| return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err) |
| } |
| if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { |
| return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err) |
| } |
| return pod, nil |
| } |
| |
| type PodWithContainersConfig struct { |
| Name string |
| Namespace string |
| Containers []v1.Container |
| } |
| |
| // InitPodWithContainers initializes a pod API object from the given config. This is used primarily for generating |
| // pods with containers each having a specific image. |
| func InitPodWithContainers(cs clientset.Interface, conf *PodWithContainersConfig) *v1.Pod { |
| pod := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| Name: conf.Name, |
| Namespace: conf.Namespace, |
| }, |
| Spec: v1.PodSpec{ |
| Containers: conf.Containers, |
| }, |
| } |
| return pod |
| } |
| |
| // RunPodWithContainers creates a pod with given config and containers and waits |
| // until it is scheduled. It returns its pointer and error status. |
| func RunPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { |
| pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) |
| if err != nil { |
| return nil, fmt.Errorf("failed to create pod-with-containers: %v", err) |
| } |
| if err = WaitForPodToSchedule(cs, pod); err != nil { |
| return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err) |
| } |
| if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { |
| return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err) |
| } |
| return pod, nil |
| } |
| |
| // PodIsGettingEvicted returns true if the pod's deletion timestamp is set. |
| func PodIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc { |
| return func(ctx context.Context) (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil { |
| return false, err |
| } |
| if pod.DeletionTimestamp != nil { |
| return true, nil |
| } |
| return false, nil |
| } |
| } |
| |
| // PodScheduledIn returns true if a given pod is placed onto one of the expected nodes. |
| func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionWithContextFunc { |
| return func(ctx context.Context) (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil { |
| // This could be a connection error so we want to retry. |
| return false, nil |
| } |
| if pod.Spec.NodeName == "" { |
| return false, nil |
| } |
| for _, nodeName := range nodeNames { |
| if pod.Spec.NodeName == nodeName { |
| return true, nil |
| } |
| } |
| return false, nil |
| } |
| } |
| |
| // PodUnschedulable returns a condition function that returns true if the given pod |
| // gets unschedulable status of reason 'Unschedulable'. |
| func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc { |
| return func(ctx context.Context) (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil { |
| // This could be a connection error so we want to retry. |
| return false, nil |
| } |
| _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) |
| return cond != nil && cond.Status == v1.ConditionFalse && |
| cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil |
| } |
| } |
| |
| // PodSchedulingError returns a condition function that returns true if the given pod |
| // gets unschedulable status for reasons other than "Unschedulable". The scheduler |
| // records such reasons in case of error. |
| func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc { |
| return func(ctx context.Context) (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) |
| if err != nil { |
| // This could be a connection error so we want to retry. |
| return false, nil |
| } |
| _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) |
| return cond != nil && cond.Status == v1.ConditionFalse && |
| cond.Reason != v1.PodReasonUnschedulable, nil |
| } |
| } |
| |
| // PodSchedulingGated returns a condition function that returns true if the given pod |
| // gets unschedulable status of reason 'SchedulingGated'. |
| func PodSchedulingGated(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { |
| return func() (bool, error) { |
| pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) |
| if err != nil { |
| // This could be a connection error so we want to retry. |
| return false, nil |
| } |
| _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) |
| return cond != nil && cond.Status == v1.ConditionFalse && |
| cond.Reason == v1.PodReasonSchedulingGated && pod.Spec.NodeName == "", nil |
| } |
| } |
| |
| // WaitForPodUnschedulableWithTimeout waits for a pod to fail scheduling and returns |
| // an error if it does not become unschedulable within the given timeout. |
| func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { |
| return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, timeout, false, PodUnschedulable(cs, pod.Namespace, pod.Name)) |
| } |
| |
| // WaitForPodUnschedulable waits for a pod to fail scheduling and returns |
| // an error if it does not become unschedulable within the timeout duration (30 seconds). |
| func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { |
| return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) |
| } |
| |
| // WaitForPodSchedulingGated waits for a pod to be in scheduling gated state |
| // and returns an error if it does not fall into this state within the given timeout. |
| func WaitForPodSchedulingGated(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { |
| return wait.Poll(100*time.Millisecond, timeout, PodSchedulingGated(cs, pod.Namespace, pod.Name)) |
| } |
| |
| // WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to |
| // the expected values. |
| func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { |
| return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { |
| pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{}) |
| if err != nil { |
| return false, err |
| } |
| if len(pdbList.Items) != len(pdbs) { |
| return false, nil |
| } |
| for i, pdb := range pdbs { |
| found := false |
| for _, cpdb := range pdbList.Items { |
| if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { |
| found = true |
| if cpdb.Status.CurrentHealthy != pdbPodNum[i] { |
| return false, nil |
| } |
| } |
| } |
| if !found { |
| return false, nil |
| } |
| } |
| return true, nil |
| }) |
| } |
| |
| // WaitCachedPodsStable waits until scheduler cache has the given pods. |
| func WaitCachedPodsStable(testCtx *TestContext, pods []*v1.Pod) error { |
| return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { |
| cachedPods, err := testCtx.Scheduler.Cache.PodCount() |
| if err != nil { |
| return false, err |
| } |
| if len(pods) != cachedPods { |
| return false, nil |
| } |
| for _, p := range pods { |
| actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) |
| if err1 != nil { |
| return false, err1 |
| } |
| cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod) |
| if err2 != nil || cachedPod == nil { |
| return false, err2 |
| } |
| } |
| return true, nil |
| }) |
| } |
| |
| // DeletePod deletes the given pod in the given namespace. |
| func DeletePod(cs clientset.Interface, podName string, nsName string) error { |
| return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0)) |
| } |
| |
| func GetPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) { |
| return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) |
| } |
| |
| func CreateNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error { |
| for _, n := range namespaces { |
| ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}} |
| if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // timeout returns a timeout error if the given `f` function doesn't |
| // complete within `d` duration; otherwise it returns nil. |
| func timeout(ctx context.Context, d time.Duration, f func()) error { |
| ctx, cancel := context.WithTimeout(ctx, d) |
| defer cancel() |
| |
| done := make(chan struct{}) |
| go func() { |
| f() |
| close(done) |
| }() |
| |
| select { |
| case <-done: |
| return nil |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| } |
| |
| // NextPodOrDie returns the next Pod in the scheduler queue. |
| // The operation needs to be completed within 5 seconds; otherwise the test gets aborted. |
| func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo { |
| t.Helper() |
| |
| var podInfo *schedulerframework.QueuedPodInfo |
| logger := klog.FromContext(testCtx.Ctx) |
| // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on |
| // default go testing timeout (10m) to abort. |
| if err := timeout(testCtx.Ctx, time.Second*5, func() { |
| podInfo, _ = testCtx.Scheduler.NextPod(logger) |
| }); err != nil { |
| t.Fatalf("Timed out waiting for the Pod to be popped: %v", err) |
| } |
| return podInfo |
| } |
| |
| // NextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout. |
| // Note that this function leaks goroutines in the case of timeout; even after this function returns after timeout, |
| // the goroutine made by this function keep waiting to pop a pod from the queue. |
| func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo { |
| t.Helper() |
| |
| var podInfo *schedulerframework.QueuedPodInfo |
| logger := klog.FromContext(testCtx.Ctx) |
| // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on |
| // default go testing timeout (10m) to abort. |
| if err := timeout(testCtx.Ctx, time.Second*5, func() { |
| podInfo, _ = testCtx.Scheduler.NextPod(logger) |
| }); err != nil { |
| return nil |
| } |
| return podInfo |
| } |