or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

snapshots.mddocs/

Snapshots and Seeking

Snapshots capture the state of a subscription at a specific point in time. Seeking allows subscriptions to jump to a specific timestamp or snapshot, enabling message replay and recovery.

Snapshot

type Snapshot struct {
    // Has unexported fields
}

func (s *Snapshot) ID() string
func (s *Snapshot) Delete(ctx context.Context) error
func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error)

A reference to a Pub/Sub snapshot.

Methods:

  • ID(): Returns the unique identifier of the snapshot within its project
  • Delete(): Deletes the snapshot (EXPERIMENTAL)
  • SetLabels(): Sets or replaces the labels on the snapshot (EXPERIMENTAL)

SnapshotConfig

type SnapshotConfig struct {
    *Snapshot
    Topic      *Topic
    Expiration time.Time
    Labels     map[string]string
}

Configuration and metadata for a snapshot.

Fields:

  • Snapshot: Reference to the snapshot
  • Topic: The topic the snapshot was created from
  • Expiration: Timestamp when the snapshot will expire
  • Labels: User-defined key-value labels

Creating Snapshots

Create from Subscription

func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)

Creates a new snapshot from the subscription's current state. If name is empty, a unique name is assigned.

The snapshot is guaranteed to retain:

  • The existing backlog on the subscription (unacknowledged messages)
  • Any messages published to the topic after the snapshot is created

Parameters:

  • ctx: Context for the operation
  • name: Snapshot ID (empty for auto-generated name)

Returns: SnapshotConfig and error

Example:

sub := client.Subscription("my-subscription")

// Create snapshot with custom name
snapshot, err := sub.CreateSnapshot(ctx, "backup-snapshot")
if err != nil {
    log.Fatalf("Failed to create snapshot: %v", err)
}
fmt.Printf("Created snapshot: %s\n", snapshot.Snapshot.ID())
fmt.Printf("Topic: %s\n", snapshot.Topic.ID())
fmt.Printf("Expiration: %v\n", snapshot.Expiration)

// Create snapshot with auto-generated name
snapshot, err = sub.CreateSnapshot(ctx, "")
if err != nil {
    log.Fatalf("Failed to create snapshot: %v", err)
}
fmt.Printf("Created snapshot: %s\n", snapshot.Snapshot.ID())

Accessing Snapshots

Get Snapshot Reference

func (c *Client) Snapshot(id string) *Snapshot

Creates a reference to a snapshot in the client's project.

Example:

snapshot := client.Snapshot("my-snapshot")
fmt.Printf("Snapshot ID: %s\n", snapshot.ID())

List Snapshots

func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator

Returns an iterator for listing all snapshots in the project.

SnapshotConfigIterator

type SnapshotConfigIterator struct {
    // Has unexported fields
}

func (it *SnapshotConfigIterator) Next() (*SnapshotConfig, error)

Iterator for listing snapshot configurations. Call Next() repeatedly until it returns iterator.Done.

Example:

it := client.Snapshots(ctx)
for {
    snapshot, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to iterate snapshots: %v", err)
    }
    fmt.Printf("Snapshot: %s\n", snapshot.Snapshot.ID())
    fmt.Printf("  Topic: %s\n", snapshot.Topic.ID())
    fmt.Printf("  Expiration: %v\n", snapshot.Expiration)
    fmt.Printf("  Labels: %v\n", snapshot.Labels)
}

Managing Snapshots

Delete Snapshot

func (s *Snapshot) Delete(ctx context.Context) error

Deletes the snapshot. This is an EXPERIMENTAL feature.

Example:

snapshot := client.Snapshot("old-snapshot")
if err := snapshot.Delete(ctx); err != nil {
    log.Fatalf("Failed to delete snapshot: %v", err)
}

Update Snapshot Labels

func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error)

Sets or replaces the labels on a snapshot. This is an EXPERIMENTAL feature.

Example:

snapshot := client.Snapshot("my-snapshot")

// Set labels
config, err := snapshot.SetLabels(ctx, map[string]string{
    "env":     "production",
    "purpose": "disaster-recovery",
})
if err != nil {
    log.Fatalf("Failed to set labels: %v", err)
}
fmt.Printf("Updated labels: %v\n", config.Labels)

// Clear all labels
config, err = snapshot.SetLabels(ctx, map[string]string{})

Seeking to Timestamps

SeekToTime

func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error

Seeks the subscription to a point in time, marking messages as acknowledged or unacknowledged based on publish time.

Behavior:

  • Messages published before the time are marked as acknowledged
  • Messages published after the time are marked as unacknowledged
  • Only affects messages retained in the subscription (within retention window)
  • Messages outside retention window are not restored

Parameters:

  • ctx: Context for the operation
  • t: Time to seek to

Example:

sub := client.Subscription("my-subscription")

// Seek to 24 hours ago
seekTime := time.Now().Add(-24 * time.Hour)
if err := sub.SeekToTime(ctx, seekTime); err != nil {
    log.Fatalf("Failed to seek: %v", err)
}
fmt.Printf("Subscription seeked to %v\n", seekTime)

// Seek to specific timestamp
specificTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
if err := sub.SeekToTime(ctx, specificTime); err != nil {
    log.Fatalf("Failed to seek: %v", err)
}

Seeking to Snapshots

SeekToSnapshot

func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error

Seeks the subscription to a snapshot, restoring the subscription state to match the snapshot.

Requirements:

  • The snapshot need not be created from this subscription
  • The snapshot must be for the same topic the subscription is subscribed to

Parameters:

  • ctx: Context for the operation
  • snap: Snapshot to seek to

Example:

sub := client.Subscription("my-subscription")
snapshot := client.Snapshot("backup-snapshot")

if err := sub.SeekToSnapshot(ctx, snapshot); err != nil {
    log.Fatalf("Failed to seek to snapshot: %v", err)
}
fmt.Printf("Subscription seeked to snapshot: %s\n", snapshot.ID())

Use Cases and Patterns

Disaster Recovery

Create regular snapshots for point-in-time recovery:

// Create daily backup snapshots
func createDailySnapshot(ctx context.Context, sub *pubsub.Subscription) error {
    snapshotName := fmt.Sprintf("daily-backup-%s", time.Now().Format("2006-01-02"))
    snapshot, err := sub.CreateSnapshot(ctx, snapshotName)
    if err != nil {
        return fmt.Errorf("failed to create snapshot: %v", err)
    }

    // Add labels for tracking
    _, err = snapshot.Snapshot.SetLabels(ctx, map[string]string{
        "type":    "backup",
        "created": time.Now().Format(time.RFC3339),
    })
    return err
}

// Recover from disaster
func recoverFromSnapshot(ctx context.Context, sub *pubsub.Subscription, snapshotID string) error {
    snapshot := sub.c.Snapshot(snapshotID)
    if err := sub.SeekToSnapshot(ctx, snapshot); err != nil {
        return fmt.Errorf("failed to recover: %v", err)
    }
    return nil
}

Message Replay

Replay messages from a specific time:

// Replay messages from the last hour
func replayLastHour(ctx context.Context, sub *pubsub.Subscription) error {
    replayTime := time.Now().Add(-1 * time.Hour)
    if err := sub.SeekToTime(ctx, replayTime); err != nil {
        return fmt.Errorf("failed to replay: %v", err)
    }

    fmt.Printf("Replaying messages from %v\n", replayTime)
    return nil
}

// Replay messages from specific event
func replayFromEvent(ctx context.Context, sub *pubsub.Subscription, eventTime time.Time) error {
    if err := sub.SeekToTime(ctx, eventTime); err != nil {
        return fmt.Errorf("failed to replay from event: %v", err)
    }

    fmt.Printf("Replaying messages from event at %v\n", eventTime)
    return nil
}

Testing and Development

Use snapshots to create consistent test environments:

// Setup test snapshot
func setupTestData(ctx context.Context, sub *pubsub.Subscription) (*pubsub.SnapshotConfig, error) {
    // Publish test messages
    topic := client.Topic("test-topic")
    for i := 0; i < 10; i++ {
        msg := &pubsub.Message{
            Data: []byte(fmt.Sprintf("test message %d", i)),
        }
        result := topic.Publish(ctx, msg)
        if _, err := result.Get(ctx); err != nil {
            return nil, err
        }
    }

    // Create snapshot of test data
    return sub.CreateSnapshot(ctx, "test-data-snapshot")
}

// Reset to test snapshot before each test
func resetToTestSnapshot(ctx context.Context, sub *pubsub.Subscription) error {
    snapshot := client.Snapshot("test-data-snapshot")
    return sub.SeekToSnapshot(ctx, snapshot)
}

Migration and Cutover

Snapshot before major changes:

// Create pre-migration snapshot
func createPreMigrationSnapshot(ctx context.Context, sub *pubsub.Subscription) (*pubsub.SnapshotConfig, error) {
    snapshotName := fmt.Sprintf("pre-migration-%s", time.Now().Format("20060102-150405"))
    snapshot, err := sub.CreateSnapshot(ctx, snapshotName)
    if err != nil {
        return nil, err
    }

    _, err = snapshot.Snapshot.SetLabels(ctx, map[string]string{
        "purpose":   "migration",
        "timestamp": time.Now().Format(time.RFC3339),
    })
    return snapshot, err
}

// Rollback if migration fails
func rollbackMigration(ctx context.Context, sub *pubsub.Subscription, snapshotID string) error {
    snapshot := client.Snapshot(snapshotID)
    return sub.SeekToSnapshot(ctx, snapshot)
}

Retention Considerations

Topic Message Retention

When using snapshots and seeking, consider the topic's message retention settings:

// Create topic with extended retention for seeking
topic, err := client.CreateTopicWithConfig(ctx, "seekable-topic", &pubsub.TopicConfig{
    RetentionDuration: optional.Duration(7 * 24 * time.Hour), // 7 days
})

// Create subscription with retention
sub, err := client.CreateSubscription(ctx, "seekable-sub", pubsub.SubscriptionConfig{
    Topic:             topic,
    RetentionDuration: 7 * 24 * time.Hour, // 7 days
})

Snapshot Expiration

Snapshots expire after 7 days by default. Plan accordingly:

// Check snapshot expiration
it := client.Snapshots(ctx)
for {
    snapshot, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatal(err)
    }

    // Delete snapshots expiring soon
    if time.Until(snapshot.Expiration) < 24*time.Hour {
        fmt.Printf("Snapshot %s expiring soon: %v\n", snapshot.Snapshot.ID(), snapshot.Expiration)
        // Optionally delete or recreate
    }
}

Best Practices

  1. Naming: Use descriptive names with timestamps for snapshots
  2. Labels: Tag snapshots with metadata (purpose, date, version)
  3. Cleanup: Delete old snapshots to avoid clutter
  4. Retention: Configure adequate retention on topics and subscriptions
  5. Testing: Test seek operations before production use
  6. Documentation: Document snapshot purposes and retention policies
  7. Monitoring: Track snapshot creation and usage
  8. Automation: Automate snapshot creation for regular backups
  9. Validation: Verify subscription state after seeking
  10. Recovery Plan: Document recovery procedures using snapshots

Limitations

  • Snapshots expire after 7 days
  • Seeking only affects messages within the retention window
  • Seeking does not restore messages already expired
  • Snapshot must be for the same topic as the subscription
  • Seeking resets all acknowledgment states