| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package client |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| |
| ocispec "github.com/opencontainers/image-spec/specs-go/v1" |
| "golang.org/x/sync/semaphore" |
| |
| "github.com/containerd/containerd/v2/core/images" |
| "github.com/containerd/containerd/v2/core/remotes" |
| "github.com/containerd/containerd/v2/core/remotes/docker" |
| "github.com/containerd/containerd/v2/core/transfer" |
| "github.com/containerd/containerd/v2/core/unpack" |
| "github.com/containerd/containerd/v2/pkg/tracing" |
| "github.com/containerd/errdefs" |
| "github.com/containerd/platforms" |
| ) |
| |
| const ( |
| pullSpanPrefix = "pull" |
| ) |
| |
| // Pull downloads the provided content into containerd's content store |
| // and returns a platform specific image object |
| func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) { |
| ctx, span := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "Pull")) |
| defer span.End() |
| |
| pullCtx := defaultRemoteContext() |
| |
| for _, o := range opts { |
| if err := o(c, pullCtx); err != nil { |
| return nil, err |
| } |
| } |
| |
| if resolver, ok := pullCtx.Resolver.(remotes.ResolverWithOptions); ok { |
| resolver.SetOptions( |
| transfer.WithConcurrentLayerFetchBuffer(pullCtx.ConcurrentLayerFetchBuffer), |
| transfer.WithMaxConcurrentDownloads(pullCtx.MaxConcurrentDownloads), |
| transfer.WithDownloadLimiter(pullCtx.DownloadLimiter), |
| ) |
| } |
| |
| if pullCtx.PlatformMatcher == nil { |
| if len(pullCtx.Platforms) > 1 { |
| return nil, errors.New("cannot pull multiplatform image locally, try Fetch") |
| } else if len(pullCtx.Platforms) == 0 { |
| pullCtx.PlatformMatcher = c.platform |
| } else { |
| p, err := platforms.Parse(pullCtx.Platforms[0]) |
| if err != nil { |
| return nil, fmt.Errorf("invalid platform %s: %w", pullCtx.Platforms[0], err) |
| } |
| |
| pullCtx.PlatformMatcher = platforms.Only(p) |
| } |
| } |
| |
| span.SetAttributes( |
| tracing.Attribute("image.ref", ref), |
| tracing.Attribute("unpack", pullCtx.Unpack), |
| tracing.Attribute("max.concurrent.downloads", pullCtx.MaxConcurrentDownloads), |
| tracing.Attribute("concurrent.layer.fetch.buffer", pullCtx.ConcurrentLayerFetchBuffer), |
| tracing.Attribute("platforms.count", len(pullCtx.Platforms)), |
| ) |
| |
| ctx, done, err := c.WithLease(ctx) |
| if err != nil { |
| return nil, err |
| } |
| defer done(ctx) |
| |
| var unpacker *unpack.Unpacker |
| |
| if pullCtx.Unpack { |
| snapshotterName, err := c.resolveSnapshotterName(ctx, pullCtx.Snapshotter) |
| if err != nil { |
| return nil, fmt.Errorf("unable to resolve snapshotter: %w", err) |
| } |
| span.SetAttributes(tracing.Attribute("snapshotter.name", snapshotterName)) |
| |
| snCapabilities, err := c.GetSnapshotterCapabilities(ctx, snapshotterName) |
| if err != nil { |
| return nil, fmt.Errorf("unable to get snapshotter capabilities: %w", err) |
| } |
| |
| var uconfig UnpackConfig |
| for _, opt := range pullCtx.UnpackOpts { |
| if err := opt(ctx, &uconfig); err != nil { |
| return nil, err |
| } |
| } |
| var platformMatcher platforms.Matcher |
| if !uconfig.CheckPlatformSupported { |
| platformMatcher = platforms.All |
| } |
| |
| // Check client Unpack config |
| platform := unpack.Platform{ |
| Platform: platformMatcher, |
| SnapshotterKey: snapshotterName, |
| Snapshotter: c.SnapshotService(snapshotterName), |
| SnapshotOpts: append(pullCtx.SnapshotterOpts, uconfig.SnapshotOpts...), |
| Applier: c.DiffService(), |
| ApplyOpts: uconfig.ApplyOpts, |
| SnapshotterCapabilities: snCapabilities, |
| } |
| uopts := []unpack.UnpackerOpt{unpack.WithUnpackPlatform(platform)} |
| if uconfig.DuplicationSuppressor != nil { |
| uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) |
| } |
| if uconfig.Limiter != nil { |
| uopts = append(uopts, unpack.WithUnpackLimiter(uconfig.Limiter)) |
| } |
| unpacker, err = unpack.NewUnpacker(ctx, c.ContentStore(), uopts...) |
| if err != nil { |
| return nil, fmt.Errorf("unable to initialize unpacker: %w", err) |
| } |
| defer func() { |
| if _, err := unpacker.Wait(); err != nil { |
| if retErr == nil { |
| retErr = fmt.Errorf("unpack: %w", err) |
| } |
| } |
| }() |
| wrapper := pullCtx.HandlerWrapper |
| pullCtx.HandlerWrapper = func(h images.Handler) images.Handler { |
| if wrapper == nil { |
| return unpacker.Unpack(h) |
| } |
| return unpacker.Unpack(wrapper(h)) |
| } |
| } |
| |
| img, err := c.fetch(ctx, pullCtx, ref, 1) |
| if err != nil { |
| return nil, err |
| } |
| |
| // NOTE(fuweid): unpacker defers blobs download. before create image |
| // record in ImageService, should wait for unpacking(including blobs |
| // download). |
| var ur unpack.Result |
| if unpacker != nil { |
| _, unpackSpan := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "UnpackWait")) |
| if ur, err = unpacker.Wait(); err != nil { |
| unpackSpan.SetStatus(err) |
| unpackSpan.End() |
| return nil, err |
| } |
| unpackSpan.End() |
| } |
| |
| img, err = c.createNewImage(ctx, img) |
| if err != nil { |
| return nil, err |
| } |
| |
| i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) |
| span.SetAttributes(tracing.Attribute("image.ref", i.Name())) |
| |
| if unpacker != nil && ur.Unpacks == 0 { |
| // Unpack was tried previously but nothing was unpacked |
| // This was at least required for schema 1 image. |
| if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil { |
| return nil, fmt.Errorf("failed to unpack image on snapshotter %s: %w", pullCtx.Snapshotter, err) |
| } |
| } |
| |
| return i, nil |
| } |
| |
| func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) { |
| ctx, span := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "fetch")) |
| defer span.End() |
| store := c.ContentStore() |
| name, desc, err := rCtx.Resolver.Resolve(ctx, ref) |
| if err != nil { |
| return images.Image{}, fmt.Errorf("failed to resolve reference %q: %w", ref, err) |
| } |
| |
| fetcher, err := rCtx.Resolver.Fetcher(ctx, name) |
| if err != nil { |
| return images.Image{}, fmt.Errorf("failed to get fetcher for %q: %w", name, err) |
| } |
| |
| var ( |
| handler images.Handler |
| |
| isConvertible bool |
| converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error) |
| limiter *semaphore.Weighted |
| ) |
| if desc.MediaType == images.MediaTypeDockerSchema1Manifest { |
| return images.Image{}, fmt.Errorf("%w: media type %q is no longer supported since containerd v2.1, please rebuild the image as %q or %q", |
| errdefs.ErrNotImplemented, |
| images.MediaTypeDockerSchema1Manifest, images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest) |
| } |
| // Get all the children for a descriptor |
| childrenHandler := images.ChildrenHandler(store) |
| if rCtx.ReferrersProvider != nil { |
| childrenHandler = images.SetReferrers(rCtx.ReferrersProvider, childrenHandler) |
| } |
| // Set any children labels for that content |
| childrenHandler = images.SetChildrenMappedLabels(store, childrenHandler, rCtx.ChildLabelMap) |
| if rCtx.AllMetadata { |
| // Filter manifests by platforms but allow to handle manifest |
| // and configuration for not-target platforms |
| childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher) |
| } else { |
| // Filter children by platforms if specified. |
| childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher) |
| } |
| // Sort and limit manifests if a finite number is needed |
| if limit > 0 { |
| childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) |
| } |
| |
| // set isConvertible to true if there is application/octet-stream media type |
| convertibleHandler := images.HandlerFunc( |
| func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { |
| if desc.MediaType == docker.LegacyConfigMediaType { |
| isConvertible = true |
| } |
| |
| return []ocispec.Descriptor{}, nil |
| }, |
| ) |
| |
| appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref) |
| if err != nil { |
| return images.Image{}, err |
| } |
| |
| handlers := append(rCtx.BaseHandlers, |
| remotes.FetchHandler(store, fetcher), |
| convertibleHandler, |
| childrenHandler, |
| appendDistSrcLabelHandler, |
| ) |
| |
| handler = images.Handlers(handlers...) |
| |
| converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) { |
| return docker.ConvertManifest(ctx, store, desc) |
| } |
| |
| if rCtx.HandlerWrapper != nil { |
| handler = rCtx.HandlerWrapper(handler) |
| } |
| |
| if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { |
| return images.Image{}, err |
| } |
| |
| if isConvertible { |
| if desc, err = converterFunc(ctx, desc); err != nil { |
| return images.Image{}, err |
| } |
| } |
| |
| return images.Image{ |
| Name: name, |
| Target: desc, |
| Labels: rCtx.Labels, |
| }, nil |
| } |
| |
| func (c *Client) createNewImage(ctx context.Context, img images.Image) (images.Image, error) { |
| ctx, span := tracing.StartSpan(ctx, tracing.Name(pullSpanPrefix, "pull.createNewImage")) |
| defer span.End() |
| is := c.ImageService() |
| for { |
| if created, err := is.Create(ctx, img); err != nil { |
| if !errdefs.IsAlreadyExists(err) { |
| return images.Image{}, err |
| } |
| |
| updated, err := is.Update(ctx, img) |
| if err != nil { |
| // if image was removed, try create again |
| if errdefs.IsNotFound(err) { |
| continue |
| } |
| return images.Image{}, err |
| } |
| |
| img = updated |
| } else { |
| img = created |
| } |
| |
| return img, nil |
| } |
| } |