| // Copyright 2021 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package gcs |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "net/url" |
| "os" |
| "path" |
| "path/filepath" |
| "strings" |
| |
| "cloud.google.com/go/storage" |
| "github.com/golang/glog" |
| "google.golang.org/api/iterator" |
| ) |
| |
| const schemeGCS = "gs" |
| const schemeHttps = "https" |
| const gcsHostName = "storage.googleapis.com" |
| |
| // GCSBucket represents a bucket which can be used for repeated GCS access. |
| // If a prefix is provided, all GCSBucket methods will be performed relative to |
| // that prefix. |
| type GCSBucket struct { |
| handle *storage.BucketHandle |
| name string |
| prefix string |
| } |
| |
| // NewGCSBucket returns a GCSBucket for a client with the given bucket name and prefix. |
| func NewGCSBucket(client *storage.Client, bucketName, prefix string) *GCSBucket { |
| handle := client.Bucket(bucketName) |
| return &GCSBucket{handle, bucketName, prefix} |
| } |
| |
| // GCSBucketFromPath returns a GCSBucket for a client with the bucket name and prefix |
| // extracted from a provided GCS path. |
| // |
| // Returns an error if the GCS path parsing fails. |
| func GCSBucketFromPath(client *storage.Client, gcsPath string) (*GCSBucket, error) { |
| bucketName, prefix, err := getGCSVariables(gcsPath) |
| if err != nil { |
| return nil, err |
| } |
| return NewGCSBucket(client, bucketName, prefix), err |
| } |
| |
| // Path returns the path of an object or directory relative to the bucket's prefix. |
| // For instance, if bucket.prefix is "abc/def", then bucket.Path("ghi/jkl.txt") |
| // will return "abc/def/ghi/jkl.txt". |
| func (bucket *GCSBucket) Path(objectOrDirName string) string { |
| return path.Join(bucket.prefix, objectOrDirName) |
| } |
| |
| // URI returns the GCS URI of an object relative to the bucket's prefix. |
| // For instance, if bucket.name is "mybucket" and bucket.prefix is 'abc', |
| // then bucket.URI('def.txt') will return gs://mybucket/abc/def.txt. |
| func (bucket *GCSBucket) URI(objectName string) string { |
| objectPath := bucket.Path(objectName) |
| return fmt.Sprintf("%s://%s", schemeGCS, path.Join(bucket.name, objectPath)) |
| } |
| |
| // UploadObjectFromFile uploads the contents of a local file to an object in the bucket. |
| func (bucket *GCSBucket) UploadObjectFromFile(ctx context.Context, localPath string, objectName string) error { |
| glog.V(1).Infof("uploading %s to %s", localPath, bucket.URI(objectName)) |
| |
| file, err := os.Open(localPath) |
| if err != nil { |
| return fmt.Errorf("failed to create reader for local path %s: %v", localPath, err) |
| } |
| defer file.Close() |
| |
| err = bucket.UploadObject(ctx, file, objectName) |
| if err != nil { |
| return fmt.Errorf("failed to upload contents of %s to %s: %v", localPath, bucket.URI(objectName), err) |
| } |
| |
| glog.V(1).Infof("successfully uploaded %s to %s", localPath, bucket.URI(objectName)) |
| return nil |
| } |
| |
| // UploadObject uploads the contents of a reader to an object in the bucket. |
| func (bucket *GCSBucket) UploadObject(ctx context.Context, reader io.Reader, objectName string) error { |
| objectPath := bucket.Path(objectName) |
| writer := bucket.handle.Object(objectPath).NewWriter(ctx) |
| |
| _, err := io.Copy(writer, reader) |
| if err != nil { |
| return fmt.Errorf("failed to write object %s: %v", bucket.URI(objectName), err) |
| } |
| |
| err = writer.Close() |
| if err != nil { |
| return fmt.Errorf("failed to close writer for object %s: %v", bucket.URI(objectName), err) |
| } |
| |
| return nil |
| } |
| |
| // DownloadObjectToFile downloads an object from the bucket to a local file. |
| // Creates the parent directories of the local file if they don't already exist. |
| func (bucket *GCSBucket) DownloadObjectToFile(ctx context.Context, objectName string, localPath string) error { |
| glog.V(1).Infof("downloading %s to %s", bucket.URI(objectName), localPath) |
| objectPath := bucket.Path(objectName) |
| |
| reader, err := bucket.handle.Object(objectPath).NewReader(ctx) |
| if err != nil { |
| return fmt.Errorf("failed to create the reader from GCS client: %v", err) |
| } |
| defer reader.Close() |
| |
| err = os.MkdirAll(filepath.Dir(localPath), 0777) |
| if err != nil { |
| return fmt.Errorf("failed to create parent directories for output file %s: %v", localPath, err) |
| } |
| |
| writer, err := os.OpenFile(localPath, os.O_WRONLY|os.O_CREATE, 0666) |
| if err != nil { |
| return fmt.Errorf("failed to create output file %s: %v", localPath, err) |
| } |
| |
| _, err = io.Copy(writer, reader) |
| if err != nil { |
| writer_err := writer.Close() |
| if writer_err != nil { |
| glog.Errorf("failed to close writer for %s: %v", localPath, writer_err) |
| } |
| remove_err := os.Remove(localPath) |
| if remove_err != nil { |
| glog.Errorf("failed to remove %s while handling copy error: %v", localPath, remove_err) |
| } |
| return fmt.Errorf("failed to download contents of %s to %s: %v", bucket.URI(objectName), localPath, err) |
| } |
| |
| err = writer.Close() |
| if err != nil { |
| return fmt.Errorf("failed to close writer for %s: %v", localPath, err) |
| } |
| |
| glog.V(1).Infof("successfully downloaded %s to %s", bucket.URI(objectName), localPath) |
| return nil |
| } |
| |
| // DownloadObject downloads an object from the bucket and returns its contents as a byte slice. |
| func (bucket *GCSBucket) DownloadObject(ctx context.Context, objectName string) ([]byte, error) { |
| objectPath := bucket.Path(objectName) |
| reader, err := bucket.handle.Object(objectPath).NewReader(ctx) |
| if err != nil { |
| return []byte{}, fmt.Errorf("failed to create reader for object %s: %v", bucket.URI(objectName), err) |
| } |
| defer reader.Close() |
| |
| bytes, err := io.ReadAll(reader) |
| if err != nil { |
| return []byte{}, fmt.Errorf("failed to read object %s: %v", bucket.URI(objectName), err) |
| } |
| |
| return bytes, nil |
| } |
| |
| // DownloadTextObject downloads an object from the bucket and returns its contents as a string. |
| func (bucket *GCSBucket) DownloadTextObject(ctx context.Context, objectName string) (string, error) { |
| bytes, err := bucket.DownloadObject(ctx, objectName) |
| if err != nil { |
| return "", err |
| } |
| |
| return string(bytes), nil |
| } |
| |
| // DeleteObject deletes an object from the bucket. |
| func (bucket *GCSBucket) DeleteObject(ctx context.Context, objectName string) error { |
| glog.V(1).Infof("deleting %s", bucket.URI(objectName)) |
| objectName = bucket.Path(objectName) |
| |
| err := bucket.handle.Object(objectName).Delete(ctx) |
| if err != nil { |
| return fmt.Errorf("failed to delete object %s: %v", bucket.URI(objectName), err) |
| } |
| return nil |
| } |
| |
| // UploadDir uploads the contents of a local source directory to a destination directory |
| // in the bucket, preserving the directory structure. |
| func (bucket *GCSBucket) UploadDir(ctx context.Context, srcDir string, dstDir string) error { |
| entries, err := os.ReadDir(srcDir) |
| if err != nil { |
| return err |
| } |
| |
| for _, entry := range entries { |
| entrySrc := path.Join(srcDir, entry.Name()) |
| entryDst := path.Join(dstDir, entry.Name()) |
| entryInfo, err := entry.Info() |
| if err != nil { |
| return err |
| } |
| |
| if entryInfo.IsDir() { |
| err = bucket.UploadDir(ctx, entrySrc, entryDst) |
| } else { |
| err = bucket.UploadObjectFromFile(ctx, entrySrc, entryDst) |
| } |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // QueryObjectNames returns the names of all objects in a bucket matching |
| // a query. |
| // If a delimiter is provided, the output may include synthetic "directory" |
| // entries, which match the query but are not real objects. |
| func (bucket *GCSBucket) QueryObjectNames(ctx context.Context, query *storage.Query) ([]string, error) { |
| var objectNames []string |
| it := bucket.handle.Objects(ctx, query) |
| for { |
| attrs, err := it.Next() |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return []string{}, fmt.Errorf("failure while iterating over GCS entries in bucket %s: %v", bucket.name, err) |
| } |
| name := attrs.Name |
| // "Directory" entries will have an empty name and a non-empty prefix |
| if name == "" { |
| name = attrs.Prefix |
| } |
| relativeName := strings.TrimPrefix(name, bucket.prefix) |
| objectNames = append(objectNames, relativeName) |
| } |
| |
| return objectNames, nil |
| } |
| |
| // ListDir returns the list of all paths in a bucket which are one directory |
| // level below the provided path. |
| // The paths will be relative to the bucket prefix. |
| func (bucket *GCSBucket) ListDir(ctx context.Context, dirName string) ([]string, error) { |
| dirPath := bucket.Path(dirName) |
| query := &storage.Query{ |
| Delimiter: "/", |
| Prefix: dirPath + "/", // path.Join removes trailing slashes, but storage expects the slash in the prefix for directory listing |
| } |
| query.SetAttrSelection([]string{"Name", "Prefix"}) |
| return bucket.QueryObjectNames(ctx, query) |
| } |
| |
| // ListDescendants returns the list of all objects in a bucket which have a path |
| // beginning with the provided directory name. |
| // The object names will be relative to the bucket prefix. |
| func (bucket *GCSBucket) ListDescendants(ctx context.Context, dirName string) ([]string, error) { |
| dirPath := bucket.Path(dirName) |
| query := &storage.Query{Prefix: dirPath} |
| query.SetAttrSelection([]string{"Name"}) |
| return bucket.QueryObjectNames(ctx, query) |
| } |
| |
| // DownloadDir downloads all objects from a bucket which are descendents of the provided |
| // source directory and saves them to the local destination directory, preserving the |
| // directory structure from the bucket. |
| func (bucket *GCSBucket) DownloadDir(ctx context.Context, srcDir string, dstDir string) error { |
| downloads, err := bucket.descendantDownloads(ctx, srcDir, dstDir) |
| if err != nil { |
| return err |
| } |
| |
| return bucket.DownloadObjects(ctx, downloads) |
| } |
| |
| // DownloadDirParallel downloads all objects from a bucket which are descendents of the |
| // provided source directory and saves them to the local destination directory, preserving |
| // the directory structure from the bucket. |
| // |
| // See DownloadObjectsParallel for more info on `workers` and possible errors. |
| func (bucket *GCSBucket) DownloadDirParallel(ctx context.Context, srcDir string, dstDir string, workers int) error { |
| downloads, err := bucket.descendantDownloads(ctx, srcDir, dstDir) |
| if err != nil { |
| return err |
| } |
| |
| return bucket.DownloadObjectsParallel(ctx, downloads, workers) |
| } |
| |
| // An ObjectDownload specifies the remote object path for a download and the local |
| // path to which an object should be downloaded. |
| type ObjectDownload struct { |
| objectPath string |
| localPath string |
| } |
| |
| // NewObjectDownload returns a new ObjectDownload for the provided remote object |
| // path and local path. |
| func NewObjectDownload(objectPath, localPath string) ObjectDownload { |
| return ObjectDownload{objectPath: objectPath, localPath: localPath} |
| } |
| |
| // descendantDownloads gets the list of ObjectDownloads which corresponding to downloading |
| // all descendants of a bucket directory to a local directory while preserving the directory |
| // structure. |
| func (bucket *GCSBucket) descendantDownloads(ctx context.Context, bucketDir, localDir string) ([]ObjectDownload, error) { |
| descendants, err := bucket.ListDescendants(ctx, bucketDir) |
| if err != nil { |
| return nil, err |
| } |
| |
| var downloads []ObjectDownload |
| for _, objectName := range descendants { |
| // In a GCS bucket, the paths some/path and /some/path are different. |
| // However, since they both represent the same relative local directory, |
| // we can handle both cases by always removing any leading slashes when |
| // computing the relative name. |
| cleanObjectName := strings.TrimPrefix(path.Clean(objectName), "/") |
| cleanSrcDir := strings.TrimPrefix(path.Clean(bucketDir), "/") |
| relativeName := strings.TrimPrefix(cleanObjectName, cleanSrcDir) |
| localPath := path.Join(localDir, relativeName) |
| |
| download := NewObjectDownload(objectName, localPath) |
| downloads = append(downloads, download) |
| } |
| |
| return downloads, nil |
| } |
| |
| // DownloadObjects downloads a list of objects from a bucket. |
| // |
| // This will not stop downloading objects if some downloads fail. After all |
| // downloads have finished or failed, an error will be returned which summarizes |
| // the set of failed downloads. |
| func (bucket *GCSBucket) DownloadObjects(ctx context.Context, downloads []ObjectDownload) error { |
| var errs []error |
| for _, download := range downloads { |
| if err := bucket.DownloadObjectToFile(ctx, download.objectPath, download.localPath); err != nil { |
| errs = append(errs, err) |
| } |
| } |
| |
| if len(errs) > 0 { |
| return errors.Join(errs...) |
| } |
| |
| return nil |
| } |
| |
| // DownloadObjectsParallel downloads a list of objects from a bucket in parallel. |
| // |
| // If `workers` is negative or 0, this will spawn one worker per object which will be |
| // downloaded. Otherwise, `workers` will be the maximum number of workers which will be |
| // downloading at one time. |
| // |
| // This will not stop downloading objects if some downloads fail. After all |
| // downloads have finished or failed, an error will be returned which summarizes |
| // the set of failed downloads. |
| func (bucket *GCSBucket) DownloadObjectsParallel(ctx context.Context, downloads []ObjectDownload, workers int) error { |
| if workers <= 0 { |
| workers = len(downloads) |
| } |
| if workers >= len(downloads) { |
| workers = len(downloads) |
| } |
| |
| // Make a set of worker goroutines which will each attempt to fully drain the |
| // download queue. When the queue is closed, each goroutine will exit. |
| // Each worker will send an error or a nil to the errors channel to signal |
| // that its work is done. We iterate over the error channel later to block |
| // until all of the work is done and to collect all of the non-nil errors. |
| downloadQueue := make(chan ObjectDownload, workers) |
| errs := make(chan error) |
| for i := 0; i < workers; i++ { |
| go func() { |
| for download := range downloadQueue { |
| errs <- bucket.DownloadObjectToFile(ctx, download.objectPath, download.localPath) |
| } |
| }() |
| } |
| |
| // Enqueue all of the downloads. |
| go func() { |
| for _, download := range downloads { |
| // If the number of downloads is greater than the number of workers, this |
| // will block until a worker becomes available. |
| downloadQueue <- download |
| } |
| close(downloadQueue) |
| }() |
| |
| // Drain the errors channel to populate the errors list and free up workers |
| // which have finished their downloads. |
| var errList []error |
| for i := 0; i < len(downloads); i++ { |
| if err := <-errs; err != nil { |
| errList = append(errList, err) |
| } |
| } |
| |
| if len(errList) > 0 { |
| return errors.Join(errList...) |
| } |
| |
| return nil |
| } |
| |
| // DeleteDir deletes all objects in a bucket which are descendents of the provided directory. |
| func (bucket *GCSBucket) DeleteDir(ctx context.Context, dirName string) error { |
| glog.V(1).Infof("deleting %s", bucket.URI(dirName)) |
| |
| descendants, err := bucket.ListDescendants(ctx, dirName) |
| if err != nil { |
| return err |
| } |
| |
| for _, objectName := range descendants { |
| err := bucket.DeleteObject(ctx, objectName) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // Exists returns whether or not there is an object with the given name in the bucket. |
| func (bucket *GCSBucket) Exists(ctx context.Context, objectName string) (bool, error) { |
| objectPath := bucket.Path(objectName) |
| _, err := bucket.handle.Object(objectPath).Attrs(ctx) |
| if err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist { |
| return false, nil |
| } else if err != nil { |
| return false, fmt.Errorf("failed to check if object %s exists: %v", bucket.URI(objectName), err) |
| } else { |
| return true, nil |
| } |
| } |
| |
| // DownloadGCSObjectFromURL downloads the object at inputURL and saves it at destinationPath |
| func DownloadGCSObjectFromURL(ctx context.Context, gcsClient *storage.Client, inputURL, destinationPath string) error { |
| // GCS Client Reader. |
| gcsBucket, objectName, err := getGCSVariables(inputURL) |
| if err != nil { |
| glog.Errorf("Unable to get gcs variables - gcsBucket, gcsPath with the input url: %s", inputURL) |
| return err |
| } |
| return DownloadGCSObject(ctx, gcsClient, gcsBucket, objectName, destinationPath) |
| } |
| |
| // DownloadGCSObject downloads the object at bucket, objectName and saves it at destinationPath |
| func DownloadGCSObject(ctx context.Context, gcsClient *storage.Client, gcsBucket, objectName, destinationPath string) error { |
| return NewGCSBucket(gcsClient, gcsBucket, "").DownloadObjectToFile(ctx, objectName, destinationPath) |
| } |
| |
| // DownloadGCSObjectString downloads the object at inputURL and saves the contents of the object file to a string |
| func DownloadGCSObjectString(ctx context.Context, |
| gcsClient *storage.Client, inputURL string) (string, error) { |
| gcsBucket, objectName, err := getGCSVariables(inputURL) |
| if err != nil { |
| glog.Errorf("Unable to get gcs variables - gcsBucket, gcsPath with the input url: %s", inputURL) |
| } |
| return NewGCSBucket(gcsClient, gcsBucket, "").DownloadTextObject(ctx, objectName) |
| } |
| |
| // GCSObjectExistsFromURL checks if an object exists at inputURL |
| func GCSObjectExistsFromURL(ctx context.Context, gcsClient *storage.Client, inputURL string) (bool, error) { |
| gcsBucket, objectName, err := getGCSVariables(inputURL) |
| if err != nil { |
| glog.Errorf("Unable to get gcs variables - gcsBucket, gcsPath with the input url: %s", inputURL) |
| return false, err |
| } |
| return GCSObjectExists(ctx, gcsClient, gcsBucket, objectName) |
| } |
| |
| // GCSObjectExists checks if an object with objectName exists at gcsBucket objectName. |
| func GCSObjectExists(ctx context.Context, gcsClient *storage.Client, gcsBucket, objectName string) (bool, error) { |
| return NewGCSBucket(gcsClient, gcsBucket, "").Exists(ctx, objectName) |
| } |
| |
| // ListGCSBucket lists all the objectNames in gcsBucket with prefix. |
| func ListGCSBucket(ctx context.Context, gcsClient *storage.Client, gcsBucket, prefix string) ([]string, error) { |
| return NewGCSBucket(gcsClient, gcsBucket, "").ListDescendants(ctx, prefix) |
| } |
| |
| // UploadGCSObject uploads an object at inputPath to destination URL |
| func UploadGCSObject(ctx context.Context, gcsClient *storage.Client, inputPath, destinationURL string) error { |
| gcsBucket, name, err := getGCSVariables(destinationURL) |
| if err != nil { |
| return fmt.Errorf("error parsing destination URL: %v", err) |
| } |
| return NewGCSBucket(gcsClient, gcsBucket, "").UploadObjectFromFile(ctx, inputPath, name) |
| } |
| |
| // UploadGCSObjectString uploads an input string as a file to destination URL |
| func UploadGCSObjectString(ctx context.Context, gcsClient *storage.Client, inputStr, destinationURL string) error { |
| gcsBucket, name, err := getGCSVariables(destinationURL) |
| if err != nil { |
| return fmt.Errorf("error parsing destination URL: %v", err) |
| } |
| reader := strings.NewReader(inputStr) |
| return NewGCSBucket(gcsClient, gcsBucket, "").UploadObject(ctx, reader, name) |
| } |
| |
| // DeleteGCSObject deletes an object at the input URL |
| func DeleteGCSObject(ctx context.Context, |
| gcsClient *storage.Client, inputURL string) error { |
| gcsBucket, name, err := getGCSVariables(inputURL) |
| if err != nil { |
| return fmt.Errorf("error parsing input URL: %v", err) |
| } |
| return NewGCSBucket(gcsClient, gcsBucket, "").DeleteObject(ctx, name) |
| } |
| |
| // getGCSVariables returns the GCSBucket, GCSPath, errorMsg) based on the gcsPath url input. |
| func getGCSVariables(gcsPath string) (string, string, error) { |
| parsedURL, err := url.Parse(gcsPath) |
| if err != nil { |
| glog.Errorf("Error - %v: Failed to parse gcsPath - %s", gcsPath, err) |
| return "", "", err |
| } |
| if parsedURL.Scheme == schemeGCS { |
| // In this case, the gcsPath looks like: gs://cos-tools/16108.403.42/kernel-headers.tgz |
| // url.Hostname() returns "cos-tools" which is the GCSBucket |
| // url.EscapedPath() returns "/16108.403.42/kernel-headers.tgz" with a leading "/". |
| return parsedURL.Hostname(), strings.TrimLeft(parsedURL.EscapedPath(), "/"), nil |
| } else if parsedURL.Scheme == schemeHttps && strings.ToLower(parsedURL.Host) == gcsHostName { |
| // In this case, the gcsPath looks like: https://storage.googleapis.com/cos-tools/1700.00.0/lakitu/gpu_default_version |
| // Split the path into parts using "/" |
| pathParts := strings.Split(parsedURL.Path, "/") |
| // Check if there are enough parts in the path |
| if len(pathParts) < 3 { |
| glog.Errorf("Error: Invalid gcsPath input: %s - the number of path parts separated by '/' is smaller than 3.", gcsPath) |
| return "", "", fmt.Errorf("error: invalid gcs_path input: %s - the number of path parts separated by '/' is smaller than 3", gcsPath) |
| } |
| gcsBucket := pathParts[1] |
| gcsPath := strings.Join(pathParts[2:], "/") |
| return gcsBucket, gcsPath, nil |
| } else { |
| glog.Errorf("Invalid input gcsPath: %s - unable to parse it to get gcsBucket and gcsPath", gcsPath) |
| return "", "", fmt.Errorf("error: invalid input gcs_path: %s - unable to parse it to get gcsBucket and gcsPath", gcsPath) |
| } |
| } |