or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

ingestion.mddocs/

Data Ingestion

Pub/Sub supports ingestion from external data sources into topics. This document covers all supported ingestion sources and their configuration.

Ingestion Data Source Settings

IngestionDataSourceSettings

type IngestionDataSourceSettings struct {
    Source               IngestionDataSource
    PlatformLogsSettings *PlatformLogsSettings
}

Settings for data ingestion from external sources.

Fields:

  • Source: The ingestion data source (AWS Kinesis, Cloud Storage, Azure Event Hubs, Amazon MSK, or Confluent Cloud)
  • PlatformLogsSettings: Platform logging configuration

IngestionDataSource Interface

type IngestionDataSource interface {
    isIngestionDataSource() bool
}

Interface for ingestion data sources.

Platform Logs Settings

type PlatformLogsSettings struct {
    Severity PlatformLogsSeverity
}

type PlatformLogsSeverity int32

const (
    PlatformLogsSeverityUnspecified PlatformLogsSeverity = iota
    PlatformLogsSeverityDisabled
    PlatformLogsSeverityDebug
    PlatformLogsSeverityInfo
    PlatformLogsSeverityWarning
    PlatformLogsSeverityError
)

Configuration for platform logs produced by Pub/Sub ingestion (currently only valid for Cloud Storage ingestion).

Severity Levels:

  • PlatformLogsSeverityUnspecified: Logs disabled
  • PlatformLogsSeverityDisabled: Logs disabled
  • PlatformLogsSeverityDebug: Debug and higher
  • PlatformLogsSeverityInfo: Info and higher
  • PlatformLogsSeverityWarning: Warning and higher
  • PlatformLogsSeverityError: Error only

AWS Kinesis Ingestion

IngestionDataSourceAWSKinesis

type IngestionDataSourceAWSKinesis struct {
    State             AWSKinesisState
    StreamARN         string
    ConsumerARN       string
    AWSRoleARN        string
    GCPServiceAccount string
}

func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool

Ingestion settings for Amazon Kinesis Data Streams.

Fields:

  • State: Current state (output only)
  • StreamARN: Kinesis stream ARN
  • ConsumerARN: Kinesis consumer ARN for Enhanced Fan-Out mode
  • AWSRoleARN: AWS role ARN for Federated Identity authentication
  • GCPServiceAccount: GCP service account for Federated Identity authentication

AWS Kinesis States

type AWSKinesisState int

const (
    AWSKinesisStateUnspecified = iota
    AWSKinesisStateActive
    AWSKinesisStatePermissionDenied
    AWSKinesisStatePublishPermissionDenied
    AWSKinesisStateStreamNotFound
    AWSKinesisStateConsumerNotFound
)

Possible states for AWS Kinesis ingestion:

  • AWSKinesisStateUnspecified: Default/unused
  • AWSKinesisStateActive: Ingestion is active
  • AWSKinesisStatePermissionDenied: Error consuming from Kinesis (check AWS role/permissions)
  • AWSKinesisStatePublishPermissionDenied: Permission denied publishing to topic
  • AWSKinesisStateStreamNotFound: Kinesis stream does not exist
  • AWSKinesisStateConsumerNotFound: Kinesis consumer does not exist

Example:

topic, err := client.CreateTopicWithConfig(ctx, "kinesis-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceAWSKinesis{
            StreamARN:         "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
            ConsumerARN:       "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream/consumer/my-consumer:1234567890",
            AWSRoleARN:        "arn:aws:iam::123456789012:role/PubSubKinesisRole",
            GCPServiceAccount: "pubsub-kinesis@my-project.iam.gserviceaccount.com",
        },
    },
})
if err != nil {
    log.Fatalf("Failed to create topic: %v", err)
}

// Check ingestion state
config, err := topic.Config(ctx)
if err != nil {
    log.Fatal(err)
}
kinesisSource := config.IngestionDataSourceSettings.Source.(*pubsub.IngestionDataSourceAWSKinesis)
switch kinesisSource.State {
case pubsub.AWSKinesisStateActive:
    fmt.Println("Kinesis ingestion is active")
case pubsub.AWSKinesisStatePermissionDenied:
    fmt.Println("Permission denied - check AWS IAM configuration")
}

Amazon MSK Ingestion

IngestionDataSourceAmazonMSK

type IngestionDataSourceAmazonMSK struct {
    State             AmazonMSKState
    ClusterARN        string
    Topic             string
    AWSRoleARN        string
    GCPServiceAccount string
}

func (i *IngestionDataSourceAmazonMSK) isIngestionDataSource() bool

Ingestion settings for Amazon Managed Streaming for Apache Kafka (MSK).

Fields:

  • State: Current state (output only)
  • ClusterARN: Amazon Resource Name of MSK cluster
  • Topic: Kafka topic name to import from
  • AWSRoleARN: AWS role ARN for Federated Identity authentication
  • GCPServiceAccount: GCP service account for Federated Identity authentication

Amazon MSK States

type AmazonMSKState int

const (
    AmazonMSKStateUnspecified = iota
    AmazonMSKActive
    AmazonMSKPermissionDenied
    AmazonMSKPublishPermissionDenied
    AmazonMSKClusterNotFound
    AmazonMSKTopicNotFound
)

Possible states for Amazon MSK ingestion:

  • AmazonMSKStateUnspecified: Default/unused
  • AmazonMSKActive: MSK ingestion is active
  • AmazonMSKPermissionDenied: Permission denied consuming from MSK
  • AmazonMSKPublishPermissionDenied: Permission denied publishing to topic
  • AmazonMSKClusterNotFound: MSK cluster not found
  • AmazonMSKTopicNotFound: Kafka topic not found

Example:

topic, err := client.CreateTopicWithConfig(ctx, "msk-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceAmazonMSK{
            ClusterARN:        "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcd-1234",
            Topic:             "events",
            AWSRoleARN:        "arn:aws:iam::123456789012:role/PubSubMSKRole",
            GCPServiceAccount: "pubsub-msk@my-project.iam.gserviceaccount.com",
        },
    },
})

Azure Event Hubs Ingestion

IngestionDataSourceAzureEventHubs

type IngestionDataSourceAzureEventHubs struct {
    State             EventHubsState
    ResourceGroup     string
    Namespace         string
    EventHub          string
    ClientID          string
    TenantID          string
    SubscriptionID    string
    GCPServiceAccount string
}

func (i *IngestionDataSourceAzureEventHubs) isIngestionDataSource() bool

Ingestion settings for Azure Event Hubs.

Fields:

  • State: Current state (output only)
  • ResourceGroup: Azure resource group name
  • Namespace: Event Hubs namespace
  • EventHub: Event Hub name (not EventHubName field)
  • ClientID: Azure application client ID for authentication
  • TenantID: Azure application tenant ID for authentication
  • SubscriptionID: Azure subscription ID
  • GCPServiceAccount: GCP service account for Federated Identity authentication

Event Hubs States

type EventHubsState int

const (
    EventHubsStateUnspecified = iota
    EventHubsStateActive
    EventHubsStatePermissionDenied
    EventHubsStatePublishPermissionDenied
    EventHubsStateNamespaceNotFound
    EventHubsStateNotFound
    EventHubsStateSubscriptionNotFound
    EventHubsStateResourceGroupNotFound
)

Possible states for Azure Event Hubs ingestion:

  • EventHubsStateUnspecified: Default/unused
  • EventHubsStateActive: Ingestion is active
  • EventHubsStatePermissionDenied: Permission denied consuming from Event Hubs
  • EventHubsStatePublishPermissionDenied: Permission denied publishing to topic
  • EventHubsStateNamespaceNotFound: Event Hubs namespace not found
  • EventHubsStateNotFound: Event Hub not found
  • EventHubsStateSubscriptionNotFound: Azure subscription not found
  • EventHubsStateResourceGroupNotFound: Resource group not found

Example:

topic, err := client.CreateTopicWithConfig(ctx, "eventhubs-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceAzureEventHubs{
            ResourceGroup:     "my-resource-group",
            Namespace:         "my-eventhubs-namespace",
            EventHub:          "my-eventhub",
            ClientID:          "12345678-1234-1234-1234-123456789012",
            TenantID:          "87654321-4321-4321-4321-210987654321",
            SubscriptionID:    "abcdef12-3456-7890-abcd-ef1234567890",
            GCPServiceAccount: "pubsub-eventhubs@my-project.iam.gserviceaccount.com",
        },
    },
})

Confluent Cloud Ingestion

IngestionDataSourceConfluentCloud

type IngestionDataSourceConfluentCloud struct {
    State             ConfluentCloudState
    BootstrapServer   string
    ClusterID         string
    Topic             string
    IdentityPoolID    string
    GCPServiceAccount string
}

func (i *IngestionDataSourceConfluentCloud) isIngestionDataSource() bool

Ingestion settings for Confluent Cloud.

Fields:

  • State: Current state (output only)
  • BootstrapServer: Bootstrap server address (format: url:port)
  • ClusterID: Cluster ID
  • Topic: Kafka topic name to import from
  • IdentityPoolID: Identity pool ID for Federated Identity authentication
  • GCPServiceAccount: GCP service account for Federated Identity authentication

Confluent Cloud States

type ConfluentCloudState int

const (
    ConfluentCloudStateUnspecified = iota
    ConfluentCloudActive
    ConfluentCloudPermissionDenied
    ConfluentCloudPublishPermissionDenied
    ConfluentCloudUnreachableBootstrapServer
    ConfluentCloudClusterNotFound
    ConfluentCloudTopicNotFound
)

Possible states for Confluent Cloud ingestion:

  • ConfluentCloudStateUnspecified: Default/unused
  • ConfluentCloudActive: Ingestion is active
  • ConfluentCloudPermissionDenied: Permission denied consuming from Confluent Cloud
  • ConfluentCloudPublishPermissionDenied: Permission denied publishing to topic
  • ConfluentCloudUnreachableBootstrapServer: Bootstrap server unreachable
  • ConfluentCloudClusterNotFound: Cluster not found
  • ConfluentCloudTopicNotFound: Kafka topic not found

Example:

topic, err := client.CreateTopicWithConfig(ctx, "confluent-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceConfluentCloud{
            BootstrapServer:   "pkc-abcde.us-east-1.aws.confluent.cloud:9092",
            ClusterID:         "lkc-12345",
            Topic:             "events",
            IdentityPoolID:    "pool-abcd-1234",
            GCPServiceAccount: "pubsub-confluent@my-project.iam.gserviceaccount.com",
        },
    },
})

Cloud Storage Ingestion

IngestionDataSourceCloudStorage

type IngestionDataSourceCloudStorage struct {
    State                   CloudStorageIngestionState
    Bucket                  string
    InputFormat             ingestionDataSourceCloudStorageInputFormat
    MinimumObjectCreateTime time.Time
    MatchGlob               string
}

func (i *IngestionDataSourceCloudStorage) isIngestionDataSource() bool

Ingestion settings for Cloud Storage.

Fields:

  • State: Current state (output only)
  • Bucket: Cloud Storage bucket (without gs:// prefix)
  • InputFormat: Format of objects (Text, Avro, or PubSub Avro)
  • MinimumObjectCreateTime: Only ingest objects created after this time (EXPERIMENTAL)
  • MatchGlob: Glob pattern for matching objects (EXPERIMENTAL)

Cloud Storage Ingestion States

type CloudStorageIngestionState int

const (
    CloudStorageIngestionStateUnspecified = iota
    CloudStorageIngestionStateActive
    CloudStorageIngestionPermissionDenied
    CloudStorageIngestionPublishPermissionDenied
    CloudStorageIngestionBucketNotFound
    CloudStorageIngestionTooManyObjects
)

Possible states for Cloud Storage ingestion:

  • CloudStorageIngestionStateUnspecified: Default/unused
  • CloudStorageIngestionStateActive: Ingestion is active
  • CloudStorageIngestionPermissionDenied: Error calling Cloud Storage API
  • CloudStorageIngestionPublishPermissionDenied: Permission denied publishing to topic
  • CloudStorageIngestionBucketNotFound: Bucket does not exist
  • CloudStorageIngestionTooManyObjects: Bucket has too many objects (ingestion paused)

Cloud Storage Input Formats

IngestionDataSourceCloudStorageTextFormat

type IngestionDataSourceCloudStorageTextFormat struct {
    Delimiter string
}

func (i *IngestionDataSourceCloudStorageTextFormat) isCloudStorageIngestionInputFormat() bool

Text format with configurable delimiter.

Example:

topic, err := client.CreateTopicWithConfig(ctx, "gcs-text-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceCloudStorage{
            Bucket: "my-bucket",
            InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
                Delimiter: "\n",
            },
            MinimumObjectCreateTime: time.Now().Add(-24 * time.Hour),
            MatchGlob:               "data/events/*.txt",
        },
        PlatformLogsSettings: &pubsub.PlatformLogsSettings{
            Severity: pubsub.PlatformLogsSeverityInfo,
        },
    },
})

IngestionDataSourceCloudStorageAvroFormat

type IngestionDataSourceCloudStorageAvroFormat struct{}

func (i *IngestionDataSourceCloudStorageAvroFormat) isCloudStorageIngestionInputFormat() bool

Avro format for Cloud Storage ingestion.

Example:

topic, err := client.CreateTopicWithConfig(ctx, "gcs-avro-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceCloudStorage{
            Bucket: "my-bucket",
            InputFormat: &pubsub.IngestionDataSourceCloudStorageAvroFormat{},
        },
    },
})

IngestionDataSourceCloudStoragePubSubAvroFormat

type IngestionDataSourceCloudStoragePubSubAvroFormat struct{}

func (i *IngestionDataSourceCloudStoragePubSubAvroFormat) isCloudStorageIngestionInputFormat() bool

Pub/Sub Avro format (for data written by Cloud Storage subscriptions).

Example:

topic, err := client.CreateTopicWithConfig(ctx, "gcs-pubsub-avro-topic", &pubsub.TopicConfig{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceCloudStorage{
            Bucket:      "my-bucket",
            InputFormat: &pubsub.IngestionDataSourceCloudStoragePubSubAvroFormat{},
        },
    },
})

Updating Ingestion Settings

// Update ingestion settings
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
        Source: &pubsub.IngestionDataSourceCloudStorage{
            Bucket:      "new-bucket",
            InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{},
        },
    },
})

// Remove ingestion settings
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
    IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{}, // Zero value removes
})

Monitoring Ingestion

// Check topic state
config, err := topic.Config(ctx)
if err != nil {
    log.Fatal(err)
}

// Check topic state
if config.State == pubsub.TopicStateIngestionResourceError {
    fmt.Println("Ingestion has encountered an error")

    // Check specific ingestion source state
    if config.IngestionDataSourceSettings != nil {
        switch source := config.IngestionDataSourceSettings.Source.(type) {
        case *pubsub.IngestionDataSourceAWSKinesis:
            fmt.Printf("Kinesis state: %v\n", source.State)
        case *pubsub.IngestionDataSourceCloudStorage:
            fmt.Printf("Cloud Storage state: %v\n", source.State)
        case *pubsub.IngestionDataSourceAzureEventHubs:
            fmt.Printf("Event Hubs state: %v\n", source.State)
        case *pubsub.IngestionDataSourceAmazonMSK:
            fmt.Printf("MSK state: %v\n", source.State)
        case *pubsub.IngestionDataSourceConfluentCloud:
            fmt.Printf("Confluent Cloud state: %v\n", source.State)
        }
    }
}

IAM and Permissions

AWS Services (Kinesis, MSK)

Set up Federated Identity with AssumeRoleWithWebIdentity:

  1. Create AWS IAM role with trust policy for GCP service account
  2. Grant role permissions to read from Kinesis/MSK
  3. Grant GCP service account iam.serviceAccounts.getOpenIdToken permission
  4. Grant Pub/Sub service account publish permissions on topic

Azure Event Hubs

  1. Register Azure application with appropriate permissions
  2. Grant application permissions to Event Hubs namespace
  3. Configure GCP service account for Federated Identity

Confluent Cloud

  1. Create identity pool in Confluent Cloud
  2. Configure OAuth authentication with GCP
  3. Grant appropriate permissions to Kafka topic

Cloud Storage

  1. Grant Pub/Sub service account permissions on bucket:
    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get
  2. Grant Pub/Sub service account publish permissions on topic

Best Practices

  1. Federated Identity: Use Federated Identity for secure cross-cloud authentication
  2. Monitoring: Regularly check ingestion state and topic state
  3. Permissions: Grant minimal required permissions to service accounts
  4. Logging: Enable platform logs for Cloud Storage ingestion to debug issues
  5. Glob Patterns: Use specific glob patterns to limit ingestion scope
  6. Testing: Test ingestion configuration before production use
  7. Error Handling: Monitor error states and set up alerts
  8. Cleanup: Remove unused ingestion configurations
  9. Documentation: Document cross-cloud IAM setup clearly
  10. Security: Rotate credentials regularly for cross-cloud access