Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
The SubscriberClient provides high-level functionality for subscribing to Google Cloud Pub/Sub subscriptions. It handles automatic message acknowledgment, flow control, scheduling, and OpenTelemetry integration.
Create and configure a SubscriberClient with custom flow control and subscriber options.
class SubscriberClient:
def __init__(
self,
flow_control: Optional[FlowControl] = None,
subscriber_options: Optional[SubscriberOptions] = None,
**kwargs
):
"""
Initialize the subscriber client.
Parameters:
- flow_control: Settings for message flow control
- subscriber_options: Options for subscriber client behavior
- **kwargs: Additional arguments passed to underlying GAPIC client
"""
@classmethod
def from_service_account_file(
cls,
filename: str,
**kwargs
) -> "SubscriberClient":
"""
Create client from service account file.
Parameters:
- filename: Path to service account JSON file
- **kwargs: Additional arguments for client initialization
Returns:
SubscriberClient instance
"""Subscribe to messages from subscriptions with callback-based processing.
def subscribe(
self,
subscription: str,
callback: Callable[[Message], Any],
flow_control: Union[FlowControl, Sequence] = (),
scheduler: Optional[ThreadScheduler] = None,
use_legacy_flow_control: bool = False,
await_callbacks_on_shutdown: bool = False
) -> StreamingPullFuture:
"""
Subscribe to messages from a subscription.
Parameters:
- subscription: Full subscription path (e.g., "projects/my-project/subscriptions/my-sub")
- callback: Function to process received messages
- flow_control: Flow control settings or legacy sequence format
- scheduler: Custom scheduler for message processing
- use_legacy_flow_control: Whether to use legacy flow control behavior
- await_callbacks_on_shutdown: Whether to wait for callbacks on shutdown
Returns:
StreamingPullFuture that can be used to control the subscription
"""Create, retrieve, update, and delete subscriptions using the underlying GAPIC client methods.
def create_subscription(
self,
request: Optional[CreateSubscriptionRequest] = None,
*,
name: Optional[str] = None,
topic: Optional[str] = None,
**kwargs
) -> Subscription:
"""
Create a new subscription.
Parameters:
- request: The request object for creating a subscription
- name: Subscription name (e.g., "projects/my-project/subscriptions/my-sub")
- topic: Topic name to subscribe to
- **kwargs: Additional keyword arguments
Returns:
Created Subscription object
"""
def get_subscription(
self,
request: Optional[GetSubscriptionRequest] = None,
*,
subscription: Optional[str] = None,
**kwargs
) -> Subscription:
"""
Get a subscription.
Parameters:
- request: The request object for getting a subscription
- subscription: Subscription name to retrieve
- **kwargs: Additional keyword arguments
Returns:
Subscription object
"""
def update_subscription(
self,
request: Optional[UpdateSubscriptionRequest] = None,
*,
subscription: Optional[Subscription] = None,
update_mask: Optional[FieldMask] = None,
**kwargs
) -> Subscription:
"""
Update a subscription.
Parameters:
- request: The request object for updating a subscription
- subscription: Updated subscription configuration
- update_mask: Fields to update
- **kwargs: Additional keyword arguments
Returns:
Updated Subscription object
"""
def list_subscriptions(
self,
request: Optional[ListSubscriptionsRequest] = None,
*,
project: Optional[str] = None,
**kwargs
) -> ListSubscriptionsResponse:
"""
List subscriptions in a project.
Parameters:
- request: The request object for listing subscriptions
- project: Project path (e.g., "projects/my-project")
- **kwargs: Additional keyword arguments
Returns:
ListSubscriptionsResponse with subscriptions
"""
def delete_subscription(
self,
request: Optional[DeleteSubscriptionRequest] = None,
*,
subscription: Optional[str] = None,
**kwargs
) -> None:
"""
Delete a subscription.
Parameters:
- request: The request object for deleting a subscription
- subscription: Subscription name to delete
- **kwargs: Additional keyword arguments
"""Low-level message operations for acknowledgment and deadline modification.
def acknowledge(
self,
request: Optional[AcknowledgeRequest] = None,
*,
subscription: Optional[str] = None,
ack_ids: Optional[Sequence[str]] = None,
**kwargs
) -> None:
"""
Acknowledge messages by their acknowledgment IDs.
Parameters:
- request: The request object for acknowledging messages
- subscription: Subscription name
- ack_ids: List of acknowledgment IDs to acknowledge
- **kwargs: Additional keyword arguments
"""
def modify_ack_deadline(
self,
request: Optional[ModifyAckDeadlineRequest] = None,
*,
subscription: Optional[str] = None,
ack_ids: Optional[Sequence[str]] = None,
ack_deadline_seconds: Optional[int] = None,
**kwargs
) -> None:
"""
Modify acknowledgment deadline for messages.
Parameters:
- request: The request object for modifying acknowledgment deadlines
- subscription: Subscription name
- ack_ids: List of acknowledgment IDs to modify
- ack_deadline_seconds: New deadline in seconds
- **kwargs: Additional keyword arguments
"""
def pull(
self,
request: Optional[PullRequest] = None,
*,
subscription: Optional[str] = None,
max_messages: Optional[int] = None,
**kwargs
) -> PullResponse:
"""
Pull messages from a subscription synchronously.
Parameters:
- request: The request object for pulling messages
- subscription: Subscription name to pull from
- max_messages: Maximum number of messages to return
- **kwargs: Additional keyword arguments
Returns:
PullResponse with received messages
"""
def streaming_pull(
self,
requests: Iterator[StreamingPullRequest],
**kwargs
) -> Iterator[StreamingPullResponse]:
"""
Establish a streaming pull connection.
Parameters:
- requests: Iterator of streaming pull requests
- **kwargs: Additional keyword arguments
Returns:
Iterator of streaming pull responses
"""Create and manage snapshots for seeking to specific points in time.
def create_snapshot(
self,
request: Optional[CreateSnapshotRequest] = None,
*,
name: Optional[str] = None,
subscription: Optional[str] = None,
**kwargs
) -> Snapshot:
"""
Create a snapshot of a subscription.
Parameters:
- request: The request object for creating a snapshot
- name: Snapshot name (e.g., "projects/my-project/snapshots/my-snapshot")
- subscription: Subscription to create snapshot from
- **kwargs: Additional keyword arguments
Returns:
Created Snapshot object
"""
def get_snapshot(
self,
request: Optional[GetSnapshotRequest] = None,
*,
snapshot: Optional[str] = None,
**kwargs
) -> Snapshot:
"""
Get a snapshot.
Parameters:
- request: The request object for getting a snapshot
- snapshot: Snapshot name to retrieve
- **kwargs: Additional keyword arguments
Returns:
Snapshot object
"""
def list_snapshots(
self,
request: Optional[ListSnapshotsRequest] = None,
*,
project: Optional[str] = None,
**kwargs
) -> ListSnapshotsResponse:
"""
List snapshots in a project.
Parameters:
- request: The request object for listing snapshots
- project: Project path (e.g., "projects/my-project")
- **kwargs: Additional keyword arguments
Returns:
ListSnapshotsResponse with snapshots
"""
def update_snapshot(
self,
request: Optional[UpdateSnapshotRequest] = None,
*,
snapshot: Optional[Snapshot] = None,
update_mask: Optional[FieldMask] = None,
**kwargs
) -> Snapshot:
"""
Update a snapshot.
Parameters:
- request: The request object for updating a snapshot
- snapshot: Updated snapshot configuration
- update_mask: Fields to update
- **kwargs: Additional keyword arguments
Returns:
Updated Snapshot object
"""
def delete_snapshot(
self,
request: Optional[DeleteSnapshotRequest] = None,
*,
snapshot: Optional[str] = None,
**kwargs
) -> None:
"""
Delete a snapshot.
Parameters:
- request: The request object for deleting a snapshot
- snapshot: Snapshot name to delete
- **kwargs: Additional keyword arguments
"""
def seek(
self,
request: Optional[SeekRequest] = None,
*,
subscription: Optional[str] = None,
**kwargs
) -> SeekResponse:
"""
Seek a subscription to a specific snapshot or time.
Parameters:
- request: The request object for seeking
- subscription: Subscription name to seek
- **kwargs: Additional keyword arguments
Returns:
SeekResponse indicating seek result
"""Utility methods for constructing and parsing resource paths.
@staticmethod
def subscription_path(project: str, subscription: str) -> str:
"""
Construct a subscription path from project ID and subscription name.
Parameters:
- project: Project ID
- subscription: Subscription name
Returns:
Full subscription path string
"""
@staticmethod
def snapshot_path(project: str, snapshot: str) -> str:
"""
Construct a snapshot path from project ID and snapshot name.
Parameters:
- project: Project ID
- snapshot: Snapshot name
Returns:
Full snapshot path string
"""
@staticmethod
def topic_path(project: str, topic: str) -> str:
"""
Construct a topic path from project ID and topic name.
Parameters:
- project: Project ID
- topic: Topic name
Returns:
Full topic path string
"""
@staticmethod
def parse_subscription_path(path: str) -> Dict[str, str]:
"""
Parse a subscription path into its components.
Parameters:
- path: Subscription path string
Returns:
Dictionary with 'project' and 'subscription' keys
"""
@staticmethod
def parse_snapshot_path(path: str) -> Dict[str, str]:
"""
Parse a snapshot path into its components.
Parameters:
- path: Snapshot path string
Returns:
Dictionary with 'project' and 'snapshot' keys
"""
@staticmethod
def parse_topic_path(path: str) -> Dict[str, str]:
"""
Parse a topic path into its components.
Parameters:
- path: Topic path string
Returns:
Dictionary with 'project' and 'topic' keys
"""Control client lifecycle and access underlying components.
def close(self) -> None:
"""
Close the subscriber client and stop all subscriptions.
"""
@property
def target(self) -> str:
"""
Get the target endpoint for the client.
Returns:
Target endpoint URL
"""
@property
def api(self):
"""
Get the underlying GAPIC subscriber client.
Returns:
GAPIC SubscriberClient instance
"""
@property
def closed(self) -> bool:
"""
Check if the client is closed.
Returns:
True if client is closed
"""
@property
def open_telemetry_enabled(self) -> bool:
"""
Check if OpenTelemetry tracing is enabled.
Returns:
True if OpenTelemetry is enabled
"""Use SubscriberClient as a context manager for automatic cleanup.
def __enter__(self) -> "SubscriberClient":
"""
Enter context manager.
Returns:
Self
"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit context manager and close client.
"""Control and monitor streaming pull operations.
class StreamingPullFuture:
def cancel(self) -> bool:
"""
Cancel the streaming pull operation.
Returns:
True if cancellation was successful
"""
def cancelled(self) -> bool:
"""
Check if the operation was cancelled.
Returns:
True if operation is cancelled
"""
def running(self) -> bool:
"""
Check if the operation is currently running.
Returns:
True if operation is running
"""
def result(self, timeout: Optional[float] = None) -> None:
"""
Wait for the streaming pull to complete.
Parameters:
- timeout: Maximum time to wait in seconds
Raises:
TimeoutError: If timeout is reached
"""from google.cloud import pubsub_v1
# Create subscriber client
subscriber = pubsub_v1.SubscriberClient()
def callback(message):
print(f"Received: {message.data.decode('utf-8')}")
print(f"Attributes: {message.attributes}")
message.ack()
# Subscribe to messages
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")
# Keep the main thread running
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
subscriber.close()from google.cloud.pubsub_v1 import types
# Configure flow control
flow_control = types.FlowControl(
max_messages=100, # Process up to 100 messages concurrently
max_bytes=10 * 1024 * 1024, # 10MB max outstanding bytes
max_lease_duration=600 # 10 minute max lease duration
)
subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)def callback(message):
try:
# Process the message
data = message.data.decode('utf-8')
print(f"Processing: {data}")
# Simulate processing
process_message(data)
# Acknowledge successful processing
message.ack()
except Exception as e:
print(f"Error processing message: {e}")
# Negative acknowledge to retry later
message.nack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)from google.cloud import pubsub_v1
def callback(message):
print(f"Received: {message.data.decode('utf-8')}")
message.ack()
# Use subscriber as context manager
with pubsub_v1.SubscriberClient() as subscriber:
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
try:
# Listen for messages for 30 seconds
streaming_pull_future.result(timeout=30)
except Exception:
streaming_pull_future.cancel()
# Client automatically closed when exiting contextimport concurrent.futures
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
def callback_a(message):
print(f"Subscription A: {message.data.decode('utf-8')}")
message.ack()
def callback_b(message):
print(f"Subscription B: {message.data.decode('utf-8')}")
message.ack()
# Subscribe to multiple subscriptions
sub_a_path = subscriber.subscription_path("my-project", "subscription-a")
sub_b_path = subscriber.subscription_path("my-project", "subscription-b")
future_a = subscriber.subscribe(sub_a_path, callback=callback_a)
future_b = subscriber.subscribe(sub_b_path, callback=callback_b)
# Wait for any subscription to complete or fail
futures = [future_a, future_b]
try:
concurrent.futures.as_completed(futures, timeout=300)
except KeyboardInterrupt:
for future in futures:
future.cancel()
finally:
subscriber.close()def callback(message):
print(f"Processing: {message.data.decode('utf-8')}")
# Extend deadline if processing takes longer
message.modify_ack_deadline(60) # Extend by 60 seconds
try:
# Long processing operation
long_running_task(message.data)
message.ack()
except Exception as e:
print(f"Processing failed: {e}")
message.nack()Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub