Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
npx @tessl/cli install tessl/pypi-google-cloud-pubsub@2.31.0Google Cloud Pub/Sub is a fully-managed real-time messaging service that enables reliable, many-to-many, asynchronous messaging between applications. The Python client library provides comprehensive publisher and subscriber functionality with automatic retry logic, flow control, message ordering capabilities, and authentication integration with Google Cloud identity services.
pip install google-cloud-pubsubfrom google.cloud import pubsub_v1Import specific components:
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient, typesfrom google.cloud import pubsub_v1
# Create clients
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
# Publishing messages
topic_path = publisher.topic_path("my-project", "my-topic")
message_data = b"Hello, World!"
future = publisher.publish(topic_path, message_data)
message_id = future.result()
# Subscribing to messages
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
print(f"Received: {message.data.decode('utf-8')}")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)Google Cloud Pub/Sub follows a publisher-subscriber messaging pattern with the following key components:
The library provides both high-level clients with advanced features (batching, flow control, futures) and low-level GAPIC clients for direct API access.
High-level client for publishing messages to Pub/Sub topics with automatic batching, flow control, message ordering, and OpenTelemetry integration.
class PublisherClient:
def __init__(self, batch_settings=None, publisher_options=None, **kwargs): ...
def publish(self, topic, data, ordering_key=None, **attrs): ...
def resume_publish(self, topic, ordering_key): ...
def stop(self): ...High-level client for subscribing to Pub/Sub subscriptions with automatic message handling, flow control, and OpenTelemetry integration.
class SubscriberClient:
def __init__(self, flow_control=None, subscriber_options=None, **kwargs): ...
def subscribe(self, subscription, callback, flow_control=None, scheduler=None): ...
def close(self): ...Client for managing Pub/Sub schemas including creation, validation, and evolution of message schemas.
class SchemaServiceClient:
def create_schema(self, request=None, **kwargs): ...
def get_schema(self, request=None, **kwargs): ...
def list_schemas(self, request=None, **kwargs): ...
def validate_schema(self, request=None, **kwargs): ...
def validate_message(self, request=None, **kwargs): ...Message objects and utilities for handling received messages, including acknowledgment, negative acknowledgment, and deadline modification.
class Message:
def ack(self): ...
def nack(self): ...
def modify_ack_deadline(self, seconds): ...
def ack_with_response(self): ...
def nack_with_response(self): ...Comprehensive configuration options for client behavior including batching, flow control, publisher options, and subscriber options.
class BatchSettings(NamedTuple):
max_bytes: int = 1000000
max_latency: float = 0.01
max_messages: int = 100
class PublisherOptions(NamedTuple):
enable_message_ordering: bool = False
flow_control: PublishFlowControl = PublishFlowControl()
retry: OptionalRetry = ...
timeout: OptionalTimeout = ...Complete set of protobuf message types for Pub/Sub operations including topics, subscriptions, messages, and requests/responses.
class PubsubMessage: ...
class Topic: ...
class Subscription: ...
class PublishRequest: ...
class PublishResponse: ...Comprehensive exception types for handling errors in publishing and subscribing operations, including flow control errors and acknowledgment failures.
# Publisher exceptions
class PublishError(Exception): ...
class MessageTooLargeError(PublishError): ...
class PublishToPausedOrderingKeyException(PublishError): ...
class FlowControlLimitError(PublishError): ...
# Subscriber exceptions
class AcknowledgeError(Exception): ...
class AcknowledgeStatus(Enum): ...
class TimeoutError(Exception): ...Scheduler classes and utility functions for controlling message processing behavior and resource management in subscriber operations.
class ThreadScheduler:
def __init__(self, executor: Optional[ThreadPoolExecutor] = None): ...
def schedule(self, callback: Callable, *args, **kwargs) -> Future: ...
def shutdown(self, wait: bool = True) -> None: ...
class StreamingPullFuture:
def cancel(self) -> bool: ...
def result(self, timeout: Optional[float] = None) -> None: ...from typing import Callable, Optional, Union, Any, Sequence
from concurrent.futures import Future as ConcurrentFuture
from google.api_core import retry, gapic_v1
# Core type aliases
MessageCallback = Callable[[Message], Any]
OptionalRetry = Union[retry.Retry, object]
OptionalTimeout = Union[float, object]
DEFAULT = gapic_v1.method.DEFAULT
# Future type for publisher operations
class Future(ConcurrentFuture):
def result(self, timeout: Optional[float] = None) -> str: ...
def add_done_callback(self, callback: Callable[["Future"], None]) -> None: ...
# Scheduler type for subscriber
class ThreadScheduler:
"""Custom scheduler for message processing in subscriber."""
pass
# Streaming pull future for subscriber operations
class StreamingPullFuture:
def cancel(self) -> bool: ...
def cancelled(self) -> bool: ...
def running(self) -> bool: ...
def result(self, timeout: Optional[float] = None) -> None: ...