Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
Google Cloud Pub/Sub provides a comprehensive set of protobuf message types for all API operations. These types define the structure for topics, subscriptions, messages, requests, responses, and configuration objects.
Fundamental types for Pub/Sub messaging operations.
class PubsubMessage:
"""
A message published to or received from Pub/Sub.
Attributes:
- data: Message payload as bytes
- attributes: Key-value pairs for message metadata
- message_id: Server-assigned message ID
- publish_time: Time when message was published
- ordering_key: Optional key for message ordering
"""
data: bytes
attributes: MutableMapping[str, str]
message_id: str
publish_time: Timestamp
ordering_key: str
class ReceivedMessage:
"""
Message received from a pull operation.
Attributes:
- ack_id: Acknowledgment ID for the message
- message: The actual Pub/Sub message
- delivery_attempt: Number of delivery attempts
"""
ack_id: str
message: PubsubMessage
delivery_attempt: intTypes representing Pub/Sub resources like topics and subscriptions.
class Topic:
"""
A Pub/Sub topic resource.
Attributes:
- name: Topic resource name
- labels: User-defined labels
- message_storage_policy: Message storage policy
- kms_key_name: KMS key for encryption
- schema_settings: Schema validation settings
- satisfies_pzs: Whether topic satisfies PZS
- message_retention_duration: Message retention duration
"""
name: str
labels: MutableMapping[str, str]
message_storage_policy: MessageStoragePolicy
kms_key_name: str
schema_settings: SchemaSettings
satisfies_pzs: bool
message_retention_duration: Duration
class Subscription:
"""
A Pub/Sub subscription resource.
Attributes:
- name: Subscription resource name
- topic: Topic name this subscription receives messages from
- push_config: Push delivery configuration
- bigquery_config: BigQuery delivery configuration
- cloud_storage_config: Cloud Storage delivery configuration
- ack_deadline_seconds: Message acknowledgment deadline
- retain_acked_messages: Whether to retain acknowledged messages
- message_retention_duration: How long to retain messages
- labels: User-defined labels
- enable_message_ordering: Whether to enable message ordering
- expiration_policy: Subscription expiration policy
- filter: Message filter expression
- dead_letter_policy: Dead letter queue policy
- retry_policy: Message retry policy
- detached: Whether subscription is detached from topic
- enable_exactly_once_delivery: Whether to enable exactly-once delivery
- topic_message_retention_duration: Topic's message retention duration
- state: Current subscription state
"""
name: str
topic: str
push_config: PushConfig
bigquery_config: BigQueryConfig
cloud_storage_config: CloudStorageConfig
ack_deadline_seconds: int
retain_acked_messages: bool
message_retention_duration: Duration
labels: MutableMapping[str, str]
enable_message_ordering: bool
expiration_policy: ExpirationPolicy
filter: str
dead_letter_policy: DeadLetterPolicy
retry_policy: RetryPolicy
detached: bool
enable_exactly_once_delivery: bool
topic_message_retention_duration: Duration
state: Subscription.State
class Snapshot:
"""
A Pub/Sub snapshot resource.
Attributes:
- name: Snapshot resource name
- topic: Topic name the snapshot was created from
- expire_time: When the snapshot expires
- labels: User-defined labels
"""
name: str
topic: str
expire_time: Timestamp
labels: MutableMapping[str, str]Types for API requests and responses.
class PublishRequest:
"""
Request to publish messages to a topic.
Attributes:
- topic: Topic name to publish to
- messages: Messages to publish
"""
topic: str
messages: Sequence[PubsubMessage]
class PublishResponse:
"""
Response from publish operation.
Attributes:
- message_ids: IDs of published messages
"""
message_ids: Sequence[str]
class PullRequest:
"""
Request to pull messages from subscription.
Attributes:
- subscription: Subscription name to pull from
- return_immediately: Whether to return immediately if no messages
- max_messages: Maximum number of messages to return
"""
subscription: str
return_immediately: bool
max_messages: int
class PullResponse:
"""
Response from pull operation.
Attributes:
- received_messages: Messages received from subscription
"""
received_messages: Sequence[ReceivedMessage]
class StreamingPullRequest:
"""
Request for streaming pull operation.
Attributes:
- subscription: Subscription name
- ack_ids: Message IDs to acknowledge
- modify_deadline_seconds: Deadline modifications
- modify_deadline_ack_ids: Message IDs for deadline modification
- stream_ack_deadline_seconds: Stream acknowledgment deadline
- client_id: Client identifier
- max_extension_period: Maximum lease extension period
"""
subscription: str
ack_ids: Sequence[str]
modify_deadline_seconds: Sequence[int]
modify_deadline_ack_ids: Sequence[str]
stream_ack_deadline_seconds: int
client_id: str
max_extension_period: Duration
class StreamingPullResponse:
"""
Response from streaming pull operation.
Attributes:
- received_messages: Messages received
- acknowledge_confirmation: Acknowledgment confirmations
- modify_ack_deadline_confirmation: Deadline modification confirmations
- subscription_properties: Subscription properties
"""
received_messages: Sequence[ReceivedMessage]
acknowledge_confirmation: AcknowledgeConfirmation
modify_ack_deadline_confirmation: ModifyAckDeadlineConfirmation
subscription_properties: StreamingPullResponse.SubscriptionPropertiesTypes for configuring Pub/Sub behavior.
class PushConfig:
"""
Configuration for push delivery to HTTP endpoints.
Attributes:
- push_endpoint: URL to push messages to
- attributes: Endpoint configuration attributes
- oidc_token: OIDC token configuration
- pubsub_wrapper: Pub/Sub wrapper format
- no_wrapper: No wrapper configuration
"""
push_endpoint: str
attributes: MutableMapping[str, str]
oidc_token: PushConfig.OidcToken
pubsub_wrapper: PushConfig.PubsubWrapper
no_wrapper: PushConfig.NoWrapper
class BigQueryConfig:
"""
Configuration for BigQuery delivery.
Attributes:
- table: BigQuery table name
- use_topic_schema: Whether to use topic schema
- write_metadata: Whether to write metadata
- drop_unknown_fields: Whether to drop unknown fields
- state: Current configuration state
- use_table_schema: Whether to use table schema
- service_account_email: Service account for BigQuery access
"""
table: str
use_topic_schema: bool
write_metadata: bool
drop_unknown_fields: bool
state: BigQueryConfig.State
use_table_schema: bool
service_account_email: str
class CloudStorageConfig:
"""
Configuration for Cloud Storage delivery.
Attributes:
- bucket: Cloud Storage bucket name
- filename_prefix: Prefix for generated filenames
- filename_suffix: Suffix for generated filenames
- filename_datetime_format: Datetime format for filenames
- text_config: Text format configuration
- avro_config: Avro format configuration
- max_duration: Maximum duration before file write
- max_bytes: Maximum bytes before file write
- max_messages: Maximum messages before file write
- state: Current configuration state
- service_account_email: Service account for Cloud Storage access
"""
bucket: str
filename_prefix: str
filename_suffix: str
filename_datetime_format: str
text_config: CloudStorageConfig.TextConfig
avro_config: CloudStorageConfig.AvroConfig
max_duration: Duration
max_bytes: int
max_messages: int
state: CloudStorageConfig.State
service_account_email: str
class DeadLetterPolicy:
"""
Dead letter queue policy for failed messages.
Attributes:
- dead_letter_topic: Topic to send failed messages to
- max_delivery_attempts: Maximum delivery attempts before dead lettering
"""
dead_letter_topic: str
max_delivery_attempts: int
class RetryPolicy:
"""
Retry policy for message delivery.
Attributes:
- minimum_backoff: Minimum backoff duration
- maximum_backoff: Maximum backoff duration
"""
minimum_backoff: Duration
maximum_backoff: Duration
class ExpirationPolicy:
"""
Subscription expiration policy.
Attributes:
- ttl: Time-to-live for subscription
"""
ttl: Duration
class MessageStoragePolicy:
"""
Policy for message storage location.
Attributes:
- allowed_persistence_regions: Allowed storage regions
- enforce_in_transit: Whether to enforce in-transit requirements
"""
allowed_persistence_regions: Sequence[str]
enforce_in_transit: bool
class SchemaSettings:
"""
Schema validation settings.
Attributes:
- schema: Schema resource name
- encoding: Message encoding (JSON or BINARY)
- first_revision_id: First revision ID to validate against
- last_revision_id: Last revision ID to validate against
"""
schema: str
encoding: Encoding
first_revision_id: str
last_revision_id: strTypes for listing resources and their responses.
class ListTopicsRequest:
"""
Request to list topics.
Attributes:
- project: Project to list topics from
- page_size: Maximum number of topics to return
- page_token: Token for pagination
"""
project: str
page_size: int
page_token: str
class ListTopicsResponse:
"""
Response from list topics operation.
Attributes:
- topics: Topics in the project
- next_page_token: Token for next page
"""
topics: Sequence[Topic]
next_page_token: str
class ListSubscriptionsRequest:
"""
Request to list subscriptions.
Attributes:
- project: Project to list subscriptions from
- page_size: Maximum number of subscriptions to return
- page_token: Token for pagination
"""
project: str
page_size: int
page_token: str
class ListSubscriptionsResponse:
"""
Response from list subscriptions operation.
Attributes:
- subscriptions: Subscriptions in the project
- next_page_token: Token for next page
"""
subscriptions: Sequence[Subscription]
next_page_token: str
class ListSnapshotsRequest:
"""
Request to list snapshots.
Attributes:
- project: Project to list snapshots from
- page_size: Maximum number of snapshots to return
- page_token: Token for pagination
"""
project: str
page_size: int
page_token: str
class ListSnapshotsResponse:
"""
Response from list snapshots operation.
Attributes:
- snapshots: Snapshots in the project
- next_page_token: Token for next page
"""
snapshots: Sequence[Snapshot]
next_page_token: strTypes for creating, updating, and deleting resources.
class CreateTopicRequest:
"""
Request to create a topic.
Attributes:
- name: Topic name
- topic: Topic configuration
"""
name: str
topic: Topic
class UpdateTopicRequest:
"""
Request to update a topic.
Attributes:
- topic: Updated topic configuration
- update_mask: Fields to update
"""
topic: Topic
update_mask: FieldMask
class DeleteTopicRequest:
"""
Request to delete a topic.
Attributes:
- topic: Topic name to delete
"""
topic: str
class CreateSubscriptionRequest:
"""
Request to create a subscription.
Attributes:
- name: Subscription name
- subscription: Subscription configuration
"""
name: str
subscription: Subscription
class UpdateSubscriptionRequest:
"""
Request to update a subscription.
Attributes:
- subscription: Updated subscription configuration
- update_mask: Fields to update
"""
subscription: Subscription
update_mask: FieldMask
class DeleteSubscriptionRequest:
"""
Request to delete a subscription.
Attributes:
- subscription: Subscription name to delete
"""
subscription: str
class DetachSubscriptionRequest:
"""
Request to detach a subscription from its topic.
Attributes:
- subscription: Subscription name to detach
"""
subscription: str
class DetachSubscriptionResponse:
"""
Response from detach subscription operation.
"""
passTypes for schema service operations.
class Schema:
"""
A schema resource for message validation.
Attributes:
- name: Schema resource name
- type: Schema type (AVRO or PROTOCOL_BUFFER)
- definition: Schema definition string
- revision_id: Current revision ID
- revision_create_time: When revision was created
"""
name: str
type: Schema.Type
definition: str
revision_id: str
revision_create_time: Timestamp
class CreateSchemaRequest:
"""
Request to create a schema.
Attributes:
- parent: Parent project path
- schema: Schema definition
- schema_id: ID for the new schema
"""
parent: str
schema: Schema
schema_id: str
class ValidateSchemaRequest:
"""
Request to validate a schema.
Attributes:
- parent: Parent project path
- schema: Schema to validate
"""
parent: str
schema: Schema
class ValidateSchemaResponse:
"""
Response from schema validation.
"""
pass
class ValidateMessageRequest:
"""
Request to validate a message against a schema.
Attributes:
- parent: Parent project path
- name: Schema name (alternative to schema)
- schema: Schema definition (alternative to name)
- message: Message to validate
- encoding: Message encoding
"""
parent: str
name: str
schema: Schema
message: bytes
encoding: Encoding
class ValidateMessageResponse:
"""
Response from message validation.
"""
passTypes for acknowledgment operations and their responses.
class AcknowledgeRequest:
"""
Request for acknowledging messages.
Attributes:
- subscription: Subscription name
- ack_ids: List of acknowledgment IDs to acknowledge
"""
subscription: str
ack_ids: Sequence[str]
class ModifyAckDeadlineRequest:
"""
Request for modifying acknowledgment deadlines.
Attributes:
- subscription: Subscription name
- ack_ids: List of acknowledgment IDs to modify
- ack_deadline_seconds: New acknowledgment deadline in seconds
"""
subscription: str
ack_ids: Sequence[str]
ack_deadline_seconds: int
class AcknowledgeConfirmation:
"""
Confirmation response for acknowledgment operations.
Attributes:
- ack_ids: List of acknowledged message IDs
- temporary_failure_ack_ids: List of temporarily failed acknowledgments
- unknown_ack_ids: List of unknown acknowledgment IDs
"""
ack_ids: Sequence[str]
temporary_failure_ack_ids: Sequence[str]
unknown_ack_ids: Sequence[str]
class ModifyAckDeadlineConfirmation:
"""
Confirmation response for deadline modification operations.
Attributes:
- ack_ids: List of successfully modified acknowledgment IDs
- temporary_failure_ack_ids: List of temporarily failed modifications
- unknown_ack_ids: List of unknown acknowledgment IDs
"""
ack_ids: Sequence[str]
temporary_failure_ack_ids: Sequence[str]
unknown_ack_ids: Sequence[str]Types for seeking subscriptions to specific points in time.
class SeekRequest:
"""
Request to seek a subscription.
Attributes:
- subscription: Subscription name to seek
- time: Seek to a specific timestamp
- snapshot: Seek to a specific snapshot
"""
subscription: str
time: Timestamp
snapshot: str
class SeekResponse:
"""
Response from seek operation.
"""
passSupplementary request types for various operations.
class GetTopicRequest:
"""
Request to get a topic.
Attributes:
- topic: Topic name to retrieve
"""
topic: str
class GetSubscriptionRequest:
"""
Request to get a subscription.
Attributes:
- subscription: Subscription name to retrieve
"""
subscription: str
class GetSnapshotRequest:
"""
Request to get a snapshot.
Attributes:
- snapshot: Snapshot name to retrieve
"""
snapshot: str
class ListTopicSubscriptionsRequest:
"""
Request to list subscriptions for a topic.
Attributes:
- topic: Topic name
- page_size: Maximum number of subscriptions to return
- page_token: Token for pagination
"""
topic: str
page_size: int
page_token: str
class ListTopicSubscriptionsResponse:
"""
Response from listing topic subscriptions.
Attributes:
- subscriptions: List of subscription names
- next_page_token: Token for next page
"""
subscriptions: Sequence[str]
next_page_token: str
class ListTopicSnapshotsRequest:
"""
Request to list snapshots for a topic.
Attributes:
- topic: Topic name
- page_size: Maximum number of snapshots to return
- page_token: Token for pagination
"""
topic: str
page_size: int
page_token: str
class ListTopicSnapshotsResponse:
"""
Response from listing topic snapshots.
Attributes:
- snapshots: List of snapshot names
- next_page_token: Token for next page
"""
snapshots: Sequence[str]
next_page_token: str
class CreateSnapshotRequest:
"""
Request to create a snapshot.
Attributes:
- name: Snapshot name
- subscription: Subscription to create snapshot from
- labels: User-defined labels
"""
name: str
subscription: str
labels: MutableMapping[str, str]
class UpdateSnapshotRequest:
"""
Request to update a snapshot.
Attributes:
- snapshot: Updated snapshot configuration
- update_mask: Fields to update
"""
snapshot: Snapshot
update_mask: FieldMaskTypes for field masks and common protobuf types.
class FieldMask:
"""
Field mask for specifying which fields to update.
Attributes:
- paths: List of field paths to update
"""
paths: Sequence[str]
class Timestamp:
"""
Protobuf timestamp representation.
Attributes:
- seconds: Seconds since Unix epoch
- nanos: Nanoseconds within the second
"""
seconds: int
nanos: int
class Duration:
"""
Protobuf duration representation.
Attributes:
- seconds: Duration in seconds
- nanos: Nanoseconds within the second
"""
seconds: int
nanos: intEnumeration types used throughout the API.
class Encoding(Enum):
"""
Message encoding types.
"""
JSON = "JSON"
BINARY = "BINARY"
class SchemaView(Enum):
"""
Schema view types for retrieval.
"""
BASIC = "BASIC" # Name and type only
FULL = "FULL" # Complete schema definition
class Subscription.State(Enum):
"""
Subscription states.
"""
ACTIVE = "ACTIVE"
RESOURCE_ERROR = "RESOURCE_ERROR"from google.cloud.pubsub_v1 import types
from google.protobuf.timestamp_pb2 import Timestamp
import time
# Create a PubsubMessage
message = types.PubsubMessage(
data=b"Hello, World!",
attributes={
"event_type": "user_action",
"user_id": "12345"
},
ordering_key="user-12345"
)
# Create a publish request
publish_request = types.PublishRequest(
topic="projects/my-project/topics/my-topic",
messages=[message]
)from google.cloud.pubsub_v1 import types
from google.protobuf.duration_pb2 import Duration
# Create topic with schema settings
schema_settings = types.SchemaSettings(
schema="projects/my-project/schemas/user-events",
encoding=types.Encoding.JSON
)
topic = types.Topic(
name="projects/my-project/topics/user-events",
schema_settings=schema_settings,
labels={"environment": "production"}
)
# Create subscription with dead letter policy
dead_letter_policy = types.DeadLetterPolicy(
dead_letter_topic="projects/my-project/topics/user-events-dlq",
max_delivery_attempts=5
)
retry_policy = types.RetryPolicy(
minimum_backoff=Duration(seconds=10),
maximum_backoff=Duration(seconds=300)
)
subscription = types.Subscription(
name="projects/my-project/subscriptions/user-events-sub",
topic="projects/my-project/topics/user-events",
ack_deadline_seconds=60,
dead_letter_policy=dead_letter_policy,
retry_policy=retry_policy,
enable_message_ordering=True,
enable_exactly_once_delivery=True
)# Configure push endpoint
push_config = types.PushConfig(
push_endpoint="https://my-app.example.com/webhook",
attributes={
"x-goog-version": "v1"
}
)
# Subscription with push delivery
push_subscription = types.Subscription(
name="projects/my-project/subscriptions/push-sub",
topic="projects/my-project/topics/my-topic",
push_config=push_config
)# Configure BigQuery delivery
bigquery_config = types.BigQueryConfig(
table="my-dataset.my-table",
use_topic_schema=True,
write_metadata=True,
drop_unknown_fields=False
)
bq_subscription = types.Subscription(
name="projects/my-project/subscriptions/bq-sub",
topic="projects/my-project/topics/my-topic",
bigquery_config=bigquery_config
)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub