| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package garbagecollector |
| |
| import ( |
| "context" |
| goerrors "errors" |
| "fmt" |
| "k8s.io/controller-manager/pkg/informerfactory" |
| "reflect" |
| "sync" |
| "time" |
| |
| "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apimachinery/pkg/types" |
| utilerrors "k8s.io/apimachinery/pkg/util/errors" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/client-go/discovery" |
| clientset "k8s.io/client-go/kubernetes" // import known versions |
| v1core "k8s.io/client-go/kubernetes/typed/core/v1" |
| "k8s.io/client-go/metadata" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/tools/record" |
| "k8s.io/client-go/util/workqueue" |
| "k8s.io/controller-manager/controller" |
| "k8s.io/klog/v2" |
| c "k8s.io/kubernetes/pkg/controller" |
| "k8s.io/kubernetes/pkg/controller/garbagecollector/metrics" |
| ) |
| |
| // ResourceResyncTime defines the resync period of the garbage collector's informers. |
| const ResourceResyncTime time.Duration = 0 |
| |
| // GarbageCollector runs reflectors to watch for changes of managed API |
| // objects, funnels the results to a single-threaded dependencyGraphBuilder, |
| // which builds a graph caching the dependencies among objects. Triggered by the |
| // graph changes, the dependencyGraphBuilder enqueues objects that can |
| // potentially be garbage-collected to the `attemptToDelete` queue, and enqueues |
| // objects whose dependents need to be orphaned to the `attemptToOrphan` queue. |
| // The GarbageCollector has workers who consume these two queues, send requests |
| // to the API server to delete/update the objects accordingly. |
| // Note that having the dependencyGraphBuilder notify the garbage collector |
| // ensures that the garbage collector operates with a graph that is at least as |
| // up to date as the notification is sent. |
| type GarbageCollector struct { |
| restMapper meta.ResettableRESTMapper |
| metadataClient metadata.Interface |
| // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe. |
| attemptToDelete workqueue.RateLimitingInterface |
| // garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items. |
| attemptToOrphan workqueue.RateLimitingInterface |
| dependencyGraphBuilder *GraphBuilder |
| // GC caches the owners that do not exist according to the API server. |
| absentOwnerCache *ReferenceCache |
| |
| kubeClient clientset.Interface |
| eventBroadcaster record.EventBroadcaster |
| |
| workerLock sync.RWMutex |
| } |
| |
| var _ controller.Interface = (*GarbageCollector)(nil) |
| var _ controller.Debuggable = (*GarbageCollector)(nil) |
| |
| // NewGarbageCollector creates a new GarbageCollector. |
| func NewGarbageCollector( |
| ctx context.Context, |
| kubeClient clientset.Interface, |
| metadataClient metadata.Interface, |
| mapper meta.ResettableRESTMapper, |
| ignoredResources map[schema.GroupResource]struct{}, |
| sharedInformers informerfactory.InformerFactory, |
| informersStarted <-chan struct{}, |
| ) (*GarbageCollector, error) { |
| graphBuilder := NewDependencyGraphBuilder(ctx, metadataClient, mapper, ignoredResources, sharedInformers, informersStarted) |
| return NewComposedGarbageCollector(ctx, kubeClient, metadataClient, mapper, graphBuilder) |
| } |
| |
| func NewComposedGarbageCollector( |
| ctx context.Context, |
| kubeClient clientset.Interface, |
| metadataClient metadata.Interface, |
| mapper meta.ResettableRESTMapper, |
| graphBuilder *GraphBuilder, |
| ) (*GarbageCollector, error) { |
| attemptToDelete, attemptToOrphan, absentOwnerCache := graphBuilder.GetGraphResources() |
| |
| gc := &GarbageCollector{ |
| metadataClient: metadataClient, |
| restMapper: mapper, |
| attemptToDelete: attemptToDelete, |
| attemptToOrphan: attemptToOrphan, |
| absentOwnerCache: absentOwnerCache, |
| kubeClient: kubeClient, |
| eventBroadcaster: graphBuilder.eventBroadcaster, |
| dependencyGraphBuilder: graphBuilder, |
| } |
| |
| metrics.Register() |
| |
| return gc, nil |
| } |
| |
| // resyncMonitors starts or stops resource monitors as needed to ensure that all |
| // (and only) those resources present in the map are monitored. |
| func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResources map[schema.GroupVersionResource]struct{}) error { |
| if err := gc.dependencyGraphBuilder.syncMonitors(logger, deletableResources); err != nil { |
| return err |
| } |
| gc.dependencyGraphBuilder.startMonitors(logger) |
| return nil |
| } |
| |
| // Run starts garbage collector workers. |
| func (gc *GarbageCollector) Run(ctx context.Context, workers int) { |
| defer utilruntime.HandleCrash() |
| defer gc.attemptToDelete.ShutDown() |
| defer gc.attemptToOrphan.ShutDown() |
| defer gc.dependencyGraphBuilder.graphChanges.ShutDown() |
| |
| // Start events processing pipeline. |
| gc.eventBroadcaster.StartStructuredLogging(3) |
| gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")}) |
| defer gc.eventBroadcaster.Shutdown() |
| |
| logger := klog.FromContext(ctx) |
| logger.Info("Starting controller", "controller", "garbagecollector") |
| defer logger.Info("Shutting down controller", "controller", "garbagecollector") |
| |
| go gc.dependencyGraphBuilder.Run(ctx) |
| |
| if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool { |
| return gc.dependencyGraphBuilder.IsSynced(logger) |
| }) { |
| return |
| } |
| |
| logger.Info("All resource monitors have synced. Proceeding to collect garbage") |
| |
| // gc workers |
| for i := 0; i < workers; i++ { |
| go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second) |
| go wait.Until(func() { gc.runAttemptToOrphanWorker(logger) }, 1*time.Second, ctx.Done()) |
| } |
| |
| <-ctx.Done() |
| } |
| |
| // Sync periodically resyncs the garbage collector when new resources are |
| // observed from discovery. When new resources are detected, Sync will stop all |
| // GC workers, reset gc.restMapper, and resync the monitors. |
| // |
| // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise |
| // the mapper's underlying discovery client will be unnecessarily reset during |
| // the course of detecting new resources. |
| func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.ServerResourcesInterface, period time.Duration) { |
| oldResources := make(map[schema.GroupVersionResource]struct{}) |
| wait.UntilWithContext(ctx, func(ctx context.Context) { |
| logger := klog.FromContext(ctx) |
| |
| // Get the current resource list from discovery. |
| newResources, err := GetDeletableResources(logger, discoveryClient) |
| |
| if len(newResources) == 0 { |
| logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync") |
| metrics.GarbageCollectorResourcesSyncError.Inc() |
| return |
| } |
| if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { |
| // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources |
| for k, v := range oldResources { |
| if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) { |
| newResources[k] = v |
| } |
| } |
| } |
| |
| // Decide whether discovery has reported a change. |
| if reflect.DeepEqual(oldResources, newResources) { |
| logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync") |
| return |
| } |
| |
| // Ensure workers are paused to avoid processing events before informers |
| // have resynced. |
| gc.workerLock.Lock() |
| defer gc.workerLock.Unlock() |
| |
| // Once we get here, we should not unpause workers until we've successfully synced |
| attempt := 0 |
| wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) { |
| attempt++ |
| |
| // On a reattempt, check if available resources have changed |
| if attempt > 1 { |
| newResources, err = GetDeletableResources(logger, discoveryClient) |
| |
| if len(newResources) == 0 { |
| logger.V(2).Info("no resources reported by discovery", "attempt", attempt) |
| metrics.GarbageCollectorResourcesSyncError.Inc() |
| return false, nil |
| } |
| if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure { |
| // In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources |
| for k, v := range oldResources { |
| if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) { |
| newResources[k] = v |
| } |
| } |
| } |
| } |
| |
| logger.V(2).Info( |
| "syncing garbage collector with updated resources from discovery", |
| "attempt", attempt, |
| "diff", printDiff(oldResources, newResources), |
| ) |
| |
| // Resetting the REST mapper will also invalidate the underlying discovery |
| // client. This is a leaky abstraction and assumes behavior about the REST |
| // mapper, but we'll deal with it for now. |
| gc.restMapper.Reset() |
| logger.V(4).Info("reset restmapper") |
| |
| // Perform the monitor resync and wait for controllers to report cache sync. |
| // |
| // NOTE: It's possible that newResources will diverge from the resources |
| // discovered by restMapper during the call to Reset, since they are |
| // distinct discovery clients invalidated at different times. For example, |
| // newResources may contain resources not returned in the restMapper's |
| // discovery call if the resources appeared in-between the calls. In that |
| // case, the restMapper will fail to map some of newResources until the next |
| // attempt. |
| if err := gc.resyncMonitors(logger, newResources); err != nil { |
| utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err)) |
| metrics.GarbageCollectorResourcesSyncError.Inc() |
| return false, nil |
| } |
| logger.V(4).Info("resynced monitors") |
| |
| // wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing. |
| // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. |
| // informers keep attempting to sync in the background, so retrying doesn't interrupt them. |
| // the call to resyncMonitors on the reattempt will no-op for resources that still exist. |
| // note that workers stay paused until we successfully resync. |
| if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool { |
| return gc.dependencyGraphBuilder.IsSynced(logger) |
| }) { |
| utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt)) |
| metrics.GarbageCollectorResourcesSyncError.Inc() |
| return false, nil |
| } |
| |
| // success, break out of the loop |
| return true, nil |
| }) |
| |
| // Finally, keep track of our new state. Do this after all preceding steps |
| // have succeeded to ensure we'll retry on subsequent syncs if an error |
| // occurred. |
| oldResources = newResources |
| logger.V(2).Info("synced garbage collector") |
| }, period) |
| } |
| |
| // printDiff returns a human-readable summary of what resources were added and removed |
| func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string { |
| removed := sets.NewString() |
| for oldResource := range oldResources { |
| if _, ok := newResources[oldResource]; !ok { |
| removed.Insert(fmt.Sprintf("%+v", oldResource)) |
| } |
| } |
| added := sets.NewString() |
| for newResource := range newResources { |
| if _, ok := oldResources[newResource]; !ok { |
| added.Insert(fmt.Sprintf("%+v", newResource)) |
| } |
| } |
| return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List()) |
| } |
| |
| // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached |
| func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} { |
| stopChWithTimeout := make(chan struct{}) |
| go func() { |
| select { |
| case <-stopCh: |
| case <-time.After(timeout): |
| } |
| close(stopChWithTimeout) |
| }() |
| return stopChWithTimeout |
| } |
| |
| // IsSynced returns true if dependencyGraphBuilder is synced. |
| func (gc *GarbageCollector) IsSynced(logger klog.Logger) bool { |
| return gc.dependencyGraphBuilder.IsSynced(logger) |
| } |
| |
| func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) { |
| for gc.processAttemptToDeleteWorker(ctx) { |
| } |
| } |
| |
| var enqueuedVirtualDeleteEventErr = goerrors.New("enqueued virtual delete event") |
| |
| var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objects cannot refer to namespaced owners") |
| |
| func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool { |
| item, quit := gc.attemptToDelete.Get() |
| gc.workerLock.RLock() |
| defer gc.workerLock.RUnlock() |
| if quit { |
| return false |
| } |
| defer gc.attemptToDelete.Done(item) |
| |
| action := gc.attemptToDeleteWorker(ctx, item) |
| switch action { |
| case forgetItem: |
| gc.attemptToDelete.Forget(item) |
| case requeueItem: |
| gc.attemptToDelete.AddRateLimited(item) |
| } |
| |
| return true |
| } |
| |
| type workQueueItemAction int |
| |
| const ( |
| requeueItem = iota |
| forgetItem |
| ) |
| |
| func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item interface{}) workQueueItemAction { |
| n, ok := item.(*node) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) |
| return forgetItem |
| } |
| |
| logger := klog.FromContext(ctx) |
| |
| if !n.isObserved() { |
| nodeFromGraph, existsInGraph := gc.dependencyGraphBuilder.uidToNode.Read(n.identity.UID) |
| if !existsInGraph { |
| // this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error, |
| // and in the meantime a deletion of the real object associated with that uid was observed |
| logger.V(5).Info("item no longer in the graph, skipping attemptToDeleteItem", "item", n.identity) |
| return forgetItem |
| } |
| if nodeFromGraph.isObserved() { |
| // this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error, |
| // and in the meantime the real object associated with that uid was observed |
| logger.V(5).Info("item no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", "item", n.identity) |
| return forgetItem |
| } |
| } |
| |
| err := gc.attemptToDeleteItem(ctx, n) |
| if err == enqueuedVirtualDeleteEventErr { |
| // a virtual event was produced and will be handled by processGraphChanges, no need to requeue this node |
| return forgetItem |
| } else if err == namespacedOwnerOfClusterScopedObjectErr { |
| // a cluster-scoped object referring to a namespaced owner is an error that will not resolve on retry, no need to requeue this node |
| return forgetItem |
| } else if err != nil { |
| if _, ok := err.(*restMappingError); ok { |
| // There are at least two ways this can happen: |
| // 1. The reference is to an object of a custom type that has not yet been |
| // recognized by gc.restMapper (this is a transient error). |
| // 2. The reference is to an invalid group/version. We don't currently |
| // have a way to distinguish this from a valid type we will recognize |
| // after the next discovery sync. |
| // For now, record the error and retry. |
| logger.V(5).Error(err, "error syncing item", "item", n.identity) |
| } else { |
| utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err)) |
| } |
| // retry if garbage collection of an object failed. |
| return requeueItem |
| } else if !n.isObserved() { |
| // requeue if item hasn't been observed via an informer event yet. |
| // otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed. |
| // see https://issue.k8s.io/56121 |
| logger.V(5).Info("item hasn't been observed via informer yet", "item", n.identity) |
| return requeueItem |
| } |
| |
| return forgetItem |
| } |
| |
| // isDangling check if a reference is pointing to an object that doesn't exist. |
| // If isDangling looks up the referenced object at the API server, it also |
| // returns its latest state. |
| func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.OwnerReference, item *node) ( |
| dangling bool, owner *metav1.PartialObjectMetadata, err error) { |
| |
| logger := klog.FromContext(ctx) |
| // check for recorded absent cluster-scoped parent |
| absentOwnerCacheKey := objectReference{OwnerReference: ownerReferenceCoordinates(reference)} |
| if gc.absentOwnerCache.Has(absentOwnerCacheKey) { |
| logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist", |
| "item", item.identity, |
| "owner", reference, |
| ) |
| return true, nil, nil |
| } |
| |
| // check for recorded absent namespaced parent |
| absentOwnerCacheKey.Namespace = item.identity.Namespace |
| if gc.absentOwnerCache.Has(absentOwnerCacheKey) { |
| logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist in namespace", |
| "item", item.identity, |
| "owner", reference, |
| ) |
| return true, nil, nil |
| } |
| |
| // TODO: we need to verify the reference resource is supported by the |
| // system. If it's not a valid resource, the garbage collector should i) |
| // ignore the reference when decide if the object should be deleted, and |
| // ii) should update the object to remove such references. This is to |
| // prevent objects having references to an old resource from being |
| // deleted during a cluster upgrade. |
| resource, namespaced, err := gc.apiResource(reference.APIVersion, reference.Kind) |
| if err != nil { |
| return false, nil, err |
| } |
| if !namespaced { |
| absentOwnerCacheKey.Namespace = "" |
| } |
| |
| if len(item.identity.Namespace) == 0 && namespaced { |
| // item is a cluster-scoped object referring to a namespace-scoped owner, which is not valid. |
| // return a marker error, rather than retrying on the lookup failure forever. |
| logger.V(2).Info("item is cluster-scoped, but refers to a namespaced owner", |
| "item", item.identity, |
| "owner", reference, |
| ) |
| return false, nil, namespacedOwnerOfClusterScopedObjectErr |
| } |
| |
| // TODO: It's only necessary to talk to the API server if the owner node |
| // is a "virtual" node. The local graph could lag behind the real |
| // status, but in practice, the difference is small. |
| owner, err = gc.metadataClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.identity.Namespace)).Get(ctx, reference.Name, metav1.GetOptions{}) |
| switch { |
| case errors.IsNotFound(err): |
| gc.absentOwnerCache.Add(absentOwnerCacheKey) |
| logger.V(5).Info("item's owner is not found", |
| "item", item.identity, |
| "owner", reference, |
| ) |
| return true, nil, nil |
| case err != nil: |
| return false, nil, err |
| } |
| |
| if owner.GetUID() != reference.UID { |
| logger.V(5).Info("item's owner is not found, UID mismatch", |
| "item", item.identity, |
| "owner", reference, |
| ) |
| gc.absentOwnerCache.Add(absentOwnerCacheKey) |
| return true, nil, nil |
| } |
| return false, owner, nil |
| } |
| |
| // classify the latestReferences to three categories: |
| // solid: the owner exists, and is not "waitingForDependentsDeletion" |
| // dangling: the owner does not exist |
| // waitingForDependentsDeletion: the owner exists, its deletionTimestamp is non-nil, and it has |
| // FinalizerDeletingDependents |
| // This function communicates with the server. |
| func (gc *GarbageCollector) classifyReferences(ctx context.Context, item *node, latestReferences []metav1.OwnerReference) ( |
| solid, dangling, waitingForDependentsDeletion []metav1.OwnerReference, err error) { |
| for _, reference := range latestReferences { |
| isDangling, owner, err := gc.isDangling(ctx, reference, item) |
| if err != nil { |
| return nil, nil, nil, err |
| } |
| if isDangling { |
| dangling = append(dangling, reference) |
| continue |
| } |
| |
| ownerAccessor, err := meta.Accessor(owner) |
| if err != nil { |
| return nil, nil, nil, err |
| } |
| if ownerAccessor.GetDeletionTimestamp() != nil && hasDeleteDependentsFinalizer(ownerAccessor) { |
| waitingForDependentsDeletion = append(waitingForDependentsDeletion, reference) |
| } else { |
| solid = append(solid, reference) |
| } |
| } |
| return solid, dangling, waitingForDependentsDeletion, nil |
| } |
| |
| func ownerRefsToUIDs(refs []metav1.OwnerReference) []types.UID { |
| var ret []types.UID |
| for _, ref := range refs { |
| ret = append(ret, ref.UID) |
| } |
| return ret |
| } |
| |
| // attemptToDeleteItem looks up the live API object associated with the node, |
| // and issues a delete IFF the uid matches, the item is not blocked on deleting dependents, |
| // and all owner references are dangling. |
| // |
| // if the API get request returns a NotFound error, or the retrieved item's uid does not match, |
| // a virtual delete event for the node is enqueued and enqueuedVirtualDeleteEventErr is returned. |
| func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node) error { |
| logger := klog.FromContext(ctx) |
| |
| logger.V(2).Info("Processing item", |
| "item", item.identity, |
| "virtual", !item.isObserved(), |
| ) |
| |
| // "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents. |
| if item.isBeingDeleted() && !item.isDeletingDependents() { |
| logger.V(5).Info("processing item returned at once, because its DeletionTimestamp is non-nil", |
| "item", item.identity, |
| ) |
| return nil |
| } |
| // TODO: It's only necessary to talk to the API server if this is a |
| // "virtual" node. The local graph could lag behind the real status, but in |
| // practice, the difference is small. |
| latest, err := gc.getObject(item.identity) |
| switch { |
| case errors.IsNotFound(err): |
| // the GraphBuilder can add "virtual" node for an owner that doesn't |
| // exist yet, so we need to enqueue a virtual Delete event to remove |
| // the virtual node from GraphBuilder.uidToNode. |
| logger.V(5).Info("item not found, generating a virtual delete event", |
| "item", item.identity, |
| ) |
| gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity) |
| return enqueuedVirtualDeleteEventErr |
| case err != nil: |
| return err |
| } |
| |
| if latest.GetUID() != item.identity.UID { |
| logger.V(5).Info("UID doesn't match, item not found, generating a virtual delete event", |
| "item", item.identity, |
| ) |
| gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity) |
| return enqueuedVirtualDeleteEventErr |
| } |
| |
| // TODO: attemptToOrphanWorker() routine is similar. Consider merging |
| // attemptToOrphanWorker() into attemptToDeleteItem() as well. |
| if item.isDeletingDependents() { |
| return gc.processDeletingDependentsItem(logger, item) |
| } |
| |
| // compute if we should delete the item |
| ownerReferences := latest.GetOwnerReferences() |
| if len(ownerReferences) == 0 { |
| logger.V(2).Info("item doesn't have an owner, continue on next item", |
| "item", item.identity, |
| ) |
| return nil |
| } |
| |
| solid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(ctx, item, ownerReferences) |
| if err != nil { |
| return err |
| } |
| logger.V(5).Info("classify item's references", |
| "item", item.identity, |
| "solid", solid, |
| "dangling", dangling, |
| "waitingForDependentsDeletion", waitingForDependentsDeletion, |
| ) |
| |
| switch { |
| case len(solid) != 0: |
| logger.V(2).Info("item has at least one existing owner, will not garbage collect", |
| "item", item.identity, |
| "owner", solid, |
| ) |
| if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 { |
| return nil |
| } |
| logger.V(2).Info("remove dangling references and waiting references for item", |
| "item", item.identity, |
| "dangling", dangling, |
| "waitingForDependentsDeletion", waitingForDependentsDeletion, |
| ) |
| // waitingForDependentsDeletion needs to be deleted from the |
| // ownerReferences, otherwise the referenced objects will be stuck with |
| // the FinalizerDeletingDependents and never get deleted. |
| ownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...) |
| p, err := c.GenerateDeleteOwnerRefStrategicMergeBytes(item.identity.UID, ownerUIDs) |
| if err != nil { |
| return err |
| } |
| _, err = gc.patch(item, p, func(n *node) ([]byte, error) { |
| return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...) |
| }) |
| return err |
| case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0: |
| deps := item.getDependents() |
| for _, dep := range deps { |
| if dep.isDeletingDependents() { |
| // this circle detection has false positives, we need to |
| // apply a more rigorous detection if this turns out to be a |
| // problem. |
| // there are multiple workers run attemptToDeleteItem in |
| // parallel, the circle detection can fail in a race condition. |
| logger.V(2).Info("processing item, some of its owners and its dependent have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the item is going to be deleted with Foreground", |
| "item", item.identity, |
| "dependent", dep.identity, |
| ) |
| patch, err := item.unblockOwnerReferencesStrategicMergePatch() |
| if err != nil { |
| return err |
| } |
| if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil { |
| return err |
| } |
| break |
| } |
| } |
| logger.V(2).Info("at least one owner of item has FinalizerDeletingDependents, and the item itself has dependents, so it is going to be deleted in Foreground", |
| "item", item.identity, |
| ) |
| // the deletion event will be observed by the graphBuilder, so the item |
| // will be processed again in processDeletingDependentsItem. If it |
| // doesn't have dependents, the function will remove the |
| // FinalizerDeletingDependents from the item, resulting in the final |
| // deletion of the item. |
| policy := metav1.DeletePropagationForeground |
| return gc.deleteObject(item.identity, &policy) |
| default: |
| // item doesn't have any solid owner, so it needs to be garbage |
| // collected. Also, none of item's owners is waiting for the deletion of |
| // the dependents, so set propagationPolicy based on existing finalizers. |
| var policy metav1.DeletionPropagation |
| switch { |
| case hasOrphanFinalizer(latest): |
| // if an existing orphan finalizer is already on the object, honor it. |
| policy = metav1.DeletePropagationOrphan |
| case hasDeleteDependentsFinalizer(latest): |
| // if an existing foreground finalizer is already on the object, honor it. |
| policy = metav1.DeletePropagationForeground |
| default: |
| // otherwise, default to background. |
| policy = metav1.DeletePropagationBackground |
| } |
| logger.V(2).Info("Deleting item", |
| "item", item.identity, |
| "propagationPolicy", policy, |
| ) |
| return gc.deleteObject(item.identity, &policy) |
| } |
| } |
| |
| // process item that's waiting for its dependents to be deleted |
| func (gc *GarbageCollector) processDeletingDependentsItem(logger klog.Logger, item *node) error { |
| blockingDependents := item.blockingDependents() |
| if len(blockingDependents) == 0 { |
| logger.V(2).Info("remove DeleteDependents finalizer for item", "item", item.identity) |
| return gc.removeFinalizer(logger, item, metav1.FinalizerDeleteDependents) |
| } |
| for _, dep := range blockingDependents { |
| if !dep.isDeletingDependents() { |
| logger.V(2).Info("adding dependent to attemptToDelete, because its owner is deletingDependents", |
| "item", item.identity, |
| "dependent", dep.identity, |
| ) |
| gc.attemptToDelete.Add(dep) |
| } |
| } |
| return nil |
| } |
| |
| // dependents are copies of pointers to the owner's dependents, they don't need to be locked. |
| func (gc *GarbageCollector) orphanDependents(logger klog.Logger, owner objectReference, dependents []*node) error { |
| errCh := make(chan error, len(dependents)) |
| wg := sync.WaitGroup{} |
| wg.Add(len(dependents)) |
| for i := range dependents { |
| go func(dependent *node) { |
| defer wg.Done() |
| // the dependent.identity.UID is used as precondition |
| p, err := c.GenerateDeleteOwnerRefStrategicMergeBytes(dependent.identity.UID, []types.UID{owner.UID}) |
| if err != nil { |
| errCh <- fmt.Errorf("orphaning %s failed, %v", dependent.identity, err) |
| return |
| } |
| _, err = gc.patch(dependent, p, func(n *node) ([]byte, error) { |
| return gc.deleteOwnerRefJSONMergePatch(n, owner.UID) |
| }) |
| // note that if the target ownerReference doesn't exist in the |
| // dependent, strategic merge patch will NOT return an error. |
| if err != nil && !errors.IsNotFound(err) { |
| errCh <- fmt.Errorf("orphaning %s failed, %v", dependent.identity, err) |
| } |
| }(dependents[i]) |
| } |
| wg.Wait() |
| close(errCh) |
| |
| var errorsSlice []error |
| for e := range errCh { |
| errorsSlice = append(errorsSlice, e) |
| } |
| |
| if len(errorsSlice) != 0 { |
| return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error()) |
| } |
| logger.V(5).Info("successfully updated all dependents", "owner", owner) |
| return nil |
| } |
| |
| func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) { |
| for gc.processAttemptToOrphanWorker(logger) { |
| } |
| } |
| |
| // processAttemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its |
| // dependents based on the graph maintained by the GC, then removes it from the |
| // OwnerReferences of its dependents, and finally updates the owner to remove |
| // the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of |
| // these steps fail. |
| func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool { |
| item, quit := gc.attemptToOrphan.Get() |
| gc.workerLock.RLock() |
| defer gc.workerLock.RUnlock() |
| if quit { |
| return false |
| } |
| defer gc.attemptToOrphan.Done(item) |
| |
| action := gc.attemptToOrphanWorker(logger, item) |
| switch action { |
| case forgetItem: |
| gc.attemptToOrphan.Forget(item) |
| case requeueItem: |
| gc.attemptToOrphan.AddRateLimited(item) |
| } |
| |
| return true |
| } |
| |
| func (gc *GarbageCollector) attemptToOrphanWorker(logger klog.Logger, item interface{}) workQueueItemAction { |
| owner, ok := item.(*node) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) |
| return forgetItem |
| } |
| // we don't need to lock each element, because they never get updated |
| owner.dependentsLock.RLock() |
| dependents := make([]*node, 0, len(owner.dependents)) |
| for dependent := range owner.dependents { |
| dependents = append(dependents, dependent) |
| } |
| owner.dependentsLock.RUnlock() |
| |
| err := gc.orphanDependents(logger, owner.identity, dependents) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err)) |
| return requeueItem |
| } |
| // update the owner, remove "orphaningFinalizer" from its finalizers list |
| err = gc.removeFinalizer(logger, owner, metav1.FinalizerOrphanDependents) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err)) |
| return requeueItem |
| } |
| return forgetItem |
| } |
| |
| // *FOR TEST USE ONLY* |
| // GraphHasUID returns if the GraphBuilder has a particular UID store in its |
| // uidToNode graph. It's useful for debugging. |
| // This method is used by integration tests. |
| func (gc *GarbageCollector) GraphHasUID(u types.UID) bool { |
| _, ok := gc.dependencyGraphBuilder.uidToNode.Read(u) |
| return ok |
| } |
| |
| // GetDeletableResources returns all resources from discoveryClient that the |
| // garbage collector should recognize and work with. More specifically, all |
| // preferred resources which support the 'delete', 'list', and 'watch' verbs. |
| // |
| // If an error was encountered fetching resources from the server, |
| // it is included as well, along with any resources that were successfully resolved. |
| // |
| // All discovery errors are considered temporary. Upon encountering any error, |
| // GetDeletableResources will log and return any discovered resources it was |
| // able to process (which may be none). |
| func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) (map[schema.GroupVersionResource]struct{}, error) { |
| preferredResources, lookupErr := discoveryClient.ServerPreferredResources() |
| if lookupErr != nil { |
| if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(lookupErr); isLookupFailure { |
| logger.Info("failed to discover some groups", "groups", groupLookupFailures) |
| } else { |
| logger.Info("failed to discover preferred resources", "error", lookupErr) |
| } |
| } |
| if preferredResources == nil { |
| return map[schema.GroupVersionResource]struct{}{}, lookupErr |
| } |
| |
| // This is extracted from discovery.GroupVersionResources to allow tolerating |
| // failures on a per-resource basis. |
| deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete", "list", "watch"}}, preferredResources) |
| deletableGroupVersionResources := map[schema.GroupVersionResource]struct{}{} |
| for _, rl := range deletableResources { |
| gv, err := schema.ParseGroupVersion(rl.GroupVersion) |
| if err != nil { |
| logger.Info("ignoring invalid discovered resource", "groupversion", rl.GroupVersion, "error", err) |
| continue |
| } |
| for i := range rl.APIResources { |
| deletableGroupVersionResources[schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}] = struct{}{} |
| } |
| } |
| |
| return deletableGroupVersionResources, lookupErr |
| } |
| |
| func (gc *GarbageCollector) Name() string { |
| return "garbagecollector" |
| } |
| |
| // GetDependencyGraphBuilder return graph builder which is particularly helpful for testing where controllerContext is not available |
| func (gc *GarbageCollector) GetDependencyGraphBuilder() *GraphBuilder { |
| return gc.dependencyGraphBuilder |
| } |