Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
Google Cloud Pub/Sub provides scheduler classes and utility functions to control message processing behavior and resource management in subscriber operations.
Custom scheduler for controlling how messages are processed in subscriber operations.
class ThreadScheduler:
"""
A thread pool-based scheduler for processing subscriber messages.
This scheduler manages the execution of message callbacks using a
configurable thread pool, allowing control over concurrency and
resource usage in message processing.
"""
def __init__(self, executor: Optional[ThreadPoolExecutor] = None):
"""
Initialize the thread scheduler.
Parameters:
- executor: Optional pre-configured ThreadPoolExecutor.
If not provided, a default executor will be created.
"""
def schedule(self, callback: Callable, *args, **kwargs) -> Future:
"""
Schedule a callback function for execution.
Parameters:
- callback: Function to execute
- *args: Positional arguments for the callback
- **kwargs: Keyword arguments for the callback
Returns:
Future representing the scheduled execution
"""
def shutdown(self, wait: bool = True) -> None:
"""
Shutdown the scheduler and stop accepting new tasks.
Parameters:
- wait: Whether to wait for currently executing tasks to complete
"""
@property
def executor(self) -> ThreadPoolExecutor:
"""
Get the underlying thread pool executor.
Returns:
ThreadPoolExecutor instance used by this scheduler
"""Future objects for handling asynchronous operations in publisher and subscriber.
class StreamingPullFuture:
"""
Future object for managing streaming pull operations.
This future controls the lifecycle of a streaming pull subscription
and allows monitoring and cancellation of the operation.
"""
def cancel(self) -> bool:
"""
Cancel the streaming pull operation.
Stops the streaming pull and releases associated resources.
Returns:
True if the operation was successfully cancelled
"""
def cancelled(self) -> bool:
"""
Check if the streaming pull operation was cancelled.
Returns:
True if the operation is cancelled
"""
def running(self) -> bool:
"""
Check if the streaming pull operation is currently running.
Returns:
True if the operation is active
"""
def result(self, timeout: Optional[float] = None) -> None:
"""
Wait for the streaming pull operation to complete.
This method blocks until the streaming pull stops due to
cancellation, error, or other termination condition.
Parameters:
- timeout: Maximum time to wait in seconds
Raises:
TimeoutError: If the timeout is exceeded
"""
def add_done_callback(self, callback: Callable[["StreamingPullFuture"], None]) -> None:
"""
Add a callback to be executed when the future completes.
Parameters:
- callback: Function to call when the future is done
"""
class PublisherFuture:
"""
Future object for publisher operations that return message IDs.
This future represents the result of a publish operation and
resolves to the server-assigned message ID.
"""
def result(self, timeout: Optional[float] = None) -> str:
"""
Get the message ID from the publish operation.
Parameters:
- timeout: Maximum time to wait in seconds
Returns:
Server-assigned message ID
Raises:
TimeoutError: If the timeout is exceeded
PublishError: If the publish operation failed
"""
def add_done_callback(self, callback: Callable[["PublisherFuture"], None]) -> None:
"""
Add a callback to be executed when the future completes.
Parameters:
- callback: Function to call with the future as argument
"""
def cancel(self) -> bool:
"""
Attempt to cancel the publish operation.
Returns:
Always False (Pub/Sub publish operations cannot be cancelled)
"""
def cancelled(self) -> bool:
"""
Check if the publish operation was cancelled.
Returns:
Always False (Pub/Sub publish operations cannot be cancelled)
"""
class AcknowledgeFuture:
"""
Future object for acknowledgment operations in exactly-once delivery.
This future represents the result of an ack/nack operation and
resolves to an AcknowledgeStatus indicating the result.
"""
def result(self, timeout: Optional[float] = None) -> AcknowledgeStatus:
"""
Get the acknowledgment status.
Parameters:
- timeout: Maximum time to wait in seconds
Returns:
AcknowledgeStatus indicating the result
Raises:
TimeoutError: If the timeout is exceeded
AcknowledgeError: If the acknowledgment operation failed
"""
def add_done_callback(self, callback: Callable[["AcknowledgeFuture"], None]) -> None:
"""
Add a callback to be executed when the future completes.
Parameters:
- callback: Function to call with the future as argument
"""Helper functions for working with Pub/Sub resources and operations.
def common_project_path(project: str) -> str:
"""
Construct a project path string.
Parameters:
- project: Project ID
Returns:
Project path in the format "projects/{project}"
"""
def common_location_path(project: str, location: str) -> str:
"""
Construct a location path string.
Parameters:
- project: Project ID
- location: Location/region name
Returns:
Location path in the format "projects/{project}/locations/{location}"
"""import concurrent.futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
# Create custom thread pool executor
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=20,
thread_name_prefix="pubsub-callback"
)
# Create scheduler with custom executor
scheduler = ThreadScheduler(executor=executor)
# Create subscriber with custom scheduler
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def message_callback(message):
print(f"Processing message: {message.message_id}")
# Simulate processing work
time.sleep(1)
message.ack()
# Use custom scheduler in subscription
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=message_callback,
scheduler=scheduler
)
try:
# Let it run for 60 seconds
streaming_pull_future.result(timeout=60)
except KeyboardInterrupt:
streaming_pull_future.cancel()
finally:
# Shutdown scheduler
scheduler.shutdown(wait=True)from google.cloud import pubsub_v1
import threading
import time
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
print(f"Received: {message.data.decode('utf-8')}")
message.ack()
# Start streaming pull
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Streaming pull running: {streaming_pull_future.running()}")
print(f"Streaming pull cancelled: {streaming_pull_future.cancelled()}")
# Add completion callback
def on_done(future):
print("Streaming pull completed")
print(f"Was cancelled: {future.cancelled()}")
streaming_pull_future.add_done_callback(on_done)
# Run in background thread to allow monitoring
def monitor_future():
try:
# Wait for streaming pull to complete
streaming_pull_future.result()
except Exception as e:
print(f"Streaming pull error: {e}")
monitor_thread = threading.Thread(target=monitor_future)
monitor_thread.start()
# Let it run for 30 seconds, then cancel
time.sleep(30)
print("Cancelling streaming pull...")
streaming_pull_future.cancel()
# Wait for monitor thread to complete
monitor_thread.join()from google.cloud import pubsub_v1
import concurrent.futures
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
# Publish multiple messages and collect futures
futures = []
for i in range(10):
future = publisher.publish(topic_path, f"Message {i}".encode())
futures.append(future)
# Add callbacks to futures
def on_publish_complete(future):
try:
message_id = future.result()
print(f"Published successfully: {message_id}")
except Exception as e:
print(f"Publish failed: {e}")
for future in futures:
future.add_done_callback(on_publish_complete)
# Wait for all futures to complete
try:
# Use concurrent.futures.as_completed for efficient waiting
for future in concurrent.futures.as_completed(futures, timeout=30):
message_id = future.result()
print(f"Completed: {message_id}")
except concurrent.futures.TimeoutError:
print("Some publishes did not complete within timeout")from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
try:
# Process the message
result = process_message(message.data)
# Acknowledge with response for exactly-once delivery
ack_future = message.ack_with_response()
# Add callback to handle ack result
def on_ack_complete(ack_future):
try:
ack_status = ack_future.result()
if ack_status == AcknowledgeStatus.SUCCESS:
print(f"Message {message.message_id} acknowledged successfully")
else:
print(f"Ack failed with status: {ack_status}")
# Handle failed acknowledgment
except Exception as e:
print(f"Ack operation failed: {e}")
ack_future.add_done_callback(on_ack_complete)
except Exception as e:
print(f"Message processing failed: {e}")
# Nack with response
nack_future = message.nack_with_response()
nack_future.add_done_callback(
lambda f: print(f"Nack status: {f.result()}")
)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
# Using path construction utilities
project_id = "my-project"
topic_name = "my-topic"
subscription_name = "my-subscription"
# Construct paths
topic_path = PublisherClient.topic_path(project_id, topic_name)
subscription_path = SubscriberClient.subscription_path(project_id, subscription_name)
project_path = PublisherClient.common_project_path(project_id)
print(f"Topic path: {topic_path}")
print(f"Subscription path: {subscription_path}")
print(f"Project path: {project_path}")
# Parse paths back to components
topic_components = PublisherClient.parse_topic_path(topic_path)
print(f"Topic components: {topic_components}")
# Output: {'project': 'my-project', 'topic': 'my-topic'}
subscription_components = SubscriberClient.parse_subscription_path(subscription_path)
print(f"Subscription components: {subscription_components}")
# Output: {'project': 'my-project', 'subscription': 'my-subscription'}import concurrent.futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
# Configure different thread pools for different workloads
def create_scheduler_for_workload(workload_type: str) -> ThreadScheduler:
if workload_type == "io_intensive":
# More threads for I/O bound work
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=50,
thread_name_prefix=f"pubsub-io"
)
elif workload_type == "cpu_intensive":
# Limit threads for CPU bound work
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=4, # Match CPU cores
thread_name_prefix=f"pubsub-cpu"
)
else:
# Default configuration
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=10,
thread_name_prefix=f"pubsub-default"
)
return ThreadScheduler(executor=executor)
# Use different schedulers for different subscriptions
io_scheduler = create_scheduler_for_workload("io_intensive")
cpu_scheduler = create_scheduler_for_workload("cpu_intensive")
subscriber = pubsub_v1.SubscriberClient()
# I/O intensive subscription (e.g., API calls, database operations)
io_subscription = subscriber.subscription_path("my-project", "io-intensive-sub")
io_future = subscriber.subscribe(
io_subscription,
callback=io_intensive_callback,
scheduler=io_scheduler
)
# CPU intensive subscription (e.g., data processing, calculations)
cpu_subscription = subscriber.subscription_path("my-project", "cpu-intensive-sub")
cpu_future = subscriber.subscribe(
cpu_subscription,
callback=cpu_intensive_callback,
scheduler=cpu_scheduler
)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub