CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

publisher.mddocs/

Publisher Client

The PublisherClient provides high-level functionality for publishing messages to Google Cloud Pub/Sub topics. It handles automatic batching, flow control, message ordering, retry logic, and OpenTelemetry integration.

Capabilities

Client Initialization

Create and configure a PublisherClient with custom batching and flow control settings.

class PublisherClient:
    def __init__(
        self,
        batch_settings: Optional[BatchSettings] = None,
        publisher_options: Optional[PublisherOptions] = None,
        **kwargs
    ):
        """
        Initialize the publisher client.

        Parameters:
        - batch_settings: Settings for message batching behavior
        - publisher_options: Options for publisher client behavior
        - **kwargs: Additional arguments passed to underlying GAPIC client
        """

    @classmethod
    def from_service_account_file(
        cls,
        filename: str,
        **kwargs
    ) -> "PublisherClient":
        """
        Create client from service account file.

        Parameters:
        - filename: Path to service account JSON file
        - **kwargs: Additional arguments for client initialization

        Returns:
        PublisherClient instance
        """

Message Publishing

Publish messages to topics with support for attributes, ordering keys, and futures for result handling.

def publish(
    self,
    topic: str,
    data: bytes,
    ordering_key: str = "",
    retry: OptionalRetry = DEFAULT,
    timeout: OptionalTimeout = DEFAULT,
    **attrs: Union[bytes, str]
) -> Future:
    """
    Publish a message to a topic.

    Parameters:
    - topic: Full topic path (e.g., "projects/my-project/topics/my-topic")
    - data: Message payload as bytes
    - ordering_key: Optional key for message ordering (default: "")
    - retry: Retry configuration for the publish operation
    - timeout: Timeout configuration for the publish operation
    - **attrs: Message attributes as keyword arguments (values can be bytes or str)

    Returns:
    Future that resolves to message ID string
    """

Message Ordering

Resume publishing for an ordering key after an error has occurred.

def resume_publish(self, topic: str, ordering_key: str) -> None:
    """
    Resume publishing for ordering key after error.

    Parameters:
    - topic: Full topic path
    - ordering_key: Ordering key to resume
    """

Topic Management

Create, retrieve, update, and delete topics using the underlying GAPIC client methods.

def create_topic(
    self,
    request: Optional[CreateTopicRequest] = None,
    *,
    name: Optional[str] = None,
    **kwargs
) -> Topic:
    """
    Create a new topic.

    Parameters:
    - request: The request object for creating a topic
    - name: Topic name (e.g., "projects/my-project/topics/my-topic")
    - **kwargs: Additional keyword arguments

    Returns:
    Created Topic object
    """

def get_topic(
    self,
    request: Optional[GetTopicRequest] = None,
    *,
    topic: Optional[str] = None,
    **kwargs
) -> Topic:
    """
    Get a topic.

    Parameters:
    - request: The request object for getting a topic
    - topic: Topic name to retrieve
    - **kwargs: Additional keyword arguments

    Returns:
    Topic object
    """

def list_topics(
    self,
    request: Optional[ListTopicsRequest] = None,
    *,
    project: Optional[str] = None,
    **kwargs
) -> ListTopicsResponse:
    """
    List topics in a project.

    Parameters:
    - request: The request object for listing topics
    - project: Project path (e.g., "projects/my-project")
    - **kwargs: Additional keyword arguments

    Returns:
    ListTopicsResponse with topics
    """

def list_topic_subscriptions(
    self,
    request: Optional[ListTopicSubscriptionsRequest] = None,
    *,
    topic: Optional[str] = None,
    **kwargs
) -> ListTopicSubscriptionsResponse:
    """
    List subscriptions attached to a topic.

    Parameters:
    - request: The request object for listing topic subscriptions
    - topic: Topic name
    - **kwargs: Additional keyword arguments

    Returns:
    ListTopicSubscriptionsResponse with subscription names
    """

def list_topic_snapshots(
    self,
    request: Optional[ListTopicSnapshotsRequest] = None,
    *,
    topic: Optional[str] = None,
    **kwargs
) -> ListTopicSnapshotsResponse:
    """
    List snapshots for a topic.

    Parameters:
    - request: The request object for listing topic snapshots
    - topic: Topic name
    - **kwargs: Additional keyword arguments

    Returns:
    ListTopicSnapshotsResponse with snapshot names
    """

def update_topic(
    self,
    request: Optional[UpdateTopicRequest] = None,
    *,
    topic: Optional[Topic] = None,
    update_mask: Optional[FieldMask] = None,
    **kwargs
) -> Topic:
    """
    Update a topic.

    Parameters:
    - request: The request object for updating a topic
    - topic: Updated topic configuration
    - update_mask: Fields to update
    - **kwargs: Additional keyword arguments

    Returns:
    Updated Topic object
    """

def delete_topic(
    self,
    request: Optional[DeleteTopicRequest] = None,
    *,
    topic: Optional[str] = None,
    **kwargs
) -> None:
    """
    Delete a topic.

    Parameters:
    - request: The request object for deleting a topic
    - topic: Topic name to delete
    - **kwargs: Additional keyword arguments
    """

Path Helper Methods

Utility methods for constructing and parsing resource paths.

@staticmethod
def topic_path(project: str, topic: str) -> str:
    """
    Construct a topic path from project ID and topic name.

    Parameters:
    - project: Project ID
    - topic: Topic name

    Returns:
    Full topic path string
    """

@staticmethod
def subscription_path(project: str, subscription: str) -> str:
    """
    Construct a subscription path from project ID and subscription name.

    Parameters:
    - project: Project ID
    - subscription: Subscription name

    Returns:
    Full subscription path string
    """

@staticmethod
def schema_path(project: str, schema: str) -> str:
    """
    Construct a schema path from project ID and schema name.

    Parameters:
    - project: Project ID
    - schema: Schema name

    Returns:
    Full schema path string
    """

@staticmethod
def parse_topic_path(path: str) -> Dict[str, str]:
    """
    Parse a topic path into its components.

    Parameters:
    - path: Topic path string

    Returns:
    Dictionary with 'project' and 'topic' keys
    """

@staticmethod
def parse_subscription_path(path: str) -> Dict[str, str]:
    """
    Parse a subscription path into its components.

    Parameters:
    - path: Subscription path string

    Returns:
    Dictionary with 'project' and 'subscription' keys
    """

@staticmethod
def parse_schema_path(path: str) -> Dict[str, str]:
    """
    Parse a schema path into its components.

    Parameters:
    - path: Schema path string

    Returns:
    Dictionary with 'project' and 'schema' keys
    """

Client Management

Control client lifecycle and access underlying components.

def stop(self) -> None:
    """
    Stop the publisher client and wait for all batches to complete.
    """

@property
def target(self) -> str:
    """
    Get the target endpoint for the client.

    Returns:
    Target endpoint URL
    """

@property
def api(self):
    """
    Get the underlying GAPIC publisher client.

    Returns:
    GAPIC PublisherClient instance
    """

@property
def open_telemetry_enabled(self) -> bool:
    """
    Check if OpenTelemetry tracing is enabled.

    Returns:
    True if OpenTelemetry is enabled
    """

Future Objects

Publisher operations return Future objects for asynchronous result handling.

class Future:
    def result(self, timeout: Optional[float] = None) -> str:
        """
        Get the message ID or raise an exception.

        Parameters:
        - timeout: Number of seconds to wait before timeout

        Returns:
        Message ID string

        Raises:
        TimeoutError: If operation times out
        Exception: For other errors in publish operation
        """

    def add_done_callback(
        self, 
        callback: Callable[["Future"], None]
    ) -> None:
        """
        Add callback to be called when future completes.

        Parameters:
        - callback: Function to call with future as argument
        """

    def cancel(self) -> bool:
        """
        Attempt to cancel the operation.

        Returns:
        Always False (Pub/Sub operations cannot be cancelled)
        """

    def cancelled(self) -> bool:
        """
        Check if operation was cancelled.

        Returns:
        Always False (Pub/Sub operations cannot be cancelled)
        """

Usage Examples

Basic Publishing

from google.cloud import pubsub_v1

# Create publisher client
publisher = pubsub_v1.PublisherClient()

# Publish a message
topic_path = publisher.topic_path("my-project", "my-topic")
message_data = b"Hello, World!"
future = publisher.publish(topic_path, message_data)

# Get message ID
try:
    message_id = future.result(timeout=30)
    print(f"Published message with ID: {message_id}")
except Exception as e:
    print(f"Failed to publish: {e}")

Publishing with Attributes

# Publish message with attributes
future = publisher.publish(
    topic_path,
    b"Message with metadata",
    event_type="user_action",
    user_id="12345",
    timestamp="2024-01-01T00:00:00Z"
)

Message Ordering

from google.cloud.pubsub_v1 import types

# Configure for message ordering
publisher_options = types.PublisherOptions(
    enable_message_ordering=True
)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

# Publish ordered messages
for i in range(5):
    future = publisher.publish(
        topic_path,
        f"Message {i}".encode(),
        ordering_key="user-123"
    )

Custom Batching

# Configure custom batching
batch_settings = types.BatchSettings(
    max_bytes=500000,  # 500KB
    max_latency=0.05,  # 50ms
    max_messages=50
)

publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)

Callback Handling

def publish_callback(future):
    try:
        message_id = future.result()
        print(f"Published: {message_id}")
    except Exception as e:
        print(f"Publish failed: {e}")

# Publish with callback
future = publisher.publish(topic_path, b"Async message")
future.add_done_callback(publish_callback)

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json