tessl install tessl/golang-cloud-google-com-go-storage@1.59.0Google Cloud Storage client library for Go providing comprehensive APIs for bucket and object operations, access control, and advanced features
This document covers high-performance features for large-scale operations including parallelized listing (dataflux) and downloads (transfer manager).
Status: Both packages are in preview and APIs may change.
Import path: cloud.google.com/go/storage/dataflux
Fast parallelized object listing using worksteal algorithm. Recommended for buckets with 50,000+ objects.
Parallelized object listing with adaptive algorithms.
/**
* Creates a new parallelized lister.
* @param c - Storage client
* @param in - Lister configuration
* @returns Lister and error
*/
func NewLister(c *storage.Client, in *ListerInput) (*Lister, error)
/**
* Lister provides parallel object listing with worksteal algorithm.
*/
type Lister struct {
// contains filtered or unexported fields
}
/**
* Returns the next batch of objects.
* Returns empty slice when listing is complete.
* @param ctx - Context for the operation
* @returns Slice of ObjectAttrs and error
*/
func (l *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
/**
* Closes the lister and releases resources.
*/
func (l *Lister) Close()
/**
* ListerInput configures the lister.
*/
type ListerInput struct {
// BucketName is the bucket to list (required)
BucketName string
// Parallelism is the number of parallel workers
// Default: 10x CPU count
// Optional
Parallelism int
// BatchSize is the minimum objects per batch
// Default: 50
// Optional
BatchSize int
// Query filters objects (same as storage.Query)
// Optional
Query *storage.Query
// SkipDirectoryObjects excludes directory objects
// Default: false
// Optional
SkipDirectoryObjects bool
}Usage Example:
import (
"context"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/dataflux"
)
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create lister
input := &dataflux.ListerInput{
BucketName: "my-large-bucket",
Parallelism: 20, // 20 parallel workers
BatchSize: 100, // 100 objects per batch
}
lister, err := dataflux.NewLister(client, input)
if err != nil {
log.Fatal(err)
}
defer lister.Close()
// List all objects in batches
totalObjects := 0
for {
objects, err := lister.NextBatch(ctx)
if err != nil {
log.Fatal(err)
}
if len(objects) == 0 {
break // Done listing
}
totalObjects += len(objects)
for _, obj := range objects {
fmt.Printf("Object: %s, Size: %d\n", obj.Name, obj.Size)
}
}
fmt.Printf("Total objects: %d\n", totalObjects)
// List with query filter
input = &dataflux.ListerInput{
BucketName: "my-bucket",
Query: &storage.Query{
Prefix: "logs/2024/",
},
SkipDirectoryObjects: true,
}
lister, err = dataflux.NewLister(client, input)
if err != nil {
log.Fatal(err)
}
defer lister.Close()
for {
objects, err := lister.NextBatch(ctx)
if err != nil {
log.Fatal(err)
}
if len(objects) == 0 {
break
}
// Process batch
}Import path: cloud.google.com/go/storage/transfermanager
Parallelized download with sharding for high-performance scenarios.
Manages parallelized downloads with part-based sharding.
/**
* Creates a new downloader.
* @param c - Storage client
* @param opts - Downloader options
* @returns Downloader and error
*/
func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error)
/**
* Downloader manages parallelized downloads.
* Thread-safe for concurrent use.
*/
type Downloader struct {
// contains filtered or unexported fields
}
/**
* Queues a single object download.
* Non-blocking - returns immediately.
* @param ctx - Context for the operation
* @param input - Download specification
* @returns Error if queueing fails
*/
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error
/**
* Queues a directory download (all objects with prefix).
* Non-blocking - returns immediately.
* @param ctx - Context for the operation
* @param input - Directory download specification
* @returns Error if queueing fails
*/
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error
/**
* Waits for all downloads to complete and closes the downloader.
* @returns Error if any download failed
*/
func (d *Downloader) WaitAndClose() error
/**
* Returns a channel for receiving download results.
* Only available if WithCallbacks() was not used.
* @returns Receive-only channel of DownloadOutput
*/
func (d *Downloader) Results() <-chan *DownloadOutput/**
* DownloadObjectInput specifies a single object download.
*/
type DownloadObjectInput struct {
// Bucket is the GCS bucket name (required)
Bucket string
// Object is the object name (required)
Object string
// Destination is where to write downloaded data (required)
// Must implement io.WriterAt for parallel writes
Destination io.WriterAt
// Generation specifies object generation (optional)
Generation int64
// Conditions are preconditions (optional)
Conditions *storage.Conditions
// EncryptionKey is the decryption key (optional)
EncryptionKey []byte
// Range specifies byte range to download (optional)
Range *DownloadRange
// Callback is called on completion (optional, only if WithCallbacks enabled)
Callback func(*DownloadOutput)
}
/**
* DownloadDirectoryInput specifies a directory download.
*/
type DownloadDirectoryInput struct {
// Bucket is the GCS bucket name (required)
Bucket string
// LocalDirectory is the target directory (required)
LocalDirectory string
// Prefix filters objects by name prefix (optional)
Prefix string
// StartOffset is the lexicographic start (optional)
StartOffset string
// EndOffset is the lexicographic end (optional)
EndOffset string
// MatchGlob is a glob pattern filter (optional)
MatchGlob string
// StripPrefix removes this prefix from local paths (optional)
StripPrefix string
// Callback is called when directory download completes (optional)
Callback func([]DownloadOutput)
// OnObjectDownload is called for each object (optional)
OnObjectDownload func(*DownloadOutput)
}
/**
* DownloadRange specifies a byte range.
*/
type DownloadRange struct {
// Offset is the starting byte (inclusive)
Offset int64
// Length is the number of bytes to read
Length int64
}
/**
* DownloadOutput contains download results.
*/
type DownloadOutput struct {
// Bucket is the bucket name
Bucket string
// Object is the object name
Object string
// Range is the downloaded range (if specified)
Range *DownloadRange
// Err is the download error (nil if successful)
Err error
// Attrs contains object metadata (if successful)
Attrs *storage.ObjectAttrs
}/**
* Option configures the downloader.
*/
type Option interface {
// contains filtered or unexported methods
}
/**
* Sets maximum concurrent goroutines.
* Default: CPU count / 2
* Maximum: 200
* @param numWorkers - Number of workers
* @returns Option
*/
func WithWorkers(numWorkers int) Option
/**
* Sets chunk size for sharding.
* Default: 32 MiB
* Set to 0 to disable sharding
* @param partSize - Chunk size in bytes
* @returns Option
*/
func WithPartSize(partSize int64) Option
/**
* Sets per-operation timeout.
* @param timeout - Timeout duration
* @returns Option
*/
func WithPerOpTimeout(timeout time.Duration) Option
/**
* Enables callback-based processing.
* Disables Results() channel.
* @returns Option
*/
func WithCallbacks() Option
/**
* Skips downloading files that already exist locally.
* Default: overwrite existing files
* @returns Option
*/
func SkipIfExists() OptionThread-safe buffer for downloads.
/**
* DownloadBuffer is a thread-safe buffer implementing io.WriterAt.
*/
type DownloadBuffer struct {
// contains filtered or unexported fields
}
/**
* Creates a download buffer from a byte slice.
* @param buf - Backing buffer
* @returns DownloadBuffer
*/
func NewDownloadBuffer(buf []byte) *DownloadBuffer
/**
* Writes data at the specified offset (thread-safe).
* Implements io.WriterAt.
* @param p - Data to write
* @param off - Offset to write at
* @returns Number of bytes written and error
*/
func (db *DownloadBuffer) WriteAt(p []byte, off int64) (int, error)
/**
* Returns the written data.
* @returns Byte slice
*/
func (db *DownloadBuffer) Bytes() []byteUsage Examples:
import (
"context"
"os"
"cloud.google.com/go/storage"
"cloud.google.com/go/storage/transfermanager"
)
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create downloader with options
downloader, err := transfermanager.NewDownloader(client,
transfermanager.WithWorkers(10),
transfermanager.WithPartSize(16*1024*1024), // 16 MiB chunks
)
if err != nil {
log.Fatal(err)
}
// Download single object to file
file, err := os.Create("/tmp/downloaded.dat")
if err != nil {
log.Fatal(err)
}
defer file.Close()
input := &transfermanager.DownloadObjectInput{
Bucket: "my-bucket",
Object: "large-file.dat",
Destination: file,
}
err = downloader.DownloadObject(ctx, input)
if err != nil {
log.Fatal(err)
}
// Wait for completion
err = downloader.WaitAndClose()
if err != nil {
log.Fatal(err)
}
// Download to memory buffer
buf := make([]byte, 10*1024*1024) // 10 MiB
downloadBuf := transfermanager.NewDownloadBuffer(buf)
downloader, _ = transfermanager.NewDownloader(client)
input = &transfermanager.DownloadObjectInput{
Bucket: "my-bucket",
Object: "data.bin",
Destination: downloadBuf,
}
downloader.DownloadObject(ctx, input)
downloader.WaitAndClose()
data := downloadBuf.Bytes()
fmt.Printf("Downloaded %d bytes\n", len(data))
// Download directory using results channel
downloader, _ = transfermanager.NewDownloader(client,
transfermanager.WithWorkers(20),
)
dirInput := &transfermanager.DownloadDirectoryInput{
Bucket: "my-bucket",
Prefix: "photos/",
LocalDirectory: "/tmp/photos",
}
err = downloader.DownloadDirectory(ctx, dirInput)
if err != nil {
log.Fatal(err)
}
// Process results as they complete
go func() {
for result := range downloader.Results() {
if result.Err != nil {
log.Printf("Failed to download %s: %v", result.Object, result.Err)
} else {
log.Printf("Downloaded %s (%d bytes)", result.Object, result.Attrs.Size)
}
}
}()
err = downloader.WaitAndClose()
// Download with callbacks
downloader, _ = transfermanager.NewDownloader(client,
transfermanager.WithCallbacks(),
transfermanager.WithWorkers(15),
)
dirInput = &transfermanager.DownloadDirectoryInput{
Bucket: "my-bucket",
Prefix: "logs/",
LocalDirectory: "/var/logs",
OnObjectDownload: func(output *transfermanager.DownloadOutput) {
if output.Err != nil {
log.Printf("Error: %s - %v", output.Object, output.Err)
} else {
log.Printf("Success: %s", output.Object)
}
},
Callback: func(outputs []transfermanager.DownloadOutput) {
log.Printf("Directory download complete: %d objects", len(outputs))
},
}
downloader.DownloadDirectory(ctx, dirInput)
downloader.WaitAndClose()
// Download byte range
downloader, _ = transfermanager.NewDownloader(client)
file, _ = os.Create("/tmp/partial.dat")
input = &transfermanager.DownloadObjectInput{
Bucket: "my-bucket",
Object: "large-file.dat",
Destination: file,
Range: &transfermanager.DownloadRange{
Offset: 1024, // Start at byte 1024
Length: 1024*1024, // Read 1 MiB
},
}
downloader.DownloadObject(ctx, input)
downloader.WaitAndClose()
// Skip existing files
downloader, _ = transfermanager.NewDownloader(client,
transfermanager.SkipIfExists(),
)
dirInput = &transfermanager.DownloadDirectoryInput{
Bucket: "my-bucket",
Prefix: "backups/",
LocalDirectory: "/backups",
}
downloader.DownloadDirectory(ctx, dirInput)
downloader.WaitAndClose()