CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

subscriber.mddocs/

Subscriber Client

The SubscriberClient provides high-level functionality for subscribing to Google Cloud Pub/Sub subscriptions. It handles automatic message acknowledgment, flow control, scheduling, and OpenTelemetry integration.

Capabilities

Client Initialization

Create and configure a SubscriberClient with custom flow control and subscriber options.

class SubscriberClient:
    def __init__(
        self,
        flow_control: Optional[FlowControl] = None,
        subscriber_options: Optional[SubscriberOptions] = None,
        **kwargs
    ):
        """
        Initialize the subscriber client.

        Parameters:
        - flow_control: Settings for message flow control
        - subscriber_options: Options for subscriber client behavior
        - **kwargs: Additional arguments passed to underlying GAPIC client
        """

    @classmethod
    def from_service_account_file(
        cls,
        filename: str,
        **kwargs
    ) -> "SubscriberClient":
        """
        Create client from service account file.

        Parameters:
        - filename: Path to service account JSON file
        - **kwargs: Additional arguments for client initialization

        Returns:
        SubscriberClient instance
        """

Message Subscription

Subscribe to messages from subscriptions with callback-based processing.

def subscribe(
    self,
    subscription: str,
    callback: Callable[[Message], Any],
    flow_control: Union[FlowControl, Sequence] = (),
    scheduler: Optional[ThreadScheduler] = None,
    use_legacy_flow_control: bool = False,
    await_callbacks_on_shutdown: bool = False
) -> StreamingPullFuture:
    """
    Subscribe to messages from a subscription.

    Parameters:
    - subscription: Full subscription path (e.g., "projects/my-project/subscriptions/my-sub")
    - callback: Function to process received messages
    - flow_control: Flow control settings or legacy sequence format
    - scheduler: Custom scheduler for message processing
    - use_legacy_flow_control: Whether to use legacy flow control behavior
    - await_callbacks_on_shutdown: Whether to wait for callbacks on shutdown

    Returns:
    StreamingPullFuture that can be used to control the subscription
    """

Subscription Management

Create, retrieve, update, and delete subscriptions using the underlying GAPIC client methods.

def create_subscription(
    self,
    request: Optional[CreateSubscriptionRequest] = None,
    *,
    name: Optional[str] = None,
    topic: Optional[str] = None,
    **kwargs
) -> Subscription:
    """
    Create a new subscription.

    Parameters:
    - request: The request object for creating a subscription
    - name: Subscription name (e.g., "projects/my-project/subscriptions/my-sub")
    - topic: Topic name to subscribe to
    - **kwargs: Additional keyword arguments

    Returns:
    Created Subscription object
    """

def get_subscription(
    self,
    request: Optional[GetSubscriptionRequest] = None,
    *,
    subscription: Optional[str] = None,
    **kwargs
) -> Subscription:
    """
    Get a subscription.

    Parameters:
    - request: The request object for getting a subscription
    - subscription: Subscription name to retrieve
    - **kwargs: Additional keyword arguments

    Returns:
    Subscription object
    """

def update_subscription(
    self,
    request: Optional[UpdateSubscriptionRequest] = None,
    *,
    subscription: Optional[Subscription] = None,
    update_mask: Optional[FieldMask] = None,
    **kwargs
) -> Subscription:
    """
    Update a subscription.

    Parameters:
    - request: The request object for updating a subscription
    - subscription: Updated subscription configuration
    - update_mask: Fields to update
    - **kwargs: Additional keyword arguments

    Returns:
    Updated Subscription object
    """

def list_subscriptions(
    self,
    request: Optional[ListSubscriptionsRequest] = None,
    *,
    project: Optional[str] = None,
    **kwargs
) -> ListSubscriptionsResponse:
    """
    List subscriptions in a project.

    Parameters:
    - request: The request object for listing subscriptions
    - project: Project path (e.g., "projects/my-project")
    - **kwargs: Additional keyword arguments

    Returns:
    ListSubscriptionsResponse with subscriptions
    """

def delete_subscription(
    self,
    request: Optional[DeleteSubscriptionRequest] = None,
    *,
    subscription: Optional[str] = None,
    **kwargs
) -> None:
    """
    Delete a subscription.

    Parameters:
    - request: The request object for deleting a subscription
    - subscription: Subscription name to delete
    - **kwargs: Additional keyword arguments
    """

Message Operations

Low-level message operations for acknowledgment and deadline modification.

def acknowledge(
    self,
    request: Optional[AcknowledgeRequest] = None,
    *,
    subscription: Optional[str] = None,
    ack_ids: Optional[Sequence[str]] = None,
    **kwargs
) -> None:
    """
    Acknowledge messages by their acknowledgment IDs.

    Parameters:
    - request: The request object for acknowledging messages
    - subscription: Subscription name
    - ack_ids: List of acknowledgment IDs to acknowledge
    - **kwargs: Additional keyword arguments
    """

def modify_ack_deadline(
    self,
    request: Optional[ModifyAckDeadlineRequest] = None,
    *,
    subscription: Optional[str] = None,
    ack_ids: Optional[Sequence[str]] = None,
    ack_deadline_seconds: Optional[int] = None,
    **kwargs
) -> None:
    """
    Modify acknowledgment deadline for messages.

    Parameters:
    - request: The request object for modifying acknowledgment deadlines
    - subscription: Subscription name
    - ack_ids: List of acknowledgment IDs to modify
    - ack_deadline_seconds: New deadline in seconds
    - **kwargs: Additional keyword arguments
    """

def pull(
    self,
    request: Optional[PullRequest] = None,
    *,
    subscription: Optional[str] = None,
    max_messages: Optional[int] = None,
    **kwargs
) -> PullResponse:
    """
    Pull messages from a subscription synchronously.

    Parameters:
    - request: The request object for pulling messages
    - subscription: Subscription name to pull from
    - max_messages: Maximum number of messages to return
    - **kwargs: Additional keyword arguments

    Returns:
    PullResponse with received messages
    """

def streaming_pull(
    self,
    requests: Iterator[StreamingPullRequest],
    **kwargs
) -> Iterator[StreamingPullResponse]:
    """
    Establish a streaming pull connection.

    Parameters:
    - requests: Iterator of streaming pull requests
    - **kwargs: Additional keyword arguments

    Returns:
    Iterator of streaming pull responses
    """

Snapshot Operations

Create and manage snapshots for seeking to specific points in time.

def create_snapshot(
    self,
    request: Optional[CreateSnapshotRequest] = None,
    *,
    name: Optional[str] = None,
    subscription: Optional[str] = None,
    **kwargs
) -> Snapshot:
    """
    Create a snapshot of a subscription.

    Parameters:
    - request: The request object for creating a snapshot
    - name: Snapshot name (e.g., "projects/my-project/snapshots/my-snapshot")
    - subscription: Subscription to create snapshot from
    - **kwargs: Additional keyword arguments

    Returns:
    Created Snapshot object
    """

def get_snapshot(
    self,
    request: Optional[GetSnapshotRequest] = None,
    *,
    snapshot: Optional[str] = None,
    **kwargs
) -> Snapshot:
    """
    Get a snapshot.

    Parameters:
    - request: The request object for getting a snapshot
    - snapshot: Snapshot name to retrieve
    - **kwargs: Additional keyword arguments

    Returns:
    Snapshot object
    """

def list_snapshots(
    self,
    request: Optional[ListSnapshotsRequest] = None,
    *,
    project: Optional[str] = None,
    **kwargs
) -> ListSnapshotsResponse:
    """
    List snapshots in a project.

    Parameters:
    - request: The request object for listing snapshots
    - project: Project path (e.g., "projects/my-project")
    - **kwargs: Additional keyword arguments

    Returns:
    ListSnapshotsResponse with snapshots
    """

def update_snapshot(
    self,
    request: Optional[UpdateSnapshotRequest] = None,
    *,
    snapshot: Optional[Snapshot] = None,
    update_mask: Optional[FieldMask] = None,
    **kwargs
) -> Snapshot:
    """
    Update a snapshot.

    Parameters:
    - request: The request object for updating a snapshot
    - snapshot: Updated snapshot configuration
    - update_mask: Fields to update
    - **kwargs: Additional keyword arguments

    Returns:
    Updated Snapshot object
    """

def delete_snapshot(
    self,
    request: Optional[DeleteSnapshotRequest] = None,
    *,
    snapshot: Optional[str] = None,
    **kwargs
) -> None:
    """
    Delete a snapshot.

    Parameters:
    - request: The request object for deleting a snapshot
    - snapshot: Snapshot name to delete
    - **kwargs: Additional keyword arguments
    """

def seek(
    self,
    request: Optional[SeekRequest] = None,
    *,
    subscription: Optional[str] = None,
    **kwargs
) -> SeekResponse:
    """
    Seek a subscription to a specific snapshot or time.

    Parameters:
    - request: The request object for seeking
    - subscription: Subscription name to seek
    - **kwargs: Additional keyword arguments

    Returns:
    SeekResponse indicating seek result
    """

Path Helper Methods

Utility methods for constructing and parsing resource paths.

@staticmethod
def subscription_path(project: str, subscription: str) -> str:
    """
    Construct a subscription path from project ID and subscription name.

    Parameters:
    - project: Project ID
    - subscription: Subscription name

    Returns:
    Full subscription path string
    """

@staticmethod
def snapshot_path(project: str, snapshot: str) -> str:
    """
    Construct a snapshot path from project ID and snapshot name.

    Parameters:
    - project: Project ID
    - snapshot: Snapshot name

    Returns:
    Full snapshot path string
    """

@staticmethod
def topic_path(project: str, topic: str) -> str:
    """
    Construct a topic path from project ID and topic name.

    Parameters:
    - project: Project ID
    - topic: Topic name

    Returns:
    Full topic path string
    """

@staticmethod
def parse_subscription_path(path: str) -> Dict[str, str]:
    """
    Parse a subscription path into its components.

    Parameters:
    - path: Subscription path string

    Returns:
    Dictionary with 'project' and 'subscription' keys
    """

@staticmethod
def parse_snapshot_path(path: str) -> Dict[str, str]:
    """
    Parse a snapshot path into its components.

    Parameters:
    - path: Snapshot path string

    Returns:
    Dictionary with 'project' and 'snapshot' keys
    """

@staticmethod
def parse_topic_path(path: str) -> Dict[str, str]:
    """
    Parse a topic path into its components.

    Parameters:
    - path: Topic path string

    Returns:
    Dictionary with 'project' and 'topic' keys
    """

Client Management

Control client lifecycle and access underlying components.

def close(self) -> None:
    """
    Close the subscriber client and stop all subscriptions.
    """

@property
def target(self) -> str:
    """
    Get the target endpoint for the client.

    Returns:
    Target endpoint URL
    """

@property
def api(self):
    """
    Get the underlying GAPIC subscriber client.

    Returns:
    GAPIC SubscriberClient instance
    """

@property
def closed(self) -> bool:
    """
    Check if the client is closed.

    Returns:
    True if client is closed
    """

@property
def open_telemetry_enabled(self) -> bool:
    """
    Check if OpenTelemetry tracing is enabled.

    Returns:
    True if OpenTelemetry is enabled
    """

Context Manager Support

Use SubscriberClient as a context manager for automatic cleanup.

def __enter__(self) -> "SubscriberClient":
    """
    Enter context manager.

    Returns:
    Self
    """

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """
    Exit context manager and close client.
    """

Streaming Pull Future

Control and monitor streaming pull operations.

class StreamingPullFuture:
    def cancel(self) -> bool:
        """
        Cancel the streaming pull operation.

        Returns:
        True if cancellation was successful
        """

    def cancelled(self) -> bool:
        """
        Check if the operation was cancelled.

        Returns:
        True if operation is cancelled
        """

    def running(self) -> bool:
        """
        Check if the operation is currently running.

        Returns:
        True if operation is running
        """

    def result(self, timeout: Optional[float] = None) -> None:
        """
        Wait for the streaming pull to complete.

        Parameters:
        - timeout: Maximum time to wait in seconds

        Raises:
        TimeoutError: If timeout is reached
        """

Usage Examples

Basic Subscription

from google.cloud import pubsub_v1

# Create subscriber client
subscriber = pubsub_v1.SubscriberClient()

def callback(message):
    print(f"Received: {message.data.decode('utf-8')}")
    print(f"Attributes: {message.attributes}")
    message.ack()

# Subscribe to messages
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

print(f"Listening for messages on {subscription_path}...")

# Keep the main thread running
try:
    streaming_pull_future.result()
except KeyboardInterrupt:
    streaming_pull_future.cancel()
    subscriber.close()

Custom Flow Control

from google.cloud.pubsub_v1 import types

# Configure flow control
flow_control = types.FlowControl(
    max_messages=100,        # Process up to 100 messages concurrently
    max_bytes=10 * 1024 * 1024,  # 10MB max outstanding bytes
    max_lease_duration=600   # 10 minute max lease duration
)

subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)

Message Processing with Error Handling

def callback(message):
    try:
        # Process the message
        data = message.data.decode('utf-8')
        print(f"Processing: {data}")
        
        # Simulate processing
        process_message(data)
        
        # Acknowledge successful processing
        message.ack()
        
    except Exception as e:
        print(f"Error processing message: {e}")
        # Negative acknowledge to retry later
        message.nack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

Context Manager Usage

from google.cloud import pubsub_v1

def callback(message):
    print(f"Received: {message.data.decode('utf-8')}")
    message.ack()

# Use subscriber as context manager
with pubsub_v1.SubscriberClient() as subscriber:
    subscription_path = subscriber.subscription_path("my-project", "my-subscription")
    
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
    
    try:
        # Listen for messages for 30 seconds
        streaming_pull_future.result(timeout=30)
    except Exception:
        streaming_pull_future.cancel()
# Client automatically closed when exiting context

Multiple Subscriptions

import concurrent.futures
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()

def callback_a(message):
    print(f"Subscription A: {message.data.decode('utf-8')}")
    message.ack()

def callback_b(message):
    print(f"Subscription B: {message.data.decode('utf-8')}")
    message.ack()

# Subscribe to multiple subscriptions
sub_a_path = subscriber.subscription_path("my-project", "subscription-a")
sub_b_path = subscriber.subscription_path("my-project", "subscription-b")

future_a = subscriber.subscribe(sub_a_path, callback=callback_a)
future_b = subscriber.subscribe(sub_b_path, callback=callback_b)

# Wait for any subscription to complete or fail
futures = [future_a, future_b]
try:
    concurrent.futures.as_completed(futures, timeout=300)
except KeyboardInterrupt:
    for future in futures:
        future.cancel()
finally:
    subscriber.close()

Message Deadline Modification

def callback(message):
    print(f"Processing: {message.data.decode('utf-8')}")
    
    # Extend deadline if processing takes longer
    message.modify_ack_deadline(60)  # Extend by 60 seconds
    
    try:
        # Long processing operation
        long_running_task(message.data)
        message.ack()
    except Exception as e:
        print(f"Processing failed: {e}")
        message.nack()

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json