| /* |
| Copyright 2020 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package storageversiongc |
| |
| import ( |
| "context" |
| "fmt" |
| "time" |
| |
| apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1" |
| coordinationv1 "k8s.io/api/coordination/v1" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| utilerrors "k8s.io/apimachinery/pkg/util/errors" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apiserver/pkg/storageversion" |
| apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1" |
| coordinformers "k8s.io/client-go/informers/coordination/v1" |
| "k8s.io/client-go/kubernetes" |
| coordlisters "k8s.io/client-go/listers/coordination/v1" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/util/workqueue" |
| "k8s.io/kubernetes/pkg/controlplane" |
| |
| "k8s.io/klog/v2" |
| ) |
| |
| // Controller watches kube-apiserver leases and storageversions, and delete stale |
| // storage version entries and objects. |
| type Controller struct { |
| kubeclientset kubernetes.Interface |
| |
| leaseLister coordlisters.LeaseLister |
| leasesSynced cache.InformerSynced |
| |
| storageVersionSynced cache.InformerSynced |
| |
| leaseQueue workqueue.RateLimitingInterface |
| storageVersionQueue workqueue.RateLimitingInterface |
| } |
| |
| // NewStorageVersionGC creates a new Controller. |
| func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller { |
| c := &Controller{ |
| kubeclientset: clientset, |
| leaseLister: leaseInformer.Lister(), |
| leasesSynced: leaseInformer.Informer().HasSynced, |
| storageVersionSynced: storageVersionInformer.Informer().HasSynced, |
| leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"), |
| storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"), |
| } |
| logger := klog.FromContext(ctx) |
| leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| DeleteFunc: func(obj interface{}) { |
| c.onDeleteLease(logger, obj) |
| }, |
| }) |
| // use the default resync period from the informer |
| storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: func(obj interface{}) { |
| c.onAddStorageVersion(logger, obj) |
| }, |
| UpdateFunc: func(old, newObj interface{}) { |
| c.onUpdateStorageVersion(logger, old, newObj) |
| }, |
| }) |
| |
| return c |
| } |
| |
| // Run starts one worker. |
| func (c *Controller) Run(ctx context.Context) { |
| logger := klog.FromContext(ctx) |
| defer utilruntime.HandleCrash() |
| defer c.leaseQueue.ShutDown() |
| defer c.storageVersionQueue.ShutDown() |
| defer logger.Info("Shutting down storage version garbage collector") |
| |
| logger.Info("Starting storage version garbage collector") |
| |
| if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) { |
| utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) |
| return |
| } |
| |
| // Identity lease deletion and storageversion update don't happen too often. Start one |
| // worker for each of them. |
| // runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker |
| // handles storageversion creation/update with non-existing id. The latter should rarely |
| // happen. It's okay for the two workers to conflict on update. |
| go wait.UntilWithContext(ctx, c.runLeaseWorker, time.Second) |
| go wait.UntilWithContext(ctx, c.runStorageVersionWorker, time.Second) |
| |
| <-ctx.Done() |
| } |
| |
| func (c *Controller) runLeaseWorker(ctx context.Context) { |
| for c.processNextLease(ctx) { |
| } |
| } |
| |
| func (c *Controller) processNextLease(ctx context.Context) bool { |
| key, quit := c.leaseQueue.Get() |
| if quit { |
| return false |
| } |
| defer c.leaseQueue.Done(key) |
| |
| err := c.processDeletedLease(ctx, key.(string)) |
| if err == nil { |
| c.leaseQueue.Forget(key) |
| return true |
| } |
| |
| utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err)) |
| c.leaseQueue.AddRateLimited(key) |
| return true |
| } |
| |
| func (c *Controller) runStorageVersionWorker(ctx context.Context) { |
| for c.processNextStorageVersion(ctx) { |
| } |
| } |
| |
| func (c *Controller) processNextStorageVersion(ctx context.Context) bool { |
| key, quit := c.storageVersionQueue.Get() |
| if quit { |
| return false |
| } |
| defer c.storageVersionQueue.Done(key) |
| |
| err := c.syncStorageVersion(ctx, key.(string)) |
| if err == nil { |
| c.storageVersionQueue.Forget(key) |
| return true |
| } |
| |
| utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err)) |
| c.storageVersionQueue.AddRateLimited(key) |
| return true |
| } |
| |
| func (c *Controller) processDeletedLease(ctx context.Context, name string) error { |
| _, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{}) |
| // the lease isn't deleted, nothing we need to do here |
| if err == nil { |
| return nil |
| } |
| if !apierrors.IsNotFound(err) { |
| return err |
| } |
| // the frequency of this call won't be too high because we only trigger on identity lease deletions |
| storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(ctx, metav1.ListOptions{}) |
| if err != nil { |
| return err |
| } |
| |
| var errors []error |
| for _, sv := range storageVersionList.Items { |
| var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion |
| hasStaleRecord := false |
| for _, ssv := range sv.Status.StorageVersions { |
| if ssv.APIServerID == name { |
| hasStaleRecord = true |
| continue |
| } |
| serverStorageVersions = append(serverStorageVersions, ssv) |
| } |
| if !hasStaleRecord { |
| continue |
| } |
| if err := c.updateOrDeleteStorageVersion(ctx, &sv, serverStorageVersions); err != nil { |
| errors = append(errors, err) |
| } |
| } |
| |
| return utilerrors.NewAggregate(errors) |
| } |
| |
| func (c *Controller) syncStorageVersion(ctx context.Context, name string) error { |
| sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(ctx, name, metav1.GetOptions{}) |
| if apierrors.IsNotFound(err) { |
| // The problematic storage version that was added/updated recently is gone. |
| // Nothing we need to do here. |
| return nil |
| } |
| if err != nil { |
| return err |
| } |
| |
| hasInvalidID := false |
| var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion |
| for _, v := range sv.Status.StorageVersions { |
| lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{}) |
| if err != nil || lease == nil || lease.Labels == nil || |
| lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { |
| // We cannot find a corresponding identity lease from apiserver as well. |
| // We need to clean up this storage version. |
| hasInvalidID = true |
| continue |
| } |
| serverStorageVersions = append(serverStorageVersions, v) |
| } |
| if !hasInvalidID { |
| return nil |
| } |
| return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions) |
| } |
| |
| func (c *Controller) onAddStorageVersion(logger klog.Logger, obj interface{}) { |
| castObj := obj.(*apiserverinternalv1alpha1.StorageVersion) |
| c.enqueueStorageVersion(logger, castObj) |
| } |
| |
| func (c *Controller) onUpdateStorageVersion(logger klog.Logger, oldObj, newObj interface{}) { |
| castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion) |
| c.enqueueStorageVersion(logger, castNewObj) |
| } |
| |
| // enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver |
| func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverinternalv1alpha1.StorageVersion) { |
| for _, sv := range obj.Status.StorageVersions { |
| lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) |
| if err != nil || lease == nil || lease.Labels == nil || |
| lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { |
| // we cannot find a corresponding identity lease in cache, enqueue the storageversion |
| logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name) |
| c.storageVersionQueue.Add(obj.Name) |
| return |
| } |
| } |
| } |
| |
| func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) { |
| castObj, ok := obj.(*coordinationv1.Lease) |
| if !ok { |
| tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) |
| return |
| } |
| castObj, ok = tombstone.Obj.(*coordinationv1.Lease) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj)) |
| return |
| } |
| } |
| |
| if castObj.Namespace == metav1.NamespaceSystem && |
| castObj.Labels != nil && |
| castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { |
| logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name) |
| c.enqueueLease(castObj) |
| } |
| } |
| |
| func (c *Controller) enqueueLease(obj *coordinationv1.Lease) { |
| c.leaseQueue.Add(obj.Name) |
| } |
| |
| func (c *Controller) updateOrDeleteStorageVersion(ctx context.Context, sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error { |
| if len(serverStorageVersions) == 0 { |
| return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete( |
| ctx, sv.Name, metav1.DeleteOptions{}) |
| } |
| sv.Status.StorageVersions = serverStorageVersions |
| storageversion.SetCommonEncodingVersion(sv) |
| _, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus( |
| ctx, sv, metav1.UpdateOptions{}) |
| return err |
| } |