tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Pub/Sub supports ingestion from external data sources into topics. This document covers all supported ingestion sources and their configuration.
type IngestionDataSourceSettings struct {
Source IngestionDataSource
PlatformLogsSettings *PlatformLogsSettings
}Settings for data ingestion from external sources.
Fields:
Source: The ingestion data source (AWS Kinesis, Cloud Storage, Azure Event Hubs, Amazon MSK, or Confluent Cloud)PlatformLogsSettings: Platform logging configurationtype IngestionDataSource interface {
isIngestionDataSource() bool
}Interface for ingestion data sources.
type PlatformLogsSettings struct {
Severity PlatformLogsSeverity
}
type PlatformLogsSeverity int32
const (
PlatformLogsSeverityUnspecified PlatformLogsSeverity = iota
PlatformLogsSeverityDisabled
PlatformLogsSeverityDebug
PlatformLogsSeverityInfo
PlatformLogsSeverityWarning
PlatformLogsSeverityError
)Configuration for platform logs produced by Pub/Sub ingestion (currently only valid for Cloud Storage ingestion).
Severity Levels:
PlatformLogsSeverityUnspecified: Logs disabledPlatformLogsSeverityDisabled: Logs disabledPlatformLogsSeverityDebug: Debug and higherPlatformLogsSeverityInfo: Info and higherPlatformLogsSeverityWarning: Warning and higherPlatformLogsSeverityError: Error onlytype IngestionDataSourceAWSKinesis struct {
State AWSKinesisState
StreamARN string
ConsumerARN string
AWSRoleARN string
GCPServiceAccount string
}
func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() boolIngestion settings for Amazon Kinesis Data Streams.
Fields:
State: Current state (output only)StreamARN: Kinesis stream ARNConsumerARN: Kinesis consumer ARN for Enhanced Fan-Out modeAWSRoleARN: AWS role ARN for Federated Identity authenticationGCPServiceAccount: GCP service account for Federated Identity authenticationtype AWSKinesisState int
const (
AWSKinesisStateUnspecified = iota
AWSKinesisStateActive
AWSKinesisStatePermissionDenied
AWSKinesisStatePublishPermissionDenied
AWSKinesisStateStreamNotFound
AWSKinesisStateConsumerNotFound
)Possible states for AWS Kinesis ingestion:
AWSKinesisStateUnspecified: Default/unusedAWSKinesisStateActive: Ingestion is activeAWSKinesisStatePermissionDenied: Error consuming from Kinesis (check AWS role/permissions)AWSKinesisStatePublishPermissionDenied: Permission denied publishing to topicAWSKinesisStateStreamNotFound: Kinesis stream does not existAWSKinesisStateConsumerNotFound: Kinesis consumer does not existExample:
topic, err := client.CreateTopicWithConfig(ctx, "kinesis-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceAWSKinesis{
StreamARN: "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
ConsumerARN: "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream/consumer/my-consumer:1234567890",
AWSRoleARN: "arn:aws:iam::123456789012:role/PubSubKinesisRole",
GCPServiceAccount: "pubsub-kinesis@my-project.iam.gserviceaccount.com",
},
},
})
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
// Check ingestion state
config, err := topic.Config(ctx)
if err != nil {
log.Fatal(err)
}
kinesisSource := config.IngestionDataSourceSettings.Source.(*pubsub.IngestionDataSourceAWSKinesis)
switch kinesisSource.State {
case pubsub.AWSKinesisStateActive:
fmt.Println("Kinesis ingestion is active")
case pubsub.AWSKinesisStatePermissionDenied:
fmt.Println("Permission denied - check AWS IAM configuration")
}type IngestionDataSourceAmazonMSK struct {
State AmazonMSKState
ClusterARN string
Topic string
AWSRoleARN string
GCPServiceAccount string
}
func (i *IngestionDataSourceAmazonMSK) isIngestionDataSource() boolIngestion settings for Amazon Managed Streaming for Apache Kafka (MSK).
Fields:
State: Current state (output only)ClusterARN: Amazon Resource Name of MSK clusterTopic: Kafka topic name to import fromAWSRoleARN: AWS role ARN for Federated Identity authenticationGCPServiceAccount: GCP service account for Federated Identity authenticationtype AmazonMSKState int
const (
AmazonMSKStateUnspecified = iota
AmazonMSKActive
AmazonMSKPermissionDenied
AmazonMSKPublishPermissionDenied
AmazonMSKClusterNotFound
AmazonMSKTopicNotFound
)Possible states for Amazon MSK ingestion:
AmazonMSKStateUnspecified: Default/unusedAmazonMSKActive: MSK ingestion is activeAmazonMSKPermissionDenied: Permission denied consuming from MSKAmazonMSKPublishPermissionDenied: Permission denied publishing to topicAmazonMSKClusterNotFound: MSK cluster not foundAmazonMSKTopicNotFound: Kafka topic not foundExample:
topic, err := client.CreateTopicWithConfig(ctx, "msk-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceAmazonMSK{
ClusterARN: "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcd-1234",
Topic: "events",
AWSRoleARN: "arn:aws:iam::123456789012:role/PubSubMSKRole",
GCPServiceAccount: "pubsub-msk@my-project.iam.gserviceaccount.com",
},
},
})type IngestionDataSourceAzureEventHubs struct {
State EventHubsState
ResourceGroup string
Namespace string
EventHub string
ClientID string
TenantID string
SubscriptionID string
GCPServiceAccount string
}
func (i *IngestionDataSourceAzureEventHubs) isIngestionDataSource() boolIngestion settings for Azure Event Hubs.
Fields:
State: Current state (output only)ResourceGroup: Azure resource group nameNamespace: Event Hubs namespaceEventHub: Event Hub name (not EventHubName field)ClientID: Azure application client ID for authenticationTenantID: Azure application tenant ID for authenticationSubscriptionID: Azure subscription IDGCPServiceAccount: GCP service account for Federated Identity authenticationtype EventHubsState int
const (
EventHubsStateUnspecified = iota
EventHubsStateActive
EventHubsStatePermissionDenied
EventHubsStatePublishPermissionDenied
EventHubsStateNamespaceNotFound
EventHubsStateNotFound
EventHubsStateSubscriptionNotFound
EventHubsStateResourceGroupNotFound
)Possible states for Azure Event Hubs ingestion:
EventHubsStateUnspecified: Default/unusedEventHubsStateActive: Ingestion is activeEventHubsStatePermissionDenied: Permission denied consuming from Event HubsEventHubsStatePublishPermissionDenied: Permission denied publishing to topicEventHubsStateNamespaceNotFound: Event Hubs namespace not foundEventHubsStateNotFound: Event Hub not foundEventHubsStateSubscriptionNotFound: Azure subscription not foundEventHubsStateResourceGroupNotFound: Resource group not foundExample:
topic, err := client.CreateTopicWithConfig(ctx, "eventhubs-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceAzureEventHubs{
ResourceGroup: "my-resource-group",
Namespace: "my-eventhubs-namespace",
EventHub: "my-eventhub",
ClientID: "12345678-1234-1234-1234-123456789012",
TenantID: "87654321-4321-4321-4321-210987654321",
SubscriptionID: "abcdef12-3456-7890-abcd-ef1234567890",
GCPServiceAccount: "pubsub-eventhubs@my-project.iam.gserviceaccount.com",
},
},
})type IngestionDataSourceConfluentCloud struct {
State ConfluentCloudState
BootstrapServer string
ClusterID string
Topic string
IdentityPoolID string
GCPServiceAccount string
}
func (i *IngestionDataSourceConfluentCloud) isIngestionDataSource() boolIngestion settings for Confluent Cloud.
Fields:
State: Current state (output only)BootstrapServer: Bootstrap server address (format: url:port)ClusterID: Cluster IDTopic: Kafka topic name to import fromIdentityPoolID: Identity pool ID for Federated Identity authenticationGCPServiceAccount: GCP service account for Federated Identity authenticationtype ConfluentCloudState int
const (
ConfluentCloudStateUnspecified = iota
ConfluentCloudActive
ConfluentCloudPermissionDenied
ConfluentCloudPublishPermissionDenied
ConfluentCloudUnreachableBootstrapServer
ConfluentCloudClusterNotFound
ConfluentCloudTopicNotFound
)Possible states for Confluent Cloud ingestion:
ConfluentCloudStateUnspecified: Default/unusedConfluentCloudActive: Ingestion is activeConfluentCloudPermissionDenied: Permission denied consuming from Confluent CloudConfluentCloudPublishPermissionDenied: Permission denied publishing to topicConfluentCloudUnreachableBootstrapServer: Bootstrap server unreachableConfluentCloudClusterNotFound: Cluster not foundConfluentCloudTopicNotFound: Kafka topic not foundExample:
topic, err := client.CreateTopicWithConfig(ctx, "confluent-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceConfluentCloud{
BootstrapServer: "pkc-abcde.us-east-1.aws.confluent.cloud:9092",
ClusterID: "lkc-12345",
Topic: "events",
IdentityPoolID: "pool-abcd-1234",
GCPServiceAccount: "pubsub-confluent@my-project.iam.gserviceaccount.com",
},
},
})type IngestionDataSourceCloudStorage struct {
State CloudStorageIngestionState
Bucket string
InputFormat ingestionDataSourceCloudStorageInputFormat
MinimumObjectCreateTime time.Time
MatchGlob string
}
func (i *IngestionDataSourceCloudStorage) isIngestionDataSource() boolIngestion settings for Cloud Storage.
Fields:
State: Current state (output only)Bucket: Cloud Storage bucket (without gs:// prefix)InputFormat: Format of objects (Text, Avro, or PubSub Avro)MinimumObjectCreateTime: Only ingest objects created after this time (EXPERIMENTAL)MatchGlob: Glob pattern for matching objects (EXPERIMENTAL)type CloudStorageIngestionState int
const (
CloudStorageIngestionStateUnspecified = iota
CloudStorageIngestionStateActive
CloudStorageIngestionPermissionDenied
CloudStorageIngestionPublishPermissionDenied
CloudStorageIngestionBucketNotFound
CloudStorageIngestionTooManyObjects
)Possible states for Cloud Storage ingestion:
CloudStorageIngestionStateUnspecified: Default/unusedCloudStorageIngestionStateActive: Ingestion is activeCloudStorageIngestionPermissionDenied: Error calling Cloud Storage APICloudStorageIngestionPublishPermissionDenied: Permission denied publishing to topicCloudStorageIngestionBucketNotFound: Bucket does not existCloudStorageIngestionTooManyObjects: Bucket has too many objects (ingestion paused)type IngestionDataSourceCloudStorageTextFormat struct {
Delimiter string
}
func (i *IngestionDataSourceCloudStorageTextFormat) isCloudStorageIngestionInputFormat() boolText format with configurable delimiter.
Example:
topic, err := client.CreateTopicWithConfig(ctx, "gcs-text-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceCloudStorage{
Bucket: "my-bucket",
InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
Delimiter: "\n",
},
MinimumObjectCreateTime: time.Now().Add(-24 * time.Hour),
MatchGlob: "data/events/*.txt",
},
PlatformLogsSettings: &pubsub.PlatformLogsSettings{
Severity: pubsub.PlatformLogsSeverityInfo,
},
},
})type IngestionDataSourceCloudStorageAvroFormat struct{}
func (i *IngestionDataSourceCloudStorageAvroFormat) isCloudStorageIngestionInputFormat() boolAvro format for Cloud Storage ingestion.
Example:
topic, err := client.CreateTopicWithConfig(ctx, "gcs-avro-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceCloudStorage{
Bucket: "my-bucket",
InputFormat: &pubsub.IngestionDataSourceCloudStorageAvroFormat{},
},
},
})type IngestionDataSourceCloudStoragePubSubAvroFormat struct{}
func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestionInputFormat() boolPub/Sub Avro format (for data written by Cloud Storage subscriptions).
Example:
topic, err := client.CreateTopicWithConfig(ctx, "gcs-pubsub-avro-topic", &pubsub.TopicConfig{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceCloudStorage{
Bucket: "my-bucket",
InputFormat: &pubsub.IngestionDataSourceCloudStoragePubSubAvroFormat{},
},
},
})// Update ingestion settings
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
Source: &pubsub.IngestionDataSourceCloudStorage{
Bucket: "new-bucket",
InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{},
},
},
})
// Remove ingestion settings
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{}, // Zero value removes
})// Check topic state
config, err := topic.Config(ctx)
if err != nil {
log.Fatal(err)
}
// Check topic state
if config.State == pubsub.TopicStateIngestionResourceError {
fmt.Println("Ingestion has encountered an error")
// Check specific ingestion source state
if config.IngestionDataSourceSettings != nil {
switch source := config.IngestionDataSourceSettings.Source.(type) {
case *pubsub.IngestionDataSourceAWSKinesis:
fmt.Printf("Kinesis state: %v\n", source.State)
case *pubsub.IngestionDataSourceCloudStorage:
fmt.Printf("Cloud Storage state: %v\n", source.State)
case *pubsub.IngestionDataSourceAzureEventHubs:
fmt.Printf("Event Hubs state: %v\n", source.State)
case *pubsub.IngestionDataSourceAmazonMSK:
fmt.Printf("MSK state: %v\n", source.State)
case *pubsub.IngestionDataSourceConfluentCloud:
fmt.Printf("Confluent Cloud state: %v\n", source.State)
}
}
}Set up Federated Identity with AssumeRoleWithWebIdentity:
iam.serviceAccounts.getOpenIdToken permissionstorage.objects.liststorage.objects.getstorage.buckets.get