blob: 57709f3f1927533de7030244efe50cd9476b04b8 [file] [log] [blame]
// Copyright 2021 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcs
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"cloud.google.com/go/storage"
"github.com/golang/glog"
"google.golang.org/api/iterator"
)
const schemeGCS = "gs"
const schemeHttps = "https"
const gcsHostName = "storage.googleapis.com"
// GCSBucket represents a bucket which can be used for repeated GCS access.
// If a prefix is provided, all GCSBucket methods will be performed relative to
// that prefix.
type GCSBucket struct {
handle *storage.BucketHandle
name string
prefix string
}
// NewGCSBucket returns a GCSBucket for a client with the given bucket name and prefix.
func NewGCSBucket(client *storage.Client, bucketName, prefix string) *GCSBucket {
handle := client.Bucket(bucketName)
return &GCSBucket{handle, bucketName, prefix}
}
// GCSBucketFromPath returns a GCSBucket for a client with the bucket name and prefix
// extracted from a provided GCS path.
//
// Returns an error if the GCS path parsing fails.
func GCSBucketFromPath(client *storage.Client, gcsPath string) (*GCSBucket, error) {
bucketName, prefix, err := getGCSVariables(gcsPath)
if err != nil {
return nil, err
}
return NewGCSBucket(client, bucketName, prefix), err
}
// Path returns the path of an object or directory relative to the bucket's prefix.
// For instance, if bucket.prefix is "abc/def", then bucket.Path("ghi/jkl.txt")
// will return "abc/def/ghi/jkl.txt".
func (bucket *GCSBucket) Path(objectOrDirName string) string {
return path.Join(bucket.prefix, objectOrDirName)
}
// URI returns the GCS URI of an object relative to the bucket's prefix.
// For instance, if bucket.name is "mybucket" and bucket.prefix is 'abc',
// then bucket.URI('def.txt') will return gs://mybucket/abc/def.txt.
func (bucket *GCSBucket) URI(objectName string) string {
objectPath := bucket.Path(objectName)
return fmt.Sprintf("%s://%s", schemeGCS, path.Join(bucket.name, objectPath))
}
// UploadObjectFromFile uploads the contents of a local file to an object in the bucket.
func (bucket *GCSBucket) UploadObjectFromFile(ctx context.Context, localPath string, objectName string) error {
glog.V(1).Infof("uploading %s to %s", localPath, bucket.URI(objectName))
file, err := os.Open(localPath)
if err != nil {
return fmt.Errorf("failed to create reader for local path %s: %v", localPath, err)
}
defer file.Close()
err = bucket.UploadObject(ctx, file, objectName)
if err != nil {
return fmt.Errorf("failed to upload contents of %s to %s: %v", localPath, bucket.URI(objectName), err)
}
glog.V(1).Infof("successfully uploaded %s to %s", localPath, bucket.URI(objectName))
return nil
}
// UploadObject uploads the contents of a reader to an object in the bucket.
func (bucket *GCSBucket) UploadObject(ctx context.Context, reader io.Reader, objectName string) error {
objectPath := bucket.Path(objectName)
writer := bucket.handle.Object(objectPath).NewWriter(ctx)
_, err := io.Copy(writer, reader)
if err != nil {
return fmt.Errorf("failed to write object %s: %v", bucket.URI(objectName), err)
}
err = writer.Close()
if err != nil {
return fmt.Errorf("failed to close writer for object %s: %v", bucket.URI(objectName), err)
}
return nil
}
// DownloadObjectToFile downloads an object from the bucket to a local file.
// Creates the parent directories of the local file if they don't already exist.
func (bucket *GCSBucket) DownloadObjectToFile(ctx context.Context, objectName string, localPath string) error {
glog.V(1).Infof("downloading %s to %s", bucket.URI(objectName), localPath)
objectPath := bucket.Path(objectName)
reader, err := bucket.handle.Object(objectPath).NewReader(ctx)
if err != nil {
return fmt.Errorf("failed to create the reader from GCS client: %v", err)
}
defer reader.Close()
err = os.MkdirAll(filepath.Dir(localPath), 0777)
if err != nil {
return fmt.Errorf("failed to create parent directories for output file %s: %v", localPath, err)
}
writer, err := os.OpenFile(localPath, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return fmt.Errorf("failed to create output file %s: %v", localPath, err)
}
_, err = io.Copy(writer, reader)
if err != nil {
writer_err := writer.Close()
if writer_err != nil {
glog.Errorf("failed to close writer for %s: %v", localPath, writer_err)
}
remove_err := os.Remove(localPath)
if remove_err != nil {
glog.Errorf("failed to remove %s while handling copy error: %v", localPath, remove_err)
}
return fmt.Errorf("failed to download contents of %s to %s: %v", bucket.URI(objectName), localPath, err)
}
err = writer.Close()
if err != nil {
return fmt.Errorf("failed to close writer for %s: %v", localPath, err)
}
glog.V(1).Infof("successfully downloaded %s to %s", bucket.URI(objectName), localPath)
return nil
}
// DownloadObject downloads an object from the bucket and returns its contents as a byte slice.
func (bucket *GCSBucket) DownloadObject(ctx context.Context, objectName string) ([]byte, error) {
objectPath := bucket.Path(objectName)
reader, err := bucket.handle.Object(objectPath).NewReader(ctx)
if err != nil {
return []byte{}, fmt.Errorf("failed to create reader for object %s: %v", bucket.URI(objectName), err)
}
defer reader.Close()
bytes, err := io.ReadAll(reader)
if err != nil {
return []byte{}, fmt.Errorf("failed to read object %s: %v", bucket.URI(objectName), err)
}
return bytes, nil
}
// DownloadTextObject downloads an object from the bucket and returns its contents as a string.
func (bucket *GCSBucket) DownloadTextObject(ctx context.Context, objectName string) (string, error) {
bytes, err := bucket.DownloadObject(ctx, objectName)
if err != nil {
return "", err
}
return string(bytes), nil
}
// DeleteObject deletes an object from the bucket.
func (bucket *GCSBucket) DeleteObject(ctx context.Context, objectName string) error {
glog.V(1).Infof("deleting %s", bucket.URI(objectName))
objectName = bucket.Path(objectName)
err := bucket.handle.Object(objectName).Delete(ctx)
if err != nil {
return fmt.Errorf("failed to delete object %s: %v", bucket.URI(objectName), err)
}
return nil
}
// UploadDir uploads the contents of a local source directory to a destination directory
// in the bucket, preserving the directory structure.
func (bucket *GCSBucket) UploadDir(ctx context.Context, srcDir string, dstDir string) error {
entries, err := os.ReadDir(srcDir)
if err != nil {
return err
}
for _, entry := range entries {
entrySrc := path.Join(srcDir, entry.Name())
entryDst := path.Join(dstDir, entry.Name())
entryInfo, err := entry.Info()
if err != nil {
return err
}
if entryInfo.IsDir() {
err = bucket.UploadDir(ctx, entrySrc, entryDst)
} else {
err = bucket.UploadObjectFromFile(ctx, entrySrc, entryDst)
}
if err != nil {
return err
}
}
return nil
}
// QueryObjectNames returns the names of all objects in a bucket matching
// a query.
// If a delimiter is provided, the output may include synthetic "directory"
// entries, which match the query but are not real objects.
func (bucket *GCSBucket) QueryObjectNames(ctx context.Context, query *storage.Query) ([]string, error) {
var objectNames []string
it := bucket.handle.Objects(ctx, query)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return []string{}, fmt.Errorf("failure while iterating over GCS entries in bucket %s: %v", bucket.name, err)
}
name := attrs.Name
// "Directory" entries will have an empty name and a non-empty prefix
if name == "" {
name = attrs.Prefix
}
relativeName := strings.TrimPrefix(name, bucket.prefix)
objectNames = append(objectNames, relativeName)
}
return objectNames, nil
}
// ListDir returns the list of all paths in a bucket which are one directory
// level below the provided path.
// The paths will be relative to the bucket prefix.
func (bucket *GCSBucket) ListDir(ctx context.Context, dirName string) ([]string, error) {
dirPath := bucket.Path(dirName)
query := &storage.Query{
Delimiter: "/",
Prefix: dirPath + "/", // path.Join removes trailing slashes, but storage expects the slash in the prefix for directory listing
}
query.SetAttrSelection([]string{"Name", "Prefix"})
return bucket.QueryObjectNames(ctx, query)
}
// ListDescendants returns the list of all objects in a bucket which have a path
// beginning with the provided directory name.
// The object names will be relative to the bucket prefix.
func (bucket *GCSBucket) ListDescendants(ctx context.Context, dirName string) ([]string, error) {
dirPath := bucket.Path(dirName)
query := &storage.Query{Prefix: dirPath}
query.SetAttrSelection([]string{"Name"})
return bucket.QueryObjectNames(ctx, query)
}
// DownloadDir downloads all objects from a bucket which are descendents of the provided
// source directory and saves them to the local destination directory, preserving the
// directory structure from the bucket.
func (bucket *GCSBucket) DownloadDir(ctx context.Context, srcDir string, dstDir string) error {
downloads, err := bucket.descendantDownloads(ctx, srcDir, dstDir)
if err != nil {
return err
}
return bucket.DownloadObjects(ctx, downloads)
}
// DownloadDirParallel downloads all objects from a bucket which are descendents of the
// provided source directory and saves them to the local destination directory, preserving
// the directory structure from the bucket.
//
// See DownloadObjectsParallel for more info on `workers` and possible errors.
func (bucket *GCSBucket) DownloadDirParallel(ctx context.Context, srcDir string, dstDir string, workers int) error {
downloads, err := bucket.descendantDownloads(ctx, srcDir, dstDir)
if err != nil {
return err
}
return bucket.DownloadObjectsParallel(ctx, downloads, workers)
}
// An ObjectDownload specifies the remote object path for a download and the local
// path to which an object should be downloaded.
type ObjectDownload struct {
objectPath string
localPath string
}
// NewObjectDownload returns a new ObjectDownload for the provided remote object
// path and local path.
func NewObjectDownload(objectPath, localPath string) ObjectDownload {
return ObjectDownload{objectPath: objectPath, localPath: localPath}
}
// descendantDownloads gets the list of ObjectDownloads which corresponding to downloading
// all descendants of a bucket directory to a local directory while preserving the directory
// structure.
func (bucket *GCSBucket) descendantDownloads(ctx context.Context, bucketDir, localDir string) ([]ObjectDownload, error) {
descendants, err := bucket.ListDescendants(ctx, bucketDir)
if err != nil {
return nil, err
}
var downloads []ObjectDownload
for _, objectName := range descendants {
// In a GCS bucket, the paths some/path and /some/path are different.
// However, since they both represent the same relative local directory,
// we can handle both cases by always removing any leading slashes when
// computing the relative name.
cleanObjectName := strings.TrimPrefix(path.Clean(objectName), "/")
cleanSrcDir := strings.TrimPrefix(path.Clean(bucketDir), "/")
relativeName := strings.TrimPrefix(cleanObjectName, cleanSrcDir)
localPath := path.Join(localDir, relativeName)
download := NewObjectDownload(objectName, localPath)
downloads = append(downloads, download)
}
return downloads, nil
}
// DownloadObjects downloads a list of objects from a bucket.
//
// This will not stop downloading objects if some downloads fail. After all
// downloads have finished or failed, an error will be returned which summarizes
// the set of failed downloads.
func (bucket *GCSBucket) DownloadObjects(ctx context.Context, downloads []ObjectDownload) error {
var errs []error
for _, download := range downloads {
if err := bucket.DownloadObjectToFile(ctx, download.objectPath, download.localPath); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
// DownloadObjectsParallel downloads a list of objects from a bucket in parallel.
//
// If `workers` is negative or 0, this will spawn one worker per object which will be
// downloaded. Otherwise, `workers` will be the maximum number of workers which will be
// downloading at one time.
//
// This will not stop downloading objects if some downloads fail. After all
// downloads have finished or failed, an error will be returned which summarizes
// the set of failed downloads.
func (bucket *GCSBucket) DownloadObjectsParallel(ctx context.Context, downloads []ObjectDownload, workers int) error {
if workers <= 0 {
workers = len(downloads)
}
if workers >= len(downloads) {
workers = len(downloads)
}
// Make a set of worker goroutines which will each attempt to fully drain the
// download queue. When the queue is closed, each goroutine will exit.
// Each worker will send an error or a nil to the errors channel to signal
// that its work is done. We iterate over the error channel later to block
// until all of the work is done and to collect all of the non-nil errors.
downloadQueue := make(chan ObjectDownload, workers)
errs := make(chan error)
for i := 0; i < workers; i++ {
go func() {
for download := range downloadQueue {
errs <- bucket.DownloadObjectToFile(ctx, download.objectPath, download.localPath)
}
}()
}
// Enqueue all of the downloads.
go func() {
for _, download := range downloads {
// If the number of downloads is greater than the number of workers, this
// will block until a worker becomes available.
downloadQueue <- download
}
close(downloadQueue)
}()
// Drain the errors channel to populate the errors list and free up workers
// which have finished their downloads.
var errList []error
for i := 0; i < len(downloads); i++ {
if err := <-errs; err != nil {
errList = append(errList, err)
}
}
if len(errList) > 0 {
return errors.Join(errList...)
}
return nil
}
// DeleteDir deletes all objects in a bucket which are descendents of the provided directory.
func (bucket *GCSBucket) DeleteDir(ctx context.Context, dirName string) error {
glog.V(1).Infof("deleting %s", bucket.URI(dirName))
descendants, err := bucket.ListDescendants(ctx, dirName)
if err != nil {
return err
}
for _, objectName := range descendants {
err := bucket.DeleteObject(ctx, objectName)
if err != nil {
return err
}
}
return nil
}
// Exists returns whether or not there is an object with the given name in the bucket.
func (bucket *GCSBucket) Exists(ctx context.Context, objectName string) (bool, error) {
objectPath := bucket.Path(objectName)
_, err := bucket.handle.Object(objectPath).Attrs(ctx)
if err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist {
return false, nil
} else if err != nil {
return false, fmt.Errorf("failed to check if object %s exists: %v", bucket.URI(objectName), err)
} else {
return true, nil
}
}
// DownloadGCSObjectFromURL downloads the object at inputURL and saves it at destinationPath
func DownloadGCSObjectFromURL(ctx context.Context, gcsClient *storage.Client, inputURL, destinationPath string) error {
// GCS Client Reader.
gcsBucket, objectName, err := getGCSVariables(inputURL)
if err != nil {
glog.Errorf("Unable to get gcs variables - gcsBucket, gcsPath with the input url: %s", inputURL)
return err
}
return DownloadGCSObject(ctx, gcsClient, gcsBucket, objectName, destinationPath)
}
// DownloadGCSObject downloads the object at bucket, objectName and saves it at destinationPath
func DownloadGCSObject(ctx context.Context, gcsClient *storage.Client, gcsBucket, objectName, destinationPath string) error {
return NewGCSBucket(gcsClient, gcsBucket, "").DownloadObjectToFile(ctx, objectName, destinationPath)
}
// DownloadGCSObjectString downloads the object at inputURL and saves the contents of the object file to a string
func DownloadGCSObjectString(ctx context.Context,
gcsClient *storage.Client, inputURL string) (string, error) {
gcsBucket, objectName, err := getGCSVariables(inputURL)
if err != nil {
glog.Errorf("Unable to get gcs variables - gcsBucket, gcsPath with the input url: %s", inputURL)
}
return NewGCSBucket(gcsClient, gcsBucket, "").DownloadTextObject(ctx, objectName)
}
// GCSObjectExistsFromURL checks if an object exists at inputURL
func GCSObjectExistsFromURL(ctx context.Context, gcsClient *storage.Client, inputURL string) (bool, error) {
gcsBucket, objectName, err := getGCSVariables(inputURL)
if err != nil {
glog.Errorf("Unable to get gcs variables - gcsBucket, gcsPath with the input url: %s", inputURL)
return false, err
}
return GCSObjectExists(ctx, gcsClient, gcsBucket, objectName)
}
// GCSObjectExists checks if an object with objectName exists at gcsBucket objectName.
func GCSObjectExists(ctx context.Context, gcsClient *storage.Client, gcsBucket, objectName string) (bool, error) {
return NewGCSBucket(gcsClient, gcsBucket, "").Exists(ctx, objectName)
}
// ListGCSBucket lists all the objectNames in gcsBucket with prefix.
func ListGCSBucket(ctx context.Context, gcsClient *storage.Client, gcsBucket, prefix string) ([]string, error) {
return NewGCSBucket(gcsClient, gcsBucket, "").ListDescendants(ctx, prefix)
}
// UploadGCSObject uploads an object at inputPath to destination URL
func UploadGCSObject(ctx context.Context, gcsClient *storage.Client, inputPath, destinationURL string) error {
gcsBucket, name, err := getGCSVariables(destinationURL)
if err != nil {
return fmt.Errorf("error parsing destination URL: %v", err)
}
return NewGCSBucket(gcsClient, gcsBucket, "").UploadObjectFromFile(ctx, inputPath, name)
}
// UploadGCSObjectString uploads an input string as a file to destination URL
func UploadGCSObjectString(ctx context.Context, gcsClient *storage.Client, inputStr, destinationURL string) error {
gcsBucket, name, err := getGCSVariables(destinationURL)
if err != nil {
return fmt.Errorf("error parsing destination URL: %v", err)
}
reader := strings.NewReader(inputStr)
return NewGCSBucket(gcsClient, gcsBucket, "").UploadObject(ctx, reader, name)
}
// DeleteGCSObject deletes an object at the input URL
func DeleteGCSObject(ctx context.Context,
gcsClient *storage.Client, inputURL string) error {
gcsBucket, name, err := getGCSVariables(inputURL)
if err != nil {
return fmt.Errorf("error parsing input URL: %v", err)
}
return NewGCSBucket(gcsClient, gcsBucket, "").DeleteObject(ctx, name)
}
// getGCSVariables returns the GCSBucket, GCSPath, errorMsg) based on the gcsPath url input.
func getGCSVariables(gcsPath string) (string, string, error) {
parsedURL, err := url.Parse(gcsPath)
if err != nil {
glog.Errorf("Error - %v: Failed to parse gcsPath - %s", gcsPath, err)
return "", "", err
}
if parsedURL.Scheme == schemeGCS {
// In this case, the gcsPath looks like: gs://cos-tools/16108.403.42/kernel-headers.tgz
// url.Hostname() returns "cos-tools" which is the GCSBucket
// url.EscapedPath() returns "/16108.403.42/kernel-headers.tgz" with a leading "/".
return parsedURL.Hostname(), strings.TrimLeft(parsedURL.EscapedPath(), "/"), nil
} else if parsedURL.Scheme == schemeHttps && strings.ToLower(parsedURL.Host) == gcsHostName {
// In this case, the gcsPath looks like: https://storage.googleapis.com/cos-tools/1700.00.0/lakitu/gpu_default_version
// Split the path into parts using "/"
pathParts := strings.Split(parsedURL.Path, "/")
// Check if there are enough parts in the path
if len(pathParts) < 3 {
glog.Errorf("Error: Invalid gcsPath input: %s - the number of path parts separated by '/' is smaller than 3.", gcsPath)
return "", "", fmt.Errorf("error: invalid gcs_path input: %s - the number of path parts separated by '/' is smaller than 3", gcsPath)
}
gcsBucket := pathParts[1]
gcsPath := strings.Join(pathParts[2:], "/")
return gcsBucket, gcsPath, nil
} else {
glog.Errorf("Invalid input gcsPath: %s - unable to parse it to get gcsBucket and gcsPath", gcsPath)
return "", "", fmt.Errorf("error: invalid input gcs_path: %s - unable to parse it to get gcsBucket and gcsPath", gcsPath)
}
}