| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package main |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "os" |
| "os/signal" |
| "runtime" |
| "sync" |
| "syscall" |
| "time" |
| |
| containerd "github.com/containerd/containerd/v2/client" |
| "github.com/containerd/containerd/v2/defaults" |
| "github.com/containerd/containerd/v2/integration/remote" |
| "github.com/containerd/containerd/v2/pkg/namespaces" |
| "github.com/containerd/containerd/v2/plugins" |
| "github.com/containerd/log" |
| metrics "github.com/docker/go-metrics" |
| "github.com/urfave/cli/v2" |
| ) |
| |
| var ( |
| ct metrics.LabeledTimer |
| execTimer metrics.LabeledTimer |
| errCounter metrics.LabeledCounter |
| binarySizeGauge metrics.LabeledGauge |
| ) |
| |
| const ( |
| stressNs string = "stress" |
| ) |
| |
| func init() { |
| ns := metrics.NewNamespace(stressNs, "", nil) |
| // if you want more fine grained metrics then you can drill down with the metrics in prom that |
| // containerd is outputting |
| ct = ns.NewLabeledTimer("run", "Run time of a full container during the test", "commit") |
| execTimer = ns.NewLabeledTimer("exec", "Run time of an exec process during the test", "commit") |
| binarySizeGauge = ns.NewLabeledGauge("binary_size", "Binary size of compiled binaries", metrics.Bytes, "name") |
| errCounter = ns.NewLabeledCounter("errors", "Errors encountered running the stress tests", "err") |
| metrics.Register(ns) |
| |
| // set higher ulimits |
| if err := setRlimit(); err != nil { |
| panic(err) |
| } |
| |
| cli.HelpFlag = &cli.BoolFlag{ |
| Name: "help", |
| Aliases: []string{"h"}, |
| Usage: "Show help", |
| } |
| } |
| |
| type worker interface { |
| run(ctx, tcxt context.Context) |
| getCount() int |
| incCount() |
| getFailures() int |
| incFailures() |
| } |
| |
| type run struct { |
| total int |
| failures int |
| |
| started time.Time |
| ended time.Time |
| } |
| |
| func (r *run) start() { |
| r.started = time.Now() |
| } |
| |
| func (r *run) end() { |
| r.ended = time.Now() |
| } |
| |
| func (r *run) seconds() float64 { |
| return r.ended.Sub(r.started).Seconds() |
| } |
| |
| func (r *run) gather(workers []worker) *result { |
| for _, w := range workers { |
| r.total += w.getCount() |
| r.failures += w.getFailures() |
| } |
| sec := r.seconds() |
| return &result{ |
| Total: r.total, |
| Seconds: sec, |
| ContainersPerSecond: float64(r.total) / sec, |
| SecondsPerContainer: sec / float64(r.total), |
| } |
| } |
| |
| type result struct { |
| Total int `json:"total"` |
| Failures int `json:"failures"` |
| Seconds float64 `json:"seconds"` |
| ContainersPerSecond float64 `json:"containersPerSecond"` |
| SecondsPerContainer float64 `json:"secondsPerContainer"` |
| ExecTotal int `json:"execTotal"` |
| ExecFailures int `json:"execFailures"` |
| } |
| |
| func main() { |
| // more power! |
| runtime.GOMAXPROCS(runtime.NumCPU()) |
| |
| app := cli.NewApp() |
| app.Name = "containerd-stress" |
| app.Description = "stress test a containerd daemon" |
| app.Flags = []cli.Flag{ |
| &cli.BoolFlag{ |
| Name: "debug", |
| Usage: "Set debug output in the logs", |
| }, |
| &cli.StringFlag{ |
| Name: "address", |
| Aliases: []string{"a"}, |
| Value: defaults.DefaultAddress, |
| Usage: "Path to the containerd socket", |
| }, |
| &cli.IntFlag{ |
| Name: "concurrent", |
| Aliases: []string{"c"}, |
| Value: 1, |
| Usage: "Set the concurrency of the stress test", |
| }, |
| &cli.BoolFlag{ |
| Name: "cri", |
| Usage: "Utilize CRI to create pods for the stress test. This requires a runtime that matches CRI runtime handler. Example: --runtime runc", |
| }, |
| &cli.DurationFlag{ |
| Name: "duration", |
| Aliases: []string{"d"}, |
| Value: 1 * time.Minute, |
| Usage: "Set the duration of the stress test", |
| }, |
| &cli.BoolFlag{ |
| Name: "exec", |
| Usage: "Add execs to the stress tests (non-CRI only)", |
| }, |
| &cli.StringFlag{ |
| Name: "image", |
| Aliases: []string{"i"}, |
| Value: "docker.io/library/alpine:latest", |
| Usage: "Image to be utilized for testing", |
| }, |
| &cli.BoolFlag{ |
| Name: "json", |
| Aliases: []string{"j"}, |
| Usage: "Output results in json format", |
| }, |
| &cli.StringFlag{ |
| Name: "metrics", |
| Aliases: []string{"m"}, |
| Usage: "Address to serve the metrics API", |
| }, |
| &cli.StringFlag{ |
| Name: "runtime", |
| Usage: "Set the runtime to stress test", |
| Value: plugins.RuntimeRuncV2, |
| }, |
| &cli.StringFlag{ |
| Name: "snapshotter", |
| Usage: "Set the snapshotter to use", |
| Value: "overlayfs", |
| }, |
| } |
| app.Before = func(cliContext *cli.Context) error { |
| if cliContext.Bool("json") { |
| if err := log.SetLevel("warn"); err != nil { |
| return err |
| } |
| } |
| if cliContext.Bool("debug") { |
| if err := log.SetLevel("debug"); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| app.Commands = []*cli.Command{ |
| densityCommand, |
| } |
| app.Action = func(cliContext *cli.Context) error { |
| config := config{ |
| Address: cliContext.String("address"), |
| Duration: cliContext.Duration("duration"), |
| Concurrency: cliContext.Int("concurrent"), |
| CRI: cliContext.Bool("cri"), |
| Exec: cliContext.Bool("exec"), |
| Image: cliContext.String("image"), |
| JSON: cliContext.Bool("json"), |
| Metrics: cliContext.String("metrics"), |
| Runtime: cliContext.String("runtime"), |
| Snapshotter: cliContext.String("snapshotter"), |
| } |
| if config.Metrics != "" { |
| return serve(config) |
| } |
| |
| if config.CRI { |
| return criTest(config) |
| } |
| |
| return test(config) |
| } |
| if err := app.Run(os.Args); err != nil { |
| fmt.Fprintln(os.Stderr, err) |
| os.Exit(1) |
| } |
| } |
| |
| type config struct { |
| Concurrency int |
| CRI bool |
| Duration time.Duration |
| Address string |
| Exec bool |
| Image string |
| JSON bool |
| Metrics string |
| Runtime string |
| Snapshotter string |
| } |
| |
| func (c config) newClient() (*containerd.Client, error) { |
| return containerd.New(c.Address, containerd.WithDefaultRuntime(c.Runtime)) |
| } |
| |
| func serve(c config) error { |
| go func() { |
| srv := &http.Server{ |
| Addr: c.Metrics, |
| Handler: metrics.Handler(), |
| ReadHeaderTimeout: 5 * time.Minute, // "G112: Potential Slowloris Attack (gosec)"; not a real concern for our use, so setting a long timeout. |
| } |
| if err := srv.ListenAndServe(); err != nil { |
| log.L.WithError(err).Error("listen and serve") |
| } |
| }() |
| checkBinarySizes() |
| |
| if c.CRI { |
| return criTest(c) |
| } |
| |
| return test(c) |
| } |
| |
| func criTest(c config) error { |
| var ( |
| timeout = 1 * time.Minute |
| wg sync.WaitGroup |
| ctx = namespaces.WithNamespace(context.Background(), stressNs) |
| ) |
| |
| client, err := remote.NewRuntimeService(c.Address, timeout) |
| if err != nil { |
| return fmt.Errorf("failed to create runtime service: %w", err) |
| } |
| |
| if err := criCleanup(ctx, client); err != nil { |
| return err |
| } |
| |
| tctx, cancel := context.WithTimeout(ctx, c.Duration) |
| go func() { |
| s := make(chan os.Signal, 1) |
| signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) |
| <-s |
| cancel() |
| }() |
| |
| // get runtime version: |
| version, err := client.Version("") |
| if err != nil { |
| return fmt.Errorf("failed to get runtime version: %w", err) |
| } |
| var ( |
| workers []worker |
| r = &run{} |
| ) |
| log.L.Info("starting stress test run...") |
| // create the workers along with their spec |
| for i := 0; i < c.Concurrency; i++ { |
| wg.Add(1) |
| w := &criWorker{ |
| id: i, |
| wg: &wg, |
| client: client, |
| commit: fmt.Sprintf("%s-%s", version.RuntimeName, version.RuntimeVersion), |
| runtimeHandler: c.Runtime, |
| snapshotter: c.Snapshotter, |
| } |
| workers = append(workers, w) |
| } |
| |
| // start the timer and run the worker |
| r.start() |
| for _, w := range workers { |
| go w.run(ctx, tctx) |
| } |
| // wait and end the timer |
| wg.Wait() |
| r.end() |
| |
| results := r.gather(workers) |
| log.L.Infof("ending test run in %0.3f seconds", results.Seconds) |
| |
| log.L.WithField("failures", r.failures).Infof( |
| "create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)", |
| results.Total, |
| results.Seconds, |
| results.ContainersPerSecond, |
| results.SecondsPerContainer, |
| ) |
| if c.JSON { |
| if err := json.NewEncoder(os.Stdout).Encode(results); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func test(c config) error { |
| var ( |
| wg sync.WaitGroup |
| ctx = namespaces.WithNamespace(context.Background(), "stress") |
| ) |
| |
| client, err := c.newClient() |
| if err != nil { |
| return err |
| } |
| defer client.Close() |
| if err := cleanup(ctx, client); err != nil { |
| return err |
| } |
| |
| log.L.Infof("pulling %s", c.Image) |
| image, err := client.Pull(ctx, c.Image, containerd.WithPullUnpack, containerd.WithPullSnapshotter(c.Snapshotter)) |
| if err != nil { |
| return err |
| } |
| v, err := client.Version(ctx) |
| if err != nil { |
| return err |
| } |
| |
| tctx, cancel := context.WithTimeout(ctx, c.Duration) |
| go func() { |
| s := make(chan os.Signal, 1) |
| signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) |
| <-s |
| cancel() |
| }() |
| |
| var ( |
| workers []worker |
| r = &run{} |
| ) |
| log.L.Info("starting stress test run...") |
| // create the workers along with their spec |
| for i := 0; i < c.Concurrency; i++ { |
| wg.Add(1) |
| |
| w := &ctrWorker{ |
| id: i, |
| wg: &wg, |
| image: image, |
| client: client, |
| commit: v.Revision, |
| snapshotter: c.Snapshotter, |
| } |
| workers = append(workers, w) |
| } |
| var exec *execWorker |
| if c.Exec { |
| for i := c.Concurrency; i < c.Concurrency+c.Concurrency; i++ { |
| wg.Add(1) |
| exec = &execWorker{ |
| ctrWorker: ctrWorker{ |
| id: i, |
| wg: &wg, |
| image: image, |
| client: client, |
| commit: v.Revision, |
| snapshotter: c.Snapshotter, |
| }, |
| } |
| go exec.exec(ctx, tctx) |
| } |
| } |
| |
| // start the timer and run the worker |
| r.start() |
| for _, w := range workers { |
| go w.run(ctx, tctx) |
| } |
| // wait and end the timer |
| wg.Wait() |
| r.end() |
| |
| results := r.gather(workers) |
| if c.Exec { |
| results.ExecTotal = exec.count |
| results.ExecFailures = exec.failures |
| } |
| log.L.Infof("ending test run in %0.3f seconds", results.Seconds) |
| |
| log.L.WithField("failures", r.failures).Infof( |
| "create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)", |
| results.Total, |
| results.Seconds, |
| results.ContainersPerSecond, |
| results.SecondsPerContainer, |
| ) |
| if c.JSON { |
| if err := json.NewEncoder(os.Stdout).Encode(results); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // cleanup cleans up any containers in the "stress" namespace before the test run |
| func cleanup(ctx context.Context, client *containerd.Client) error { |
| containers, err := client.Containers(ctx) |
| if err != nil { |
| return err |
| } |
| for _, c := range containers { |
| task, err := c.Task(ctx, nil) |
| if err == nil { |
| task.Delete(ctx, containerd.WithProcessKill) |
| } |
| if err := c.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { |
| if derr := c.Delete(ctx); derr == nil { |
| continue |
| } |
| return err |
| } |
| } |
| return nil |
| } |