tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Pub/Sub supports multiple delivery methods beyond pull subscriptions: push to HTTP endpoints, BigQuery tables, and Cloud Storage buckets. This document covers all delivery configuration options.
Subscriptions can use one of the following mutually exclusive delivery methods:
type PushConfig struct {
Endpoint string
Attributes map[string]string
AuthenticationMethod AuthenticationMethod
Wrapper Wrapper
}Configuration for push delivery to an HTTP/HTTPS endpoint.
Fields:
Endpoint: URL to receive messages (must be HTTPS for production)Attributes: Endpoint configuration attributesAuthenticationMethod: Authentication method for push endpointWrapper: Message wrapper format (PubsubWrapper or NoWrapper)Example:
sub, err := client.CreateSubscription(ctx, "push-sub", pubsub.SubscriptionConfig{
Topic: topic,
PushConfig: pubsub.PushConfig{
Endpoint: "https://example.com/push-handler",
Attributes: map[string]string{
"x-goog-version": "v1",
},
},
})type AuthenticationMethod interface {
isAuthMethod() bool
}Interface for push endpoint authentication methods.
type OIDCToken struct {
ServiceAccountEmail string
Audience string
}
func (oidcToken *OIDCToken) isAuthMethod() boolOpenID Connect token authentication for push endpoints.
Fields:
ServiceAccountEmail: Service account email for generating OIDC tokenAudience: Audience claim for JWT (defaults to push endpoint URL if not specified)Permissions Required: Caller needs iam.serviceAccounts.actAs permission on the service account.
Example:
sub, err := client.CreateSubscription(ctx, "authenticated-push-sub", pubsub.SubscriptionConfig{
Topic: topic,
PushConfig: pubsub.PushConfig{
Endpoint: "https://secure.example.com/webhook",
AuthenticationMethod: &pubsub.OIDCToken{
ServiceAccountEmail: "push-sa@my-project.iam.gserviceaccount.com",
Audience: "https://secure.example.com",
},
},
})type Wrapper interface {
isWrapper() bool
}Interface for message wrapper formats.
type PubsubWrapper struct{}
func (p *PubsubWrapper) isWrapper() boolSends payload in JSON representation of PubsubMessage format.
Format:
{
"message": {
"data": "base64-encoded-data",
"attributes": {
"key1": "value1",
"key2": "value2"
},
"messageId": "1234567890",
"publishTime": "2024-01-15T10:00:00Z",
"orderingKey": "key1"
},
"subscription": "projects/my-project/subscriptions/my-sub"
}Example:
sub, err := client.CreateSubscription(ctx, "wrapped-push-sub", pubsub.SubscriptionConfig{
Topic: topic,
PushConfig: pubsub.PushConfig{
Endpoint: "https://example.com/pubsub-handler",
Wrapper: &pubsub.PubsubWrapper{},
},
})type NoWrapper struct {
WriteMetadata bool
}
func (n *NoWrapper) isWrapper() boolSends raw message payload without wrapping.
Fields:
WriteMetadata: Include Pub/Sub metadata as HTTP headersExample:
sub, err := client.CreateSubscription(ctx, "unwrapped-push-sub", pubsub.SubscriptionConfig{
Topic: topic,
PushConfig: pubsub.PushConfig{
Endpoint: "https://example.com/raw-handler",
Wrapper: &pubsub.NoWrapper{
WriteMetadata: true, // Send metadata as headers
},
},
})// Update existing pull subscription to push
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "https://new-endpoint.example.com/handler",
},
})
// Convert back to pull
config, err = sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{}, // Empty config reverts to pull
})type BigQueryConfig struct {
Table string
UseTopicSchema bool
UseTableSchema bool
WriteMetadata bool
DropUnknownFields bool
State BigQueryConfigState
}Configuration for BigQuery subscription.
Fields:
Table: BigQuery table name (format: projectId:datasetId.tableId)UseTopicSchema: Use topic's schema for table columns (mutually exclusive with UseTableSchema)UseTableSchema: Use table's schema for columns (mutually exclusive with UseTopicSchema)WriteMetadata: Write Pub/Sub metadata (subscription name, message_id, publish_time, attributes, ordering_key) to additional columnsDropUnknownFields: Drop fields not in table schema (requires UseTopicSchema=true)State: Current state (output only)Example:
sub, err := client.CreateSubscription(ctx, "bigquery-sub", pubsub.SubscriptionConfig{
Topic: topic,
BigQueryConfig: pubsub.BigQueryConfig{
Table: "my-project:my_dataset.my_table",
UseTopicSchema: true,
WriteMetadata: true,
},
})type BigQueryConfigState int
const (
BigQueryConfigStateUnspecified BigQueryConfigState = iota
BigQueryConfigActive
BigQueryConfigPermissionDenied
BigQueryConfigNotFound
BigQueryConfigSchemaMismatch
)Possible states for BigQuery subscription:
BigQueryConfigStateUnspecified: Default/unusedBigQueryConfigActive: Subscription can actively send messages to BigQueryBigQueryConfigPermissionDenied: Permission denied writing to BigQuery tableBigQueryConfigNotFound: BigQuery table does not existBigQueryConfigSchemaMismatch: Schema mismatch with BigQuery tableMonitoring State:
config, err := sub.Config(ctx)
if err != nil {
log.Fatal(err)
}
switch config.BigQueryConfig.State {
case pubsub.BigQueryConfigActive:
fmt.Println("BigQuery subscription is active")
case pubsub.BigQueryConfigPermissionDenied:
fmt.Println("Permission denied - check IAM permissions")
case pubsub.BigQueryConfigNotFound:
fmt.Println("Table not found - create table")
case pubsub.BigQueryConfigSchemaMismatch:
fmt.Println("Schema mismatch - update schema or set DropUnknownFields")
}With Topic Schema:
// Topic has Avro schema
topic, err := client.CreateTopicWithConfig(ctx, "bq-topic", &pubsub.TopicConfig{
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/user-schema",
Encoding: pubsub.EncodingJSON,
},
})
// Subscription uses topic schema
sub, err := client.CreateSubscription(ctx, "bq-sub", pubsub.SubscriptionConfig{
Topic: topic,
BigQueryConfig: pubsub.BigQueryConfig{
Table: "my-project:dataset.users",
UseTopicSchema: true,
DropUnknownFields: true, // Drop extra fields
WriteMetadata: true, // Add metadata columns
},
})With Table Schema:
sub, err := client.CreateSubscription(ctx, "bq-table-schema-sub", pubsub.SubscriptionConfig{
Topic: topic,
BigQueryConfig: pubsub.BigQueryConfig{
Table: "my-project:dataset.events",
UseTableSchema: true, // Use existing table schema
},
})type CloudStorageConfig struct {
Bucket string
FilenamePrefix string
FilenameSuffix string
OutputFormat isCloudStorageOutputFormat
MaxDuration optional.Duration
MaxBytes int64
State CloudStorageConfigState
}Configuration for Cloud Storage subscription.
Fields:
Bucket: Cloud Storage bucket name (without gs:// prefix)FilenamePrefix: Prefix for generated filenamesFilenameSuffix: Suffix for generated filenamesOutputFormat: Output format (Text or Avro)MaxDuration: Maximum duration before creating new file (1 min to 10 min, default: 5 min)MaxBytes: Maximum bytes before creating new file (1 KB to 10 GiB)State: Current state (output only)Example:
sub, err := client.CreateSubscription(ctx, "gcs-sub", pubsub.SubscriptionConfig{
Topic: topic,
CloudStorageConfig: pubsub.CloudStorageConfig{
Bucket: "my-bucket",
FilenamePrefix: "pubsub/messages-",
FilenameSuffix: ".json",
OutputFormat: &pubsub.CloudStorageOutputFormatTextConfig{},
MaxDuration: optional.Duration(5 * time.Minute),
MaxBytes: 100 * 1024 * 1024, // 100 MB
},
})type isCloudStorageOutputFormat interface {
isCloudStorageOutputFormat()
}Interface for Cloud Storage output formats.
type CloudStorageOutputFormatTextConfig struct{}
func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat()Write message data as raw text, separated by newlines.
Example:
sub, err := client.CreateSubscription(ctx, "gcs-text-sub", pubsub.SubscriptionConfig{
Topic: topic,
CloudStorageConfig: pubsub.CloudStorageConfig{
Bucket: "my-bucket",
OutputFormat: &pubsub.CloudStorageOutputFormatTextConfig{},
},
})type CloudStorageOutputFormatAvroConfig struct {
WriteMetadata bool
}
func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat()Write message data as Avro binary.
Fields:
WriteMetadata: Write Pub/Sub metadata (subscription name, message_id, publish_time, attributes, ordering_key) to Avro outputExample:
sub, err := client.CreateSubscription(ctx, "gcs-avro-sub", pubsub.SubscriptionConfig{
Topic: topic,
CloudStorageConfig: pubsub.CloudStorageConfig{
Bucket: "my-bucket",
OutputFormat: &pubsub.CloudStorageOutputFormatAvroConfig{
WriteMetadata: true,
},
},
})type CloudStorageConfigState int
const (
CloudStorageConfigStateUnspecified CloudStorageConfigState = iota
CloudStorageConfigActive
CloudStorageConfigPermissionDenied
CloudStorageConfigNotFound
)Possible states for Cloud Storage subscription:
CloudStorageConfigStateUnspecified: Default/unusedCloudStorageConfigActive: Subscription can actively write to Cloud StorageCloudStorageConfigPermissionDenied: Permission denied writing to bucketCloudStorageConfigNotFound: Cloud Storage bucket does not existMonitoring State:
config, err := sub.Config(ctx)
if err != nil {
log.Fatal(err)
}
switch config.CloudStorageConfig.State {
case pubsub.CloudStorageConfigActive:
fmt.Println("Cloud Storage subscription is active")
case pubsub.CloudStorageConfigPermissionDenied:
fmt.Println("Permission denied - check bucket IAM")
case pubsub.CloudStorageConfigNotFound:
fmt.Println("Bucket not found - create bucket")
}Cloud Storage files are created with the pattern:
{FilenamePrefix}{YYYY}-{MM}-{DD}T{HH}:{MM}:{SS}.{NNNNNNNNN}_{uuid}{FilenameSuffix}Example filenames:
messages-2024-01-15T10:30:45.123456789_a1b2c3d4.json
events-2024-01-15T10:30:45.987654321_e5f6g7h8.avrosub := client.Subscription("my-sub")
// Convert pull to push
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "https://example.com/handler",
},
})config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
BigQueryConfig: &pubsub.BigQueryConfig{
Table: "my-project:dataset.table",
UseTopicSchema: true,
},
})config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
CloudStorageConfig: &pubsub.CloudStorageConfig{
Bucket: "my-bucket",
OutputFormat: &pubsub.CloudStorageOutputFormatTextConfig{},
},
})// Use empty config to revert to pull
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{},
})No additional permissions required - Pub/Sub uses its own service account.
Grant the Pub/Sub service account permissions on the BigQuery dataset:
roles/bigquery.dataEditorGrant the Pub/Sub service account permissions on the bucket:
roles/storage.objectCreatorFinding Service Account:
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.comPush Endpoints:
BigQuery:
DropUnknownFields for schema evolutionState field for errorsWriteMetadata for debuggingCloud Storage:
MaxDuration and MaxBytes for file sizeState field for errorsGeneral: