Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
The PublisherClient provides high-level functionality for publishing messages to Google Cloud Pub/Sub topics. It handles automatic batching, flow control, message ordering, retry logic, and OpenTelemetry integration.
Create and configure a PublisherClient with custom batching and flow control settings.
class PublisherClient:
def __init__(
self,
batch_settings: Optional[BatchSettings] = None,
publisher_options: Optional[PublisherOptions] = None,
**kwargs
):
"""
Initialize the publisher client.
Parameters:
- batch_settings: Settings for message batching behavior
- publisher_options: Options for publisher client behavior
- **kwargs: Additional arguments passed to underlying GAPIC client
"""
@classmethod
def from_service_account_file(
cls,
filename: str,
**kwargs
) -> "PublisherClient":
"""
Create client from service account file.
Parameters:
- filename: Path to service account JSON file
- **kwargs: Additional arguments for client initialization
Returns:
PublisherClient instance
"""Publish messages to topics with support for attributes, ordering keys, and futures for result handling.
def publish(
self,
topic: str,
data: bytes,
ordering_key: str = "",
retry: OptionalRetry = DEFAULT,
timeout: OptionalTimeout = DEFAULT,
**attrs: Union[bytes, str]
) -> Future:
"""
Publish a message to a topic.
Parameters:
- topic: Full topic path (e.g., "projects/my-project/topics/my-topic")
- data: Message payload as bytes
- ordering_key: Optional key for message ordering (default: "")
- retry: Retry configuration for the publish operation
- timeout: Timeout configuration for the publish operation
- **attrs: Message attributes as keyword arguments (values can be bytes or str)
Returns:
Future that resolves to message ID string
"""Resume publishing for an ordering key after an error has occurred.
def resume_publish(self, topic: str, ordering_key: str) -> None:
"""
Resume publishing for ordering key after error.
Parameters:
- topic: Full topic path
- ordering_key: Ordering key to resume
"""Create, retrieve, update, and delete topics using the underlying GAPIC client methods.
def create_topic(
self,
request: Optional[CreateTopicRequest] = None,
*,
name: Optional[str] = None,
**kwargs
) -> Topic:
"""
Create a new topic.
Parameters:
- request: The request object for creating a topic
- name: Topic name (e.g., "projects/my-project/topics/my-topic")
- **kwargs: Additional keyword arguments
Returns:
Created Topic object
"""
def get_topic(
self,
request: Optional[GetTopicRequest] = None,
*,
topic: Optional[str] = None,
**kwargs
) -> Topic:
"""
Get a topic.
Parameters:
- request: The request object for getting a topic
- topic: Topic name to retrieve
- **kwargs: Additional keyword arguments
Returns:
Topic object
"""
def list_topics(
self,
request: Optional[ListTopicsRequest] = None,
*,
project: Optional[str] = None,
**kwargs
) -> ListTopicsResponse:
"""
List topics in a project.
Parameters:
- request: The request object for listing topics
- project: Project path (e.g., "projects/my-project")
- **kwargs: Additional keyword arguments
Returns:
ListTopicsResponse with topics
"""
def list_topic_subscriptions(
self,
request: Optional[ListTopicSubscriptionsRequest] = None,
*,
topic: Optional[str] = None,
**kwargs
) -> ListTopicSubscriptionsResponse:
"""
List subscriptions attached to a topic.
Parameters:
- request: The request object for listing topic subscriptions
- topic: Topic name
- **kwargs: Additional keyword arguments
Returns:
ListTopicSubscriptionsResponse with subscription names
"""
def list_topic_snapshots(
self,
request: Optional[ListTopicSnapshotsRequest] = None,
*,
topic: Optional[str] = None,
**kwargs
) -> ListTopicSnapshotsResponse:
"""
List snapshots for a topic.
Parameters:
- request: The request object for listing topic snapshots
- topic: Topic name
- **kwargs: Additional keyword arguments
Returns:
ListTopicSnapshotsResponse with snapshot names
"""
def update_topic(
self,
request: Optional[UpdateTopicRequest] = None,
*,
topic: Optional[Topic] = None,
update_mask: Optional[FieldMask] = None,
**kwargs
) -> Topic:
"""
Update a topic.
Parameters:
- request: The request object for updating a topic
- topic: Updated topic configuration
- update_mask: Fields to update
- **kwargs: Additional keyword arguments
Returns:
Updated Topic object
"""
def delete_topic(
self,
request: Optional[DeleteTopicRequest] = None,
*,
topic: Optional[str] = None,
**kwargs
) -> None:
"""
Delete a topic.
Parameters:
- request: The request object for deleting a topic
- topic: Topic name to delete
- **kwargs: Additional keyword arguments
"""Utility methods for constructing and parsing resource paths.
@staticmethod
def topic_path(project: str, topic: str) -> str:
"""
Construct a topic path from project ID and topic name.
Parameters:
- project: Project ID
- topic: Topic name
Returns:
Full topic path string
"""
@staticmethod
def subscription_path(project: str, subscription: str) -> str:
"""
Construct a subscription path from project ID and subscription name.
Parameters:
- project: Project ID
- subscription: Subscription name
Returns:
Full subscription path string
"""
@staticmethod
def schema_path(project: str, schema: str) -> str:
"""
Construct a schema path from project ID and schema name.
Parameters:
- project: Project ID
- schema: Schema name
Returns:
Full schema path string
"""
@staticmethod
def parse_topic_path(path: str) -> Dict[str, str]:
"""
Parse a topic path into its components.
Parameters:
- path: Topic path string
Returns:
Dictionary with 'project' and 'topic' keys
"""
@staticmethod
def parse_subscription_path(path: str) -> Dict[str, str]:
"""
Parse a subscription path into its components.
Parameters:
- path: Subscription path string
Returns:
Dictionary with 'project' and 'subscription' keys
"""
@staticmethod
def parse_schema_path(path: str) -> Dict[str, str]:
"""
Parse a schema path into its components.
Parameters:
- path: Schema path string
Returns:
Dictionary with 'project' and 'schema' keys
"""Control client lifecycle and access underlying components.
def stop(self) -> None:
"""
Stop the publisher client and wait for all batches to complete.
"""
@property
def target(self) -> str:
"""
Get the target endpoint for the client.
Returns:
Target endpoint URL
"""
@property
def api(self):
"""
Get the underlying GAPIC publisher client.
Returns:
GAPIC PublisherClient instance
"""
@property
def open_telemetry_enabled(self) -> bool:
"""
Check if OpenTelemetry tracing is enabled.
Returns:
True if OpenTelemetry is enabled
"""Publisher operations return Future objects for asynchronous result handling.
class Future:
def result(self, timeout: Optional[float] = None) -> str:
"""
Get the message ID or raise an exception.
Parameters:
- timeout: Number of seconds to wait before timeout
Returns:
Message ID string
Raises:
TimeoutError: If operation times out
Exception: For other errors in publish operation
"""
def add_done_callback(
self,
callback: Callable[["Future"], None]
) -> None:
"""
Add callback to be called when future completes.
Parameters:
- callback: Function to call with future as argument
"""
def cancel(self) -> bool:
"""
Attempt to cancel the operation.
Returns:
Always False (Pub/Sub operations cannot be cancelled)
"""
def cancelled(self) -> bool:
"""
Check if operation was cancelled.
Returns:
Always False (Pub/Sub operations cannot be cancelled)
"""from google.cloud import pubsub_v1
# Create publisher client
publisher = pubsub_v1.PublisherClient()
# Publish a message
topic_path = publisher.topic_path("my-project", "my-topic")
message_data = b"Hello, World!"
future = publisher.publish(topic_path, message_data)
# Get message ID
try:
message_id = future.result(timeout=30)
print(f"Published message with ID: {message_id}")
except Exception as e:
print(f"Failed to publish: {e}")# Publish message with attributes
future = publisher.publish(
topic_path,
b"Message with metadata",
event_type="user_action",
user_id="12345",
timestamp="2024-01-01T00:00:00Z"
)from google.cloud.pubsub_v1 import types
# Configure for message ordering
publisher_options = types.PublisherOptions(
enable_message_ordering=True
)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
# Publish ordered messages
for i in range(5):
future = publisher.publish(
topic_path,
f"Message {i}".encode(),
ordering_key="user-123"
)# Configure custom batching
batch_settings = types.BatchSettings(
max_bytes=500000, # 500KB
max_latency=0.05, # 50ms
max_messages=50
)
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)def publish_callback(future):
try:
message_id = future.result()
print(f"Published: {message_id}")
except Exception as e:
print(f"Publish failed: {e}")
# Publish with callback
future = publisher.publish(topic_path, b"Async message")
future.add_done_callback(publish_callback)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub