or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md
tile.json

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/google-cloud-pubsub@2.31.x

To install, run

npx @tessl/cli install tessl/pypi-google-cloud-pubsub@2.31.0

index.mddocs/

Google Cloud Pub/Sub

Google Cloud Pub/Sub is a fully-managed real-time messaging service that enables reliable, many-to-many, asynchronous messaging between applications. The Python client library provides comprehensive publisher and subscriber functionality with automatic retry logic, flow control, message ordering capabilities, and authentication integration with Google Cloud identity services.

Package Information

  • Package Name: google-cloud-pubsub
  • Language: Python
  • Installation: pip install google-cloud-pubsub

Core Imports

from google.cloud import pubsub_v1

Import specific components:

from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient, types

Basic Usage

from google.cloud import pubsub_v1

# Create clients
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Publishing messages
topic_path = publisher.topic_path("my-project", "my-topic")
message_data = b"Hello, World!"
future = publisher.publish(topic_path, message_data)
message_id = future.result()

# Subscribing to messages
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    print(f"Received: {message.data.decode('utf-8')}")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

Architecture

Google Cloud Pub/Sub follows a publisher-subscriber messaging pattern with the following key components:

  • Publisher: Sends messages to topics with batching, flow control, and ordering support
  • Subscriber: Receives messages from subscriptions with automatic acknowledgment and flow control
  • Topics: Named resources to which publishers send messages
  • Subscriptions: Named resources representing a feed of messages from a specific topic
  • Messages: Data payloads with attributes and metadata
  • Schema Service: Manages message schemas for validation and evolution

The library provides both high-level clients with advanced features (batching, flow control, futures) and low-level GAPIC clients for direct API access.

Capabilities

Publisher Client

High-level client for publishing messages to Pub/Sub topics with automatic batching, flow control, message ordering, and OpenTelemetry integration.

class PublisherClient:
    def __init__(self, batch_settings=None, publisher_options=None, **kwargs): ...
    def publish(self, topic, data, ordering_key=None, **attrs): ...
    def resume_publish(self, topic, ordering_key): ...
    def stop(self): ...

Publisher Client

Subscriber Client

High-level client for subscribing to Pub/Sub subscriptions with automatic message handling, flow control, and OpenTelemetry integration.

class SubscriberClient:
    def __init__(self, flow_control=None, subscriber_options=None, **kwargs): ...
    def subscribe(self, subscription, callback, flow_control=None, scheduler=None): ...
    def close(self): ...

Subscriber Client

Schema Service Client

Client for managing Pub/Sub schemas including creation, validation, and evolution of message schemas.

class SchemaServiceClient:
    def create_schema(self, request=None, **kwargs): ...
    def get_schema(self, request=None, **kwargs): ...
    def list_schemas(self, request=None, **kwargs): ...
    def validate_schema(self, request=None, **kwargs): ...
    def validate_message(self, request=None, **kwargs): ...

Schema Service

Message Handling

Message objects and utilities for handling received messages, including acknowledgment, negative acknowledgment, and deadline modification.

class Message:
    def ack(self): ...
    def nack(self): ...
    def modify_ack_deadline(self, seconds): ...
    def ack_with_response(self): ...
    def nack_with_response(self): ...

Message Handling

Configuration Types

Comprehensive configuration options for client behavior including batching, flow control, publisher options, and subscriber options.

class BatchSettings(NamedTuple):
    max_bytes: int = 1000000
    max_latency: float = 0.01
    max_messages: int = 100

class PublisherOptions(NamedTuple):
    enable_message_ordering: bool = False
    flow_control: PublishFlowControl = PublishFlowControl()
    retry: OptionalRetry = ...
    timeout: OptionalTimeout = ...

Configuration

Protobuf Types

Complete set of protobuf message types for Pub/Sub operations including topics, subscriptions, messages, and requests/responses.

class PubsubMessage: ...
class Topic: ...
class Subscription: ...
class PublishRequest: ...
class PublishResponse: ...

Types

Exception Handling

Comprehensive exception types for handling errors in publishing and subscribing operations, including flow control errors and acknowledgment failures.

# Publisher exceptions
class PublishError(Exception): ...
class MessageTooLargeError(PublishError): ...
class PublishToPausedOrderingKeyException(PublishError): ...
class FlowControlLimitError(PublishError): ...

# Subscriber exceptions  
class AcknowledgeError(Exception): ...
class AcknowledgeStatus(Enum): ...
class TimeoutError(Exception): ...

Exception Handling

Schedulers and Utilities

Scheduler classes and utility functions for controlling message processing behavior and resource management in subscriber operations.

class ThreadScheduler:
    def __init__(self, executor: Optional[ThreadPoolExecutor] = None): ...
    def schedule(self, callback: Callable, *args, **kwargs) -> Future: ...
    def shutdown(self, wait: bool = True) -> None: ...

class StreamingPullFuture:
    def cancel(self) -> bool: ...
    def result(self, timeout: Optional[float] = None) -> None: ...

Schedulers and Utilities

Types

from typing import Callable, Optional, Union, Any, Sequence
from concurrent.futures import Future as ConcurrentFuture
from google.api_core import retry, gapic_v1

# Core type aliases
MessageCallback = Callable[[Message], Any]
OptionalRetry = Union[retry.Retry, object]
OptionalTimeout = Union[float, object]
DEFAULT = gapic_v1.method.DEFAULT

# Future type for publisher operations
class Future(ConcurrentFuture):
    def result(self, timeout: Optional[float] = None) -> str: ...
    def add_done_callback(self, callback: Callable[["Future"], None]) -> None: ...

# Scheduler type for subscriber
class ThreadScheduler:
    """Custom scheduler for message processing in subscriber."""
    pass

# Streaming pull future for subscriber operations
class StreamingPullFuture:
    def cancel(self) -> bool: ...
    def cancelled(self) -> bool: ...
    def running(self) -> bool: ...
    def result(self, timeout: Optional[float] = None) -> None: ...