| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package local |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "os" |
| "path/filepath" |
| "runtime" |
| "time" |
| |
| "github.com/containerd/errdefs" |
| "github.com/containerd/log" |
| "github.com/opencontainers/go-digest" |
| |
| "github.com/containerd/containerd/v2/core/content" |
| "github.com/containerd/containerd/v2/internal/fsverity" |
| ) |
| |
| // writer represents a write transaction against the blob store. |
| type writer struct { |
| s *store |
| fp *os.File // opened data file |
| path string // path to writer dir |
| ref string // ref key |
| offset int64 |
| total int64 |
| digester digest.Digester |
| startedAt time.Time |
| updatedAt time.Time |
| } |
| |
| func (w *writer) Status() (content.Status, error) { |
| return content.Status{ |
| Ref: w.ref, |
| Offset: w.offset, |
| Total: w.total, |
| StartedAt: w.startedAt, |
| UpdatedAt: w.updatedAt, |
| }, nil |
| } |
| |
| // Digest returns the current digest of the content, up to the current write. |
| // |
| // Cannot be called concurrently with `Write`. |
| func (w *writer) Digest() digest.Digest { |
| return w.digester.Digest() |
| } |
| |
| // Write p to the transaction. |
| // |
| // Note that writes are unbuffered to the backing file. When writing, it is |
| // recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer. |
| func (w *writer) Write(p []byte) (n int, err error) { |
| n, err = w.fp.Write(p) |
| w.digester.Hash().Write(p[:n]) |
| w.offset += int64(len(p)) |
| w.updatedAt = time.Now() |
| return n, err |
| } |
| |
| func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { |
| // Ensure even on error the writer is fully closed |
| defer w.s.unlock(w.ref) |
| |
| var base content.Info |
| for _, opt := range opts { |
| if err := opt(&base); err != nil { |
| return err |
| } |
| } |
| |
| fp := w.fp |
| w.fp = nil |
| |
| if fp == nil { |
| return fmt.Errorf("cannot commit on closed writer: %w", errdefs.ErrFailedPrecondition) |
| } |
| |
| if err := fp.Sync(); err != nil { |
| fp.Close() |
| return fmt.Errorf("sync failed: %w", err) |
| } |
| |
| fi, err := fp.Stat() |
| closeErr := fp.Close() |
| if err != nil { |
| return fmt.Errorf("stat on ingest file failed: %w", err) |
| } |
| if closeErr != nil { |
| return fmt.Errorf("failed to close ingest file: %w", closeErr) |
| } |
| |
| if size > 0 && size != fi.Size() { |
| return fmt.Errorf("unexpected commit size %d, expected %d: %w", fi.Size(), size, errdefs.ErrFailedPrecondition) |
| } |
| |
| dgst := w.digester.Digest() |
| if expected != "" && expected != dgst { |
| return fmt.Errorf("unexpected commit digest %s, expected %s: %w", dgst, expected, errdefs.ErrFailedPrecondition) |
| } |
| |
| var ( |
| ingest = filepath.Join(w.path, "data") |
| target, _ = w.s.blobPath(dgst) // ignore error because we calculated this dgst |
| ) |
| |
| // make sure parent directories of blob exist |
| if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { |
| return err |
| } |
| |
| if _, err := os.Stat(target); err == nil { |
| // collision with the target file! |
| if err := os.RemoveAll(w.path); err != nil { |
| log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Error("failed to remove ingest directory") |
| } |
| return fmt.Errorf("content %v: %w", dgst, errdefs.ErrAlreadyExists) |
| } |
| |
| if err := os.Rename(ingest, target); err != nil { |
| return err |
| } |
| |
| if err := syncDir(filepath.Dir(target)); err != nil { |
| return err |
| } |
| // Enable content blob integrity verification if supported |
| |
| if w.s.integritySupported { |
| if err := fsverity.Enable(target); err != nil { |
| log.G(ctx).Warnf("failed to enable integrity for blob %v: %s", target, err.Error()) |
| } |
| } |
| |
| // Ingest has now been made available in the content store, attempt to complete |
| // setting metadata but errors should only be logged and not returned since |
| // the content store cannot be cleanly rolled back. |
| |
| commitTime := time.Now() |
| if err := os.Chtimes(target, commitTime, commitTime); err != nil { |
| log.G(ctx).WithField("digest", dgst).Error("failed to change file time to commit time") |
| } |
| |
| // clean up!! |
| if err := os.RemoveAll(w.path); err != nil { |
| log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Error("failed to remove ingest directory") |
| } |
| |
| if w.s.ls != nil && base.Labels != nil { |
| if err := w.s.ls.Set(dgst, base.Labels); err != nil { |
| log.G(ctx).WithField("digest", dgst).Error("failed to set labels") |
| } |
| } |
| |
| // change to readonly, more important for read, but provides _some_ |
| // protection from this point on. We use the existing perms with a mask |
| // only allowing reads honoring the umask on creation. |
| // |
| // This removes write and exec, only allowing read per the creation umask. |
| // |
| // NOTE: Windows does not support this operation |
| if runtime.GOOS != "windows" { |
| if err := os.Chmod(target, (fi.Mode()&os.ModePerm)&^0333); err != nil { |
| log.G(ctx).WithField("ref", w.ref).Error("failed to make readonly") |
| } |
| } |
| |
| return nil |
| } |
| |
| // Close the writer, flushing any unwritten data and leaving the progress in |
| // tact. |
| // |
| // If one needs to resume the transaction, a new writer can be obtained from |
| // `Ingester.Writer` using the same key. The write can then be continued |
| // from it was left off. |
| // |
| // To abandon a transaction completely, first call close then `IngestManager.Abort` to |
| // clean up the associated resources. |
| func (w *writer) Close() (err error) { |
| if w.fp != nil { |
| w.fp.Sync() |
| err = w.fp.Close() |
| writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt) |
| w.fp = nil |
| w.s.unlock(w.ref) |
| return |
| } |
| |
| return nil |
| } |
| |
| func (w *writer) Truncate(size int64) error { |
| if size != 0 { |
| return errors.New("Truncate: unsupported size") |
| } |
| w.offset = 0 |
| w.digester.Hash().Reset() |
| if _, err := w.fp.Seek(0, io.SeekStart); err != nil { |
| return err |
| } |
| return w.fp.Truncate(0) |
| } |
| |
| func (w *writer) Sync() error { |
| if w.fp != nil { |
| return w.fp.Sync() |
| } |
| |
| return nil |
| } |