blob: d964580c81c504a25a8e272fd63b288e995d7148 [file] [log] [blame] [edit]
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package unpack
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/diff"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/internal/kmutex"
"github.com/containerd/containerd/v2/pkg/labels"
"github.com/containerd/containerd/v2/pkg/tracing"
)
const (
labelSnapshotRef = "containerd.io/snapshot.ref"
unpackSpanPrefix = "pkg.unpack.unpacker"
)
// Result returns information about the unpacks which were completed.
type Result struct {
Unpacks int
}
type unpackerConfig struct {
platforms []*Platform
content content.Store
limiter Limiter
duplicationSuppressor KeyedLocker
unpackLimiter Limiter
}
// Platform represents a platform-specific unpack configuration which includes
// the platform matcher as well as snapshotter and applier.
type Platform struct {
Platform platforms.Matcher
SnapshotterKey string
Snapshotter snapshots.Snapshotter
SnapshotOpts []snapshots.Opt
SnapshotterExports map[string]string
SnapshotterCapabilities []string
Applier diff.Applier
ApplyOpts []diff.ApplyOpt
// ConfigType is the supported config type to be considered for unpacking
// Defaults to OCI image config
ConfigType string
// LayerTypes are the supported types to be considered layers
// Defaults to OCI image layers
LayerTypes []string
}
// KeyedLocker is an interface for managing job duplication by
// locking on a given key.
type KeyedLocker interface {
Lock(ctx context.Context, key string) error
Unlock(key string)
}
// Limiter interface is used to restrict the number of concurrent operations by
// requiring operations to first acquire from the limiter and release when complete.
type Limiter interface {
Acquire(context.Context, int64) error
Release(int64)
}
type UnpackerOpt func(*unpackerConfig) error
func WithUnpackPlatform(u Platform) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
if u.Platform == nil {
u.Platform = platforms.All
}
if u.Snapshotter == nil {
return fmt.Errorf("snapshotter must be provided to unpack")
}
if u.SnapshotterKey == "" {
if s, ok := u.Snapshotter.(fmt.Stringer); ok {
u.SnapshotterKey = s.String()
} else {
u.SnapshotterKey = "unknown"
}
}
if u.Applier == nil {
return fmt.Errorf("applier must be provided to unpack")
}
c.platforms = append(c.platforms, &u)
return nil
})
}
func WithLimiter(l Limiter) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
c.limiter = l
return nil
})
}
func WithDuplicationSuppressor(d KeyedLocker) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
c.duplicationSuppressor = d
return nil
})
}
func WithUnpackLimiter(l Limiter) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
c.unpackLimiter = l
return nil
})
}
// Unpacker unpacks images by hooking into the image handler process.
// Unpacks happen in the backgrounds and waited on to complete.
type Unpacker struct {
unpackerConfig
unpacks atomic.Int32
ctx context.Context
eg *errgroup.Group
}
// NewUnpacker creates a new instance of the unpacker which can be used to wrap an
// image handler and unpack in parallel to handling. The unpacker will handle
// calling the block handlers when they are needed by the unpack process.
func NewUnpacker(ctx context.Context, cs content.Store, opts ...UnpackerOpt) (*Unpacker, error) {
eg, ctx := errgroup.WithContext(ctx)
u := &Unpacker{
unpackerConfig: unpackerConfig{
content: cs,
duplicationSuppressor: kmutex.NewNoop(),
},
ctx: ctx,
eg: eg,
}
for _, opt := range opts {
if err := opt(&u.unpackerConfig); err != nil {
return nil, err
}
}
if len(u.platforms) == 0 {
return nil, fmt.Errorf("no unpack platforms defined: %w", errdefs.ErrInvalidArgument)
}
return u, nil
}
// Unpack wraps an image handler to filter out blob handling and scheduling them
// during the unpack process. When an image config is encountered, the unpack
// process will be started in a goroutine.
func (u *Unpacker) Unpack(h images.Handler) images.Handler {
var (
lock sync.Mutex
layers = map[digest.Digest][]ocispec.Descriptor{}
)
var layerTypes map[string]bool
var configTypes map[string]bool
for _, p := range u.platforms {
if p.ConfigType != "" {
if configTypes == nil {
configTypes = make(map[string]bool)
}
configTypes[p.ConfigType] = true
}
if len(p.LayerTypes) > 0 {
if layerTypes == nil {
layerTypes = make(map[string]bool)
}
for _, t := range p.LayerTypes {
layerTypes[t] = true
}
}
}
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ctx, span := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "UnpackHandler"))
defer span.End()
span.SetAttributes(
tracing.Attribute("descriptor.media.type", desc.MediaType),
tracing.Attribute("descriptor.digest", desc.Digest.String()))
unlock, err := u.lockBlobDescriptor(ctx, desc)
if err != nil {
return nil, err
}
children, err := h.Handle(ctx, desc)
unlock()
if err != nil {
return children, err
}
if images.IsManifestType(desc.MediaType) {
var nonLayers []ocispec.Descriptor
var manifestLayers []ocispec.Descriptor
// Split layers from non-layers, layers will be handled after
// the config
for i, child := range children {
span.SetAttributes(
tracing.Attribute("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}),
)
if images.IsLayerType(child.MediaType) || layerTypes[child.MediaType] {
manifestLayers = append(manifestLayers, child)
} else {
nonLayers = append(nonLayers, child)
}
}
lock.Lock()
for _, nl := range nonLayers {
layers[nl.Digest] = manifestLayers
}
lock.Unlock()
children = nonLayers
} else if images.IsConfigType(desc.MediaType) || configTypes[desc.MediaType] {
lock.Lock()
l := layers[desc.Digest]
lock.Unlock()
if len(l) > 0 {
u.eg.Go(func() error {
return u.unpack(h, desc, l)
})
}
}
return children, nil
})
}
// Wait waits for any ongoing unpack processes to complete then will return
// the result.
func (u *Unpacker) Wait() (Result, error) {
if err := u.eg.Wait(); err != nil {
return Result{}, err
}
return Result{
Unpacks: int(u.unpacks.Load()),
}, nil
}
// unpackConfig is a subset of the OCI config for resolving rootfs and platform,
// any config type which supports the platform and rootfs field can be supported.
type unpackConfig struct {
// Platform describes the platform which the image in the manifest runs on.
ocispec.Platform
// RootFS references the layer content addresses used by the image.
RootFS ocispec.RootFS `json:"rootfs"`
}
type unpackStatus struct {
err error
desc ocispec.Descriptor
bottomF func(bool) error
span *tracing.Span
startAt time.Time
}
func (u *Unpacker) unpack(
h images.Handler,
config ocispec.Descriptor,
layers []ocispec.Descriptor,
) error {
ctx := u.ctx
ctx, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpack"))
defer layerSpan.End()
unpackStart := time.Now()
p, err := content.ReadBlob(ctx, u.content, config)
if err != nil {
return err
}
var i unpackConfig
if err := json.Unmarshal(p, &i); err != nil {
return fmt.Errorf("unmarshal image config: %w", err)
}
diffIDs := i.RootFS.DiffIDs
if len(layers) != len(diffIDs) {
return fmt.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
}
// TODO: Support multiple unpacks rather than just first match
var unpack *Platform
imgPlatform := platforms.Normalize(i.Platform)
for _, up := range u.platforms {
if up.ConfigType != "" && up.ConfigType != config.MediaType {
continue
}
// "layers" is only supported rootfs value for OCI images
if (up.ConfigType == "" || images.IsConfigType(up.ConfigType)) && i.RootFS.Type != "" && i.RootFS.Type != "layers" {
continue
}
if up.Platform.Match(imgPlatform) {
unpack = up
break
}
}
if unpack == nil {
log.G(ctx).WithField("image", config.Digest).WithField("platform", platforms.Format(imgPlatform)).Debugf("unpacker does not support platform, only fetching layers")
return u.fetch(ctx, h, layers, nil)
}
u.unpacks.Add(1)
var (
sn = unpack.Snapshotter
a = unpack.Applier
cs = u.content
fetchOffset int
fetchC []chan struct{}
fetchErr []chan error
parallel = u.supportParallel(unpack)
)
// If there is an early return, ensure any ongoing
// fetches get their context cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// pre-calculate chain ids for each layer
chainIDs := make([]digest.Digest, len(diffIDs))
copy(chainIDs, diffIDs)
chainIDs = identity.ChainIDs(chainIDs)
topHalf := func(i int, desc ocispec.Descriptor, span *tracing.Span, startAt time.Time) (<-chan *unpackStatus, error) {
var (
err error
parent string
chainID string
)
if i > 0 && !parallel {
parent = chainIDs[i-1].String()
}
chainID = chainIDs[i].String()
unlock, err := u.lockSnChainID(ctx, chainID, unpack.SnapshotterKey)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
unlock()
}
}()
// inherits annotations which are provided as snapshot labels.
snapshotLabels := snapshots.FilterInheritedLabels(desc.Annotations)
if snapshotLabels == nil {
snapshotLabels = make(map[string]string)
}
snapshotLabels[labelSnapshotRef] = chainID
var (
key string
mounts []mount.Mount
opts = append(unpack.SnapshotOpts, snapshots.WithLabels(snapshotLabels))
)
for try := 1; try <= 3; try++ {
// Prepare snapshot with from parent, label as root
key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)
mounts, err = sn.Prepare(ctx, key, parent, opts...)
if err != nil {
if errdefs.IsAlreadyExists(err) {
if _, err := sn.Stat(ctx, chainID); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
}
// Try again, this should be rare, log it
log.G(ctx).WithField("key", key).WithField("chainid", chainID).Debug("extraction snapshot already exists, chain id not found")
} else {
// no need to handle, snapshot now found with chain id
return nil, nil
}
} else {
return nil, fmt.Errorf("failed to prepare extraction snapshot %q: %w", key, err)
}
} else {
break
}
}
if err != nil {
return nil, fmt.Errorf("unable to prepare extraction snapshot: %w", err)
}
// Abort the snapshot if commit does not happen
abort := func(ctx context.Context) {
if err := sn.Remove(ctx, key); err != nil {
log.G(ctx).WithError(err).Errorf("failed to cleanup %q", key)
}
}
if fetchErr == nil {
fetchOffset = i
n := len(layers) - fetchOffset
fetchErr = make([]chan error, n)
fetchC = make([]chan struct{}, n)
for i := range n {
fetchC[i] = make(chan struct{})
fetchErr[i] = make(chan error, 1)
}
go func(i int) {
err := u.fetch(ctx, h, layers[i:], fetchC)
if err != nil {
for _, fc := range fetchErr {
fc <- err
close(fc)
}
}
}(i)
}
if err = u.acquire(ctx, u.unpackLimiter); err != nil {
cleanup.Do(ctx, abort)
return nil, err
}
resCh := make(chan *unpackStatus, 1)
go func() {
defer func() {
u.release(u.unpackLimiter)
close(resCh)
}()
status := &unpackStatus{
desc: desc,
span: span,
startAt: startAt,
bottomF: func(shouldAbort bool) error {
defer unlock()
if shouldAbort {
cleanup.Do(ctx, abort)
return nil
}
if i > 0 && parallel {
parent = chainIDs[i-1].String()
opts = append(opts, snapshots.WithParent(parent))
}
if err = sn.Commit(ctx, chainID, key, opts...); err != nil {
cleanup.Do(ctx, abort)
if errdefs.IsAlreadyExists(err) {
return nil
}
return fmt.Errorf("failed to commit snapshot %s: %w", key, err)
}
// Set the uncompressed label after the uncompressed
// digest has been verified through apply.
cinfo := content.Info{
Digest: desc.Digest,
Labels: map[string]string{
labels.LabelUncompressed: diffIDs[i].String(),
},
}
if _, err := cs.Update(ctx, cinfo, "labels."+labels.LabelUncompressed); err != nil {
return err
}
return nil
},
}
select {
case <-ctx.Done():
cleanup.Do(ctx, abort)
status.err = ctx.Err()
resCh <- status
return
case err := <-fetchErr[i-fetchOffset]:
if err != nil {
cleanup.Do(ctx, abort)
status.err = err
resCh <- status
return
}
case <-fetchC[i-fetchOffset]:
}
diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...)
if err != nil {
cleanup.Do(ctx, abort)
status.err = fmt.Errorf("failed to extract layer (%s %s) to %s as %q: %w", desc.MediaType, desc.Digest, unpack.SnapshotterKey, key, err)
resCh <- status
return
}
if diff.Digest != diffIDs[i] {
cleanup.Do(ctx, abort)
status.err = fmt.Errorf("wrong diff id %q calculated on extraction %q, desc %q", diff.Digest, diffIDs[i], desc.Digest)
resCh <- status
return
}
resCh <- status
}()
return resCh, nil
}
bottomHalf := func(s *unpackStatus, prevErrs error) error {
var err error
if s.err != nil {
s.bottomF(true)
err = s.err
} else if prevErrs != nil {
s.bottomF(true)
err = fmt.Errorf("aborted")
} else {
err = s.bottomF(false)
}
s.span.SetStatus(err)
s.span.End()
if err == nil {
log.G(ctx).WithFields(log.Fields{
"layer": s.desc.Digest,
"duration": time.Since(s.startAt),
}).Debug("layer unpacked")
}
return err
}
var statusChans []<-chan *unpackStatus
for i, desc := range layers {
_, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpackLayer"))
unpackLayerStart := time.Now()
layerSpan.SetAttributes(
tracing.Attribute("layer.media.type", desc.MediaType),
tracing.Attribute("layer.media.size", desc.Size),
tracing.Attribute("layer.media.digest", desc.Digest.String()),
)
statusCh, err := topHalf(i, desc, layerSpan, unpackLayerStart)
if err != nil {
if parallel {
break
} else {
layerSpan.SetStatus(err)
layerSpan.End()
return err
}
}
if statusCh == nil {
// nothing to do, already exists
layerSpan.End()
continue
}
if parallel {
statusChans = append(statusChans, statusCh)
} else {
if err = bottomHalf(<-statusCh, nil); err != nil {
return err
}
}
}
// In parallel mode, snapshots still need to be committed and rebased sequentially
if parallel {
var errs error
for _, sc := range statusChans {
if err := bottomHalf(<-sc, errs); err != nil {
errs = errors.Join(errs, err)
}
}
if errs != nil {
return errs
}
}
var chainID string
if len(chainIDs) > 0 {
chainID = chainIDs[len(chainIDs)-1].String()
}
cinfo := content.Info{
Digest: config.Digest,
Labels: map[string]string{
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey): chainID,
},
}
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey))
if err != nil {
return err
}
log.G(ctx).WithFields(log.Fields{
"config": config.Digest,
"chainID": chainID,
"parallel": parallel,
"duration": time.Since(unpackStart),
}).Debug("image unpacked")
return nil
}
func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
eg, ctx2 := errgroup.WithContext(ctx)
for i, desc := range layers {
ctx2, layerSpan := tracing.StartSpan(ctx2, tracing.Name(unpackSpanPrefix, "fetchLayer"))
layerSpan.SetAttributes(
tracing.Attribute("layer.media.type", desc.MediaType),
tracing.Attribute("layer.media.size", desc.Size),
tracing.Attribute("layer.media.digest", desc.Digest.String()),
)
var ch chan struct{}
if done != nil {
ch = done[i]
}
if err := u.acquire(ctx, u.limiter); err != nil {
return err
}
eg.Go(func() error {
defer layerSpan.End()
unlock, err := u.lockBlobDescriptor(ctx2, desc)
if err != nil {
u.release(u.limiter)
return err
}
_, err = h.Handle(ctx2, desc)
unlock()
u.release(u.limiter)
if err != nil && !errors.Is(err, images.ErrSkipDesc) {
return err
}
if ch != nil {
close(ch)
}
return nil
})
}
return eg.Wait()
}
func (u *Unpacker) acquire(ctx context.Context, l Limiter) error {
if l == nil {
return nil
}
return l.Acquire(ctx, 1)
}
func (u *Unpacker) release(l Limiter) {
if l == nil {
return
}
l.Release(1)
}
func (u *Unpacker) lockSnChainID(ctx context.Context, chainID, snapshotter string) (func(), error) {
key := u.makeChainIDKeyWithSnapshotter(chainID, snapshotter)
if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
return nil, err
}
return func() {
u.duplicationSuppressor.Unlock(key)
}, nil
}
func (u *Unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
key := u.makeBlobDescriptorKey(desc)
if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
return nil, err
}
return func() {
u.duplicationSuppressor.Unlock(key)
}, nil
}
func (u *Unpacker) makeChainIDKeyWithSnapshotter(chainID, snapshotter string) string {
return fmt.Sprintf("sn://%s/%v", snapshotter, chainID)
}
func (u *Unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string {
return fmt.Sprintf("blob://%v", desc.Digest)
}
func (u *Unpacker) supportParallel(unpack *Platform) bool {
if u.unpackLimiter == nil {
return false
}
if !slices.Contains(unpack.SnapshotterCapabilities, "rebase") {
log.L.Infof("snapshotter does not support rebase capability, unpacking will be sequential")
return false
}
return true
}
func uniquePart() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}