| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package nestedpendingoperations |
| |
| import ( |
| "fmt" |
| "testing" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" |
| volumetypes "k8s.io/kubernetes/pkg/volume/util/types" |
| ) |
| |
| const ( |
| // testTimeout is a timeout of goroutines to finish. This _should_ be just a |
| // "context switch" and it should take several ms, however, Clayton says "We |
| // have had flakes due to tests that assumed that 15s is long enough to sleep") |
| testTimeout time.Duration = 1 * time.Minute |
| |
| // initialOperationWaitTimeShort is the initial amount of time the test will |
| // wait for an operation to complete (each successive failure results in |
| // exponential backoff). |
| initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond |
| |
| // initialOperationWaitTimeLong is the initial amount of time the test will |
| // wait for an operation to complete (each successive failure results in |
| // exponential backoff). |
| initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond |
| ) |
| |
| func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| |
| // Act |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) |
| |
| // Assert |
| if err != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volume1Name := v1.UniqueVolumeName("volume1-name") |
| volume2Name := v1.UniqueVolumeName("volume2-name") |
| |
| // Act |
| err1 := grm.Run(volume1Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) |
| err2 := grm.Run(volume2Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) |
| |
| // Assert |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1) |
| } |
| |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1PodName := volumetypes.UniquePodName("operation1-podname") |
| operation2PodName := volumetypes.UniquePodName("operation2-podname") |
| |
| // Act |
| err1 := grm.Run(volumeName, operation1PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) |
| err2 := grm.Run(volumeName, operation2PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) |
| |
| // Assert |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1) |
| } |
| |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| |
| // Act |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) |
| |
| // Assert |
| if err != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateCallbackFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| <-operation1DoneCh // Force operation1 to complete |
| |
| // Act |
| err2 := retryWithExponentialBackOff( |
| time.Duration(initialOperationWaitTimeShort), |
| func() (bool, error) { |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| if err != nil { |
| t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) |
| return false, nil |
| } |
| return true, nil |
| }, |
| ) |
| |
| // Assert |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateCallbackFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| <-operation1DoneCh // Force operation1 to complete |
| |
| // Act |
| err2 := retryWithExponentialBackOff( |
| time.Duration(initialOperationWaitTimeShort), |
| func() (bool, error) { |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| if err != nil { |
| t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) |
| return false, nil |
| } |
| return true, nil |
| }, |
| ) |
| |
| // Assert |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1 := panicFunc |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := retryWithExponentialBackOff( |
| time.Duration(initialOperationWaitTimeShort), |
| func() (bool, error) { |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| if err != nil { |
| t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) |
| return false, nil |
| } |
| return true, nil |
| }, |
| ) |
| |
| // Assert |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1 := panicFunc |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := retryWithExponentialBackOff( |
| time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff |
| func() (bool, error) { |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| if err != nil { |
| t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) |
| return false, nil |
| } |
| return true, nil |
| }, |
| ) |
| |
| // Assert |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) |
| } |
| if !IsAlreadyExists(err2) { |
| t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| op1Name := "mount_volume" |
| operation1 := errorFunc |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| // Shorter than exponential backoff period, so as to trigger exponential backoff error on second |
| // operation. |
| operation2 := errorFunc |
| err2 := retryWithExponentialBackOff( |
| initialOperationWaitTimeShort, |
| func() (bool, error) { |
| err := grm.Run(volumeName, |
| EmptyUniquePodName, |
| EmptyNodeName, |
| volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) |
| |
| if exponentialbackoff.IsExponentialBackoff(err) { |
| return true, nil |
| } |
| return false, nil |
| }, |
| ) |
| |
| // Assert |
| if err2 != nil { |
| t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) |
| } |
| |
| operation3 := noopFunc |
| op3Name := "unmount_volume" |
| // Act |
| err3 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) |
| if err3 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected <no error> Actual: <%v>", err3) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operationPodName := volumetypes.UniquePodName("operation-podname") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) |
| } |
| if !IsAlreadyExists(err2) { |
| t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operationPodName := volumetypes.UniquePodName("operation-podname") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) |
| } |
| if !IsAlreadyExists(err2) { |
| t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) |
| } |
| if !IsAlreadyExists(err2) { |
| t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| operation3 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) |
| } |
| if !IsAlreadyExists(err2) { |
| t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| |
| // Act |
| operation1DoneCh <- true // Force operation1 to complete |
| err3 := retryWithExponentialBackOff( |
| time.Duration(initialOperationWaitTimeShort), |
| func() (bool, error) { |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3}) |
| if err != nil { |
| t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) |
| return false, nil |
| } |
| return true, nil |
| }, |
| ) |
| |
| // Assert |
| if err3 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| operation3 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName) |
| } |
| if !IsAlreadyExists(err2) { |
| t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| |
| // Act |
| operation1DoneCh <- true // Force operation1 to complete |
| err3 := retryWithExponentialBackOff( |
| time.Duration(initialOperationWaitTimeShort), |
| func() (bool, error) { |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3}) |
| if err != nil { |
| t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) |
| return false, nil |
| } |
| return true, nil |
| }, |
| ) |
| |
| // Assert |
| if err3 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { |
| // Test than Wait() on empty GoRoutineMap always succeeds without blocking |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| |
| // Act |
| waitDoneCh := make(chan interface{}, 1) |
| go func() { |
| grm.Wait() |
| waitDoneCh <- true |
| }() |
| |
| // Assert |
| err := waitChannelWithTimeout(waitDoneCh, testTimeout) |
| if err != nil { |
| t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) { |
| // Test than Wait() on empty GoRoutineMap always succeeds without blocking |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| |
| // Act |
| waitDoneCh := make(chan interface{}, 1) |
| go func() { |
| grm.Wait() |
| waitDoneCh <- true |
| }() |
| |
| // Assert |
| err := waitChannelWithTimeout(waitDoneCh, testTimeout) |
| if err != nil { |
| t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { |
| // Test that Wait() really blocks until the last operation succeeds |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) |
| } |
| |
| // Act |
| waitDoneCh := make(chan interface{}, 1) |
| go func() { |
| grm.Wait() |
| waitDoneCh <- true |
| }() |
| |
| // Finish the operation |
| operation1DoneCh <- true |
| |
| // Assert |
| err = waitChannelWithTimeout(waitDoneCh, testTimeout) |
| if err != nil { |
| t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err) |
| } |
| } |
| |
| func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { |
| // Test that Wait() really blocks until the last operation succeeds |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("volume-name") |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err) |
| } |
| |
| // Act |
| waitDoneCh := make(chan interface{}, 1) |
| go func() { |
| grm.Wait() |
| waitDoneCh <- true |
| }() |
| |
| // Finish the operation |
| operation1DoneCh <- true |
| |
| // Assert |
| err = waitChannelWithTimeout(waitDoneCh, testTimeout) |
| if err != nil { |
| t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err) |
| } |
| } |
| |
| /* Concurrent operations tests */ |
| |
| // "None" means volume, pod, and node names are all empty |
| // "Volume" means volume name is set, but pod name and node name are empty |
| // "Volume Pod" means volume and pod names are set, but the node name is empty |
| // "Volume Node" means volume and node names are set, but the pod name is empty |
| |
| // The same volume, pod, and node names are used (where they are not empty). |
| |
| // Covered cases: |
| // FIRST OP | SECOND OP | RESULT |
| // None | None | Positive |
| // None | Volume | Positive |
| // None | Volume Pod | Positive |
| // None | Volume Node | Positive |
| // Volume | None | Positive |
| // Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above) |
| // Volume | Volume Pod | Negative |
| // Volume | Volume Node | Negative |
| // Volume Pod | None | Positive |
| // Volume Pod | Volume | Negative |
| // Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above) |
| // Volume Node | None | Positive |
| // Volume Node | Volume | Negative |
| // Volume Node | Volume Node | Negative |
| |
| // These cases are not covered because they will never occur within the same |
| // binary, so either result works. |
| // Volume Pod | Volume Node |
| // Volume Node | Volume Pod |
| |
| func Test_NestedPendingOperations_SecondOpBeforeFirstCompletes(t *testing.T) { |
| const ( |
| keyNone = iota |
| keyVolume |
| keyVolumePod |
| keyVolumeNode |
| ) |
| |
| type testCase struct { |
| testID int |
| keyTypes []int // only 2 elements are supported |
| expectPass bool |
| } |
| |
| tests := []testCase{ |
| {testID: 1, keyTypes: []int{keyNone, keyNone}, expectPass: true}, |
| {testID: 2, keyTypes: []int{keyNone, keyVolume}, expectPass: true}, |
| {testID: 3, keyTypes: []int{keyNone, keyVolumePod}, expectPass: true}, |
| {testID: 4, keyTypes: []int{keyNone, keyVolumeNode}, expectPass: true}, |
| {testID: 5, keyTypes: []int{keyVolume, keyNone}, expectPass: true}, |
| {testID: 6, keyTypes: []int{keyVolume, keyVolumePod}, expectPass: false}, |
| {testID: 7, keyTypes: []int{keyVolume, keyVolumeNode}, expectPass: false}, |
| {testID: 8, keyTypes: []int{keyVolumePod, keyNone}, expectPass: true}, |
| {testID: 9, keyTypes: []int{keyVolumePod, keyVolume}, expectPass: false}, |
| {testID: 10, keyTypes: []int{keyVolumeNode, keyNone}, expectPass: true}, |
| {testID: 11, keyTypes: []int{keyVolumeNode, keyVolume}, expectPass: false}, |
| {testID: 12, keyTypes: []int{keyVolumeNode, keyVolumeNode}, expectPass: false}, |
| } |
| |
| for _, test := range tests { |
| var ( |
| volumeNames []v1.UniqueVolumeName |
| podNames []volumetypes.UniquePodName |
| nodeNames []types.NodeName |
| ) |
| for _, keyType := range test.keyTypes { |
| var ( |
| v v1.UniqueVolumeName |
| p volumetypes.UniquePodName |
| n types.NodeName |
| ) |
| switch keyType { |
| case keyNone: |
| v = EmptyUniqueVolumeName |
| p = EmptyUniquePodName |
| n = EmptyNodeName |
| case keyVolume: |
| v = v1.UniqueVolumeName("volume-name") |
| p = EmptyUniquePodName |
| n = EmptyNodeName |
| case keyVolumePod: |
| v = v1.UniqueVolumeName("volume-name") |
| p = volumetypes.UniquePodName("operation-podname") |
| n = EmptyNodeName |
| case keyVolumeNode: |
| v = v1.UniqueVolumeName("volume-name") |
| p = EmptyUniquePodName |
| n = types.NodeName("operation-nodename") |
| } |
| volumeNames = append(volumeNames, v) |
| podNames = append(podNames, p) |
| nodeNames = append(nodeNames, n) |
| } |
| |
| t.Run(fmt.Sprintf("Test %d", test.testID), func(t *testing.T) { |
| if test.expectPass { |
| testConcurrentOperationsPositive(t, |
| volumeNames[0], podNames[0], nodeNames[0], |
| volumeNames[1], podNames[1], nodeNames[1], |
| ) |
| } else { |
| testConcurrentOperationsNegative(t, |
| volumeNames[0], podNames[0], nodeNames[0], |
| volumeNames[1], podNames[1], nodeNames[1], |
| ) |
| } |
| }) |
| |
| } |
| |
| } |
| |
| func Test_NestedPendingOperations_Positive_Issue_88355(t *testing.T) { |
| // This test reproduces the scenario that is likely to have caused |
| // kubernetes/kubernetes issue #88355. |
| // Please refer to the issue for more context: |
| // https://github.com/kubernetes/kubernetes/issues/88355 |
| |
| // Below, vx is a volume name, and nx is a node name. |
| |
| // Operation sequence: |
| // opZ(v0) starts (operates on a different volume from all other operations) |
| // op1(v1, n1) starts |
| // op2(v1, n2) starts |
| // opZ(v0) ends with success |
| // op2(v1, n2) ends with an error (exponential backoff should be triggered) |
| // op1(v1, n1) ends with success |
| // op3(v1, n2) starts (continuously retried on exponential backoff error) |
| // op3(v1, n2) ends with success |
| // op4(v1, n2) starts |
| // op4(v1, n2) ends with success |
| |
| const ( |
| mainVolumeName = "main-volume" |
| opZVolumeName = "other-volume" |
| node1 = "node1" |
| node2 = "node2" |
| |
| // delay after an operation is signaled to finish to ensure it actually |
| // finishes before running the next operation. |
| delay = 50 * time.Millisecond |
| |
| // Replicates the default AttachDetachController reconcile period |
| reconcilerPeriod = 100 * time.Millisecond |
| ) |
| |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| opZContinueCh := make(chan interface{}) |
| op1ContinueCh := make(chan interface{}) |
| op2ContinueCh := make(chan interface{}) |
| operationZ := generateWaitFunc(opZContinueCh) |
| operation1 := generateWaitFunc(op1ContinueCh) |
| operation2 := generateWaitWithErrorFunc(op2ContinueCh) |
| operation3 := noopFunc |
| operation4 := noopFunc |
| |
| errZ := grm.Run(opZVolumeName, "" /* podName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operationZ}) |
| if errZ != nil { |
| t.Fatalf("NestedPendingOperations failed for operationZ. Expected: <no error> Actual: <%v>", errZ) |
| } |
| |
| err1 := grm.Run(mainVolumeName, "" /* podName */, node1, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Fatalf("NestedPendingOperations failed for operation1. Expected: <no error> Actual: <%v>", err1) |
| } |
| |
| err2 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| if err2 != nil { |
| t.Fatalf("NestedPendingOperations failed for operation2. Expected: <no error> Actual: <%v>", err2) |
| } |
| |
| opZContinueCh <- true |
| time.Sleep(delay) |
| op2ContinueCh <- true |
| time.Sleep(delay) |
| op1ContinueCh <- true |
| time.Sleep(delay) |
| |
| for { |
| err3 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation3}) |
| if err3 == nil { |
| break |
| } else if !exponentialbackoff.IsExponentialBackoff(err3) { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) |
| } |
| time.Sleep(reconcilerPeriod) |
| } |
| |
| time.Sleep(delay) |
| |
| err4 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation4}) |
| if err4 != nil { |
| t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4) |
| } |
| } |
| |
| // testConcurrentOperationsPositive passes if the two operations keyed by the |
| // provided parameters are executed in parallel, and fails otherwise. |
| func testConcurrentOperationsPositive( |
| t *testing.T, |
| volumeName1 v1.UniqueVolumeName, |
| podName1 volumetypes.UniquePodName, |
| nodeName1 types.NodeName, |
| volumeName2 v1.UniqueVolumeName, |
| podName2 volumetypes.UniquePodName, |
| nodeName2 types.NodeName) { |
| |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| } |
| |
| // testConcurrentOperationsNegative passes if the creation of the second |
| // operation returns an alreadyExists error, and fails otherwise. |
| func testConcurrentOperationsNegative( |
| t *testing.T, |
| volumeName1 v1.UniqueVolumeName, |
| podName1 volumetypes.UniquePodName, |
| nodeName1 types.NodeName, |
| volumeName2 v1.UniqueVolumeName, |
| podName2 volumetypes.UniquePodName, |
| nodeName2 types.NodeName) { |
| |
| // Arrange |
| grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) |
| if err1 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| operation2 := noopFunc |
| |
| // Act |
| err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) |
| |
| // Assert |
| if err2 == nil { |
| t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist") |
| } |
| if !IsAlreadyExists(err2) { |
| t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) |
| } |
| } |
| |
| /* END concurrent operations tests */ |
| |
| func generateCallbackFunc(done chan<- interface{}) func() volumetypes.OperationContext { |
| return func() volumetypes.OperationContext { |
| done <- true |
| return volumetypes.NewOperationContext(nil, nil, false) |
| } |
| } |
| |
| func generateWaitFunc(done <-chan interface{}) func() volumetypes.OperationContext { |
| return func() volumetypes.OperationContext { |
| <-done |
| return volumetypes.NewOperationContext(nil, nil, false) |
| } |
| } |
| |
| func panicFunc() volumetypes.OperationContext { |
| panic("testing panic") |
| } |
| |
| func errorFunc() volumetypes.OperationContext { |
| return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false) |
| } |
| |
| func generateWaitWithErrorFunc(done <-chan interface{}) func() volumetypes.OperationContext { |
| return func() volumetypes.OperationContext { |
| <-done |
| return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false) |
| } |
| } |
| |
| func noopFunc() volumetypes.OperationContext { |
| return volumetypes.NewOperationContext(nil, nil, false) |
| } |
| |
| func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { |
| backoff := wait.Backoff{ |
| Duration: initialDuration, |
| Factor: 3, |
| Jitter: 0, |
| Steps: 4, |
| } |
| return wait.ExponentialBackoff(backoff, fn) |
| } |
| |
| func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error { |
| timer := time.NewTimer(timeout) |
| defer timer.Stop() |
| |
| select { |
| case <-ch: |
| // Success! |
| return nil |
| case <-timer.C: |
| return fmt.Errorf("timeout after %v", timeout) |
| } |
| } |
| |
| func Test_NestedPendingOperations_OperationExists_PendingFirst(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("test-volume") |
| podName1 := volumetypes.UniquePodName("pod1") |
| podName2 := volumetypes.UniquePodName("pod2") |
| podName3 := volumetypes.UniquePodName("pod3") |
| podName4 := EmptyUniquePodName |
| nodeName := EmptyNodeName |
| |
| // delay after an operation is signaled to finish to ensure it actually |
| // finishes before running the next operation. |
| delay := 50 * time.Millisecond |
| |
| // fake operation1 for pod1 failed |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitWithErrorFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"}) |
| if err1 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| |
| // fake operation2 for pod2 fails |
| operation2DoneCh := make(chan interface{}) |
| operation2 := generateWaitWithErrorFunc(operation2DoneCh) |
| err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"}) |
| if err2 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| |
| // fake operation3 for pod3 pending |
| operation3DoneCh := make(chan interface{}) |
| operation3 := generateWaitFunc(operation3DoneCh) |
| defer func() { |
| close(operation3DoneCh) |
| }() |
| err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"}) |
| if err3 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) |
| } |
| |
| operation1DoneCh <- true |
| operation2DoneCh <- true |
| time.Sleep(delay) |
| |
| // fake operation4 for EmptyUniquePodName should be rejected as operation3 is still pending |
| operation4DoneCh := make(chan interface{}) |
| operation4 := generateWaitFunc(operation4DoneCh) |
| defer func() { |
| close(operation4DoneCh) |
| }() |
| err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"}) |
| |
| // Assert |
| if err4 == nil { |
| t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist") |
| } |
| if !IsAlreadyExists(err4) { |
| t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err4) |
| } |
| } |
| |
| func Test_NestedPendingOperations_OperationExists_ExactMatchFirstNoPending(t *testing.T) { |
| // Arrange |
| grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) |
| volumeName := v1.UniqueVolumeName("test-volume") |
| podName1 := volumetypes.UniquePodName("pod1") |
| podName2 := volumetypes.UniquePodName("pod2") |
| podName3 := volumetypes.UniquePodName("pod3") |
| podName4 := EmptyUniquePodName |
| nodeName := EmptyNodeName |
| |
| // delay after an operation is signaled to finish to ensure it actually |
| // finishes before running the next operation. |
| delay := 50 * time.Millisecond |
| backoffDelay := 500 * time.Millisecond |
| |
| // fake operation1 for pod1 fails |
| operation1DoneCh := make(chan interface{}) |
| operation1 := generateWaitWithErrorFunc(operation1DoneCh) |
| err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"}) |
| if err1 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1) |
| } |
| |
| // fake operation2 for pod2 fails |
| operation2DoneCh := make(chan interface{}) |
| operation2 := generateWaitWithErrorFunc(operation2DoneCh) |
| err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"}) |
| if err2 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2) |
| } |
| |
| // fake operation3 for pod3 fails |
| operation3DoneCh := make(chan interface{}) |
| operation3 := generateWaitWithErrorFunc(operation3DoneCh) |
| err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"}) |
| if err3 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3) |
| } |
| |
| operation1DoneCh <- true |
| operation2DoneCh <- true |
| operation3DoneCh <- true |
| time.Sleep(delay) |
| |
| // fake operation4 with EmptyUniquePodName fails |
| operation4DoneCh := make(chan interface{}) |
| operation4 := generateWaitWithErrorFunc(operation4DoneCh) |
| err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"}) |
| if err4 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4) |
| } |
| |
| operation4DoneCh <- true |
| |
| // operation for pod2 retry |
| time.Sleep(backoffDelay) |
| operation5 := noopFunc |
| err5 := grm.Run(volumeName, podName2, nodeName, volumetypes.GeneratedOperations{OperationFunc: operation5, OperationName: "umount"}) |
| if err5 != nil { |
| t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err5) |
| } |
| time.Sleep(delay) |
| |
| // Assert |
| // Operation5 will override operation2, since we successfully finished unmount operation on pod2, it should be removed from operations array |
| grm.(*nestedPendingOperations).lock.Lock() |
| defer grm.(*nestedPendingOperations).lock.Unlock() |
| for _, op := range grm.(*nestedPendingOperations).operations { |
| if op.key.podName == podName2 { |
| t.Errorf("NestedPendingOperations failed. Operation for pod2 should be removed") |
| } |
| } |
| |
| } |