CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration

Google Cloud Pub/Sub provides comprehensive configuration options for controlling client behavior, batching, flow control, retry logic, and timeouts. These configuration types enable fine-tuning for different use cases and performance requirements.

Capabilities

Batch Settings

Configure message batching behavior for the publisher client.

class BatchSettings(NamedTuple):
    """
    Settings for batch publishing messages.

    Attributes:
    - max_bytes: Maximum total size of messages in a batch (default: 1MB)
    - max_latency: Maximum time to wait before publishing batch (default: 0.01s)
    - max_messages: Maximum number of messages in a batch (default: 100)
    """
    
    max_bytes: int = 1000000
    """
    Maximum total size of messages to collect before automatically
    publishing the batch, including byte size overhead of the publish
    request. Maximum server-side limit is 10,000,000 bytes.
    """
    
    max_latency: float = 0.01
    """
    Maximum number of seconds to wait for additional messages before
    automatically publishing the batch.
    """
    
    max_messages: int = 100
    """
    Maximum number of messages to collect before automatically
    publishing the batch.
    """

Publisher Options

Configure publisher client behavior including message ordering, flow control, and OpenTelemetry.

class PublisherOptions(NamedTuple):
    """
    Options for the publisher client.

    Attributes:
    - enable_message_ordering: Whether to order messages by ordering key
    - flow_control: Flow control settings for message publishing
    - retry: Retry settings for publish operations
    - timeout: Timeout settings for publish operations
    - enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing
    """
    
    enable_message_ordering: bool = False
    """
    Whether to order messages in a batch by a supplied ordering key.
    """
    
    flow_control: PublishFlowControl = PublishFlowControl()
    """
    Flow control settings for message publishing by the client.
    By default the publisher client does not do any throttling.
    """
    
    retry: OptionalRetry = ...
    """
    Retry settings for message publishing by the client.
    Should be an instance of google.api_core.retry.Retry.
    """
    
    timeout: OptionalTimeout = ConstantTimeout(60)
    """
    Timeout settings for message publishing by the client.
    Should be compatible with pubsub_v1.types.TimeoutType.
    """
    
    enable_open_telemetry_tracing: bool = False
    """
    Whether to enable OpenTelemetry tracing for publish operations.
    """

Publish Flow Control

Configure flow control limits for the publisher client to prevent resource exhaustion.

class PublishFlowControl(NamedTuple):
    """
    Client flow control settings for message publishing.

    Attributes:
    - message_limit: Maximum number of messages awaiting publication
    - byte_limit: Maximum total size of messages awaiting publication
    - limit_exceeded_behavior: Action when flow control limits are exceeded
    """
    
    message_limit: int = 1000
    """
    Maximum number of messages awaiting to be published.
    """
    
    byte_limit: int = 10000000
    """
    Maximum total size of messages awaiting to be published.
    """
    
    limit_exceeded_behavior: LimitExceededBehavior = LimitExceededBehavior.IGNORE
    """
    Action to take when publish flow control limits are exceeded.
    """

Limit Exceeded Behavior

Enum defining actions when flow control limits are exceeded.

class LimitExceededBehavior(str, Enum):
    """
    Possible actions when exceeding publish flow control limits.
    """
    
    IGNORE = "ignore"
    """
    Ignore flow control limits and continue publishing.
    """
    
    BLOCK = "block"
    """
    Block publishing until flow control limits are no longer exceeded.
    """
    
    ERROR = "error"
    """
    Raise an error when flow control limits are exceeded.
    """

Subscriber Flow Control

Configure flow control limits for the subscriber client to manage message processing load.

class FlowControl(NamedTuple):
    """
    Settings for controlling the rate at which messages are pulled
    with an asynchronous subscription.

    Attributes:
    - max_bytes: Maximum total size of outstanding messages
    - max_messages: Maximum number of outstanding messages
    - max_lease_duration: Maximum time to hold a lease on a message
    - min_duration_per_lease_extension: Minimum lease extension duration
    - max_duration_per_lease_extension: Maximum lease extension duration
    """
    
    max_bytes: int = 104857600  # 100 MiB
    """
    Maximum total size of received - but not yet processed - messages
    before pausing the message stream.
    """
    
    max_messages: int = 1000
    """
    Maximum number of received - but not yet processed - messages
    before pausing the message stream.
    """
    
    max_lease_duration: float = 3600  # 1 hour
    """
    Maximum amount of time in seconds to hold a lease on a message
    before dropping it from lease management.
    """
    
    min_duration_per_lease_extension: float = 0
    """
    Minimum amount of time in seconds for a single lease extension attempt.
    Must be between 10 and 600 seconds (inclusive). Ignored by default,
    but set to 60 seconds if subscription has exactly-once delivery enabled.
    """
    
    max_duration_per_lease_extension: float = 0
    """
    Maximum amount of time in seconds for a single lease extension attempt.
    Bounds the delay before message redelivery if subscriber fails to extend
    the deadline. Must be between 10 and 600 seconds (inclusive).
    Ignored if set to 0.
    """

Subscriber Options

Configure subscriber client behavior including OpenTelemetry tracing.

class SubscriberOptions(NamedTuple):
    """
    Options for the subscriber client.

    Attributes:
    - enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing
    """
    
    enable_open_telemetry_tracing: bool = False
    """
    Whether to enable OpenTelemetry tracing for subscriber operations.
    """

Usage Examples

Custom Batch Settings

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# Configure custom batching
batch_settings = types.BatchSettings(
    max_bytes=500000,      # 500KB batches
    max_latency=0.05,      # 50ms max wait
    max_messages=50        # Max 50 messages per batch
)

# Create publisher with custom batching
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)

Publisher with Message Ordering

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# Configure publisher for message ordering
publisher_options = types.PublisherOptions(
    enable_message_ordering=True,
    enable_open_telemetry_tracing=True
)

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

# Publish ordered messages
topic_path = publisher.topic_path("my-project", "my-topic")
for i in range(10):
    future = publisher.publish(
        topic_path,
        f"Message {i}".encode(),
        ordering_key="user-123"
    )

Publisher Flow Control

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# Configure publisher flow control
flow_control = types.PublishFlowControl(
    message_limit=500,      # Max 500 pending messages
    byte_limit=5000000,     # Max 5MB pending bytes
    limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK
)

publisher_options = types.PublisherOptions(
    flow_control=flow_control
)

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

Subscriber Flow Control

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# Configure subscriber flow control
flow_control = types.FlowControl(
    max_messages=50,           # Process max 50 messages concurrently
    max_bytes=50 * 1024 * 1024,  # Max 50MB outstanding
    max_lease_duration=300,    # 5 minute max lease
    min_duration_per_lease_extension=60,  # Min 60s lease extensions
    max_duration_per_lease_extension=120  # Max 120s lease extensions
)

subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)

Retry and Timeout Configuration

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.api_core import retry
from google.api_core.timeout import ConstantTimeout

# Configure custom retry policy
custom_retry = retry.Retry(
    initial=1.0,           # Initial delay
    maximum=60.0,          # Maximum delay
    multiplier=2.0,        # Backoff multiplier
    deadline=300.0         # Total deadline
)

# Configure custom timeout
custom_timeout = ConstantTimeout(120)

publisher_options = types.PublisherOptions(
    retry=custom_retry,
    timeout=custom_timeout
)

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

High-Throughput Configuration

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# High-throughput publisher configuration
batch_settings = types.BatchSettings(
    max_bytes=5000000,     # 5MB batches
    max_latency=0.1,       # 100ms max wait
    max_messages=1000      # Large batches
)

flow_control = types.PublishFlowControl(
    message_limit=10000,   # High message limit
    byte_limit=100000000,  # 100MB limit
    limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK
)

publisher_options = types.PublisherOptions(
    flow_control=flow_control
)

publisher = pubsub_v1.PublisherClient(
    batch_settings=batch_settings,
    publisher_options=publisher_options
)

# High-throughput subscriber configuration
subscriber_flow_control = types.FlowControl(
    max_messages=1000,              # High concurrency
    max_bytes=200 * 1024 * 1024,   # 200MB outstanding
    max_lease_duration=1800         # 30 minute max lease
)

subscriber = pubsub_v1.SubscriberClient(flow_control=subscriber_flow_control)

Low-Latency Configuration

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# Low-latency publisher configuration
batch_settings = types.BatchSettings(
    max_bytes=100000,      # Small batches for low latency
    max_latency=0.001,     # 1ms max wait
    max_messages=10        # Small batches
)

publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)

# Low-latency subscriber configuration
flow_control = types.FlowControl(
    max_messages=10,       # Low concurrency for predictable latency
    max_bytes=1024 * 1024, # 1MB outstanding
    max_lease_duration=60  # Short lease duration
)

subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json