Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
Google Cloud Pub/Sub provides comprehensive configuration options for controlling client behavior, batching, flow control, retry logic, and timeouts. These configuration types enable fine-tuning for different use cases and performance requirements.
Configure message batching behavior for the publisher client.
class BatchSettings(NamedTuple):
"""
Settings for batch publishing messages.
Attributes:
- max_bytes: Maximum total size of messages in a batch (default: 1MB)
- max_latency: Maximum time to wait before publishing batch (default: 0.01s)
- max_messages: Maximum number of messages in a batch (default: 100)
"""
max_bytes: int = 1000000
"""
Maximum total size of messages to collect before automatically
publishing the batch, including byte size overhead of the publish
request. Maximum server-side limit is 10,000,000 bytes.
"""
max_latency: float = 0.01
"""
Maximum number of seconds to wait for additional messages before
automatically publishing the batch.
"""
max_messages: int = 100
"""
Maximum number of messages to collect before automatically
publishing the batch.
"""Configure publisher client behavior including message ordering, flow control, and OpenTelemetry.
class PublisherOptions(NamedTuple):
"""
Options for the publisher client.
Attributes:
- enable_message_ordering: Whether to order messages by ordering key
- flow_control: Flow control settings for message publishing
- retry: Retry settings for publish operations
- timeout: Timeout settings for publish operations
- enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing
"""
enable_message_ordering: bool = False
"""
Whether to order messages in a batch by a supplied ordering key.
"""
flow_control: PublishFlowControl = PublishFlowControl()
"""
Flow control settings for message publishing by the client.
By default the publisher client does not do any throttling.
"""
retry: OptionalRetry = ...
"""
Retry settings for message publishing by the client.
Should be an instance of google.api_core.retry.Retry.
"""
timeout: OptionalTimeout = ConstantTimeout(60)
"""
Timeout settings for message publishing by the client.
Should be compatible with pubsub_v1.types.TimeoutType.
"""
enable_open_telemetry_tracing: bool = False
"""
Whether to enable OpenTelemetry tracing for publish operations.
"""Configure flow control limits for the publisher client to prevent resource exhaustion.
class PublishFlowControl(NamedTuple):
"""
Client flow control settings for message publishing.
Attributes:
- message_limit: Maximum number of messages awaiting publication
- byte_limit: Maximum total size of messages awaiting publication
- limit_exceeded_behavior: Action when flow control limits are exceeded
"""
message_limit: int = 1000
"""
Maximum number of messages awaiting to be published.
"""
byte_limit: int = 10000000
"""
Maximum total size of messages awaiting to be published.
"""
limit_exceeded_behavior: LimitExceededBehavior = LimitExceededBehavior.IGNORE
"""
Action to take when publish flow control limits are exceeded.
"""Enum defining actions when flow control limits are exceeded.
class LimitExceededBehavior(str, Enum):
"""
Possible actions when exceeding publish flow control limits.
"""
IGNORE = "ignore"
"""
Ignore flow control limits and continue publishing.
"""
BLOCK = "block"
"""
Block publishing until flow control limits are no longer exceeded.
"""
ERROR = "error"
"""
Raise an error when flow control limits are exceeded.
"""Configure flow control limits for the subscriber client to manage message processing load.
class FlowControl(NamedTuple):
"""
Settings for controlling the rate at which messages are pulled
with an asynchronous subscription.
Attributes:
- max_bytes: Maximum total size of outstanding messages
- max_messages: Maximum number of outstanding messages
- max_lease_duration: Maximum time to hold a lease on a message
- min_duration_per_lease_extension: Minimum lease extension duration
- max_duration_per_lease_extension: Maximum lease extension duration
"""
max_bytes: int = 104857600 # 100 MiB
"""
Maximum total size of received - but not yet processed - messages
before pausing the message stream.
"""
max_messages: int = 1000
"""
Maximum number of received - but not yet processed - messages
before pausing the message stream.
"""
max_lease_duration: float = 3600 # 1 hour
"""
Maximum amount of time in seconds to hold a lease on a message
before dropping it from lease management.
"""
min_duration_per_lease_extension: float = 0
"""
Minimum amount of time in seconds for a single lease extension attempt.
Must be between 10 and 600 seconds (inclusive). Ignored by default,
but set to 60 seconds if subscription has exactly-once delivery enabled.
"""
max_duration_per_lease_extension: float = 0
"""
Maximum amount of time in seconds for a single lease extension attempt.
Bounds the delay before message redelivery if subscriber fails to extend
the deadline. Must be between 10 and 600 seconds (inclusive).
Ignored if set to 0.
"""Configure subscriber client behavior including OpenTelemetry tracing.
class SubscriberOptions(NamedTuple):
"""
Options for the subscriber client.
Attributes:
- enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing
"""
enable_open_telemetry_tracing: bool = False
"""
Whether to enable OpenTelemetry tracing for subscriber operations.
"""from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# Configure custom batching
batch_settings = types.BatchSettings(
max_bytes=500000, # 500KB batches
max_latency=0.05, # 50ms max wait
max_messages=50 # Max 50 messages per batch
)
# Create publisher with custom batching
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# Configure publisher for message ordering
publisher_options = types.PublisherOptions(
enable_message_ordering=True,
enable_open_telemetry_tracing=True
)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
# Publish ordered messages
topic_path = publisher.topic_path("my-project", "my-topic")
for i in range(10):
future = publisher.publish(
topic_path,
f"Message {i}".encode(),
ordering_key="user-123"
)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# Configure publisher flow control
flow_control = types.PublishFlowControl(
message_limit=500, # Max 500 pending messages
byte_limit=5000000, # Max 5MB pending bytes
limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK
)
publisher_options = types.PublisherOptions(
flow_control=flow_control
)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# Configure subscriber flow control
flow_control = types.FlowControl(
max_messages=50, # Process max 50 messages concurrently
max_bytes=50 * 1024 * 1024, # Max 50MB outstanding
max_lease_duration=300, # 5 minute max lease
min_duration_per_lease_extension=60, # Min 60s lease extensions
max_duration_per_lease_extension=120 # Max 120s lease extensions
)
subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.api_core import retry
from google.api_core.timeout import ConstantTimeout
# Configure custom retry policy
custom_retry = retry.Retry(
initial=1.0, # Initial delay
maximum=60.0, # Maximum delay
multiplier=2.0, # Backoff multiplier
deadline=300.0 # Total deadline
)
# Configure custom timeout
custom_timeout = ConstantTimeout(120)
publisher_options = types.PublisherOptions(
retry=custom_retry,
timeout=custom_timeout
)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# High-throughput publisher configuration
batch_settings = types.BatchSettings(
max_bytes=5000000, # 5MB batches
max_latency=0.1, # 100ms max wait
max_messages=1000 # Large batches
)
flow_control = types.PublishFlowControl(
message_limit=10000, # High message limit
byte_limit=100000000, # 100MB limit
limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK
)
publisher_options = types.PublisherOptions(
flow_control=flow_control
)
publisher = pubsub_v1.PublisherClient(
batch_settings=batch_settings,
publisher_options=publisher_options
)
# High-throughput subscriber configuration
subscriber_flow_control = types.FlowControl(
max_messages=1000, # High concurrency
max_bytes=200 * 1024 * 1024, # 200MB outstanding
max_lease_duration=1800 # 30 minute max lease
)
subscriber = pubsub_v1.SubscriberClient(flow_control=subscriber_flow_control)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# Low-latency publisher configuration
batch_settings = types.BatchSettings(
max_bytes=100000, # Small batches for low latency
max_latency=0.001, # 1ms max wait
max_messages=10 # Small batches
)
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
# Low-latency subscriber configuration
flow_control = types.FlowControl(
max_messages=10, # Low concurrency for predictable latency
max_bytes=1024 * 1024, # 1MB outstanding
max_lease_duration=60 # Short lease duration
)
subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub