tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Snapshots capture the state of a subscription at a specific point in time. Seeking allows subscriptions to jump to a specific timestamp or snapshot, enabling message replay and recovery.
type Snapshot struct {
// Has unexported fields
}
func (s *Snapshot) ID() string
func (s *Snapshot) Delete(ctx context.Context) error
func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error)A reference to a Pub/Sub snapshot.
Methods:
ID(): Returns the unique identifier of the snapshot within its projectDelete(): Deletes the snapshot (EXPERIMENTAL)SetLabels(): Sets or replaces the labels on the snapshot (EXPERIMENTAL)type SnapshotConfig struct {
*Snapshot
Topic *Topic
Expiration time.Time
Labels map[string]string
}Configuration and metadata for a snapshot.
Fields:
Snapshot: Reference to the snapshotTopic: The topic the snapshot was created fromExpiration: Timestamp when the snapshot will expireLabels: User-defined key-value labelsfunc (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)Creates a new snapshot from the subscription's current state. If name is empty, a unique name is assigned.
The snapshot is guaranteed to retain:
Parameters:
ctx: Context for the operationname: Snapshot ID (empty for auto-generated name)Returns: SnapshotConfig and error
Example:
sub := client.Subscription("my-subscription")
// Create snapshot with custom name
snapshot, err := sub.CreateSnapshot(ctx, "backup-snapshot")
if err != nil {
log.Fatalf("Failed to create snapshot: %v", err)
}
fmt.Printf("Created snapshot: %s\n", snapshot.Snapshot.ID())
fmt.Printf("Topic: %s\n", snapshot.Topic.ID())
fmt.Printf("Expiration: %v\n", snapshot.Expiration)
// Create snapshot with auto-generated name
snapshot, err = sub.CreateSnapshot(ctx, "")
if err != nil {
log.Fatalf("Failed to create snapshot: %v", err)
}
fmt.Printf("Created snapshot: %s\n", snapshot.Snapshot.ID())func (c *Client) Snapshot(id string) *SnapshotCreates a reference to a snapshot in the client's project.
Example:
snapshot := client.Snapshot("my-snapshot")
fmt.Printf("Snapshot ID: %s\n", snapshot.ID())func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIteratorReturns an iterator for listing all snapshots in the project.
type SnapshotConfigIterator struct {
// Has unexported fields
}
func (it *SnapshotConfigIterator) Next() (*SnapshotConfig, error)Iterator for listing snapshot configurations. Call Next() repeatedly until it returns iterator.Done.
Example:
it := client.Snapshots(ctx)
for {
snapshot, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate snapshots: %v", err)
}
fmt.Printf("Snapshot: %s\n", snapshot.Snapshot.ID())
fmt.Printf(" Topic: %s\n", snapshot.Topic.ID())
fmt.Printf(" Expiration: %v\n", snapshot.Expiration)
fmt.Printf(" Labels: %v\n", snapshot.Labels)
}func (s *Snapshot) Delete(ctx context.Context) errorDeletes the snapshot. This is an EXPERIMENTAL feature.
Example:
snapshot := client.Snapshot("old-snapshot")
if err := snapshot.Delete(ctx); err != nil {
log.Fatalf("Failed to delete snapshot: %v", err)
}func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error)Sets or replaces the labels on a snapshot. This is an EXPERIMENTAL feature.
Example:
snapshot := client.Snapshot("my-snapshot")
// Set labels
config, err := snapshot.SetLabels(ctx, map[string]string{
"env": "production",
"purpose": "disaster-recovery",
})
if err != nil {
log.Fatalf("Failed to set labels: %v", err)
}
fmt.Printf("Updated labels: %v\n", config.Labels)
// Clear all labels
config, err = snapshot.SetLabels(ctx, map[string]string{})func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) errorSeeks the subscription to a point in time, marking messages as acknowledged or unacknowledged based on publish time.
Behavior:
Parameters:
ctx: Context for the operationt: Time to seek toExample:
sub := client.Subscription("my-subscription")
// Seek to 24 hours ago
seekTime := time.Now().Add(-24 * time.Hour)
if err := sub.SeekToTime(ctx, seekTime); err != nil {
log.Fatalf("Failed to seek: %v", err)
}
fmt.Printf("Subscription seeked to %v\n", seekTime)
// Seek to specific timestamp
specificTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
if err := sub.SeekToTime(ctx, specificTime); err != nil {
log.Fatalf("Failed to seek: %v", err)
}func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) errorSeeks the subscription to a snapshot, restoring the subscription state to match the snapshot.
Requirements:
Parameters:
ctx: Context for the operationsnap: Snapshot to seek toExample:
sub := client.Subscription("my-subscription")
snapshot := client.Snapshot("backup-snapshot")
if err := sub.SeekToSnapshot(ctx, snapshot); err != nil {
log.Fatalf("Failed to seek to snapshot: %v", err)
}
fmt.Printf("Subscription seeked to snapshot: %s\n", snapshot.ID())Create regular snapshots for point-in-time recovery:
// Create daily backup snapshots
func createDailySnapshot(ctx context.Context, sub *pubsub.Subscription) error {
snapshotName := fmt.Sprintf("daily-backup-%s", time.Now().Format("2006-01-02"))
snapshot, err := sub.CreateSnapshot(ctx, snapshotName)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
// Add labels for tracking
_, err = snapshot.Snapshot.SetLabels(ctx, map[string]string{
"type": "backup",
"created": time.Now().Format(time.RFC3339),
})
return err
}
// Recover from disaster
func recoverFromSnapshot(ctx context.Context, sub *pubsub.Subscription, snapshotID string) error {
snapshot := sub.c.Snapshot(snapshotID)
if err := sub.SeekToSnapshot(ctx, snapshot); err != nil {
return fmt.Errorf("failed to recover: %v", err)
}
return nil
}Replay messages from a specific time:
// Replay messages from the last hour
func replayLastHour(ctx context.Context, sub *pubsub.Subscription) error {
replayTime := time.Now().Add(-1 * time.Hour)
if err := sub.SeekToTime(ctx, replayTime); err != nil {
return fmt.Errorf("failed to replay: %v", err)
}
fmt.Printf("Replaying messages from %v\n", replayTime)
return nil
}
// Replay messages from specific event
func replayFromEvent(ctx context.Context, sub *pubsub.Subscription, eventTime time.Time) error {
if err := sub.SeekToTime(ctx, eventTime); err != nil {
return fmt.Errorf("failed to replay from event: %v", err)
}
fmt.Printf("Replaying messages from event at %v\n", eventTime)
return nil
}Use snapshots to create consistent test environments:
// Setup test snapshot
func setupTestData(ctx context.Context, sub *pubsub.Subscription) (*pubsub.SnapshotConfig, error) {
// Publish test messages
topic := client.Topic("test-topic")
for i := 0; i < 10; i++ {
msg := &pubsub.Message{
Data: []byte(fmt.Sprintf("test message %d", i)),
}
result := topic.Publish(ctx, msg)
if _, err := result.Get(ctx); err != nil {
return nil, err
}
}
// Create snapshot of test data
return sub.CreateSnapshot(ctx, "test-data-snapshot")
}
// Reset to test snapshot before each test
func resetToTestSnapshot(ctx context.Context, sub *pubsub.Subscription) error {
snapshot := client.Snapshot("test-data-snapshot")
return sub.SeekToSnapshot(ctx, snapshot)
}Snapshot before major changes:
// Create pre-migration snapshot
func createPreMigrationSnapshot(ctx context.Context, sub *pubsub.Subscription) (*pubsub.SnapshotConfig, error) {
snapshotName := fmt.Sprintf("pre-migration-%s", time.Now().Format("20060102-150405"))
snapshot, err := sub.CreateSnapshot(ctx, snapshotName)
if err != nil {
return nil, err
}
_, err = snapshot.Snapshot.SetLabels(ctx, map[string]string{
"purpose": "migration",
"timestamp": time.Now().Format(time.RFC3339),
})
return snapshot, err
}
// Rollback if migration fails
func rollbackMigration(ctx context.Context, sub *pubsub.Subscription, snapshotID string) error {
snapshot := client.Snapshot(snapshotID)
return sub.SeekToSnapshot(ctx, snapshot)
}When using snapshots and seeking, consider the topic's message retention settings:
// Create topic with extended retention for seeking
topic, err := client.CreateTopicWithConfig(ctx, "seekable-topic", &pubsub.TopicConfig{
RetentionDuration: optional.Duration(7 * 24 * time.Hour), // 7 days
})
// Create subscription with retention
sub, err := client.CreateSubscription(ctx, "seekable-sub", pubsub.SubscriptionConfig{
Topic: topic,
RetentionDuration: 7 * 24 * time.Hour, // 7 days
})Snapshots expire after 7 days by default. Plan accordingly:
// Check snapshot expiration
it := client.Snapshots(ctx)
for {
snapshot, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
// Delete snapshots expiring soon
if time.Until(snapshot.Expiration) < 24*time.Hour {
fmt.Printf("Snapshot %s expiring soon: %v\n", snapshot.Snapshot.ID(), snapshot.Expiration)
// Optionally delete or recreate
}
}