or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/storage@1.59.x

docs

access-control.mdadvanced-features.mdclient-and-buckets.mdcontrol-api.mdexperimental-features.mdindex.mdobject-operations.mdperformance-features.md
tile.json

tessl/golang-cloud-google-com-go-storage

tessl install tessl/golang-cloud-google-com-go-storage@1.59.0

Google Cloud Storage client library for Go providing comprehensive APIs for bucket and object operations, access control, and advanced features

performance-features.mddocs/

Performance Features

This document covers high-performance features for large-scale operations including parallelized listing (dataflux) and downloads (transfer manager).

Status: Both packages are in preview and APIs may change.

Dataflux - Fast Parallel Listing

Import path: cloud.google.com/go/storage/dataflux

Fast parallelized object listing using worksteal algorithm. Recommended for buckets with 50,000+ objects.

Lister

Parallelized object listing with adaptive algorithms.

/**
 * Creates a new parallelized lister.
 * @param c - Storage client
 * @param in - Lister configuration
 * @returns Lister and error
 */
func NewLister(c *storage.Client, in *ListerInput) (*Lister, error)

/**
 * Lister provides parallel object listing with worksteal algorithm.
 */
type Lister struct {
    // contains filtered or unexported fields
}

/**
 * Returns the next batch of objects.
 * Returns empty slice when listing is complete.
 * @param ctx - Context for the operation
 * @returns Slice of ObjectAttrs and error
 */
func (l *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)

/**
 * Closes the lister and releases resources.
 */
func (l *Lister) Close()

/**
 * ListerInput configures the lister.
 */
type ListerInput struct {
    // BucketName is the bucket to list (required)
    BucketName string

    // Parallelism is the number of parallel workers
    // Default: 10x CPU count
    // Optional
    Parallelism int

    // BatchSize is the minimum objects per batch
    // Default: 50
    // Optional
    BatchSize int

    // Query filters objects (same as storage.Query)
    // Optional
    Query *storage.Query

    // SkipDirectoryObjects excludes directory objects
    // Default: false
    // Optional
    SkipDirectoryObjects bool
}

Usage Example:

import (
    "context"
    "cloud.google.com/go/storage"
    "cloud.google.com/go/storage/dataflux"
)

ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Create lister
input := &dataflux.ListerInput{
    BucketName:  "my-large-bucket",
    Parallelism: 20,           // 20 parallel workers
    BatchSize:   100,          // 100 objects per batch
}
lister, err := dataflux.NewLister(client, input)
if err != nil {
    log.Fatal(err)
}
defer lister.Close()

// List all objects in batches
totalObjects := 0
for {
    objects, err := lister.NextBatch(ctx)
    if err != nil {
        log.Fatal(err)
    }
    if len(objects) == 0 {
        break // Done listing
    }

    totalObjects += len(objects)
    for _, obj := range objects {
        fmt.Printf("Object: %s, Size: %d\n", obj.Name, obj.Size)
    }
}
fmt.Printf("Total objects: %d\n", totalObjects)

// List with query filter
input = &dataflux.ListerInput{
    BucketName: "my-bucket",
    Query: &storage.Query{
        Prefix: "logs/2024/",
    },
    SkipDirectoryObjects: true,
}
lister, err = dataflux.NewLister(client, input)
if err != nil {
    log.Fatal(err)
}
defer lister.Close()

for {
    objects, err := lister.NextBatch(ctx)
    if err != nil {
        log.Fatal(err)
    }
    if len(objects) == 0 {
        break
    }
    // Process batch
}

Transfer Manager - Parallelized Downloads

Import path: cloud.google.com/go/storage/transfermanager

Parallelized download with sharding for high-performance scenarios.

Downloader

Manages parallelized downloads with part-based sharding.

/**
 * Creates a new downloader.
 * @param c - Storage client
 * @param opts - Downloader options
 * @returns Downloader and error
 */
func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error)

/**
 * Downloader manages parallelized downloads.
 * Thread-safe for concurrent use.
 */
type Downloader struct {
    // contains filtered or unexported fields
}

/**
 * Queues a single object download.
 * Non-blocking - returns immediately.
 * @param ctx - Context for the operation
 * @param input - Download specification
 * @returns Error if queueing fails
 */
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error

/**
 * Queues a directory download (all objects with prefix).
 * Non-blocking - returns immediately.
 * @param ctx - Context for the operation
 * @param input - Directory download specification
 * @returns Error if queueing fails
 */
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error

/**
 * Waits for all downloads to complete and closes the downloader.
 * @returns Error if any download failed
 */
func (d *Downloader) WaitAndClose() error

/**
 * Returns a channel for receiving download results.
 * Only available if WithCallbacks() was not used.
 * @returns Receive-only channel of DownloadOutput
 */
func (d *Downloader) Results() <-chan *DownloadOutput

Download Input Types

/**
 * DownloadObjectInput specifies a single object download.
 */
type DownloadObjectInput struct {
    // Bucket is the GCS bucket name (required)
    Bucket string

    // Object is the object name (required)
    Object string

    // Destination is where to write downloaded data (required)
    // Must implement io.WriterAt for parallel writes
    Destination io.WriterAt

    // Generation specifies object generation (optional)
    Generation int64

    // Conditions are preconditions (optional)
    Conditions *storage.Conditions

    // EncryptionKey is the decryption key (optional)
    EncryptionKey []byte

    // Range specifies byte range to download (optional)
    Range *DownloadRange

    // Callback is called on completion (optional, only if WithCallbacks enabled)
    Callback func(*DownloadOutput)
}

/**
 * DownloadDirectoryInput specifies a directory download.
 */
type DownloadDirectoryInput struct {
    // Bucket is the GCS bucket name (required)
    Bucket string

    // LocalDirectory is the target directory (required)
    LocalDirectory string

    // Prefix filters objects by name prefix (optional)
    Prefix string

    // StartOffset is the lexicographic start (optional)
    StartOffset string

    // EndOffset is the lexicographic end (optional)
    EndOffset string

    // MatchGlob is a glob pattern filter (optional)
    MatchGlob string

    // StripPrefix removes this prefix from local paths (optional)
    StripPrefix string

    // Callback is called when directory download completes (optional)
    Callback func([]DownloadOutput)

    // OnObjectDownload is called for each object (optional)
    OnObjectDownload func(*DownloadOutput)
}

/**
 * DownloadRange specifies a byte range.
 */
type DownloadRange struct {
    // Offset is the starting byte (inclusive)
    Offset int64

    // Length is the number of bytes to read
    Length int64
}

/**
 * DownloadOutput contains download results.
 */
type DownloadOutput struct {
    // Bucket is the bucket name
    Bucket string

    // Object is the object name
    Object string

    // Range is the downloaded range (if specified)
    Range *DownloadRange

    // Err is the download error (nil if successful)
    Err error

    // Attrs contains object metadata (if successful)
    Attrs *storage.ObjectAttrs
}

Download Options

/**
 * Option configures the downloader.
 */
type Option interface {
    // contains filtered or unexported methods
}

/**
 * Sets maximum concurrent goroutines.
 * Default: CPU count / 2
 * Maximum: 200
 * @param numWorkers - Number of workers
 * @returns Option
 */
func WithWorkers(numWorkers int) Option

/**
 * Sets chunk size for sharding.
 * Default: 32 MiB
 * Set to 0 to disable sharding
 * @param partSize - Chunk size in bytes
 * @returns Option
 */
func WithPartSize(partSize int64) Option

/**
 * Sets per-operation timeout.
 * @param timeout - Timeout duration
 * @returns Option
 */
func WithPerOpTimeout(timeout time.Duration) Option

/**
 * Enables callback-based processing.
 * Disables Results() channel.
 * @returns Option
 */
func WithCallbacks() Option

/**
 * Skips downloading files that already exist locally.
 * Default: overwrite existing files
 * @returns Option
 */
func SkipIfExists() Option

Download Buffer

Thread-safe buffer for downloads.

/**
 * DownloadBuffer is a thread-safe buffer implementing io.WriterAt.
 */
type DownloadBuffer struct {
    // contains filtered or unexported fields
}

/**
 * Creates a download buffer from a byte slice.
 * @param buf - Backing buffer
 * @returns DownloadBuffer
 */
func NewDownloadBuffer(buf []byte) *DownloadBuffer

/**
 * Writes data at the specified offset (thread-safe).
 * Implements io.WriterAt.
 * @param p - Data to write
 * @param off - Offset to write at
 * @returns Number of bytes written and error
 */
func (db *DownloadBuffer) WriteAt(p []byte, off int64) (int, error)

/**
 * Returns the written data.
 * @returns Byte slice
 */
func (db *DownloadBuffer) Bytes() []byte

Usage Examples:

import (
    "context"
    "os"
    "cloud.google.com/go/storage"
    "cloud.google.com/go/storage/transfermanager"
)

ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Create downloader with options
downloader, err := transfermanager.NewDownloader(client,
    transfermanager.WithWorkers(10),
    transfermanager.WithPartSize(16*1024*1024), // 16 MiB chunks
)
if err != nil {
    log.Fatal(err)
}

// Download single object to file
file, err := os.Create("/tmp/downloaded.dat")
if err != nil {
    log.Fatal(err)
}
defer file.Close()

input := &transfermanager.DownloadObjectInput{
    Bucket:      "my-bucket",
    Object:      "large-file.dat",
    Destination: file,
}
err = downloader.DownloadObject(ctx, input)
if err != nil {
    log.Fatal(err)
}

// Wait for completion
err = downloader.WaitAndClose()
if err != nil {
    log.Fatal(err)
}

// Download to memory buffer
buf := make([]byte, 10*1024*1024) // 10 MiB
downloadBuf := transfermanager.NewDownloadBuffer(buf)

downloader, _ = transfermanager.NewDownloader(client)
input = &transfermanager.DownloadObjectInput{
    Bucket:      "my-bucket",
    Object:      "data.bin",
    Destination: downloadBuf,
}
downloader.DownloadObject(ctx, input)
downloader.WaitAndClose()

data := downloadBuf.Bytes()
fmt.Printf("Downloaded %d bytes\n", len(data))

// Download directory using results channel
downloader, _ = transfermanager.NewDownloader(client,
    transfermanager.WithWorkers(20),
)

dirInput := &transfermanager.DownloadDirectoryInput{
    Bucket:         "my-bucket",
    Prefix:         "photos/",
    LocalDirectory: "/tmp/photos",
}
err = downloader.DownloadDirectory(ctx, dirInput)
if err != nil {
    log.Fatal(err)
}

// Process results as they complete
go func() {
    for result := range downloader.Results() {
        if result.Err != nil {
            log.Printf("Failed to download %s: %v", result.Object, result.Err)
        } else {
            log.Printf("Downloaded %s (%d bytes)", result.Object, result.Attrs.Size)
        }
    }
}()

err = downloader.WaitAndClose()

// Download with callbacks
downloader, _ = transfermanager.NewDownloader(client,
    transfermanager.WithCallbacks(),
    transfermanager.WithWorkers(15),
)

dirInput = &transfermanager.DownloadDirectoryInput{
    Bucket:         "my-bucket",
    Prefix:         "logs/",
    LocalDirectory: "/var/logs",
    OnObjectDownload: func(output *transfermanager.DownloadOutput) {
        if output.Err != nil {
            log.Printf("Error: %s - %v", output.Object, output.Err)
        } else {
            log.Printf("Success: %s", output.Object)
        }
    },
    Callback: func(outputs []transfermanager.DownloadOutput) {
        log.Printf("Directory download complete: %d objects", len(outputs))
    },
}
downloader.DownloadDirectory(ctx, dirInput)
downloader.WaitAndClose()

// Download byte range
downloader, _ = transfermanager.NewDownloader(client)
file, _ = os.Create("/tmp/partial.dat")

input = &transfermanager.DownloadObjectInput{
    Bucket:      "my-bucket",
    Object:      "large-file.dat",
    Destination: file,
    Range: &transfermanager.DownloadRange{
        Offset: 1024,      // Start at byte 1024
        Length: 1024*1024, // Read 1 MiB
    },
}
downloader.DownloadObject(ctx, input)
downloader.WaitAndClose()

// Skip existing files
downloader, _ = transfermanager.NewDownloader(client,
    transfermanager.SkipIfExists(),
)

dirInput = &transfermanager.DownloadDirectoryInput{
    Bucket:         "my-bucket",
    Prefix:         "backups/",
    LocalDirectory: "/backups",
}
downloader.DownloadDirectory(ctx, dirInput)
downloader.WaitAndClose()