| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package devicemanager |
| |
| import ( |
| "fmt" |
| "os" |
| "path/filepath" |
| "reflect" |
| goruntime "runtime" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
| |
| cadvisorapi "github.com/google/cadvisor/info/v1" |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/resource" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/uuid" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/client-go/tools/record" |
| pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" |
| watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" |
| "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" |
| "k8s.io/kubernetes/pkg/kubelet/cm/containermap" |
| "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" |
| plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1" |
| "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" |
| "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" |
| "k8s.io/kubernetes/pkg/kubelet/config" |
| "k8s.io/kubernetes/pkg/kubelet/lifecycle" |
| "k8s.io/kubernetes/pkg/kubelet/pluginmanager" |
| schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" |
| ) |
| |
| const ( |
| testResourceName = "fake-domain/resource" |
| ) |
| |
| func newWrappedManagerImpl(socketPath string, manager *ManagerImpl) *wrappedManagerImpl { |
| w := &wrappedManagerImpl{ |
| ManagerImpl: manager, |
| callback: manager.genericDeviceUpdateCallback, |
| } |
| w.socketdir, _ = filepath.Split(socketPath) |
| w.server, _ = plugin.NewServer(socketPath, w, w) |
| return w |
| } |
| |
| type wrappedManagerImpl struct { |
| *ManagerImpl |
| socketdir string |
| callback func(string, []pluginapi.Device) |
| } |
| |
| func (m *wrappedManagerImpl) PluginListAndWatchReceiver(r string, resp *pluginapi.ListAndWatchResponse) { |
| var devices []pluginapi.Device |
| for _, d := range resp.Devices { |
| devices = append(devices, *d) |
| } |
| m.callback(r, devices) |
| } |
| |
| func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) { |
| socketDir, err = os.MkdirTemp("", "device_plugin") |
| if err != nil { |
| return |
| } |
| socketName = filepath.Join(socketDir, "server.sock") |
| pluginSocketName = filepath.Join(socketDir, "device-plugin.sock") |
| os.MkdirAll(socketDir, 0755) |
| return |
| } |
| |
| func TestNewManagerImpl(t *testing.T) { |
| socketDir, socketName, _, err := tmpSocketDir() |
| topologyStore := topologymanager.NewFakeManager() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| _, err = newManagerImpl(socketName, nil, topologyStore) |
| require.NoError(t, err) |
| os.RemoveAll(socketDir) |
| } |
| |
| func TestNewManagerImplStart(t *testing.T) { |
| socketDir, socketName, pluginSocketName, err := tmpSocketDir() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) |
| cleanup(t, m, p) |
| // Stop should tolerate being called more than once. |
| cleanup(t, m, p) |
| } |
| |
| func TestNewManagerImplStartProbeMode(t *testing.T) { |
| socketDir, socketName, pluginSocketName, err := tmpSocketDir() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName) |
| cleanup(t, m, p) |
| } |
| |
| // Tests that the device plugin manager correctly handles registration and re-registration by |
| // making sure that after registration, devices are correctly updated and if a re-registration |
| // happens, we will NOT delete devices; and no orphaned devices left. |
| func TestDevicePluginReRegistration(t *testing.T) { |
| // TODO: Remove skip once https://github.com/kubernetes/kubernetes/pull/115269 merges. |
| if goruntime.GOOS == "windows" { |
| t.Skip("Skipping test on Windows.") |
| } |
| socketDir, socketName, pluginSocketName, err := tmpSocketDir() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| devs := []*pluginapi.Device{ |
| {ID: "Dev1", Health: pluginapi.Healthy}, |
| {ID: "Dev2", Health: pluginapi.Healthy}, |
| } |
| devsForRegistration := []*pluginapi.Device{ |
| {ID: "Dev3", Health: pluginapi.Healthy}, |
| } |
| for _, preStartContainerFlag := range []bool{false, true} { |
| for _, getPreferredAllocationFlag := range []bool{false, true} { |
| m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName) |
| p1.Register(socketName, testResourceName, "") |
| |
| select { |
| case <-ch: |
| case <-time.After(5 * time.Second): |
| t.Fatalf("timeout while waiting for manager update") |
| } |
| capacity, allocatable, _ := m.GetCapacity() |
| resourceCapacity := capacity[v1.ResourceName(testResourceName)] |
| resourceAllocatable := allocatable[v1.ResourceName(testResourceName)] |
| require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") |
| require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") |
| |
| p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag) |
| err = p2.Start() |
| require.NoError(t, err) |
| p2.Register(socketName, testResourceName, "") |
| |
| select { |
| case <-ch: |
| case <-time.After(5 * time.Second): |
| t.Fatalf("timeout while waiting for manager update") |
| } |
| capacity, allocatable, _ = m.GetCapacity() |
| resourceCapacity = capacity[v1.ResourceName(testResourceName)] |
| resourceAllocatable = allocatable[v1.ResourceName(testResourceName)] |
| require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") |
| require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.") |
| |
| // Test the scenario that a plugin re-registers with different devices. |
| p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag) |
| err = p3.Start() |
| require.NoError(t, err) |
| p3.Register(socketName, testResourceName, "") |
| |
| select { |
| case <-ch: |
| case <-time.After(5 * time.Second): |
| t.Fatalf("timeout while waiting for manager update") |
| } |
| capacity, allocatable, _ = m.GetCapacity() |
| resourceCapacity = capacity[v1.ResourceName(testResourceName)] |
| resourceAllocatable = allocatable[v1.ResourceName(testResourceName)] |
| require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") |
| require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.") |
| p2.Stop() |
| p3.Stop() |
| cleanup(t, m, p1) |
| } |
| } |
| } |
| |
| // Tests that the device plugin manager correctly handles registration and re-registration by |
| // making sure that after registration, devices are correctly updated and if a re-registration |
| // happens, we will NOT delete devices; and no orphaned devices left. |
| // While testing above scenario, plugin discovery and registration will be done using |
| // Kubelet probe based mechanism |
| func TestDevicePluginReRegistrationProbeMode(t *testing.T) { |
| // TODO: Remove skip once https://github.com/kubernetes/kubernetes/pull/115269 merges. |
| if goruntime.GOOS == "windows" { |
| t.Skip("Skipping test on Windows.") |
| } |
| socketDir, socketName, pluginSocketName, err := tmpSocketDir() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| devs := []*pluginapi.Device{ |
| {ID: "Dev1", Health: pluginapi.Healthy}, |
| {ID: "Dev2", Health: pluginapi.Healthy}, |
| } |
| devsForRegistration := []*pluginapi.Device{ |
| {ID: "Dev3", Health: pluginapi.Healthy}, |
| } |
| |
| m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName) |
| |
| // Wait for the first callback to be issued. |
| select { |
| case <-ch: |
| case <-time.After(5 * time.Second): |
| t.FailNow() |
| } |
| capacity, allocatable, _ := m.GetCapacity() |
| resourceCapacity := capacity[v1.ResourceName(testResourceName)] |
| resourceAllocatable := allocatable[v1.ResourceName(testResourceName)] |
| require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") |
| require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") |
| |
| p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false) |
| err = p2.Start() |
| require.NoError(t, err) |
| // Wait for the second callback to be issued. |
| select { |
| case <-ch: |
| case <-time.After(5 * time.Second): |
| t.FailNow() |
| } |
| |
| capacity, allocatable, _ = m.GetCapacity() |
| resourceCapacity = capacity[v1.ResourceName(testResourceName)] |
| resourceAllocatable = allocatable[v1.ResourceName(testResourceName)] |
| require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") |
| require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.") |
| |
| // Test the scenario that a plugin re-registers with different devices. |
| p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false) |
| err = p3.Start() |
| require.NoError(t, err) |
| // Wait for the third callback to be issued. |
| select { |
| case <-ch: |
| case <-time.After(5 * time.Second): |
| t.FailNow() |
| } |
| |
| capacity, allocatable, _ = m.GetCapacity() |
| resourceCapacity = capacity[v1.ResourceName(testResourceName)] |
| resourceAllocatable = allocatable[v1.ResourceName(testResourceName)] |
| require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable") |
| require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed") |
| p2.Stop() |
| p3.Stop() |
| cleanup(t, m, p1) |
| } |
| |
| func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, |
| topology []cadvisorapi.Node) (Manager, <-chan interface{}) { |
| topologyStore := topologymanager.NewFakeManager() |
| m, err := newManagerImpl(socketName, topology, topologyStore) |
| require.NoError(t, err) |
| updateChan := make(chan interface{}) |
| |
| w := newWrappedManagerImpl(socketName, m) |
| if callback != nil { |
| w.callback = callback |
| } |
| |
| originalCallback := w.callback |
| w.callback = func(resourceName string, devices []pluginapi.Device) { |
| originalCallback(resourceName, devices) |
| updateChan <- new(interface{}) |
| } |
| activePods := func() []*v1.Pod { |
| return []*v1.Pod{} |
| } |
| |
| // test steady state, initialization where sourcesReady, containerMap and containerRunningSet |
| // are relevant will be tested with a different flow |
| err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.New[string]()) |
| require.NoError(t, err) |
| |
| return w, updateChan |
| } |
| |
| func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *plugin.Stub { |
| p := plugin.NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false) |
| err := p.Start() |
| require.NoError(t, err) |
| return p |
| } |
| |
| func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager { |
| pluginManager := pluginmanager.NewPluginManager( |
| filepath.Dir(pluginSocketName), /* sockDir */ |
| &record.FakeRecorder{}, |
| ) |
| |
| runPluginManager(pluginManager) |
| pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler()) |
| return pluginManager |
| } |
| |
| func runPluginManager(pluginManager pluginmanager.PluginManager) { |
| // FIXME: Replace sets.String with sets.Set[string] |
| sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true }) |
| go pluginManager.Run(sourcesReady, wait.NeverStop) |
| } |
| |
| func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) { |
| m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) |
| p := setupDevicePlugin(t, devs, pluginSocketName) |
| return m, updateChan, p |
| } |
| |
| func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) { |
| m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil) |
| p := setupDevicePlugin(t, devs, pluginSocketName) |
| pm := setupPluginManager(t, pluginSocketName, m) |
| return m, updateChan, p, pm |
| } |
| |
| func cleanup(t *testing.T, m Manager, p *plugin.Stub) { |
| p.Stop() |
| m.Stop() |
| } |
| |
| func TestUpdateCapacityAllocatable(t *testing.T) { |
| socketDir, socketName, _, err := tmpSocketDir() |
| topologyStore := topologymanager.NewFakeManager() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| testManager, err := newManagerImpl(socketName, nil, topologyStore) |
| as := assert.New(t) |
| as.NotNil(testManager) |
| as.Nil(err) |
| |
| devs := []pluginapi.Device{ |
| {ID: "Device1", Health: pluginapi.Healthy}, |
| {ID: "Device2", Health: pluginapi.Healthy}, |
| {ID: "Device3", Health: pluginapi.Unhealthy}, |
| } |
| callback := testManager.genericDeviceUpdateCallback |
| |
| // Adds three devices for resource1, two healthy and one unhealthy. |
| // Expects capacity for resource1 to be 2. |
| resourceName1 := "domain1.com/resource1" |
| e1 := &endpointImpl{} |
| testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} |
| callback(resourceName1, devs) |
| capacity, allocatable, removedResources := testManager.GetCapacity() |
| resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| as.Equal(int64(3), resource1Capacity.Value()) |
| as.Equal(int64(2), resource1Allocatable.Value()) |
| as.Equal(0, len(removedResources)) |
| |
| // Deletes an unhealthy device should NOT change allocatable but change capacity. |
| devs1 := devs[:len(devs)-1] |
| callback(resourceName1, devs1) |
| capacity, allocatable, removedResources = testManager.GetCapacity() |
| resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| as.Equal(int64(2), resource1Capacity.Value()) |
| as.Equal(int64(2), resource1Allocatable.Value()) |
| as.Equal(0, len(removedResources)) |
| |
| // Updates a healthy device to unhealthy should reduce allocatable by 1. |
| devs[1].Health = pluginapi.Unhealthy |
| callback(resourceName1, devs) |
| capacity, allocatable, removedResources = testManager.GetCapacity() |
| resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| as.Equal(int64(3), resource1Capacity.Value()) |
| as.Equal(int64(1), resource1Allocatable.Value()) |
| as.Equal(0, len(removedResources)) |
| |
| // Deletes a healthy device should reduce capacity and allocatable by 1. |
| devs2 := devs[1:] |
| callback(resourceName1, devs2) |
| capacity, allocatable, removedResources = testManager.GetCapacity() |
| resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] |
| as.True(ok) |
| as.Equal(int64(0), resource1Allocatable.Value()) |
| as.Equal(int64(2), resource1Capacity.Value()) |
| as.Equal(0, len(removedResources)) |
| |
| // Tests adding another resource. |
| resourceName2 := "resource2" |
| e2 := &endpointImpl{} |
| e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager) |
| testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} |
| callback(resourceName2, devs) |
| capacity, allocatable, removedResources = testManager.GetCapacity() |
| as.Equal(2, len(capacity)) |
| resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| as.Equal(int64(3), resource2Capacity.Value()) |
| as.Equal(int64(1), resource2Allocatable.Value()) |
| as.Equal(0, len(removedResources)) |
| |
| // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 |
| // is removed from capacity and it no longer exists in healthyDevices after the call. |
| e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second)) |
| capacity, allocatable, removed := testManager.GetCapacity() |
| as.Equal([]string{resourceName1}, removed) |
| as.NotContains(capacity, v1.ResourceName(resourceName1)) |
| as.NotContains(allocatable, v1.ResourceName(resourceName1)) |
| val, ok := capacity[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| as.Equal(int64(3), val.Value()) |
| as.NotContains(testManager.healthyDevices, resourceName1) |
| as.NotContains(testManager.unhealthyDevices, resourceName1) |
| as.NotContains(testManager.endpoints, resourceName1) |
| as.Equal(1, len(testManager.endpoints)) |
| |
| // Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and |
| // preStartContainer calls return errors. |
| e2.client.Disconnect() |
| as.False(e2.stopTime.IsZero()) |
| _, err = e2.allocate([]string{"Device1"}) |
| reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2)) |
| _, err = e2.preStartContainer([]string{"Device1"}) |
| reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2)) |
| // Marks resourceName2 unhealthy and verifies its capacity/allocatable are |
| // correctly updated. |
| testManager.markResourceUnhealthy(resourceName2) |
| capacity, allocatable, removed = testManager.GetCapacity() |
| val, ok = capacity[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| as.Equal(int64(3), val.Value()) |
| val, ok = allocatable[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| as.Equal(int64(0), val.Value()) |
| as.Empty(removed) |
| // Writes and re-reads checkpoints. Verifies we create a stopped endpoint |
| // for resourceName2, its capacity is set to zero, and we still consider |
| // it as a DevicePlugin resource. This makes sure any pod that was scheduled |
| // during the time of propagating capacity change to the scheduler will be |
| // properly rejected instead of being incorrectly started. |
| err = testManager.writeCheckpoint() |
| as.Nil(err) |
| testManager.healthyDevices = make(map[string]sets.Set[string]) |
| testManager.unhealthyDevices = make(map[string]sets.Set[string]) |
| err = testManager.readCheckpoint() |
| as.Nil(err) |
| as.Equal(1, len(testManager.endpoints)) |
| as.Contains(testManager.endpoints, resourceName2) |
| capacity, allocatable, removed = testManager.GetCapacity() |
| val, ok = capacity[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| as.Equal(int64(0), val.Value()) |
| val, ok = allocatable[v1.ResourceName(resourceName2)] |
| as.True(ok) |
| as.Equal(int64(0), val.Value()) |
| as.Empty(removed) |
| as.True(testManager.isDevicePluginResource(resourceName2)) |
| } |
| |
| func TestGetAllocatableDevicesMultipleResources(t *testing.T) { |
| socketDir, socketName, _, err := tmpSocketDir() |
| topologyStore := topologymanager.NewFakeManager() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| testManager, err := newManagerImpl(socketName, nil, topologyStore) |
| as := assert.New(t) |
| as.NotNil(testManager) |
| as.Nil(err) |
| |
| resource1Devs := []pluginapi.Device{ |
| {ID: "R1Device1", Health: pluginapi.Healthy}, |
| {ID: "R1Device2", Health: pluginapi.Healthy}, |
| {ID: "R1Device3", Health: pluginapi.Unhealthy}, |
| } |
| resourceName1 := "domain1.com/resource1" |
| e1 := &endpointImpl{} |
| testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} |
| testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) |
| |
| resource2Devs := []pluginapi.Device{ |
| {ID: "R2Device1", Health: pluginapi.Healthy}, |
| } |
| resourceName2 := "other.domain2.org/resource2" |
| e2 := &endpointImpl{} |
| testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} |
| testManager.genericDeviceUpdateCallback(resourceName2, resource2Devs) |
| |
| allocatableDevs := testManager.GetAllocatableDevices() |
| as.Equal(2, len(allocatableDevs)) |
| |
| devInstances1, ok := allocatableDevs[resourceName1] |
| as.True(ok) |
| checkAllocatableDevicesConsistsOf(as, devInstances1, []string{"R1Device1", "R1Device2"}) |
| |
| devInstances2, ok := allocatableDevs[resourceName2] |
| as.True(ok) |
| checkAllocatableDevicesConsistsOf(as, devInstances2, []string{"R2Device1"}) |
| |
| } |
| |
| func TestGetAllocatableDevicesHealthTransition(t *testing.T) { |
| socketDir, socketName, _, err := tmpSocketDir() |
| topologyStore := topologymanager.NewFakeManager() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| testManager, err := newManagerImpl(socketName, nil, topologyStore) |
| as := assert.New(t) |
| as.NotNil(testManager) |
| as.Nil(err) |
| |
| resource1Devs := []pluginapi.Device{ |
| {ID: "R1Device1", Health: pluginapi.Healthy}, |
| {ID: "R1Device2", Health: pluginapi.Healthy}, |
| {ID: "R1Device3", Health: pluginapi.Unhealthy}, |
| } |
| |
| // Adds three devices for resource1, two healthy and one unhealthy. |
| // Expects allocatable devices for resource1 to be 2. |
| resourceName1 := "domain1.com/resource1" |
| e1 := &endpointImpl{} |
| testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} |
| |
| testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) |
| |
| allocatableDevs := testManager.GetAllocatableDevices() |
| as.Equal(1, len(allocatableDevs)) |
| devInstances, ok := allocatableDevs[resourceName1] |
| as.True(ok) |
| checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2"}) |
| |
| // Unhealthy device becomes healthy |
| resource1Devs = []pluginapi.Device{ |
| {ID: "R1Device1", Health: pluginapi.Healthy}, |
| {ID: "R1Device2", Health: pluginapi.Healthy}, |
| {ID: "R1Device3", Health: pluginapi.Healthy}, |
| } |
| testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs) |
| |
| allocatableDevs = testManager.GetAllocatableDevices() |
| as.Equal(1, len(allocatableDevs)) |
| devInstances, ok = allocatableDevs[resourceName1] |
| as.True(ok) |
| checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2", "R1Device3"}) |
| } |
| |
| func checkAllocatableDevicesConsistsOf(as *assert.Assertions, devInstances DeviceInstances, expectedDevs []string) { |
| as.Equal(len(expectedDevs), len(devInstances)) |
| for _, deviceID := range expectedDevs { |
| _, ok := devInstances[deviceID] |
| as.True(ok) |
| } |
| } |
| |
| func constructDevices(devices []string) checkpoint.DevicesPerNUMA { |
| ret := checkpoint.DevicesPerNUMA{} |
| for _, dev := range devices { |
| ret[0] = append(ret[0], dev) |
| } |
| return ret |
| } |
| |
| // containerAllocateResponseBuilder is a helper to build a ContainerAllocateResponse |
| type containerAllocateResponseBuilder struct { |
| devices map[string]string |
| mounts map[string]string |
| envs map[string]string |
| cdiDevices []string |
| } |
| |
| // containerAllocateResponseBuilderOption defines a functional option for a containerAllocateResponseBuilder |
| type containerAllocateResponseBuilderOption func(*containerAllocateResponseBuilder) |
| |
| // withDevices sets the devices for the containerAllocateResponseBuilder |
| func withDevices(devices map[string]string) containerAllocateResponseBuilderOption { |
| return func(b *containerAllocateResponseBuilder) { |
| b.devices = devices |
| } |
| } |
| |
| // withMounts sets the mounts for the containerAllocateResponseBuilder |
| func withMounts(mounts map[string]string) containerAllocateResponseBuilderOption { |
| return func(b *containerAllocateResponseBuilder) { |
| b.mounts = mounts |
| } |
| } |
| |
| // withEnvs sets the envs for the containerAllocateResponseBuilder |
| func withEnvs(envs map[string]string) containerAllocateResponseBuilderOption { |
| return func(b *containerAllocateResponseBuilder) { |
| b.envs = envs |
| } |
| } |
| |
| // withCDIDevices sets the cdiDevices for the containerAllocateResponseBuilder |
| func withCDIDevices(cdiDevices ...string) containerAllocateResponseBuilderOption { |
| return func(b *containerAllocateResponseBuilder) { |
| b.cdiDevices = cdiDevices |
| } |
| } |
| |
| // newContainerAllocateResponse creates a ContainerAllocateResponse with the given options. |
| func newContainerAllocateResponse(opts ...containerAllocateResponseBuilderOption) *pluginapi.ContainerAllocateResponse { |
| b := &containerAllocateResponseBuilder{} |
| for _, opt := range opts { |
| opt(b) |
| } |
| |
| return b.Build() |
| } |
| |
| // Build uses the configured builder to create a ContainerAllocateResponse. |
| func (b *containerAllocateResponseBuilder) Build() *pluginapi.ContainerAllocateResponse { |
| resp := &pluginapi.ContainerAllocateResponse{} |
| for k, v := range b.devices { |
| resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ |
| HostPath: k, |
| ContainerPath: v, |
| Permissions: "mrw", |
| }) |
| } |
| for k, v := range b.mounts { |
| resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ |
| ContainerPath: k, |
| HostPath: v, |
| ReadOnly: true, |
| }) |
| } |
| resp.Envs = make(map[string]string) |
| for k, v := range b.envs { |
| resp.Envs[k] = v |
| } |
| |
| var cdiDevices []*pluginapi.CDIDevice |
| for _, dev := range b.cdiDevices { |
| cdiDevice := pluginapi.CDIDevice{ |
| Name: dev, |
| } |
| cdiDevices = append(cdiDevices, &cdiDevice) |
| } |
| resp.CDIDevices = cdiDevices |
| |
| return resp |
| } |
| |
| func TestCheckpoint(t *testing.T) { |
| resourceName1 := "domain1.com/resource1" |
| resourceName2 := "domain2.com/resource2" |
| resourceName3 := "domain2.com/resource3" |
| as := assert.New(t) |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) |
| as.Nil(err) |
| testManager := &ManagerImpl{ |
| endpoints: make(map[string]endpointInfo), |
| healthyDevices: make(map[string]sets.Set[string]), |
| unhealthyDevices: make(map[string]sets.Set[string]), |
| allocatedDevices: make(map[string]sets.Set[string]), |
| podDevices: newPodDevices(), |
| checkpointManager: ckm, |
| } |
| |
| testManager.podDevices.insert("pod1", "con1", resourceName1, |
| constructDevices([]string{"dev1", "dev2"}), |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}), |
| withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), |
| withCDIDevices("domain1.com/resource1=dev1", "domain1.com/resource1=dev2"), |
| ), |
| ) |
| testManager.podDevices.insert("pod1", "con1", resourceName2, |
| constructDevices([]string{"dev1", "dev2"}), |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}), |
| withMounts(map[string]string{"/home/r2lib1": "/usr/r2lib1"}), |
| withEnvs(map[string]string{"r2devices": "dev1 dev2"}), |
| ), |
| ) |
| testManager.podDevices.insert("pod1", "con2", resourceName1, |
| constructDevices([]string{"dev3"}), |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r1dev3": "/dev/r1dev3"}), |
| withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), |
| ), |
| ) |
| testManager.podDevices.insert("pod2", "con1", resourceName1, |
| constructDevices([]string{"dev4"}), |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}), |
| withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), |
| ), |
| ) |
| testManager.podDevices.insert("pod3", "con3", resourceName3, |
| checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r3dev5": "/dev/r3dev5"}), |
| withMounts(map[string]string{"/home/r3lib1": "/usr/r3lib1"}), |
| ), |
| ) |
| |
| testManager.healthyDevices[resourceName1] = sets.New[string]() |
| testManager.healthyDevices[resourceName1].Insert("dev1") |
| testManager.healthyDevices[resourceName1].Insert("dev2") |
| testManager.healthyDevices[resourceName1].Insert("dev3") |
| testManager.healthyDevices[resourceName1].Insert("dev4") |
| testManager.healthyDevices[resourceName1].Insert("dev5") |
| testManager.healthyDevices[resourceName2] = sets.New[string]() |
| testManager.healthyDevices[resourceName2].Insert("dev1") |
| testManager.healthyDevices[resourceName2].Insert("dev2") |
| testManager.healthyDevices[resourceName3] = sets.New[string]() |
| testManager.healthyDevices[resourceName3].Insert("dev5") |
| |
| expectedPodDevices := testManager.podDevices |
| expectedAllocatedDevices := testManager.podDevices.devices() |
| expectedAllDevices := testManager.healthyDevices |
| |
| err = testManager.writeCheckpoint() |
| |
| as.Nil(err) |
| testManager.podDevices = newPodDevices() |
| err = testManager.readCheckpoint() |
| as.Nil(err) |
| |
| as.Equal(expectedPodDevices.size(), testManager.podDevices.size()) |
| for podUID, containerDevices := range expectedPodDevices.devs { |
| for conName, resources := range containerDevices { |
| for resource := range resources { |
| expDevices := expectedPodDevices.containerDevices(podUID, conName, resource) |
| testDevices := testManager.podDevices.containerDevices(podUID, conName, resource) |
| as.True(reflect.DeepEqual(expDevices, testDevices)) |
| opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName) |
| opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName) |
| as.Equal(len(opts1.Envs), len(opts2.Envs)) |
| as.Equal(len(opts1.Mounts), len(opts2.Mounts)) |
| as.Equal(len(opts1.Devices), len(opts2.Devices)) |
| } |
| } |
| } |
| as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices)) |
| as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices)) |
| } |
| |
| type activePodsStub struct { |
| activePods []*v1.Pod |
| } |
| |
| func (a *activePodsStub) getActivePods() []*v1.Pod { |
| return a.activePods |
| } |
| |
| func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) { |
| a.activePods = newPods |
| } |
| |
| type MockEndpoint struct { |
| getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) |
| allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error) |
| initChan chan []string |
| } |
| |
| func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { |
| m.initChan <- devs |
| return &pluginapi.PreStartContainerResponse{}, nil |
| } |
| |
| func (m *MockEndpoint) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) { |
| if m.getPreferredAllocationFunc != nil { |
| return m.getPreferredAllocationFunc(available, mustInclude, size) |
| } |
| return nil, nil |
| } |
| |
| func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) { |
| if m.allocateFunc != nil { |
| return m.allocateFunc(devs) |
| } |
| return nil, nil |
| } |
| |
| func (m *MockEndpoint) setStopTime(t time.Time) {} |
| |
| func (m *MockEndpoint) isStopped() bool { return false } |
| |
| func (m *MockEndpoint) stopGracePeriodExpired() bool { return false } |
| |
| func makePod(limits v1.ResourceList) *v1.Pod { |
| return &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| UID: uuid.NewUUID(), |
| }, |
| Spec: v1.PodSpec{ |
| Containers: []v1.Container{ |
| { |
| Resources: v1.ResourceRequirements{ |
| Limits: limits, |
| }, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*wrappedManagerImpl, error) { |
| monitorCallback := func(resourceName string, devices []pluginapi.Device) {} |
| ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) |
| if err != nil { |
| return nil, err |
| } |
| m := &ManagerImpl{ |
| healthyDevices: make(map[string]sets.Set[string]), |
| unhealthyDevices: make(map[string]sets.Set[string]), |
| allocatedDevices: make(map[string]sets.Set[string]), |
| endpoints: make(map[string]endpointInfo), |
| podDevices: newPodDevices(), |
| devicesToReuse: make(PodReusableDevices), |
| topologyAffinityStore: topologymanager.NewFakeManager(), |
| activePods: activePods, |
| sourcesReady: &sourcesReadyStub{}, |
| checkpointManager: ckm, |
| allDevices: NewResourceDeviceInstances(), |
| } |
| testManager := &wrappedManagerImpl{ |
| ManagerImpl: m, |
| socketdir: tmpDir, |
| callback: monitorCallback, |
| } |
| |
| for _, res := range testRes { |
| testManager.healthyDevices[res.resourceName] = sets.New[string](res.devs.Devices().UnsortedList()...) |
| if res.resourceName == "domain1.com/resource1" { |
| testManager.endpoints[res.resourceName] = endpointInfo{ |
| e: &MockEndpoint{allocateFunc: allocateStubFunc()}, |
| opts: nil, |
| } |
| } |
| if res.resourceName == "domain2.com/resource2" { |
| testManager.endpoints[res.resourceName] = endpointInfo{ |
| e: &MockEndpoint{ |
| allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { |
| resp := new(pluginapi.ContainerAllocateResponse) |
| resp.Envs = make(map[string]string) |
| for _, dev := range devs { |
| switch dev { |
| case "dev3": |
| resp.Envs["key2"] = "val2" |
| |
| case "dev4": |
| resp.Envs["key2"] = "val3" |
| } |
| } |
| resps := new(pluginapi.AllocateResponse) |
| resps.ContainerResponses = append(resps.ContainerResponses, resp) |
| return resps, nil |
| }, |
| }, |
| opts: nil, |
| } |
| } |
| testManager.allDevices[res.resourceName] = makeDevice(res.devs, res.topology) |
| |
| } |
| return testManager, nil |
| } |
| |
| type TestResource struct { |
| resourceName string |
| resourceQuantity resource.Quantity |
| devs checkpoint.DevicesPerNUMA |
| topology bool |
| } |
| |
| func TestFilterByAffinity(t *testing.T) { |
| as := require.New(t) |
| allDevices := ResourceDeviceInstances{ |
| "res1": map[string]pluginapi.Device{ |
| "dev1": { |
| ID: "dev1", |
| Topology: &pluginapi.TopologyInfo{ |
| Nodes: []*pluginapi.NUMANode{ |
| { |
| ID: 1, |
| }, |
| }, |
| }, |
| }, |
| "dev2": { |
| ID: "dev2", |
| Topology: &pluginapi.TopologyInfo{ |
| Nodes: []*pluginapi.NUMANode{ |
| { |
| ID: 1, |
| }, |
| { |
| ID: 2, |
| }, |
| }, |
| }, |
| }, |
| "dev3": { |
| ID: "dev3", |
| Topology: &pluginapi.TopologyInfo{ |
| Nodes: []*pluginapi.NUMANode{ |
| { |
| ID: 2, |
| }, |
| }, |
| }, |
| }, |
| "dev4": { |
| ID: "dev4", |
| Topology: &pluginapi.TopologyInfo{ |
| Nodes: []*pluginapi.NUMANode{ |
| { |
| ID: 2, |
| }, |
| }, |
| }, |
| }, |
| "devwithouttopology": { |
| ID: "dev5", |
| }, |
| }, |
| } |
| |
| fakeAffinity, _ := bitmask.NewBitMask(2) |
| fakeHint := topologymanager.TopologyHint{ |
| NUMANodeAffinity: fakeAffinity, |
| Preferred: true, |
| } |
| testManager := ManagerImpl{ |
| topologyAffinityStore: topologymanager.NewFakeManagerWithHint(&fakeHint), |
| allDevices: allDevices, |
| } |
| |
| testCases := []struct { |
| available sets.Set[string] |
| fromAffinityExpected sets.Set[string] |
| notFromAffinityExpected sets.Set[string] |
| withoutTopologyExpected sets.Set[string] |
| }{ |
| { |
| available: sets.New[string]("dev1", "dev2"), |
| fromAffinityExpected: sets.New[string]("dev2"), |
| notFromAffinityExpected: sets.New[string]("dev1"), |
| withoutTopologyExpected: sets.New[string](), |
| }, |
| { |
| available: sets.New[string]("dev1", "dev2", "dev3", "dev4"), |
| fromAffinityExpected: sets.New[string]("dev2", "dev3", "dev4"), |
| notFromAffinityExpected: sets.New[string]("dev1"), |
| withoutTopologyExpected: sets.New[string](), |
| }, |
| } |
| |
| for _, testCase := range testCases { |
| fromAffinity, notFromAffinity, withoutTopology := testManager.filterByAffinity("", "", "res1", testCase.available) |
| as.Truef(fromAffinity.Equal(testCase.fromAffinityExpected), "expect devices from affinity to be %v but got %v", testCase.fromAffinityExpected, fromAffinity) |
| as.Truef(notFromAffinity.Equal(testCase.notFromAffinityExpected), "expect devices not from affinity to be %v but got %v", testCase.notFromAffinityExpected, notFromAffinity) |
| as.Truef(withoutTopology.Equal(testCase.withoutTopologyExpected), "expect devices without topology to be %v but got %v", testCase.notFromAffinityExpected, notFromAffinity) |
| } |
| } |
| |
| func TestPodContainerDeviceAllocation(t *testing.T) { |
| res1 := TestResource{ |
| resourceName: "domain1.com/resource1", |
| resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, |
| topology: true, |
| } |
| res2 := TestResource{ |
| resourceName: "domain2.com/resource2", |
| resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}}, |
| topology: false, |
| } |
| testResources := make([]TestResource, 2) |
| testResources = append(testResources, res1) |
| testResources = append(testResources, res2) |
| as := require.New(t) |
| podsStub := activePodsStub{ |
| activePods: []*v1.Pod{}, |
| } |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) |
| as.Nil(err) |
| |
| testPods := []*v1.Pod{ |
| makePod(v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res1.resourceQuantity, |
| v1.ResourceName("cpu"): res1.resourceQuantity, |
| v1.ResourceName(res2.resourceName): res2.resourceQuantity}), |
| makePod(v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res2.resourceQuantity}), |
| makePod(v1.ResourceList{ |
| v1.ResourceName(res2.resourceName): res2.resourceQuantity}), |
| } |
| testCases := []struct { |
| description string |
| testPod *v1.Pod |
| expectedContainerOptsLen []int |
| expectedAllocatedResName1 int |
| expectedAllocatedResName2 int |
| expErr error |
| }{ |
| { |
| description: "Successful allocation of two Res1 resources and one Res2 resource", |
| testPod: testPods[0], |
| expectedContainerOptsLen: []int{3, 2, 2}, |
| expectedAllocatedResName1: 2, |
| expectedAllocatedResName2: 1, |
| expErr: nil, |
| }, |
| { |
| description: "Requesting to create a pod without enough resources should fail", |
| testPod: testPods[1], |
| expectedContainerOptsLen: nil, |
| expectedAllocatedResName1: 2, |
| expectedAllocatedResName2: 1, |
| expErr: fmt.Errorf("requested number of devices unavailable for domain1.com/resource1. Requested: 1, Available: 0"), |
| }, |
| { |
| description: "Successful allocation of all available Res1 resources and Res2 resources", |
| testPod: testPods[2], |
| expectedContainerOptsLen: []int{0, 0, 1}, |
| expectedAllocatedResName1: 2, |
| expectedAllocatedResName2: 2, |
| expErr: nil, |
| }, |
| } |
| activePods := []*v1.Pod{} |
| for _, testCase := range testCases { |
| pod := testCase.testPod |
| activePods = append(activePods, pod) |
| podsStub.updateActivePods(activePods) |
| err := testManager.Allocate(pod, &pod.Spec.Containers[0]) |
| if !reflect.DeepEqual(err, testCase.expErr) { |
| t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v", |
| testCase.description, testCase.expErr, err) |
| } |
| runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) |
| if testCase.expErr == nil { |
| as.Nil(err) |
| } |
| if testCase.expectedContainerOptsLen == nil { |
| as.Nil(runContainerOpts) |
| } else { |
| as.Equal(len(runContainerOpts.Devices), testCase.expectedContainerOptsLen[0]) |
| as.Equal(len(runContainerOpts.Mounts), testCase.expectedContainerOptsLen[1]) |
| as.Equal(len(runContainerOpts.Envs), testCase.expectedContainerOptsLen[2]) |
| } |
| as.Equal(testCase.expectedAllocatedResName1, testManager.allocatedDevices[res1.resourceName].Len()) |
| as.Equal(testCase.expectedAllocatedResName2, testManager.allocatedDevices[res2.resourceName].Len()) |
| } |
| |
| } |
| |
| func TestPodContainerDeviceToAllocate(t *testing.T) { |
| resourceName1 := "domain1.com/resource1" |
| resourceName2 := "domain2.com/resource2" |
| resourceName3 := "domain2.com/resource3" |
| as := require.New(t) |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| |
| testManager := &ManagerImpl{ |
| endpoints: make(map[string]endpointInfo), |
| healthyDevices: make(map[string]sets.Set[string]), |
| unhealthyDevices: make(map[string]sets.Set[string]), |
| allocatedDevices: make(map[string]sets.Set[string]), |
| podDevices: newPodDevices(), |
| activePods: func() []*v1.Pod { return []*v1.Pod{} }, |
| sourcesReady: &sourcesReadyStub{}, |
| } |
| |
| testManager.podDevices.insert("pod1", "con1", resourceName1, |
| constructDevices([]string{"dev1", "dev2"}), |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}), |
| withMounts(map[string]string{"/home/r2lib1": "/usr/r2lib1"}), |
| withEnvs(map[string]string{"r2devices": "dev1 dev2"}), |
| ), |
| ) |
| testManager.podDevices.insert("pod2", "con2", resourceName2, |
| checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}), |
| withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), |
| ), |
| ) |
| testManager.podDevices.insert("pod3", "con3", resourceName3, |
| checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}), |
| withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}), |
| ), |
| ) |
| |
| // no healthy devices for resourceName1 and devices corresponding to |
| // resource2 are intentionally omitted to simulate that the resource |
| // hasn't been registered. |
| testManager.healthyDevices[resourceName1] = sets.New[string]() |
| testManager.healthyDevices[resourceName3] = sets.New[string]() |
| // dev5 is no longer in the list of healthy devices |
| testManager.healthyDevices[resourceName3].Insert("dev7") |
| testManager.healthyDevices[resourceName3].Insert("dev8") |
| |
| testCases := []struct { |
| description string |
| podUID string |
| contName string |
| resource string |
| required int |
| reusableDevices sets.Set[string] |
| expectedAllocatedDevices sets.Set[string] |
| expErr error |
| }{ |
| { |
| description: "Admission error in case no healthy devices to allocate present", |
| podUID: "pod1", |
| contName: "con1", |
| resource: resourceName1, |
| required: 2, |
| reusableDevices: sets.New[string](), |
| expectedAllocatedDevices: nil, |
| expErr: fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resourceName1), |
| }, |
| { |
| description: "Admission error in case resource is not registered", |
| podUID: "pod2", |
| contName: "con2", |
| resource: resourceName2, |
| required: 1, |
| reusableDevices: sets.New[string](), |
| expectedAllocatedDevices: nil, |
| expErr: fmt.Errorf("cannot allocate unregistered device %s", resourceName2), |
| }, |
| { |
| description: "Admission error in case resource not devices previously allocated no longer healthy", |
| podUID: "pod3", |
| contName: "con3", |
| resource: resourceName3, |
| required: 1, |
| reusableDevices: sets.New[string](), |
| expectedAllocatedDevices: nil, |
| expErr: fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resourceName3), |
| }, |
| } |
| |
| for _, testCase := range testCases { |
| allocDevices, err := testManager.devicesToAllocate(testCase.podUID, testCase.contName, testCase.resource, testCase.required, testCase.reusableDevices) |
| if !reflect.DeepEqual(err, testCase.expErr) { |
| t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v", |
| testCase.description, testCase.expErr, err) |
| } |
| if !reflect.DeepEqual(allocDevices, testCase.expectedAllocatedDevices) { |
| t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v", |
| testCase.description, testCase.expectedAllocatedDevices, allocDevices) |
| } |
| } |
| |
| } |
| |
| func TestDevicesToAllocateConflictWithUpdateAllocatedDevices(t *testing.T) { |
| podToAllocate := "podToAllocate" |
| containerToAllocate := "containerToAllocate" |
| podToRemove := "podToRemove" |
| containerToRemove := "containerToRemove" |
| deviceID := "deviceID" |
| resourceName := "domain1.com/resource" |
| |
| socket := filepath.Join(os.TempDir(), esocketName()) |
| devs := []*pluginapi.Device{ |
| {ID: deviceID, Health: pluginapi.Healthy}, |
| } |
| p, e := esetup(t, devs, socket, resourceName, func(n string, d []pluginapi.Device) {}) |
| |
| waitUpdateAllocatedDevicesChan := make(chan struct{}) |
| waitSetGetPreferredAllocChan := make(chan struct{}) |
| |
| p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) { |
| waitSetGetPreferredAllocChan <- struct{}{} |
| <-waitUpdateAllocatedDevicesChan |
| return &pluginapi.PreferredAllocationResponse{ |
| ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{ |
| { |
| DeviceIDs: []string{deviceID}, |
| }, |
| }, |
| }, nil |
| }) |
| |
| testManager := &ManagerImpl{ |
| endpoints: make(map[string]endpointInfo), |
| healthyDevices: make(map[string]sets.Set[string]), |
| unhealthyDevices: make(map[string]sets.Set[string]), |
| allocatedDevices: make(map[string]sets.Set[string]), |
| podDevices: newPodDevices(), |
| activePods: func() []*v1.Pod { return []*v1.Pod{} }, |
| sourcesReady: &sourcesReadyStub{}, |
| topologyAffinityStore: topologymanager.NewFakeManager(), |
| } |
| |
| testManager.endpoints[resourceName] = endpointInfo{ |
| e: e, |
| opts: &pluginapi.DevicePluginOptions{ |
| GetPreferredAllocationAvailable: true, |
| }, |
| } |
| testManager.healthyDevices[resourceName] = sets.New[string](deviceID) |
| testManager.podDevices.insert(podToRemove, containerToRemove, resourceName, nil, nil) |
| |
| go func() { |
| <-waitSetGetPreferredAllocChan |
| testManager.UpdateAllocatedDevices() |
| waitUpdateAllocatedDevicesChan <- struct{}{} |
| }() |
| |
| set, err := testManager.devicesToAllocate(podToAllocate, containerToAllocate, resourceName, 1, sets.New[string]()) |
| assert.NoError(t, err) |
| assert.Equal(t, set, sets.New[string](deviceID)) |
| } |
| |
| func TestGetDeviceRunContainerOptions(t *testing.T) { |
| res1 := TestResource{ |
| resourceName: "domain1.com/resource1", |
| resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, |
| topology: true, |
| } |
| res2 := TestResource{ |
| resourceName: "domain2.com/resource2", |
| resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}}, |
| topology: false, |
| } |
| |
| testResources := make([]TestResource, 2) |
| testResources = append(testResources, res1) |
| testResources = append(testResources, res2) |
| |
| podsStub := activePodsStub{ |
| activePods: []*v1.Pod{}, |
| } |
| as := require.New(t) |
| |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| |
| testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) |
| as.Nil(err) |
| |
| pod1 := makePod(v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res1.resourceQuantity, |
| v1.ResourceName(res2.resourceName): res2.resourceQuantity, |
| }) |
| pod2 := makePod(v1.ResourceList{ |
| v1.ResourceName(res2.resourceName): res2.resourceQuantity, |
| }) |
| |
| activePods := []*v1.Pod{pod1, pod2} |
| podsStub.updateActivePods(activePods) |
| |
| err = testManager.Allocate(pod1, &pod1.Spec.Containers[0]) |
| as.Nil(err) |
| err = testManager.Allocate(pod2, &pod2.Spec.Containers[0]) |
| as.Nil(err) |
| |
| // when pod is in activePods, GetDeviceRunContainerOptions should return |
| runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod1, &pod1.Spec.Containers[0]) |
| as.Nil(err) |
| as.Equal(len(runContainerOpts.Devices), 3) |
| as.Equal(len(runContainerOpts.Mounts), 2) |
| as.Equal(len(runContainerOpts.Envs), 2) |
| |
| activePods = []*v1.Pod{pod2} |
| podsStub.updateActivePods(activePods) |
| testManager.UpdateAllocatedDevices() |
| |
| // when pod is removed from activePods,G etDeviceRunContainerOptions should return error |
| runContainerOpts, err = testManager.GetDeviceRunContainerOptions(pod1, &pod1.Spec.Containers[0]) |
| as.Nil(err) |
| as.Nil(runContainerOpts) |
| } |
| |
| func TestInitContainerDeviceAllocation(t *testing.T) { |
| // Requesting to create a pod that requests resourceName1 in init containers and normal containers |
| // should succeed with devices allocated to init containers reallocated to normal containers. |
| res1 := TestResource{ |
| resourceName: "domain1.com/resource1", |
| resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, |
| topology: false, |
| } |
| res2 := TestResource{ |
| resourceName: "domain2.com/resource2", |
| resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}}, |
| topology: true, |
| } |
| testResources := make([]TestResource, 2) |
| testResources = append(testResources, res1) |
| testResources = append(testResources, res2) |
| as := require.New(t) |
| podsStub := activePodsStub{ |
| activePods: []*v1.Pod{}, |
| } |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| |
| testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) |
| as.Nil(err) |
| |
| podWithPluginResourcesInInitContainers := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| UID: uuid.NewUUID(), |
| }, |
| Spec: v1.PodSpec{ |
| InitContainers: []v1.Container{ |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res2.resourceQuantity, |
| }, |
| }, |
| }, |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res1.resourceQuantity, |
| }, |
| }, |
| }, |
| }, |
| Containers: []v1.Container{ |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res2.resourceQuantity, |
| v1.ResourceName(res2.resourceName): res2.resourceQuantity, |
| }, |
| }, |
| }, |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res2.resourceQuantity, |
| v1.ResourceName(res2.resourceName): res2.resourceQuantity, |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers}) |
| for _, container := range podWithPluginResourcesInInitContainers.Spec.InitContainers { |
| err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container) |
| } |
| for _, container := range podWithPluginResourcesInInitContainers.Spec.Containers { |
| err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container) |
| } |
| as.Nil(err) |
| podUID := string(podWithPluginResourcesInInitContainers.UID) |
| initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name |
| initCont2 := podWithPluginResourcesInInitContainers.Spec.InitContainers[1].Name |
| normalCont1 := podWithPluginResourcesInInitContainers.Spec.Containers[0].Name |
| normalCont2 := podWithPluginResourcesInInitContainers.Spec.Containers[1].Name |
| initCont1Devices := testManager.podDevices.containerDevices(podUID, initCont1, res1.resourceName) |
| initCont2Devices := testManager.podDevices.containerDevices(podUID, initCont2, res1.resourceName) |
| normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName) |
| normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName) |
| as.Equal(1, initCont1Devices.Len()) |
| as.Equal(2, initCont2Devices.Len()) |
| as.Equal(1, normalCont1Devices.Len()) |
| as.Equal(1, normalCont2Devices.Len()) |
| as.True(initCont2Devices.IsSuperset(initCont1Devices)) |
| as.True(initCont2Devices.IsSuperset(normalCont1Devices)) |
| as.True(initCont2Devices.IsSuperset(normalCont2Devices)) |
| as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len()) |
| } |
| |
| func TestRestartableInitContainerDeviceAllocation(t *testing.T) { |
| // Requesting to create a pod that requests resourceName1 in restartable |
| // init containers and normal containers should succeed with devices |
| // allocated to init containers not reallocated to normal containers. |
| oneDevice := resource.NewQuantity(int64(1), resource.DecimalSI) |
| twoDevice := resource.NewQuantity(int64(2), resource.DecimalSI) |
| threeDevice := resource.NewQuantity(int64(3), resource.DecimalSI) |
| res1 := TestResource{ |
| resourceName: "domain1.com/resource1", |
| resourceQuantity: *resource.NewQuantity(int64(6), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{ |
| 0: []string{"dev1", "dev2", "dev3", "dev4", "dev5", "dev6"}, |
| }, |
| topology: false, |
| } |
| testResources := []TestResource{ |
| res1, |
| } |
| as := require.New(t) |
| podsStub := activePodsStub{ |
| activePods: []*v1.Pod{}, |
| } |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| |
| testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) |
| as.Nil(err) |
| |
| containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways |
| podWithPluginResourcesInRestartableInitContainers := &v1.Pod{ |
| ObjectMeta: metav1.ObjectMeta{ |
| UID: uuid.NewUUID(), |
| }, |
| Spec: v1.PodSpec{ |
| InitContainers: []v1.Container{ |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): *threeDevice, |
| }, |
| }, |
| }, |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): *oneDevice, |
| }, |
| }, |
| RestartPolicy: &containerRestartPolicyAlways, |
| }, |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): *twoDevice, |
| }, |
| }, |
| RestartPolicy: &containerRestartPolicyAlways, |
| }, |
| }, |
| Containers: []v1.Container{ |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): *oneDevice, |
| }, |
| }, |
| }, |
| { |
| Name: string(uuid.NewUUID()), |
| Resources: v1.ResourceRequirements{ |
| Limits: v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): *twoDevice, |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInRestartableInitContainers}) |
| for _, container := range podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers { |
| err = testManager.Allocate(podWithPluginResourcesInRestartableInitContainers, &container) |
| } |
| for _, container := range podWithPluginResourcesInRestartableInitContainers.Spec.Containers { |
| err = testManager.Allocate(podWithPluginResourcesInRestartableInitContainers, &container) |
| } |
| as.Nil(err) |
| podUID := string(podWithPluginResourcesInRestartableInitContainers.UID) |
| regularInitCont1 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[0].Name |
| restartableInitCont2 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[1].Name |
| restartableInitCont3 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[2].Name |
| normalCont1 := podWithPluginResourcesInRestartableInitContainers.Spec.Containers[0].Name |
| normalCont2 := podWithPluginResourcesInRestartableInitContainers.Spec.Containers[1].Name |
| regularInitCont1Devices := testManager.podDevices.containerDevices(podUID, regularInitCont1, res1.resourceName) |
| restartableInitCont2Devices := testManager.podDevices.containerDevices(podUID, restartableInitCont2, res1.resourceName) |
| restartableInitCont3Devices := testManager.podDevices.containerDevices(podUID, restartableInitCont3, res1.resourceName) |
| normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName) |
| normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName) |
| as.Equal(3, regularInitCont1Devices.Len()) |
| as.Equal(1, restartableInitCont2Devices.Len()) |
| as.Equal(2, restartableInitCont3Devices.Len()) |
| as.Equal(1, normalCont1Devices.Len()) |
| as.Equal(2, normalCont2Devices.Len()) |
| as.True(regularInitCont1Devices.IsSuperset(restartableInitCont2Devices)) |
| as.True(regularInitCont1Devices.IsSuperset(restartableInitCont3Devices)) |
| // regularInitCont1Devices are sharable with other containers |
| |
| dedicatedContainerDevices := []sets.Set[string]{ |
| restartableInitCont2Devices, |
| restartableInitCont3Devices, |
| normalCont1Devices, |
| normalCont2Devices, |
| } |
| |
| for i := 0; i < len(dedicatedContainerDevices)-1; i++ { |
| for j := i + 1; j < len(dedicatedContainerDevices); j++ { |
| t.Logf("containerDevices[%d] = %v", i, dedicatedContainerDevices[i]) |
| t.Logf("containerDevices[%d] = %v", j, dedicatedContainerDevices[j]) |
| as.Empty(dedicatedContainerDevices[i].Intersection(dedicatedContainerDevices[j])) |
| } |
| } |
| } |
| |
| func TestUpdatePluginResources(t *testing.T) { |
| pod := &v1.Pod{} |
| pod.UID = types.UID("testPod") |
| |
| resourceName1 := "domain1.com/resource1" |
| devID1 := "dev1" |
| |
| resourceName2 := "domain2.com/resource2" |
| devID2 := "dev2" |
| |
| as := assert.New(t) |
| monitorCallback := func(resourceName string, devices []pluginapi.Device) {} |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| |
| ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) |
| as.Nil(err) |
| m := &ManagerImpl{ |
| allocatedDevices: make(map[string]sets.Set[string]), |
| healthyDevices: make(map[string]sets.Set[string]), |
| podDevices: newPodDevices(), |
| checkpointManager: ckm, |
| } |
| testManager := wrappedManagerImpl{ |
| ManagerImpl: m, |
| callback: monitorCallback, |
| } |
| testManager.podDevices.devs[string(pod.UID)] = make(containerDevices) |
| |
| // require one of resource1 and one of resource2 |
| testManager.allocatedDevices[resourceName1] = sets.New[string]() |
| testManager.allocatedDevices[resourceName1].Insert(devID1) |
| testManager.allocatedDevices[resourceName2] = sets.New[string]() |
| testManager.allocatedDevices[resourceName2].Insert(devID2) |
| |
| cachedNode := &v1.Node{ |
| Status: v1.NodeStatus{ |
| Allocatable: v1.ResourceList{ |
| // has no resource1 and two of resource2 |
| v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI), |
| }, |
| }, |
| } |
| nodeInfo := &schedulerframework.NodeInfo{} |
| nodeInfo.SetNode(cachedNode) |
| |
| testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) |
| |
| allocatableScalarResources := nodeInfo.Allocatable.ScalarResources |
| // allocatable in nodeInfo is less than needed, should update |
| as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)])) |
| // allocatable in nodeInfo is more than needed, should skip updating |
| as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)])) |
| } |
| |
| func TestDevicePreStartContainer(t *testing.T) { |
| // Ensures that if device manager is indicated to invoke `PreStartContainer` RPC |
| // by device plugin, then device manager invokes PreStartContainer at endpoint interface. |
| // Also verifies that final allocation of mounts, envs etc is same as expected. |
| res1 := TestResource{ |
| resourceName: "domain1.com/resource1", |
| resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), |
| devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, |
| topology: false, |
| } |
| as := require.New(t) |
| podsStub := activePodsStub{ |
| activePods: []*v1.Pod{}, |
| } |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| |
| testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}) |
| as.Nil(err) |
| |
| ch := make(chan []string, 1) |
| testManager.endpoints[res1.resourceName] = endpointInfo{ |
| e: &MockEndpoint{ |
| initChan: ch, |
| allocateFunc: allocateStubFunc(), |
| }, |
| opts: &pluginapi.DevicePluginOptions{PreStartRequired: true}, |
| } |
| pod := makePod(v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): res1.resourceQuantity}) |
| activePods := []*v1.Pod{} |
| activePods = append(activePods, pod) |
| podsStub.updateActivePods(activePods) |
| err = testManager.Allocate(pod, &pod.Spec.Containers[0]) |
| as.Nil(err) |
| runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) |
| as.Nil(err) |
| var initializedDevs []string |
| select { |
| case <-time.After(time.Second): |
| t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub") |
| case initializedDevs = <-ch: |
| break |
| } |
| |
| as.Contains(initializedDevs, "dev1") |
| as.Contains(initializedDevs, "dev2") |
| as.Equal(len(initializedDevs), res1.devs.Devices().Len()) |
| |
| expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"}) |
| as.Nil(err) |
| as.Equal(1, len(expectedResps.ContainerResponses)) |
| expectedResp := expectedResps.ContainerResponses[0] |
| as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices)) |
| as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts)) |
| as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs)) |
| |
| pod2 := makePod(v1.ResourceList{ |
| v1.ResourceName(res1.resourceName): *resource.NewQuantity(int64(0), resource.DecimalSI)}) |
| activePods = append(activePods, pod2) |
| podsStub.updateActivePods(activePods) |
| err = testManager.Allocate(pod2, &pod2.Spec.Containers[0]) |
| as.Nil(err) |
| _, err = testManager.GetDeviceRunContainerOptions(pod2, &pod2.Spec.Containers[0]) |
| as.Nil(err) |
| select { |
| case <-time.After(time.Millisecond): |
| t.Log("When pod resourceQuantity is 0, PreStartContainer RPC stub will be skipped") |
| case <-ch: |
| break |
| } |
| } |
| |
| func TestResetExtendedResource(t *testing.T) { |
| as := assert.New(t) |
| tmpDir, err := os.MkdirTemp("", "checkpoint") |
| as.Nil(err) |
| defer os.RemoveAll(tmpDir) |
| ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) |
| as.Nil(err) |
| testManager := &ManagerImpl{ |
| endpoints: make(map[string]endpointInfo), |
| healthyDevices: make(map[string]sets.Set[string]), |
| unhealthyDevices: make(map[string]sets.Set[string]), |
| allocatedDevices: make(map[string]sets.Set[string]), |
| podDevices: newPodDevices(), |
| checkpointManager: ckm, |
| } |
| |
| extendedResourceName := "domain.com/resource" |
| testManager.podDevices.insert("pod", "con", extendedResourceName, |
| constructDevices([]string{"dev1"}), |
| newContainerAllocateResponse( |
| withDevices(map[string]string{"/dev/dev1": "/dev/dev1"}), |
| withMounts(map[string]string{"/home/lib1": "/usr/lib1"}), |
| ), |
| ) |
| |
| testManager.healthyDevices[extendedResourceName] = sets.New[string]() |
| testManager.healthyDevices[extendedResourceName].Insert("dev1") |
| // checkpoint is present, indicating node hasn't been recreated |
| err = testManager.writeCheckpoint() |
| as.Nil(err) |
| |
| as.False(testManager.ShouldResetExtendedResourceCapacity()) |
| |
| // checkpoint is absent, representing node recreation |
| ckpts, err := ckm.ListCheckpoints() |
| as.Nil(err) |
| for _, ckpt := range ckpts { |
| err = ckm.RemoveCheckpoint(ckpt) |
| as.Nil(err) |
| } |
| as.True(testManager.ShouldResetExtendedResourceCapacity()) |
| } |
| |
| func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) { |
| return func(devs []string) (*pluginapi.AllocateResponse, error) { |
| resp := new(pluginapi.ContainerAllocateResponse) |
| resp.Envs = make(map[string]string) |
| for _, dev := range devs { |
| switch dev { |
| case "dev1": |
| resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ |
| ContainerPath: "/dev/aaa", |
| HostPath: "/dev/aaa", |
| Permissions: "mrw", |
| }) |
| |
| resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ |
| ContainerPath: "/dev/bbb", |
| HostPath: "/dev/bbb", |
| Permissions: "mrw", |
| }) |
| |
| resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ |
| ContainerPath: "/container_dir1/file1", |
| HostPath: "host_dir1/file1", |
| ReadOnly: true, |
| }) |
| |
| case "dev2": |
| resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ |
| ContainerPath: "/dev/ccc", |
| HostPath: "/dev/ccc", |
| Permissions: "mrw", |
| }) |
| |
| resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ |
| ContainerPath: "/container_dir1/file2", |
| HostPath: "host_dir1/file2", |
| ReadOnly: true, |
| }) |
| |
| resp.Envs["key1"] = "val1" |
| } |
| } |
| resps := new(pluginapi.AllocateResponse) |
| resps.ContainerResponses = append(resps.ContainerResponses, resp) |
| return resps, nil |
| } |
| } |
| |
| func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]pluginapi.Device { |
| res := make(map[string]pluginapi.Device) |
| var topologyInfo *pluginapi.TopologyInfo |
| for node, devs := range devOnNUMA { |
| if topology { |
| topologyInfo = &pluginapi.TopologyInfo{Nodes: []*pluginapi.NUMANode{{ID: node}}} |
| } else { |
| topologyInfo = nil |
| } |
| for idx := range devs { |
| res[devs[idx]] = pluginapi.Device{ID: devs[idx], Topology: topologyInfo} |
| } |
| } |
| return res |
| } |
| |
| const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint" |
| |
| var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}` |
| |
| func TestReadPreNUMACheckpoint(t *testing.T) { |
| socketDir, socketName, _, err := tmpSocketDir() |
| require.NoError(t, err) |
| defer os.RemoveAll(socketDir) |
| |
| err = os.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644) |
| require.NoError(t, err) |
| |
| topologyStore := topologymanager.NewFakeManager() |
| nodes := []cadvisorapi.Node{{Id: 0}} |
| m, err := newManagerImpl(socketName, nodes, topologyStore) |
| require.NoError(t, err) |
| |
| // TODO: we should not calling private methods, but among the existing tests we do anyway |
| err = m.readCheckpoint() |
| require.NoError(t, err) |
| } |
| |
| func TestGetTopologyHintsWithUpdates(t *testing.T) { |
| socketDir, socketName, _, err := tmpSocketDir() |
| defer os.RemoveAll(socketDir) |
| require.NoError(t, err) |
| |
| devs := []pluginapi.Device{} |
| for i := 0; i < 1000; i++ { |
| devs = append(devs, pluginapi.Device{ |
| ID: fmt.Sprintf("dev-%d", i), |
| Health: pluginapi.Healthy, |
| Topology: &pluginapi.TopologyInfo{ |
| Nodes: []*pluginapi.NUMANode{ |
| {ID: 0}, |
| }, |
| }}) |
| } |
| testPod := makePod(v1.ResourceList{ |
| testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI), |
| }) |
| topology := []cadvisorapi.Node{ |
| {Id: 0}, |
| } |
| testCases := []struct { |
| description string |
| count int |
| devices []pluginapi.Device |
| testfunc func(manager *wrappedManagerImpl) |
| }{ |
| { |
| description: "GetTopologyHints data race when update device", |
| count: 10, |
| devices: devs, |
| testfunc: func(manager *wrappedManagerImpl) { |
| manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0]) |
| }, |
| }, |
| { |
| description: "GetPodTopologyHints data race when update device", |
| count: 10, |
| devices: devs, |
| testfunc: func(manager *wrappedManagerImpl) { |
| manager.GetPodTopologyHints(testPod) |
| }, |
| }, |
| } |
| |
| for _, test := range testCases { |
| t.Run(test.description, func(t *testing.T) { |
| m, _ := setupDeviceManager(t, nil, nil, socketName, topology) |
| defer m.Stop() |
| mimpl := m.(*wrappedManagerImpl) |
| |
| wg := sync.WaitGroup{} |
| wg.Add(2) |
| |
| updated := atomic.Bool{} |
| updated.Store(false) |
| go func() { |
| defer wg.Done() |
| for i := 0; i < test.count; i++ { |
| // simulate the device plugin to send device updates |
| mimpl.genericDeviceUpdateCallback(testResourceName, devs) |
| } |
| updated.Store(true) |
| }() |
| go func() { |
| defer wg.Done() |
| for !updated.Load() { |
| // When a data race occurs, golang will throw an error, and recover() cannot catch this error, |
| // Such as: `throw("Concurrent map iteration and map writing")`. |
| // When this test ends quietly, no data race error occurs. |
| // Otherwise, the test process exits automatically and prints all goroutine call stacks. |
| test.testfunc(mimpl) |
| } |
| }() |
| wg.Wait() |
| }) |
| } |
| } |