| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package integration |
| |
| import ( |
| "context" |
| "fmt" |
| "os" |
| "path/filepath" |
| goruntime "runtime" |
| "strconv" |
| "strings" |
| "sync" |
| "syscall" |
| "testing" |
| "time" |
| |
| "github.com/containerd/nri/pkg/api" |
| "github.com/containerd/nri/pkg/stub" |
| "github.com/opencontainers/selinux/go-selinux" |
| runtime "k8s.io/cri-api/pkg/apis/runtime/v1" |
| |
| cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis" |
| |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| |
| "github.com/containerd/containerd/v2/integration/images" |
| ) |
| |
| const ( |
| nriTestSocket = "/var/run/nri-test.sock" |
| pluginSyncTimeout = 3 * time.Second |
| containerStopTimeout = 10 |
| onlineCpus = "/sys/devices/system/cpu/online" |
| normalMems = "/sys/devices/system/node/has_normal_memory" |
| ) |
| |
| var ( |
| availableCpuset []string |
| availableMemset []string |
| ) |
| |
| // skipNriTestIfNecessary skips NRI tests if necessary. |
| func skipNriTestIfNecessary(t *testing.T, extraSkipChecks ...map[string]bool) { |
| if goruntime.GOOS != "linux" { |
| t.Skip("Not running on linux") |
| } |
| |
| if selinux.GetEnabled() { |
| // https://github.com/containerd/containerd/pull/7892#issuecomment-1369825603 |
| t.Skip("SELinux relabeling is not supported for NRI yet") |
| } |
| _, err := os.Stat(nriTestSocket) |
| if err != nil { |
| t.Skip("Containerd test instance does not have NRI enabled") |
| } |
| |
| for _, checks := range extraSkipChecks { |
| for check, skip := range checks { |
| if skip { |
| t.Skip(check) |
| } |
| } |
| } |
| } |
| |
| // Test successful Nri plugin setup. |
| func TestNriPluginSetup(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test that NRI plugins can connect and get set up.") |
| |
| var ( |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{ |
| {}, |
| {}, |
| {}, |
| }, |
| } |
| ) |
| |
| tc.setup() |
| } |
| |
| // Test NRI runtime/plugin state synchronization. |
| func TestNriPluginSynchronization(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test that NRI plugins get properly synchronized with the runtime state.") |
| |
| var ( |
| tc = &nriTest{ |
| t: t, |
| } |
| podCount = 3 |
| ctrPerPod = 2 |
| ) |
| |
| tc.setup() |
| |
| for i := 0; i < podCount; i++ { |
| podID := tc.runPod(fmt.Sprintf("pod%d", i)) |
| for j := 0; j < ctrPerPod; j++ { |
| tc.startContainer(podID, fmt.Sprintf("ctr%d", j)) |
| } |
| } |
| |
| for _, plugin := range []*mockPlugin{{}, {}, {}} { |
| tc.connectNewPlugin(plugin) |
| } |
| |
| for _, plugin := range tc.plugins { |
| err := plugin.Wait(PluginSynchronized, time.After(pluginSyncTimeout)) |
| require.NoError(t, err, "plugin sync wait") |
| } |
| |
| for _, id := range tc.pods { |
| for _, plugin := range tc.plugins { |
| _, ok := plugin.pods[id] |
| require.True(tc.t, ok, "runtime sync of pod "+id) |
| } |
| } |
| |
| for _, id := range tc.ctrs { |
| for _, plugin := range tc.plugins { |
| _, ok := plugin.ctrs[id] |
| require.True(t, ok, "runtime sync of container "+id) |
| } |
| } |
| } |
| |
| // Test mount injection into containers by NRI plugins. |
| func TestNriMountInjection(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test that NRI plugins can inject mounts into containers.") |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| injectMount = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| adjust := &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| return adjust, nil, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = injectMount |
| tc.setup() |
| |
| msg := fmt.Sprintf("hello process %d", os.Getpid()) |
| cmd := "echo " + msg + " > /out/result; sleep 3600" |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", cmd), |
| ) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| require.Equal(t, msg+"\n", string(chk), "check result") |
| } |
| |
| // Test environment variable injection by NRI plugins. |
| func TestNriEnvironmentInjection(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test that NRI plugins can inject environment variables into containers.") |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| injectEnv = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| adjust := &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| adjust.AddEnv("TEST_ENV_NAME", "TEST_ENV_VALUE") |
| return adjust, nil, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = injectEnv |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", "echo $TEST_ENV_NAME > /out/result; sleep 3600"), |
| ) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| require.Equal(t, "TEST_ENV_VALUE\n", string(chk), "check result") |
| } |
| |
| // Test annotation injection by NRI plugins. |
| func TestNriAnnotationInjection(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test that NRI plugins can inject annotations into containers.") |
| |
| var ( |
| key = "TEST_ANNOTATION_KEY" |
| value = "TEST_ANNOTATION_VALUE" |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| injectAnnotation = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| adjust := &api.ContainerAdjustment{} |
| adjust.AddAnnotation(key, value) |
| return adjust, nil, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = injectAnnotation |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| id := tc.startContainer(podID, "ctr0") |
| |
| timeout := time.After(pluginSyncTimeout) |
| err := tc.plugins[0].Wait(ContainerEvent(tc.plugins[0].ctrs[id], PostCreateContainer), timeout) |
| require.NoError(t, err, "wait for container post-create event") |
| require.Equal(t, value, tc.plugins[0].ctrs[id].Annotations[key], "updated annotations") |
| } |
| |
| // Test linux device injection by NRI plugins. |
| func TestNriLinuxDeviceInjection(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test that NRI plugins can inject linux devices into containers.") |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| injectDev = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| adjust := &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| adjust.AddDevice(&api.LinuxDevice{ |
| Path: "/dev/pie", |
| Type: "c", |
| Major: 31, |
| Minor: 41, |
| Uid: api.UInt32(uint32(11)), |
| Gid: api.UInt32(uint32(22)), |
| FileMode: api.FileMode(uint32(0664)), |
| }) |
| return adjust, nil, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = injectDev |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", "stat -c %F-%a-%u:%g-%t:%T /dev/pie > /out/result; sleep 3600"), |
| ) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| require.Equal(t, "character special file-664-11:22-1f:29\n", string(chk), "check result") |
| } |
| |
| // Test linux cpuset adjustment by NRI plugins. |
| func TestNriLinuxCpusetAdjustment(t *testing.T) { |
| skipNriTestIfNecessary(t, |
| map[string]bool{ |
| "not enough online CPUs for test": len(getAvailableCpuset(t)) < 2, |
| }, |
| ) |
| |
| t.Log("Test that NRI plugins can adjust linux cpusets of containers.") |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| adjustCpuset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| adjust := &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| adjust.SetLinuxCPUSetCPUs(availableCpuset[1]) |
| return adjust, nil, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = adjustCpuset |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", "grep Cpus_allowed_list: /proc/self/status > /out/result; "+ |
| "sleep 3600"), |
| ) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| |
| expected := "Cpus_allowed_list:\t" + availableCpuset[1] + "\n" |
| require.Equal(t, expected, string(chk), "check result") |
| } |
| |
| // Test linux memset adjustment by NRI plugins. |
| func TestNriLinuxMemsetAdjustment(t *testing.T) { |
| skipNriTestIfNecessary(t, |
| map[string]bool{ |
| "not enough online memory nodes for test": len(getAvailableMemset(t)) < 2, |
| }, |
| ) |
| |
| t.Log("Test that NRI plugins can adjust linux memsets of containers.") |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| adjustMemset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| adjust := &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| adjust.SetLinuxCPUSetMems(availableMemset[1]) |
| return adjust, nil, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = adjustMemset |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", "grep Mems_allowed_list: /proc/self/status > /out/result; "+ |
| "sleep 3600"), |
| ) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| |
| expected := "Mems_allowed_list:\t" + availableMemset[1] + "\n" |
| require.Equal(t, expected, string(chk), "check result") |
| } |
| |
| // Test creation-time linux cpuset update of existing containers by NRI plugins. |
| func TestNriLinuxCpusetAdjustmentUpdate(t *testing.T) { |
| skipNriTestIfNecessary(t, |
| map[string]bool{ |
| "not enough online CPUs for test": len(getAvailableCpuset(t)) < 2, |
| }, |
| ) |
| |
| t.Log("Test that NRI plugins can update linux cpusets of existing containers.") |
| t.Logf("availableCpuset values is %v", availableCpuset) |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| ctr0 string |
| updateCpuset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| var ( |
| adjust *api.ContainerAdjustment |
| update []*api.ContainerUpdate |
| ) |
| if ctr.Name == "ctr0" { |
| ctr0 = ctr.Id |
| adjust = &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| t.Logf("ctr0 availableCpuset values is %v", availableCpuset) |
| adjust.SetLinuxCPUSetCPUs(availableCpuset[0]) |
| } else { |
| update = []*api.ContainerUpdate{{}} |
| update[0].SetContainerId(ctr0) |
| t.Logf("ctr1 availableCpuset values is %v", availableCpuset) |
| update[0].SetLinuxCPUSetCPUs(availableCpuset[1]) |
| } |
| return adjust, update, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = updateCpuset |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", |
| `prev="" |
| while true; do |
| cpus="$(grep Cpus_allowed_list: /proc/self/status)" |
| [ "$cpus" != "$prev" ] && echo "$cpus" > /out/result |
| cpus="$prev" |
| sleep 0.2 |
| done`), |
| ) |
| tc.startContainer(podID, "ctr1", |
| WithCommand("/bin/sh", "-c", "sleep 3600"), |
| ) |
| |
| time.Sleep(1 * time.Second) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| |
| expected := "Cpus_allowed_list:\t" + availableCpuset[1] + "\n" |
| require.Equal(t, expected, string(chk), "check result") |
| } |
| |
| // Test creation-time linux memset update of existing containers by NRI plugins. |
| func TestNriLinuxMemsetAdjustmentUpdate(t *testing.T) { |
| skipNriTestIfNecessary(t, |
| map[string]bool{ |
| "not enough online memory nodes for test": len(getAvailableMemset(t)) < 2, |
| }, |
| ) |
| |
| t.Log("Test that NRI plugins can update linux memsets of existing containers.") |
| |
| var ( |
| out = t.TempDir() |
| tc = &nriTest{ |
| t: t, |
| plugins: []*mockPlugin{{}}, |
| } |
| ctr0 string |
| updateMemset = func(p *mockPlugin, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| var ( |
| adjust *api.ContainerAdjustment |
| update []*api.ContainerUpdate |
| ) |
| if ctr.Name == "ctr0" { |
| ctr0 = ctr.Id |
| adjust = &api.ContainerAdjustment{} |
| adjust.AddMount(&api.Mount{ |
| Destination: "/out", |
| Source: out, |
| Type: "bind", |
| Options: []string{"bind"}, |
| }) |
| adjust.SetLinuxCPUSetMems(availableMemset[0]) |
| } else { |
| update = []*api.ContainerUpdate{{}} |
| update[0].SetContainerId(ctr0) |
| update[0].SetLinuxCPUSetMems(availableMemset[1]) |
| } |
| return adjust, update, nil |
| } |
| ) |
| |
| tc.plugins[0].createContainer = updateMemset |
| tc.setup() |
| |
| podID := tc.runPod("pod0") |
| tc.startContainer(podID, "ctr0", |
| WithCommand("/bin/sh", "-c", |
| `prev="" |
| while true; do |
| mems="$(grep Mems_allowed_list: /proc/self/status)" |
| [ "$mems" != "$prev" ] && echo "$mems" > /out/result |
| mems="$prev" |
| sleep 0.2 |
| done`), |
| ) |
| tc.startContainer(podID, "ctr1", |
| WithCommand("/bin/sh", "-c", "sleep 3600"), |
| ) |
| |
| time.Sleep(1 * time.Second) |
| |
| chk, err := waitForFileAndRead(filepath.Join(out, "result"), time.Second) |
| require.NoError(t, err, "read result") |
| |
| expected := "Mems_allowed_list:\t" + availableMemset[1] + "\n" |
| require.Equal(t, expected, string(chk), "check result") |
| } |
| |
| // Test NRI vs. containerd restart. |
| func TestNriPluginContainerdRestart(t *testing.T) { |
| skipNriTestIfNecessary(t) |
| |
| t.Log("Test NRI plugins vs. containerd restart.") |
| |
| var ( |
| tc = &nriTest{ |
| t: t, |
| } |
| podCount = 3 |
| ctrPerPod = 2 |
| ) |
| |
| tc.setup() |
| |
| for i := 0; i < podCount; i++ { |
| podID := tc.runPod(fmt.Sprintf("pod%d", i)) |
| for j := 0; j < ctrPerPod; j++ { |
| tc.startContainer(podID, fmt.Sprintf("ctr%d", j)) |
| } |
| } |
| |
| for _, plugin := range []*mockPlugin{{}, {}, {}} { |
| tc.connectNewPlugin(plugin) |
| } |
| |
| for _, plugin := range tc.plugins { |
| err := plugin.Wait(PluginSynchronized, time.After(pluginSyncTimeout)) |
| require.NoError(t, err, "plugin sync wait") |
| } |
| |
| for _, id := range tc.pods { |
| for _, plugin := range tc.plugins { |
| _, ok := plugin.pods[id] |
| require.True(tc.t, ok, "runtime sync of pod "+id) |
| } |
| } |
| |
| for _, id := range tc.ctrs { |
| for _, plugin := range tc.plugins { |
| _, ok := plugin.ctrs[id] |
| require.True(t, ok, "runtime sync of container "+id) |
| } |
| } |
| |
| t.Logf("Restart containerd") |
| RestartContainerd(t, syscall.SIGTERM) |
| |
| for _, plugin := range tc.plugins { |
| require.True(t, plugin.closed, "plugin connection closed") |
| } |
| } |
| |
| // An NRI test along with its state. |
| type nriTest struct { |
| t *testing.T |
| name string |
| prefix string |
| runtime cri.RuntimeService |
| plugins []*mockPlugin |
| namespace string |
| sbCfg map[string]*runtime.PodSandboxConfig |
| pods []string |
| ctrs []string |
| } |
| |
| // setup prepares the test environment, connects all NRI plugins. |
| func (tc *nriTest) setup() { |
| if tc.name == "" { |
| tc.name = tc.t.Name() |
| } |
| if tc.prefix == "" { |
| tc.prefix = strings.ToLower(tc.name) |
| } |
| if tc.namespace == "" { |
| tc.namespace = tc.prefix + "-" + strconv.Itoa(os.Getpid()) |
| } |
| |
| tc.sbCfg = make(map[string]*runtime.PodSandboxConfig) |
| tc.runtime = runtimeService |
| |
| for idx, p := range tc.plugins { |
| p.namespace = tc.namespace |
| |
| if p.logf == nil { |
| p.logf = tc.t.Logf |
| } |
| if p.idx == "" { |
| p.idx = fmt.Sprintf("%02d", idx) |
| } |
| |
| err := p.Start() |
| require.NoError(tc.t, err, "start plugin") |
| |
| err = p.Wait(PluginSynchronized, time.After(pluginSyncTimeout)) |
| require.NoError(tc.t, err, "wait for plugin setup") |
| } |
| } |
| |
| // Connect a new NRI plugin to the runtime. |
| func (tc *nriTest) connectNewPlugin(p *mockPlugin) { |
| p.namespace = tc.namespace |
| |
| if p.logf == nil { |
| p.logf = tc.t.Logf |
| } |
| if p.idx == "" { |
| p.idx = fmt.Sprintf("%02d", len(tc.plugins)) |
| } |
| |
| tc.plugins = append(tc.plugins, p) |
| |
| err := p.Start() |
| require.NoError(tc.t, err, "start plugin") |
| |
| err = p.Wait(PluginSynchronized, time.After(pluginSyncTimeout)) |
| require.NoError(tc.t, err, "wait for plugin setup") |
| } |
| |
| // Create a pod in a namespace specific to the test case. |
| func (tc *nriTest) runPod(name string) string { |
| id, cfg := PodSandboxConfigWithCleanup(tc.t, name, tc.namespace) |
| tc.sbCfg[id] = cfg |
| tc.pods = append(tc.pods, id) |
| return id |
| } |
| |
| // Start a container in a (testcase-specific) pod. |
| func (tc *nriTest) startContainer(podID, name string, opts ...ContainerOpts) string { |
| podCfg := tc.sbCfg[podID] |
| require.NotNil(tc.t, podCfg, "pod config for "+podID) |
| |
| ctrCfg := ContainerConfig(name, "", opts...) |
| if ctrCfg.Image.Image == "" { |
| image := images.Get(images.BusyBox) |
| EnsureImageExists(tc.t, image) |
| ctrCfg.Image.Image = image |
| } |
| |
| if len(ctrCfg.Command) == 0 { |
| WithCommand("/bin/sh", "-c", "sleep 3600")(ctrCfg) |
| } |
| |
| ctrID, err := tc.runtime.CreateContainer(podID, ctrCfg, podCfg) |
| assert.NoError(tc.t, err, "create container") |
| assert.NoError(tc.t, tc.runtime.StartContainer(ctrID), "start container") |
| |
| tc.ctrs = append(tc.ctrs, ctrID) |
| |
| tc.t.Cleanup(func() { |
| assert.NoError(tc.t, tc.runtime.StopContainer(ctrID, containerStopTimeout)) |
| assert.NoError(tc.t, tc.runtime.RemoveContainer(ctrID)) |
| }) |
| |
| tc.t.Logf("created/started container %s/%s/%s with ID %s", |
| podCfg.Metadata.Namespace, podCfg.Metadata.Name, name, ctrID) |
| |
| return ctrID |
| } |
| |
| // |
| // NRI plugin implementation for integration tests |
| // |
| |
| type mockPlugin struct { |
| name string |
| idx string |
| stub stub.Stub |
| mask stub.EventMask |
| |
| q *eventQ |
| pods map[string]*api.PodSandbox |
| ctrs map[string]*api.Container |
| |
| closed bool |
| namespace string |
| logf func(string, ...interface{}) |
| synchronize func(*mockPlugin, []*api.PodSandbox, []*api.Container) ([]*api.ContainerUpdate, error) |
| runPodSandbox func(*mockPlugin, *api.PodSandbox) error |
| stopPodSandbox func(*mockPlugin, *api.PodSandbox) error |
| removePodSandbox func(*mockPlugin, *api.PodSandbox) error |
| createContainer func(*mockPlugin, *api.PodSandbox, *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) |
| postCreateContainer func(*mockPlugin, *api.PodSandbox, *api.Container) |
| updateContainer func(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error) |
| postUpdateContainer func(*mockPlugin, *api.PodSandbox, *api.Container) |
| stopContainer func(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error) |
| } |
| |
| func (m *mockPlugin) Start() error { |
| var err error |
| |
| if m.name == "" { |
| m.name = "mock-plugin" |
| } |
| if m.idx == "" { |
| m.idx = "00" |
| } |
| |
| if m.mask == 0 { |
| m.mask.Set( |
| api.Event_RUN_POD_SANDBOX, |
| api.Event_STOP_POD_SANDBOX, |
| api.Event_REMOVE_POD_SANDBOX, |
| api.Event_CREATE_CONTAINER, |
| api.Event_POST_CREATE_CONTAINER, |
| api.Event_START_CONTAINER, |
| api.Event_POST_START_CONTAINER, |
| api.Event_UPDATE_CONTAINER, |
| api.Event_POST_UPDATE_CONTAINER, |
| api.Event_STOP_CONTAINER, |
| api.Event_REMOVE_CONTAINER, |
| ) |
| } |
| |
| m.stub, err = stub.New(m, |
| stub.WithPluginName(m.name), |
| stub.WithPluginIdx(m.idx), |
| stub.WithSocketPath(nriTestSocket), |
| stub.WithOnClose(m.onClose), |
| ) |
| if err != nil { |
| return err |
| } |
| |
| if m.logf == nil { |
| m.logf = func(format string, args ...interface{}) { |
| fmt.Printf(format+"\n", args...) |
| } |
| } |
| |
| if m.synchronize == nil { |
| m.synchronize = nopSynchronize |
| } |
| if m.runPodSandbox == nil { |
| m.runPodSandbox = nopRunPodSandbox |
| } |
| if m.stopPodSandbox == nil { |
| m.stopPodSandbox = nopStopPodSandbox |
| } |
| if m.removePodSandbox == nil { |
| m.removePodSandbox = nopRemovePodSandbox |
| } |
| if m.createContainer == nil { |
| m.createContainer = nopCreateContainer |
| } |
| if m.postCreateContainer == nil { |
| m.postCreateContainer = nopEvent |
| } |
| if m.updateContainer == nil { |
| m.updateContainer = nopUpdateContainer |
| } |
| if m.postUpdateContainer == nil { |
| m.postUpdateContainer = nopEvent |
| } |
| if m.stopContainer == nil { |
| m.stopContainer = nopStopContainer |
| } |
| |
| m.q = &eventQ{} |
| m.pods = make(map[string]*api.PodSandbox) |
| m.ctrs = make(map[string]*api.Container) |
| |
| err = m.stub.Start(context.Background()) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (m *mockPlugin) Stop() { |
| if m.stub != nil { |
| m.stub.Stop() |
| m.stub.Wait() |
| } |
| } |
| |
| func (m *mockPlugin) onClose() { |
| m.closed = true |
| if m.stub != nil { |
| m.stub.Stop() |
| m.stub.Wait() |
| } |
| } |
| |
| func (m *mockPlugin) inNamespace(namespace string) bool { |
| return strings.HasPrefix(namespace, m.namespace) |
| } |
| |
| func (m *mockPlugin) Log(format string, args ...interface{}) { |
| m.logf(fmt.Sprintf("[plugin:%s-%s] ", m.idx, m.name)+format, args...) |
| } |
| |
| func (m *mockPlugin) Configure(ctx context.Context, cfg string) (stub.EventMask, error) { |
| return m.mask, nil |
| } |
| |
| func (m *mockPlugin) Synchronize(ctx context.Context, pods []*api.PodSandbox, ctrs []*api.Container) ([]*api.ContainerUpdate, error) { |
| m.Log("Synchronize") |
| for _, pod := range pods { |
| m.Log(" - pod %s", pod.Id) |
| m.pods[pod.Id] = pod |
| } |
| for _, ctr := range ctrs { |
| m.Log(" - ctr %s", ctr.Id) |
| m.ctrs[ctr.Id] = ctr |
| } |
| |
| m.q.Add(PluginSynchronized) |
| |
| return m.synchronize(m, pods, ctrs) |
| } |
| |
| func (m *mockPlugin) Shutdown(ctx context.Context) { |
| m.Log("Shutdown") |
| } |
| |
| func (m *mockPlugin) RunPodSandbox(ctx context.Context, pod *api.PodSandbox) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("RunPodSandbox %s/%s", pod.Namespace, pod.Name) |
| m.pods[pod.Id] = pod |
| m.q.Add(PodSandboxEvent(pod, RunPodSandbox)) |
| return m.runPodSandbox(m, pod) |
| } |
| |
| func (m *mockPlugin) StopPodSandbox(ctx context.Context, pod *api.PodSandbox) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("StopPodSandbox %s/%s", pod.Namespace, pod.Name) |
| m.pods[pod.Id] = pod |
| m.q.Add(PodSandboxEvent(pod, StopPodSandbox)) |
| return m.stopPodSandbox(m, pod) |
| } |
| |
| func (m *mockPlugin) RemovePodSandbox(ctx context.Context, pod *api.PodSandbox) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("RemovePodSandbox %s/%s", pod.Namespace, pod.Name) |
| delete(m.pods, pod.Id) |
| m.q.Add(PodSandboxEvent(pod, RemovePodSandbox)) |
| return m.removePodSandbox(m, pod) |
| } |
| |
| func (m *mockPlugin) CreateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| if !m.inNamespace(pod.Namespace) { |
| return nil, nil, nil |
| } |
| |
| m.Log("CreateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, CreateContainer)) |
| |
| return m.createContainer(m, pod, ctr) |
| } |
| |
| func (m *mockPlugin) PostCreateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("PostCreateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, PostCreateContainer)) |
| m.postCreateContainer(m, pod, ctr) |
| return nil |
| } |
| |
| func (m *mockPlugin) StartContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("StartContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, StartContainer)) |
| return nil |
| } |
| |
| func (m *mockPlugin) PostStartContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("PostStartContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, PostStartContainer)) |
| return nil |
| } |
| |
| func (m *mockPlugin) UpdateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) ([]*api.ContainerUpdate, error) { |
| if !m.inNamespace(pod.Namespace) { |
| return nil, nil |
| } |
| |
| m.Log("UpdateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, UpdateContainer)) |
| return m.updateContainer(m, pod, ctr) |
| } |
| |
| func (m *mockPlugin) PostUpdateContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("PostUpdateContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, PostUpdateContainer)) |
| m.postUpdateContainer(m, pod, ctr) |
| return nil |
| } |
| |
| func (m *mockPlugin) StopContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) ([]*api.ContainerUpdate, error) { |
| if !m.inNamespace(pod.Namespace) { |
| return nil, nil |
| } |
| |
| m.Log("StopContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| m.pods[pod.Id] = pod |
| m.ctrs[ctr.Id] = ctr |
| m.q.Add(ContainerEvent(ctr, StopContainer)) |
| return m.stopContainer(m, pod, ctr) |
| } |
| |
| func (m *mockPlugin) RemoveContainer(ctx context.Context, pod *api.PodSandbox, ctr *api.Container) error { |
| if !m.inNamespace(pod.Namespace) { |
| return nil |
| } |
| |
| m.Log("RemoveContainer %s/%s/%s", pod.Namespace, pod.Name, ctr.Name) |
| delete(m.pods, pod.Id) |
| delete(m.ctrs, ctr.Id) |
| m.q.Add(ContainerEvent(ctr, RemoveContainer)) |
| return nil |
| } |
| |
| func (m *mockPlugin) Wait(e *Event, deadline <-chan time.Time) error { |
| _, err := m.q.Wait(e, deadline) |
| return err |
| } |
| |
| func nopSynchronize(*mockPlugin, []*api.PodSandbox, []*api.Container) ([]*api.ContainerUpdate, error) { |
| return nil, nil |
| } |
| |
| func nopRunPodSandbox(*mockPlugin, *api.PodSandbox) error { |
| return nil |
| } |
| |
| func nopStopPodSandbox(*mockPlugin, *api.PodSandbox) error { |
| return nil |
| } |
| |
| func nopRemovePodSandbox(*mockPlugin, *api.PodSandbox) error { |
| return nil |
| } |
| |
| func nopCreateContainer(*mockPlugin, *api.PodSandbox, *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { |
| return nil, nil, nil |
| } |
| |
| func nopUpdateContainer(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error) { |
| return nil, nil |
| } |
| |
| func nopStopContainer(*mockPlugin, *api.PodSandbox, *api.Container) ([]*api.ContainerUpdate, error) { |
| return nil, nil |
| } |
| |
| func nopEvent(*mockPlugin, *api.PodSandbox, *api.Container) { |
| } |
| |
| // |
| // plugin event and event recording |
| // |
| |
| type EventType string |
| |
| const ( |
| Configured = "configured" |
| Synchronized = "synchronized" |
| StartupError = "startup-error" |
| Shutdown = "shutdown" |
| Disconnected = "closed" |
| Stopped = "stopped" |
| |
| RunPodSandbox = "RunPodSandbox" |
| StopPodSandbox = "StopPodSandbox" |
| RemovePodSandbox = "RemovePodSandbox" |
| CreateContainer = "CreateContainer" |
| StartContainer = "StartContainer" |
| UpdateContainer = "UpdateContainer" |
| StopContainer = "StopContainer" |
| RemoveContainer = "RemoveContainer" |
| PostCreateContainer = "PostCreateContainer" |
| PostStartContainer = "PostStartContainer" |
| PostUpdateContainer = "PostUpdateContainer" |
| |
| Error = "Error" |
| Timeout = "" |
| ) |
| |
| type Event struct { |
| Type EventType |
| Pod string |
| Ctr string |
| } |
| |
| var ( |
| PluginConfigured = &Event{Type: Configured} |
| PluginSynchronized = &Event{Type: Synchronized} |
| PluginStartupError = &Event{Type: StartupError} |
| PluginShutdown = &Event{Type: Shutdown} |
| PluginDisconnected = &Event{Type: Disconnected} |
| PluginStopped = &Event{Type: Stopped} |
| ) |
| |
| func PodSandboxEvent(pod *api.PodSandbox, t EventType) *Event { |
| return &Event{Type: t, Pod: pod.Id} |
| } |
| |
| func ContainerEvent(ctr *api.Container, t EventType) *Event { |
| return &Event{Type: t, Ctr: ctr.Id} |
| } |
| |
| func PodRefEvent(pod string, t EventType) *Event { |
| return &Event{Type: t, Pod: pod} |
| } |
| |
| func ContainerRefEvent(pod, ctr string, t EventType) *Event { |
| return &Event{Type: t, Pod: pod, Ctr: ctr} |
| } |
| |
| func (e *Event) Matches(o *Event) bool { |
| if e.Type != o.Type { |
| return false |
| } |
| if e.Pod != "" && o.Pod != "" { |
| if e.Pod != o.Pod { |
| return false |
| } |
| } |
| if e.Ctr != "" && o.Ctr != "" { |
| if e.Ctr != o.Ctr { |
| return false |
| } |
| } |
| return true |
| } |
| |
| func (e *Event) String() string { |
| str, sep := "", "" |
| if e.Pod != "" { |
| str = e.Pod |
| sep = ":" |
| } |
| if e.Ctr != "" { |
| str += sep + e.Ctr |
| sep = "/" |
| } |
| return str + sep + string(e.Type) |
| } |
| |
| type eventQ struct { |
| sync.Mutex |
| q []*Event |
| c chan *Event |
| } |
| |
| func (q *eventQ) Add(e *Event) { |
| if q == nil { |
| return |
| } |
| q.Lock() |
| defer q.Unlock() |
| q.q = append(q.q, e) |
| if q.c != nil { |
| q.c <- e |
| } |
| } |
| |
| func (q *eventQ) Reset(e *Event) { |
| q.Lock() |
| defer q.Unlock() |
| q.q = []*Event{} |
| } |
| |
| func (q *eventQ) Events() []*Event { |
| q.Lock() |
| defer q.Unlock() |
| var events []*Event |
| events = append(events, q.q...) |
| return events |
| } |
| |
| func (q *eventQ) Has(e *Event) bool { |
| q.Lock() |
| defer q.Unlock() |
| return q.search(e) != nil |
| } |
| |
| func (q *eventQ) search(e *Event) *Event { |
| for _, qe := range q.q { |
| if qe.Matches(e) { |
| return qe |
| } |
| } |
| return nil |
| } |
| |
| func (q *eventQ) Wait(w *Event, deadline <-chan time.Time) (*Event, error) { |
| var unlocked bool |
| q.Lock() |
| defer func() { |
| if !unlocked { |
| q.Unlock() |
| } |
| }() |
| |
| if e := q.search(w); e != nil { |
| return e, nil |
| } |
| |
| if q.c != nil { |
| return nil, fmt.Errorf("event queue already busy Wait()ing") |
| } |
| q.c = make(chan *Event, 16) |
| defer func() { |
| c := q.c |
| q.c = nil |
| close(c) |
| }() |
| |
| q.Unlock() |
| unlocked = true |
| for { |
| select { |
| case e := <-q.c: |
| if e.Matches(w) { |
| return e, nil |
| } |
| case <-deadline: |
| return nil, fmt.Errorf("event queue timed out Wait()ing for %s", w) |
| } |
| } |
| } |
| |
| // |
| // helper functions |
| // |
| |
| // Wait for a file to show up in the filesystem then read its content. |
| func waitForFileAndRead(path string, timeout time.Duration) ([]byte, error) { |
| var ( |
| deadline = time.After(timeout) |
| slack = 100 * time.Millisecond |
| ) |
| |
| WAIT: |
| for { |
| if _, err := os.Stat(path); err == nil { |
| break WAIT |
| } |
| select { |
| case <-deadline: |
| return nil, fmt.Errorf("waiting for %s timed out", path) |
| default: |
| time.Sleep(slack) |
| } |
| } |
| |
| time.Sleep(slack) |
| return os.ReadFile(path) |
| } |
| |
| // getAvailableCpuset returns the set of online CPUs. |
| func getAvailableCpuset(t *testing.T) []string { |
| if availableCpuset == nil { |
| availableCpuset = getXxxset(t, "cpuset", onlineCpus) |
| } |
| return availableCpuset |
| } |
| |
| // getAvailableMemset returns the set of usable NUMA nodes. |
| func getAvailableMemset(t *testing.T) []string { |
| if availableMemset == nil { |
| availableMemset = getXxxset(t, "memset", normalMems) |
| } |
| return availableMemset |
| } |
| |
| // getXxxset reads/parses a CPU/memory set into a slice. |
| func getXxxset(t *testing.T, kind, path string) []string { |
| var ( |
| data []byte |
| set []string |
| one uint64 |
| err error |
| ) |
| |
| data, err = os.ReadFile(path) |
| if err != nil { |
| t.Logf("failed to read %s: %v", path, err) |
| return nil |
| } |
| |
| for _, rng := range strings.Split(strings.TrimSpace(string(data)), ",") { |
| var ( |
| lo int |
| hi = -1 |
| ) |
| loHi := strings.Split(rng, "-") |
| switch len(loHi) { |
| case 2: |
| one, err = strconv.ParseUint(loHi[1], 10, 32) |
| if err != nil { |
| t.Errorf("failed to parse %s range %q: %v", kind, rng, err) |
| return nil |
| } |
| hi = int(one) + 1 |
| fallthrough |
| case 1: |
| one, err = strconv.ParseUint(loHi[0], 10, 32) |
| if err != nil { |
| t.Errorf("failed to parse %s range %q: %v", kind, rng, err) |
| return nil |
| } |
| lo = int(one) |
| default: |
| t.Errorf("invalid %s range %q", kind, rng) |
| return nil |
| } |
| |
| if hi == -1 { |
| hi = lo + 1 |
| } |
| |
| for i := lo; i < hi; i++ { |
| set = append(set, strconv.Itoa(i)) |
| } |
| } |
| |
| return set |
| } |