CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

schedulers.mddocs/

Schedulers and Utilities

Google Cloud Pub/Sub provides scheduler classes and utility functions to control message processing behavior and resource management in subscriber operations.

Capabilities

Thread Scheduler

Custom scheduler for controlling how messages are processed in subscriber operations.

class ThreadScheduler:
    """
    A thread pool-based scheduler for processing subscriber messages.
    
    This scheduler manages the execution of message callbacks using a
    configurable thread pool, allowing control over concurrency and
    resource usage in message processing.
    """
    
    def __init__(self, executor: Optional[ThreadPoolExecutor] = None):
        """
        Initialize the thread scheduler.
        
        Parameters:
        - executor: Optional pre-configured ThreadPoolExecutor.
                   If not provided, a default executor will be created.
        """
    
    def schedule(self, callback: Callable, *args, **kwargs) -> Future:
        """
        Schedule a callback function for execution.
        
        Parameters:
        - callback: Function to execute
        - *args: Positional arguments for the callback
        - **kwargs: Keyword arguments for the callback
        
        Returns:
        Future representing the scheduled execution
        """
    
    def shutdown(self, wait: bool = True) -> None:
        """
        Shutdown the scheduler and stop accepting new tasks.
        
        Parameters:
        - wait: Whether to wait for currently executing tasks to complete
        """
    
    @property
    def executor(self) -> ThreadPoolExecutor:
        """
        Get the underlying thread pool executor.
        
        Returns:
        ThreadPoolExecutor instance used by this scheduler
        """

Future Classes

Future objects for handling asynchronous operations in publisher and subscriber.

class StreamingPullFuture:
    """
    Future object for managing streaming pull operations.
    
    This future controls the lifecycle of a streaming pull subscription
    and allows monitoring and cancellation of the operation.
    """
    
    def cancel(self) -> bool:
        """
        Cancel the streaming pull operation.
        
        Stops the streaming pull and releases associated resources.
        
        Returns:
        True if the operation was successfully cancelled
        """
    
    def cancelled(self) -> bool:
        """
        Check if the streaming pull operation was cancelled.
        
        Returns:
        True if the operation is cancelled
        """
    
    def running(self) -> bool:
        """
        Check if the streaming pull operation is currently running.
        
        Returns:
        True if the operation is active
        """
    
    def result(self, timeout: Optional[float] = None) -> None:
        """
        Wait for the streaming pull operation to complete.
        
        This method blocks until the streaming pull stops due to
        cancellation, error, or other termination condition.
        
        Parameters:
        - timeout: Maximum time to wait in seconds
        
        Raises:
        TimeoutError: If the timeout is exceeded
        """
    
    def add_done_callback(self, callback: Callable[["StreamingPullFuture"], None]) -> None:
        """
        Add a callback to be executed when the future completes.
        
        Parameters:
        - callback: Function to call when the future is done
        """

class PublisherFuture:
    """
    Future object for publisher operations that return message IDs.
    
    This future represents the result of a publish operation and
    resolves to the server-assigned message ID.
    """
    
    def result(self, timeout: Optional[float] = None) -> str:
        """
        Get the message ID from the publish operation.
        
        Parameters:
        - timeout: Maximum time to wait in seconds
        
        Returns:
        Server-assigned message ID
        
        Raises:
        TimeoutError: If the timeout is exceeded
        PublishError: If the publish operation failed
        """
    
    def add_done_callback(self, callback: Callable[["PublisherFuture"], None]) -> None:
        """
        Add a callback to be executed when the future completes.
        
        Parameters:
        - callback: Function to call with the future as argument
        """
    
    def cancel(self) -> bool:
        """
        Attempt to cancel the publish operation.
        
        Returns:
        Always False (Pub/Sub publish operations cannot be cancelled)
        """
    
    def cancelled(self) -> bool:
        """
        Check if the publish operation was cancelled.
        
        Returns:
        Always False (Pub/Sub publish operations cannot be cancelled)
        """

class AcknowledgeFuture:
    """
    Future object for acknowledgment operations in exactly-once delivery.
    
    This future represents the result of an ack/nack operation and
    resolves to an AcknowledgeStatus indicating the result.
    """
    
    def result(self, timeout: Optional[float] = None) -> AcknowledgeStatus:
        """
        Get the acknowledgment status.
        
        Parameters:
        - timeout: Maximum time to wait in seconds
        
        Returns:
        AcknowledgeStatus indicating the result
        
        Raises:
        TimeoutError: If the timeout is exceeded
        AcknowledgeError: If the acknowledgment operation failed
        """
    
    def add_done_callback(self, callback: Callable[["AcknowledgeFuture"], None]) -> None:
        """
        Add a callback to be executed when the future completes.
        
        Parameters:
        - callback: Function to call with the future as argument
        """

Utility Functions

Helper functions for working with Pub/Sub resources and operations.

def common_project_path(project: str) -> str:
    """
    Construct a project path string.
    
    Parameters:
    - project: Project ID
    
    Returns:
    Project path in the format "projects/{project}"
    """

def common_location_path(project: str, location: str) -> str:
    """
    Construct a location path string.
    
    Parameters:
    - project: Project ID
    - location: Location/region name
    
    Returns:
    Location path in the format "projects/{project}/locations/{location}"
    """

Usage Examples

Custom Thread Scheduler

import concurrent.futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

# Create custom thread pool executor
executor = concurrent.futures.ThreadPoolExecutor(
    max_workers=20,
    thread_name_prefix="pubsub-callback"
)

# Create scheduler with custom executor
scheduler = ThreadScheduler(executor=executor)

# Create subscriber with custom scheduler
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def message_callback(message):
    print(f"Processing message: {message.message_id}")
    # Simulate processing work
    time.sleep(1)
    message.ack()

# Use custom scheduler in subscription
streaming_pull_future = subscriber.subscribe(
    subscription_path,
    callback=message_callback,
    scheduler=scheduler
)

try:
    # Let it run for 60 seconds
    streaming_pull_future.result(timeout=60)
except KeyboardInterrupt:
    streaming_pull_future.cancel()
finally:
    # Shutdown scheduler
    scheduler.shutdown(wait=True)

Managing Streaming Pull Future

from google.cloud import pubsub_v1
import threading
import time

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    print(f"Received: {message.data.decode('utf-8')}")
    message.ack()

# Start streaming pull
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

print(f"Streaming pull running: {streaming_pull_future.running()}")
print(f"Streaming pull cancelled: {streaming_pull_future.cancelled()}")

# Add completion callback
def on_done(future):
    print("Streaming pull completed")
    print(f"Was cancelled: {future.cancelled()}")

streaming_pull_future.add_done_callback(on_done)

# Run in background thread to allow monitoring
def monitor_future():
    try:
        # Wait for streaming pull to complete
        streaming_pull_future.result()
    except Exception as e:
        print(f"Streaming pull error: {e}")

monitor_thread = threading.Thread(target=monitor_future)
monitor_thread.start()

# Let it run for 30 seconds, then cancel
time.sleep(30)
print("Cancelling streaming pull...")
streaming_pull_future.cancel()

# Wait for monitor thread to complete
monitor_thread.join()

Publisher Future Handling

from google.cloud import pubsub_v1
import concurrent.futures

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

# Publish multiple messages and collect futures
futures = []
for i in range(10):
    future = publisher.publish(topic_path, f"Message {i}".encode())
    futures.append(future)

# Add callbacks to futures
def on_publish_complete(future):
    try:
        message_id = future.result()
        print(f"Published successfully: {message_id}")
    except Exception as e:
        print(f"Publish failed: {e}")

for future in futures:
    future.add_done_callback(on_publish_complete)

# Wait for all futures to complete
try:
    # Use concurrent.futures.as_completed for efficient waiting
    for future in concurrent.futures.as_completed(futures, timeout=30):
        message_id = future.result()
        print(f"Completed: {message_id}")
        
except concurrent.futures.TimeoutError:
    print("Some publishes did not complete within timeout")

Exactly-Once Delivery with Acknowledge Futures

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    try:
        # Process the message
        result = process_message(message.data)
        
        # Acknowledge with response for exactly-once delivery
        ack_future = message.ack_with_response()
        
        # Add callback to handle ack result
        def on_ack_complete(ack_future):
            try:
                ack_status = ack_future.result()
                if ack_status == AcknowledgeStatus.SUCCESS:
                    print(f"Message {message.message_id} acknowledged successfully")
                else:
                    print(f"Ack failed with status: {ack_status}")
                    # Handle failed acknowledgment
                    
            except Exception as e:
                print(f"Ack operation failed: {e}")
        
        ack_future.add_done_callback(on_ack_complete)
        
    except Exception as e:
        print(f"Message processing failed: {e}")
        # Nack with response
        nack_future = message.nack_with_response()
        nack_future.add_done_callback(
            lambda f: print(f"Nack status: {f.result()}")
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

Resource Path Utilities

from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient

# Using path construction utilities
project_id = "my-project"
topic_name = "my-topic"
subscription_name = "my-subscription"

# Construct paths
topic_path = PublisherClient.topic_path(project_id, topic_name)
subscription_path = SubscriberClient.subscription_path(project_id, subscription_name)
project_path = PublisherClient.common_project_path(project_id)

print(f"Topic path: {topic_path}")
print(f"Subscription path: {subscription_path}")
print(f"Project path: {project_path}")

# Parse paths back to components
topic_components = PublisherClient.parse_topic_path(topic_path)
print(f"Topic components: {topic_components}")
# Output: {'project': 'my-project', 'topic': 'my-topic'}

subscription_components = SubscriberClient.parse_subscription_path(subscription_path)
print(f"Subscription components: {subscription_components}")
# Output: {'project': 'my-project', 'subscription': 'my-subscription'}

Thread Pool Configuration

import concurrent.futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler

# Configure different thread pools for different workloads
def create_scheduler_for_workload(workload_type: str) -> ThreadScheduler:
    if workload_type == "io_intensive":
        # More threads for I/O bound work
        executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=50,
            thread_name_prefix=f"pubsub-io"
        )
    elif workload_type == "cpu_intensive":
        # Limit threads for CPU bound work
        executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=4,  # Match CPU cores
            thread_name_prefix=f"pubsub-cpu"
        )
    else:
        # Default configuration
        executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=10,
            thread_name_prefix=f"pubsub-default"
        )
    
    return ThreadScheduler(executor=executor)

# Use different schedulers for different subscriptions
io_scheduler = create_scheduler_for_workload("io_intensive")
cpu_scheduler = create_scheduler_for_workload("cpu_intensive")

subscriber = pubsub_v1.SubscriberClient()

# I/O intensive subscription (e.g., API calls, database operations)
io_subscription = subscriber.subscription_path("my-project", "io-intensive-sub")
io_future = subscriber.subscribe(
    io_subscription,
    callback=io_intensive_callback,
    scheduler=io_scheduler
)

# CPU intensive subscription (e.g., data processing, calculations)
cpu_subscription = subscriber.subscription_path("my-project", "cpu-intensive-sub")
cpu_future = subscriber.subscribe(
    cpu_subscription,
    callback=cpu_intensive_callback,
    scheduler=cpu_scheduler
)

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json