or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

delivery.mddocs/

Delivery Configurations

Pub/Sub supports multiple delivery methods beyond pull subscriptions: push to HTTP endpoints, BigQuery tables, and Cloud Storage buckets. This document covers all delivery configuration options.

Delivery Types

Subscriptions can use one of the following mutually exclusive delivery methods:

  • Pull: Application pulls messages using the Receive API (default)
  • Push: Pub/Sub pushes messages to HTTP/HTTPS endpoints
  • BigQuery: Pub/Sub writes messages to BigQuery tables
  • Cloud Storage: Pub/Sub writes messages to Cloud Storage buckets

Push Configuration

PushConfig

type PushConfig struct {
    Endpoint             string
    Attributes           map[string]string
    AuthenticationMethod AuthenticationMethod
    Wrapper              Wrapper
}

Configuration for push delivery to an HTTP/HTTPS endpoint.

Fields:

  • Endpoint: URL to receive messages (must be HTTPS for production)
  • Attributes: Endpoint configuration attributes
  • AuthenticationMethod: Authentication method for push endpoint
  • Wrapper: Message wrapper format (PubsubWrapper or NoWrapper)

Example:

sub, err := client.CreateSubscription(ctx, "push-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    PushConfig: pubsub.PushConfig{
        Endpoint: "https://example.com/push-handler",
        Attributes: map[string]string{
            "x-goog-version": "v1",
        },
    },
})

Push Authentication

AuthenticationMethod

type AuthenticationMethod interface {
    isAuthMethod() bool
}

Interface for push endpoint authentication methods.

OIDCToken

type OIDCToken struct {
    ServiceAccountEmail string
    Audience            string
}

func (oidcToken *OIDCToken) isAuthMethod() bool

OpenID Connect token authentication for push endpoints.

Fields:

  • ServiceAccountEmail: Service account email for generating OIDC token
  • Audience: Audience claim for JWT (defaults to push endpoint URL if not specified)

Permissions Required: Caller needs iam.serviceAccounts.actAs permission on the service account.

Example:

sub, err := client.CreateSubscription(ctx, "authenticated-push-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    PushConfig: pubsub.PushConfig{
        Endpoint: "https://secure.example.com/webhook",
        AuthenticationMethod: &pubsub.OIDCToken{
            ServiceAccountEmail: "push-sa@my-project.iam.gserviceaccount.com",
            Audience:            "https://secure.example.com",
        },
    },
})

Message Wrapper Formats

Wrapper Interface

type Wrapper interface {
    isWrapper() bool
}

Interface for message wrapper formats.

PubsubWrapper

type PubsubWrapper struct{}

func (p *PubsubWrapper) isWrapper() bool

Sends payload in JSON representation of PubsubMessage format.

Format:

{
  "message": {
    "data": "base64-encoded-data",
    "attributes": {
      "key1": "value1",
      "key2": "value2"
    },
    "messageId": "1234567890",
    "publishTime": "2024-01-15T10:00:00Z",
    "orderingKey": "key1"
  },
  "subscription": "projects/my-project/subscriptions/my-sub"
}

Example:

sub, err := client.CreateSubscription(ctx, "wrapped-push-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    PushConfig: pubsub.PushConfig{
        Endpoint: "https://example.com/pubsub-handler",
        Wrapper:  &pubsub.PubsubWrapper{},
    },
})

NoWrapper

type NoWrapper struct {
    WriteMetadata bool
}

func (n *NoWrapper) isWrapper() bool

Sends raw message payload without wrapping.

Fields:

  • WriteMetadata: Include Pub/Sub metadata as HTTP headers

Example:

sub, err := client.CreateSubscription(ctx, "unwrapped-push-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    PushConfig: pubsub.PushConfig{
        Endpoint: "https://example.com/raw-handler",
        Wrapper: &pubsub.NoWrapper{
            WriteMetadata: true, // Send metadata as headers
        },
    },
})

Updating Push Configuration

// Update existing pull subscription to push
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    PushConfig: &pubsub.PushConfig{
        Endpoint: "https://new-endpoint.example.com/handler",
    },
})

// Convert back to pull
config, err = sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    PushConfig: &pubsub.PushConfig{}, // Empty config reverts to pull
})

BigQuery Configuration

BigQueryConfig

type BigQueryConfig struct {
    Table             string
    UseTopicSchema    bool
    UseTableSchema    bool
    WriteMetadata     bool
    DropUnknownFields bool
    State             BigQueryConfigState
}

Configuration for BigQuery subscription.

Fields:

  • Table: BigQuery table name (format: projectId:datasetId.tableId)
  • UseTopicSchema: Use topic's schema for table columns (mutually exclusive with UseTableSchema)
  • UseTableSchema: Use table's schema for columns (mutually exclusive with UseTopicSchema)
  • WriteMetadata: Write Pub/Sub metadata (subscription name, message_id, publish_time, attributes, ordering_key) to additional columns
  • DropUnknownFields: Drop fields not in table schema (requires UseTopicSchema=true)
  • State: Current state (output only)

Example:

sub, err := client.CreateSubscription(ctx, "bigquery-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    BigQueryConfig: pubsub.BigQueryConfig{
        Table:          "my-project:my_dataset.my_table",
        UseTopicSchema: true,
        WriteMetadata:  true,
    },
})

BigQuery States

type BigQueryConfigState int

const (
    BigQueryConfigStateUnspecified BigQueryConfigState = iota
    BigQueryConfigActive
    BigQueryConfigPermissionDenied
    BigQueryConfigNotFound
    BigQueryConfigSchemaMismatch
)

Possible states for BigQuery subscription:

  • BigQueryConfigStateUnspecified: Default/unused
  • BigQueryConfigActive: Subscription can actively send messages to BigQuery
  • BigQueryConfigPermissionDenied: Permission denied writing to BigQuery table
  • BigQueryConfigNotFound: BigQuery table does not exist
  • BigQueryConfigSchemaMismatch: Schema mismatch with BigQuery table

Monitoring State:

config, err := sub.Config(ctx)
if err != nil {
    log.Fatal(err)
}

switch config.BigQueryConfig.State {
case pubsub.BigQueryConfigActive:
    fmt.Println("BigQuery subscription is active")
case pubsub.BigQueryConfigPermissionDenied:
    fmt.Println("Permission denied - check IAM permissions")
case pubsub.BigQueryConfigNotFound:
    fmt.Println("Table not found - create table")
case pubsub.BigQueryConfigSchemaMismatch:
    fmt.Println("Schema mismatch - update schema or set DropUnknownFields")
}

BigQuery Schema Mapping

With Topic Schema:

// Topic has Avro schema
topic, err := client.CreateTopicWithConfig(ctx, "bq-topic", &pubsub.TopicConfig{
    SchemaSettings: &pubsub.SchemaSettings{
        Schema:   "projects/my-project/schemas/user-schema",
        Encoding: pubsub.EncodingJSON,
    },
})

// Subscription uses topic schema
sub, err := client.CreateSubscription(ctx, "bq-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    BigQueryConfig: pubsub.BigQueryConfig{
        Table:             "my-project:dataset.users",
        UseTopicSchema:    true,
        DropUnknownFields: true, // Drop extra fields
        WriteMetadata:     true, // Add metadata columns
    },
})

With Table Schema:

sub, err := client.CreateSubscription(ctx, "bq-table-schema-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    BigQueryConfig: pubsub.BigQueryConfig{
        Table:          "my-project:dataset.events",
        UseTableSchema: true, // Use existing table schema
    },
})

Cloud Storage Configuration

CloudStorageConfig

type CloudStorageConfig struct {
    Bucket         string
    FilenamePrefix string
    FilenameSuffix string
    OutputFormat   isCloudStorageOutputFormat
    MaxDuration    optional.Duration
    MaxBytes       int64
    State          CloudStorageConfigState
}

Configuration for Cloud Storage subscription.

Fields:

  • Bucket: Cloud Storage bucket name (without gs:// prefix)
  • FilenamePrefix: Prefix for generated filenames
  • FilenameSuffix: Suffix for generated filenames
  • OutputFormat: Output format (Text or Avro)
  • MaxDuration: Maximum duration before creating new file (1 min to 10 min, default: 5 min)
  • MaxBytes: Maximum bytes before creating new file (1 KB to 10 GiB)
  • State: Current state (output only)

Example:

sub, err := client.CreateSubscription(ctx, "gcs-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    CloudStorageConfig: pubsub.CloudStorageConfig{
        Bucket:         "my-bucket",
        FilenamePrefix: "pubsub/messages-",
        FilenameSuffix: ".json",
        OutputFormat:   &pubsub.CloudStorageOutputFormatTextConfig{},
        MaxDuration:    optional.Duration(5 * time.Minute),
        MaxBytes:       100 * 1024 * 1024, // 100 MB
    },
})

Cloud Storage Output Formats

isCloudStorageOutputFormat

type isCloudStorageOutputFormat interface {
    isCloudStorageOutputFormat()
}

Interface for Cloud Storage output formats.

CloudStorageOutputFormatTextConfig

type CloudStorageOutputFormatTextConfig struct{}

func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat()

Write message data as raw text, separated by newlines.

Example:

sub, err := client.CreateSubscription(ctx, "gcs-text-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    CloudStorageConfig: pubsub.CloudStorageConfig{
        Bucket:       "my-bucket",
        OutputFormat: &pubsub.CloudStorageOutputFormatTextConfig{},
    },
})

CloudStorageOutputFormatAvroConfig

type CloudStorageOutputFormatAvroConfig struct {
    WriteMetadata bool
}

func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat()

Write message data as Avro binary.

Fields:

  • WriteMetadata: Write Pub/Sub metadata (subscription name, message_id, publish_time, attributes, ordering_key) to Avro output

Example:

sub, err := client.CreateSubscription(ctx, "gcs-avro-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    CloudStorageConfig: pubsub.CloudStorageConfig{
        Bucket: "my-bucket",
        OutputFormat: &pubsub.CloudStorageOutputFormatAvroConfig{
            WriteMetadata: true,
        },
    },
})

Cloud Storage States

type CloudStorageConfigState int

const (
    CloudStorageConfigStateUnspecified CloudStorageConfigState = iota
    CloudStorageConfigActive
    CloudStorageConfigPermissionDenied
    CloudStorageConfigNotFound
)

Possible states for Cloud Storage subscription:

  • CloudStorageConfigStateUnspecified: Default/unused
  • CloudStorageConfigActive: Subscription can actively write to Cloud Storage
  • CloudStorageConfigPermissionDenied: Permission denied writing to bucket
  • CloudStorageConfigNotFound: Cloud Storage bucket does not exist

Monitoring State:

config, err := sub.Config(ctx)
if err != nil {
    log.Fatal(err)
}

switch config.CloudStorageConfig.State {
case pubsub.CloudStorageConfigActive:
    fmt.Println("Cloud Storage subscription is active")
case pubsub.CloudStorageConfigPermissionDenied:
    fmt.Println("Permission denied - check bucket IAM")
case pubsub.CloudStorageConfigNotFound:
    fmt.Println("Bucket not found - create bucket")
}

File Naming Pattern

Cloud Storage files are created with the pattern:

{FilenamePrefix}{YYYY}-{MM}-{DD}T{HH}:{MM}:{SS}.{NNNNNNNNN}_{uuid}{FilenameSuffix}

Example filenames:

messages-2024-01-15T10:30:45.123456789_a1b2c3d4.json
events-2024-01-15T10:30:45.987654321_e5f6g7h8.avro

Switching Delivery Methods

Pull to Push

sub := client.Subscription("my-sub")

// Convert pull to push
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    PushConfig: &pubsub.PushConfig{
        Endpoint: "https://example.com/handler",
    },
})

Pull to BigQuery

config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    BigQueryConfig: &pubsub.BigQueryConfig{
        Table:          "my-project:dataset.table",
        UseTopicSchema: true,
    },
})

Pull to Cloud Storage

config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    CloudStorageConfig: &pubsub.CloudStorageConfig{
        Bucket:       "my-bucket",
        OutputFormat: &pubsub.CloudStorageOutputFormatTextConfig{},
    },
})

Back to Pull

// Use empty config to revert to pull
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    PushConfig: &pubsub.PushConfig{},
})

IAM Permissions

Push Subscriptions

No additional permissions required - Pub/Sub uses its own service account.

BigQuery Subscriptions

Grant the Pub/Sub service account permissions on the BigQuery dataset:

roles/bigquery.dataEditor

Cloud Storage Subscriptions

Grant the Pub/Sub service account permissions on the bucket:

roles/storage.objectCreator

Finding Service Account:

service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

Best Practices

  1. Push Endpoints:

    • Use HTTPS for production
    • Implement proper authentication (OIDC tokens)
    • Handle retries and errors gracefully
    • Verify message signatures
    • Return 2xx status codes for successful processing
  2. BigQuery:

    • Create tables with appropriate schemas before subscription
    • Use DropUnknownFields for schema evolution
    • Monitor State field for errors
    • Set appropriate partition and clustering
    • Use WriteMetadata for debugging
  3. Cloud Storage:

    • Choose appropriate MaxDuration and MaxBytes for file size
    • Use organized prefix patterns for easier querying
    • Enable versioning on buckets for safety
    • Monitor State field for errors
    • Use Avro for structured data, text for logs
  4. General:

    • Only one delivery method per subscription
    • Monitor subscription states regularly
    • Test delivery configurations before production
    • Configure appropriate retry policies
    • Set up alerting on delivery failures
    • Use labels to track subscription purposes