| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Package deployment contains all the logic for handling Kubernetes Deployments. |
| // It implements a set of strategies (rolling, recreate) for deploying an application, |
| // the means to rollback to previous versions, proportional scaling for mitigating |
| // risk, cleanup policy, and other useful features of Deployments. |
| package deployment |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "time" |
| |
| apps "k8s.io/api/apps/v1" |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/types" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| appsinformers "k8s.io/client-go/informers/apps/v1" |
| coreinformers "k8s.io/client-go/informers/core/v1" |
| clientset "k8s.io/client-go/kubernetes" |
| "k8s.io/client-go/kubernetes/scheme" |
| v1core "k8s.io/client-go/kubernetes/typed/core/v1" |
| appslisters "k8s.io/client-go/listers/apps/v1" |
| corelisters "k8s.io/client-go/listers/core/v1" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/tools/record" |
| "k8s.io/client-go/util/workqueue" |
| "k8s.io/klog/v2" |
| "k8s.io/kubernetes/pkg/controller" |
| "k8s.io/kubernetes/pkg/controller/deployment/util" |
| ) |
| |
| const ( |
| // maxRetries is the number of times a deployment will be retried before it is dropped out of the queue. |
| // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times |
| // a deployment is going to be requeued: |
| // |
| // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s |
| maxRetries = 15 |
| ) |
| |
| // controllerKind contains the schema.GroupVersionKind for this controller type. |
| var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment") |
| |
| // DeploymentController is responsible for synchronizing Deployment objects stored |
| // in the system with actual running replica sets and pods. |
| type DeploymentController struct { |
| // rsControl is used for adopting/releasing replica sets. |
| rsControl controller.RSControlInterface |
| client clientset.Interface |
| |
| eventBroadcaster record.EventBroadcaster |
| eventRecorder record.EventRecorder |
| |
| // To allow injection of syncDeployment for testing. |
| syncHandler func(ctx context.Context, dKey string) error |
| // used for unit testing |
| enqueueDeployment func(deployment *apps.Deployment) |
| |
| // dLister can list/get deployments from the shared informer's store |
| dLister appslisters.DeploymentLister |
| // rsLister can list/get replica sets from the shared informer's store |
| rsLister appslisters.ReplicaSetLister |
| // podLister can list/get pods from the shared informer's store |
| podLister corelisters.PodLister |
| |
| // dListerSynced returns true if the Deployment store has been synced at least once. |
| // Added as a member to the struct to allow injection for testing. |
| dListerSynced cache.InformerSynced |
| // rsListerSynced returns true if the ReplicaSet store has been synced at least once. |
| // Added as a member to the struct to allow injection for testing. |
| rsListerSynced cache.InformerSynced |
| // podListerSynced returns true if the pod store has been synced at least once. |
| // Added as a member to the struct to allow injection for testing. |
| podListerSynced cache.InformerSynced |
| |
| // Deployments that need to be synced |
| queue workqueue.RateLimitingInterface |
| } |
| |
| // NewDeploymentController creates a new DeploymentController. |
| func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { |
| eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) |
| logger := klog.FromContext(ctx) |
| dc := &DeploymentController{ |
| client: client, |
| eventBroadcaster: eventBroadcaster, |
| eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}), |
| queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), |
| } |
| dc.rsControl = controller.RealRSControl{ |
| KubeClient: client, |
| Recorder: dc.eventRecorder, |
| } |
| |
| dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: func(obj interface{}) { |
| dc.addDeployment(logger, obj) |
| }, |
| UpdateFunc: func(oldObj, newObj interface{}) { |
| dc.updateDeployment(logger, oldObj, newObj) |
| }, |
| // This will enter the sync loop and no-op, because the deployment has been deleted from the store. |
| DeleteFunc: func(obj interface{}) { |
| dc.deleteDeployment(logger, obj) |
| }, |
| }) |
| rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: func(obj interface{}) { |
| dc.addReplicaSet(logger, obj) |
| }, |
| UpdateFunc: func(oldObj, newObj interface{}) { |
| dc.updateReplicaSet(logger, oldObj, newObj) |
| }, |
| DeleteFunc: func(obj interface{}) { |
| dc.deleteReplicaSet(logger, obj) |
| }, |
| }) |
| podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| DeleteFunc: func(obj interface{}) { |
| dc.deletePod(logger, obj) |
| }, |
| }) |
| |
| dc.syncHandler = dc.syncDeployment |
| dc.enqueueDeployment = dc.enqueue |
| |
| dc.dLister = dInformer.Lister() |
| dc.rsLister = rsInformer.Lister() |
| dc.podLister = podInformer.Lister() |
| dc.dListerSynced = dInformer.Informer().HasSynced |
| dc.rsListerSynced = rsInformer.Informer().HasSynced |
| dc.podListerSynced = podInformer.Informer().HasSynced |
| return dc, nil |
| } |
| |
| // Run begins watching and syncing. |
| func (dc *DeploymentController) Run(ctx context.Context, workers int) { |
| defer utilruntime.HandleCrash() |
| |
| // Start events processing pipeline. |
| dc.eventBroadcaster.StartStructuredLogging(3) |
| dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")}) |
| defer dc.eventBroadcaster.Shutdown() |
| |
| defer dc.queue.ShutDown() |
| |
| logger := klog.FromContext(ctx) |
| logger.Info("Starting controller", "controller", "deployment") |
| defer logger.Info("Shutting down controller", "controller", "deployment") |
| |
| if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { |
| return |
| } |
| |
| for i := 0; i < workers; i++ { |
| go wait.UntilWithContext(ctx, dc.worker, time.Second) |
| } |
| |
| <-ctx.Done() |
| } |
| |
| func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) { |
| d := obj.(*apps.Deployment) |
| logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d)) |
| dc.enqueueDeployment(d) |
| } |
| |
| func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) { |
| oldD := old.(*apps.Deployment) |
| curD := cur.(*apps.Deployment) |
| logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD)) |
| dc.enqueueDeployment(curD) |
| } |
| |
| func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) { |
| d, ok := obj.(*apps.Deployment) |
| if !ok { |
| tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) |
| return |
| } |
| d, ok = tombstone.Obj.(*apps.Deployment) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj)) |
| return |
| } |
| } |
| logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d)) |
| dc.enqueueDeployment(d) |
| } |
| |
| // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created. |
| func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) { |
| rs := obj.(*apps.ReplicaSet) |
| |
| if rs.DeletionTimestamp != nil { |
| // On a restart of the controller manager, it's possible for an object to |
| // show up in a state that is already pending deletion. |
| dc.deleteReplicaSet(logger, rs) |
| return |
| } |
| // If it has a ControllerRef, that's all that matters. |
| if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil { |
| d := dc.resolveControllerRef(rs.Namespace, controllerRef) |
| if d == nil { |
| return |
| } |
| logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs)) |
| dc.enqueueDeployment(d) |
| return |
| } |
| |
| // Otherwise, it's an orphan. Get a list of all matching Deployments and sync |
| // them to see if anyone wants to adopt it. |
| ds := dc.getDeploymentsForReplicaSet(logger, rs) |
| if len(ds) == 0 { |
| return |
| } |
| logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs)) |
| for _, d := range ds { |
| dc.enqueueDeployment(d) |
| } |
| } |
| |
| // getDeploymentsForReplicaSet returns a list of Deployments that potentially |
| // match a ReplicaSet. |
| func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment { |
| deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs) |
| if err != nil || len(deployments) == 0 { |
| return nil |
| } |
| // Because all ReplicaSet's belonging to a deployment should have a unique label key, |
| // there should never be more than one deployment returned by the above method. |
| // If that happens we should probably dynamically repair the situation by ultimately |
| // trying to clean up one of the controllers, for now we just return the older one |
| if len(deployments) > 1 { |
| // ControllerRef will ensure we don't do anything crazy, but more than one |
| // item in this list nevertheless constitutes user error. |
| logger.V(4).Info("user error! more than one deployment is selecting replica set", |
| "replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0])) |
| } |
| return deployments |
| } |
| |
| // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet |
| // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to |
| // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet |
| // types. |
| func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) { |
| curRS := cur.(*apps.ReplicaSet) |
| oldRS := old.(*apps.ReplicaSet) |
| if curRS.ResourceVersion == oldRS.ResourceVersion { |
| // Periodic resync will send update events for all known replica sets. |
| // Two different versions of the same replica set will always have different RVs. |
| return |
| } |
| |
| curControllerRef := metav1.GetControllerOf(curRS) |
| oldControllerRef := metav1.GetControllerOf(oldRS) |
| controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) |
| if controllerRefChanged && oldControllerRef != nil { |
| // The ControllerRef was changed. Sync the old controller, if any. |
| if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil { |
| dc.enqueueDeployment(d) |
| } |
| } |
| // If it has a ControllerRef, that's all that matters. |
| if curControllerRef != nil { |
| d := dc.resolveControllerRef(curRS.Namespace, curControllerRef) |
| if d == nil { |
| return |
| } |
| logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS)) |
| dc.enqueueDeployment(d) |
| return |
| } |
| |
| // Otherwise, it's an orphan. If anything changed, sync matching controllers |
| // to see if anyone wants to adopt it now. |
| labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels) |
| if labelChanged || controllerRefChanged { |
| ds := dc.getDeploymentsForReplicaSet(logger, curRS) |
| if len(ds) == 0 { |
| return |
| } |
| logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS)) |
| for _, d := range ds { |
| dc.enqueueDeployment(d) |
| } |
| } |
| } |
| |
| // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when |
| // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or |
| // a DeletionFinalStateUnknown marker item. |
| func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) { |
| rs, ok := obj.(*apps.ReplicaSet) |
| |
| // When a delete is dropped, the relist will notice a pod in the store not |
| // in the list, leading to the insertion of a tombstone object which contains |
| // the deleted key/value. Note that this value might be stale. If the ReplicaSet |
| // changed labels the new deployment will not be woken up till the periodic resync. |
| if !ok { |
| tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) |
| return |
| } |
| rs, ok = tombstone.Obj.(*apps.ReplicaSet) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj)) |
| return |
| } |
| } |
| |
| controllerRef := metav1.GetControllerOf(rs) |
| if controllerRef == nil { |
| // No controller should care about orphans being deleted. |
| return |
| } |
| d := dc.resolveControllerRef(rs.Namespace, controllerRef) |
| if d == nil { |
| return |
| } |
| logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs)) |
| dc.enqueueDeployment(d) |
| } |
| |
| // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running. |
| func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) { |
| pod, ok := obj.(*v1.Pod) |
| |
| // When a delete is dropped, the relist will notice a pod in the store not |
| // in the list, leading to the insertion of a tombstone object which contains |
| // the deleted key/value. Note that this value might be stale. If the Pod |
| // changed labels the new deployment will not be woken up till the periodic resync. |
| if !ok { |
| tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) |
| return |
| } |
| pod, ok = tombstone.Obj.(*v1.Pod) |
| if !ok { |
| utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) |
| return |
| } |
| } |
| d := dc.getDeploymentForPod(logger, pod) |
| if d == nil { |
| return |
| } |
| logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod)) |
| if d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType { |
| // Sync if this Deployment now has no more Pods. |
| rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1())) |
| if err != nil { |
| return |
| } |
| podMap, err := dc.getPodMapForDeployment(d, rsList) |
| if err != nil { |
| return |
| } |
| numPods := 0 |
| for _, podList := range podMap { |
| numPods += len(podList) |
| } |
| if numPods == 0 { |
| dc.enqueueDeployment(d) |
| } |
| } |
| } |
| |
| func (dc *DeploymentController) enqueue(deployment *apps.Deployment) { |
| key, err := controller.KeyFunc(deployment) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err)) |
| return |
| } |
| |
| dc.queue.Add(key) |
| } |
| |
| func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) { |
| key, err := controller.KeyFunc(deployment) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err)) |
| return |
| } |
| |
| dc.queue.AddRateLimited(key) |
| } |
| |
| // enqueueAfter will enqueue a deployment after the provided amount of time. |
| func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) { |
| key, err := controller.KeyFunc(deployment) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err)) |
| return |
| } |
| |
| dc.queue.AddAfter(key, after) |
| } |
| |
| // getDeploymentForPod returns the deployment managing the given Pod. |
| func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment { |
| // Find the owning replica set |
| var rs *apps.ReplicaSet |
| var err error |
| controllerRef := metav1.GetControllerOf(pod) |
| if controllerRef == nil { |
| // No controller owns this Pod. |
| return nil |
| } |
| if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind { |
| // Not a pod owned by a replica set. |
| return nil |
| } |
| rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) |
| if err != nil || rs.UID != controllerRef.UID { |
| logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err) |
| return nil |
| } |
| |
| // Now find the Deployment that owns that ReplicaSet. |
| controllerRef = metav1.GetControllerOf(rs) |
| if controllerRef == nil { |
| return nil |
| } |
| return dc.resolveControllerRef(rs.Namespace, controllerRef) |
| } |
| |
| // resolveControllerRef returns the controller referenced by a ControllerRef, |
| // or nil if the ControllerRef could not be resolved to a matching controller |
| // of the correct Kind. |
| func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment { |
| // We can't look up by UID, so look up by Name and then verify UID. |
| // Don't even try to look up by Name if it's the wrong Kind. |
| if controllerRef.Kind != controllerKind.Kind { |
| return nil |
| } |
| d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name) |
| if err != nil { |
| return nil |
| } |
| if d.UID != controllerRef.UID { |
| // The controller we found with this Name is not the same one that the |
| // ControllerRef points to. |
| return nil |
| } |
| return d |
| } |
| |
| // worker runs a worker thread that just dequeues items, processes them, and marks them done. |
| // It enforces that the syncHandler is never invoked concurrently with the same key. |
| func (dc *DeploymentController) worker(ctx context.Context) { |
| for dc.processNextWorkItem(ctx) { |
| } |
| } |
| |
| func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool { |
| key, quit := dc.queue.Get() |
| if quit { |
| return false |
| } |
| defer dc.queue.Done(key) |
| |
| err := dc.syncHandler(ctx, key.(string)) |
| dc.handleErr(ctx, err, key) |
| |
| return true |
| } |
| |
| func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) { |
| logger := klog.FromContext(ctx) |
| if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { |
| dc.queue.Forget(key) |
| return |
| } |
| ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) |
| if keyErr != nil { |
| logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key) |
| } |
| |
| if dc.queue.NumRequeues(key) < maxRetries { |
| logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err) |
| dc.queue.AddRateLimited(key) |
| return |
| } |
| |
| utilruntime.HandleError(err) |
| logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err) |
| dc.queue.Forget(key) |
| } |
| |
| // getReplicaSetsForDeployment uses ControllerRefManager to reconcile |
| // ControllerRef by adopting and orphaning. |
| // It returns the list of ReplicaSets that this Deployment should manage. |
| func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) { |
| // List all ReplicaSets to find those we own but that no longer match our |
| // selector. They will be orphaned by ClaimReplicaSets(). |
| rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything()) |
| if err != nil { |
| return nil, err |
| } |
| deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) |
| if err != nil { |
| return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) |
| } |
| // If any adoptions are attempted, we should first recheck for deletion with |
| // an uncached quorum read sometime after listing ReplicaSets (see #42639). |
| canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { |
| fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{}) |
| if err != nil { |
| return nil, err |
| } |
| if fresh.UID != d.UID { |
| return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID) |
| } |
| return fresh, nil |
| }) |
| cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc) |
| return cm.ClaimReplicaSets(ctx, rsList) |
| } |
| |
| // getPodMapForDeployment returns the Pods managed by a Deployment. |
| // |
| // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS, |
| // according to the Pod's ControllerRef. |
| // NOTE: The pod pointers returned by this method point the pod objects in the cache and thus |
| // shouldn't be modified in any way. |
| func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) { |
| // Get all Pods that potentially belong to this Deployment. |
| selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) |
| if err != nil { |
| return nil, err |
| } |
| pods, err := dc.podLister.Pods(d.Namespace).List(selector) |
| if err != nil { |
| return nil, err |
| } |
| // Group Pods by their controller (if it's in rsList). |
| podMap := make(map[types.UID][]*v1.Pod, len(rsList)) |
| for _, rs := range rsList { |
| podMap[rs.UID] = []*v1.Pod{} |
| } |
| for _, pod := range pods { |
| // Do not ignore inactive Pods because Recreate Deployments need to verify that no |
| // Pods from older versions are running before spinning up new Pods. |
| controllerRef := metav1.GetControllerOf(pod) |
| if controllerRef == nil { |
| continue |
| } |
| // Only append if we care about this UID. |
| if _, ok := podMap[controllerRef.UID]; ok { |
| podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod) |
| } |
| } |
| return podMap, nil |
| } |
| |
| // syncDeployment will sync the deployment with the given key. |
| // This function is not meant to be invoked concurrently with the same key. |
| func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error { |
| logger := klog.FromContext(ctx) |
| namespace, name, err := cache.SplitMetaNamespaceKey(key) |
| if err != nil { |
| logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key) |
| return err |
| } |
| |
| startTime := time.Now() |
| logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime) |
| defer func() { |
| logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime)) |
| }() |
| |
| deployment, err := dc.dLister.Deployments(namespace).Get(name) |
| if errors.IsNotFound(err) { |
| logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) |
| return nil |
| } |
| if err != nil { |
| return err |
| } |
| |
| // Deep-copy otherwise we are mutating our cache. |
| // TODO: Deep-copy only when needed. |
| d := deployment.DeepCopy() |
| |
| everything := metav1.LabelSelector{} |
| if reflect.DeepEqual(d.Spec.Selector, &everything) { |
| dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") |
| if d.Status.ObservedGeneration < d.Generation { |
| d.Status.ObservedGeneration = d.Generation |
| dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) |
| } |
| return nil |
| } |
| |
| // List ReplicaSets owned by this Deployment, while reconciling ControllerRef |
| // through adoption/orphaning. |
| rsList, err := dc.getReplicaSetsForDeployment(ctx, d) |
| if err != nil { |
| return err |
| } |
| // List all Pods owned by this Deployment, grouped by their ReplicaSet. |
| // Current uses of the podMap are: |
| // |
| // * check if a Pod is labeled correctly with the pod-template-hash label. |
| // * check that no old Pods are running in the middle of Recreate Deployments. |
| podMap, err := dc.getPodMapForDeployment(d, rsList) |
| if err != nil { |
| return err |
| } |
| |
| if d.DeletionTimestamp != nil { |
| return dc.syncStatusOnly(ctx, d, rsList) |
| } |
| |
| // Update deployment conditions with an Unknown condition when pausing/resuming |
| // a deployment. In this way, we can be sure that we won't timeout when a user |
| // resumes a Deployment with a set progressDeadlineSeconds. |
| if err = dc.checkPausedConditions(ctx, d); err != nil { |
| return err |
| } |
| |
| if d.Spec.Paused { |
| return dc.sync(ctx, d, rsList) |
| } |
| |
| // rollback is not re-entrant in case the underlying replica sets are updated with a new |
| // revision so we should ensure that we won't proceed to update replica sets until we |
| // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. |
| if getRollbackTo(d) != nil { |
| return dc.rollback(ctx, d, rsList) |
| } |
| |
| scalingEvent, err := dc.isScalingEvent(ctx, d, rsList) |
| if err != nil { |
| return err |
| } |
| if scalingEvent { |
| return dc.sync(ctx, d, rsList) |
| } |
| |
| switch d.Spec.Strategy.Type { |
| case apps.RecreateDeploymentStrategyType: |
| return dc.rolloutRecreate(ctx, d, rsList, podMap) |
| case apps.RollingUpdateDeploymentStrategyType: |
| return dc.rolloutRolling(ctx, d, rsList) |
| } |
| return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) |
| } |