| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package namespace |
| |
| import ( |
| "context" |
| "fmt" |
| "time" |
| |
| "golang.org/x/time/rate" |
| |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| coreinformers "k8s.io/client-go/informers/core/v1" |
| clientset "k8s.io/client-go/kubernetes" |
| corelisters "k8s.io/client-go/listers/core/v1" |
| "k8s.io/client-go/metadata" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/util/workqueue" |
| "k8s.io/kubernetes/pkg/controller" |
| "k8s.io/kubernetes/pkg/controller/namespace/deletion" |
| |
| "k8s.io/klog/v2" |
| ) |
| |
| const ( |
| // namespaceDeletionGracePeriod is the time period to wait before processing a received namespace event. |
| // This allows time for the following to occur: |
| // * lifecycle admission plugins on HA apiservers to also observe a namespace |
| // deletion and prevent new objects from being created in the terminating namespace |
| // * non-leader etcd servers to observe last-minute object creations in a namespace |
| // so this controller's cleanup can actually clean up all objects |
| namespaceDeletionGracePeriod = 5 * time.Second |
| ) |
| |
| // NamespaceController is responsible for performing actions dependent upon a namespace phase |
| type NamespaceController struct { |
| // lister that can list namespaces from a shared cache |
| lister corelisters.NamespaceLister |
| // returns true when the namespace cache is ready |
| listerSynced cache.InformerSynced |
| // namespaces that have been queued up for processing by workers |
| queue workqueue.RateLimitingInterface |
| // helper to delete all resources in the namespace when the namespace is deleted. |
| namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface |
| } |
| |
| // NewNamespaceController creates a new NamespaceController |
| func NewNamespaceController( |
| ctx context.Context, |
| kubeClient clientset.Interface, |
| metadataClient metadata.Interface, |
| discoverResourcesFn func() ([]*metav1.APIResourceList, error), |
| namespaceInformer coreinformers.NamespaceInformer, |
| resyncPeriod time.Duration, |
| finalizerToken v1.FinalizerName) *NamespaceController { |
| |
| // create the controller so we can inject the enqueue function |
| namespaceController := &NamespaceController{ |
| queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"), |
| namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken), |
| } |
| |
| // configure the namespace informer event handlers |
| namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( |
| cache.ResourceEventHandlerFuncs{ |
| AddFunc: func(obj interface{}) { |
| namespace := obj.(*v1.Namespace) |
| namespaceController.enqueueNamespace(namespace) |
| }, |
| UpdateFunc: func(oldObj, newObj interface{}) { |
| namespace := newObj.(*v1.Namespace) |
| namespaceController.enqueueNamespace(namespace) |
| }, |
| }, |
| resyncPeriod, |
| ) |
| namespaceController.lister = namespaceInformer.Lister() |
| namespaceController.listerSynced = namespaceInformer.Informer().HasSynced |
| |
| return namespaceController |
| } |
| |
| // nsControllerRateLimiter is tuned for a faster than normal recycle time with default backoff speed and default overall |
| // requeing speed. We do this so that namespace cleanup is reliably faster and we know that the number of namespaces being |
| // deleted is smaller than total number of other namespace scoped resources in a cluster. |
| func nsControllerRateLimiter() workqueue.RateLimiter { |
| return workqueue.NewMaxOfRateLimiter( |
| // this ensures that we retry namespace deletion at least every minute, never longer. |
| workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 60*time.Second), |
| // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) |
| &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, |
| ) |
| } |
| |
| // enqueueNamespace adds an object to the controller work queue |
| // obj could be an *v1.Namespace, or a DeletionFinalStateUnknown item. |
| func (nm *NamespaceController) enqueueNamespace(obj interface{}) { |
| key, err := controller.KeyFunc(obj) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) |
| return |
| } |
| |
| namespace := obj.(*v1.Namespace) |
| // don't queue if we aren't deleted |
| if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() { |
| return |
| } |
| |
| // delay processing namespace events to allow HA api servers to observe namespace deletion, |
| // and HA etcd servers to observe last minute object creations inside the namespace |
| nm.queue.AddAfter(key, namespaceDeletionGracePeriod) |
| } |
| |
| // worker processes the queue of namespace objects. |
| // Each namespace can be in the queue at most once. |
| // The system ensures that no two workers can process |
| // the same namespace at the same time. |
| func (nm *NamespaceController) worker(ctx context.Context) { |
| workFunc := func(ctx context.Context) bool { |
| key, quit := nm.queue.Get() |
| if quit { |
| return true |
| } |
| defer nm.queue.Done(key) |
| |
| err := nm.syncNamespaceFromKey(ctx, key.(string)) |
| if err == nil { |
| // no error, forget this entry and return |
| nm.queue.Forget(key) |
| return false |
| } |
| |
| if estimate, ok := err.(*deletion.ResourcesRemainingError); ok { |
| t := estimate.Estimate/2 + 1 |
| klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t) |
| nm.queue.AddAfter(key, time.Duration(t)*time.Second) |
| } else { |
| // rather than wait for a full resync, re-add the namespace to the queue to be processed |
| nm.queue.AddRateLimited(key) |
| utilruntime.HandleError(fmt.Errorf("deletion of namespace %v failed: %v", key, err)) |
| } |
| return false |
| } |
| for { |
| quit := workFunc(ctx) |
| |
| if quit { |
| return |
| } |
| } |
| } |
| |
| // syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it |
| func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) { |
| startTime := time.Now() |
| logger := klog.FromContext(ctx) |
| defer func() { |
| logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime)) |
| }() |
| |
| namespace, err := nm.lister.Get(key) |
| if errors.IsNotFound(err) { |
| logger.Info("Namespace has been deleted", "namespace", key) |
| return nil |
| } |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err)) |
| return err |
| } |
| return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name) |
| } |
| |
| // Run starts observing the system with the specified number of workers. |
| func (nm *NamespaceController) Run(ctx context.Context, workers int) { |
| defer utilruntime.HandleCrash() |
| defer nm.queue.ShutDown() |
| logger := klog.FromContext(ctx) |
| logger.Info("Starting namespace controller") |
| defer logger.Info("Shutting down namespace controller") |
| |
| if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) { |
| return |
| } |
| |
| logger.V(5).Info("Starting workers of namespace controller") |
| for i := 0; i < workers; i++ { |
| go wait.UntilWithContext(ctx, nm.worker, time.Second) |
| } |
| <-ctx.Done() |
| } |