CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gcloud

Python client library for Google Cloud Platform services including Datastore, Storage, and Pub/Sub

Overall
score

93%

Overview
Eval results
Files

pubsub.mddocs/

Google Cloud Pub/Sub

Google Cloud Pub/Sub is a messaging service for asynchronous communication between applications. It provides topic-based publish/subscribe messaging with support for both push and pull subscriptions, automatic scaling, and at-least-once message delivery.

Capabilities

Client Operations

High-level client for managing Pub/Sub topics and subscriptions.

class Client:
    def __init__(self, project=None, credentials=None, http=None):
        """
        Initialize Pub/Sub client.
        
        Parameters:
        - project (str): Google Cloud project ID
        - credentials: OAuth2 credentials object
        - http: Optional HTTP client
        """
    
    def list_topics(self, page_size=None, page_token=None):
        """
        List topics in project.
        
        Parameters:
        - page_size (int): Maximum results per page
        - page_token (str): Pagination token
        
        Returns:
        Iterator: Topic listing iterator
        """
    
    def list_subscriptions(self, page_size=None, page_token=None, topic_name=None):
        """
        List subscriptions in project.
        
        Parameters:
        - page_size (int): Maximum results per page
        - page_token (str): Pagination token
        - topic_name (str): Optional topic filter
        
        Returns:
        Iterator: Subscription listing iterator
        """
    
    def topic(self, name, timestamp_messages=False):
        """
        Create Topic instance.
        
        Parameters:
        - name (str): Topic name
        - timestamp_messages (bool): Whether to add timestamps to messages
        
        Returns:
        Topic: Topic instance
        """

Topic Operations

Topic management for message publishing with support for batch operations and message attributes.

class Topic:
    def __init__(self, name, client, timestamp_messages=False):
        """
        Initialize topic.
        
        Parameters:
        - name (str): Topic name
        - client (Client): Pub/Sub client
        - timestamp_messages (bool): Add timestamps to messages
        """
    
    def subscription(self, name, ack_deadline=None, push_endpoint=None):
        """
        Create subscription instance for this topic.
        
        Parameters:
        - name (str): Subscription name
        - ack_deadline (int): Message acknowledgment deadline in seconds
        - push_endpoint (str): URL for push subscriptions
        
        Returns:
        Subscription: Subscription instance
        """
    
    @classmethod
    def from_api_repr(cls, resource, client):
        """
        Create topic from API response.
        
        Parameters:
        - resource (dict): API response data
        - client (Client): Pub/Sub client
        
        Returns:
        Topic: Topic instance
        """
    
    def create(self, client=None):
        """
        Create topic.
        
        Parameters:
        - client (Client): Optional client override
        """
    
    def exists(self, client=None):
        """
        Check if topic exists.
        
        Parameters:
        - client (Client): Optional client override
        
        Returns:
        bool: True if topic exists
        """
    
    def publish(self, message, client=None, **attrs):
        """
        Publish single message to topic.
        
        Parameters:
        - message (str): Message data
        - client (Client): Optional client override
        - **attrs: Message attributes as keyword arguments
        
        Returns:
        str: Published message ID
        """
    
    def batch(self, client=None):
        """
        Create batch publisher for efficient message batching.
        
        Parameters:
        - client (Client): Optional client override
        
        Returns:
        Batch: Batch publisher instance
        """
    
    def delete(self, client=None):
        """
        Delete topic.
        
        Parameters:
        - client (Client): Optional client override
        """
    
    @property
    def name(self):
        """
        Topic name.
        
        Returns:
        str: Topic name
        """
    
    @property
    def project(self):
        """
        Project ID.
        
        Returns:
        str: Project ID
        """
    
    @property
    def full_name(self):
        """
        Fully qualified topic name.
        
        Returns:
        str: Full topic name
        """
    
    @property
    def path(self):
        """
        API path for topic.
        
        Returns:
        str: Topic API path
        """
    
    @property
    def timestamp_messages(self):
        """
        Whether messages are automatically timestamped.
        
        Returns:
        bool: Timestamp setting
        """

Topic Batch Operations

Batch publisher for efficient message publishing with automatic batching and context manager support.

class Batch:
    def __init__(self, topic, client):
        """
        Initialize batch publisher.
        
        Parameters:
        - topic (Topic): Associated topic
        - client (Client): Pub/Sub client
        """
    
    def publish(self, message, **attrs):
        """
        Add message to batch.
        
        Parameters:
        - message (str): Message data
        - **attrs: Message attributes as keyword arguments
        """
    
    def commit(self, client=None):
        """
        Send all batched messages.
        
        Parameters:
        - client (Client): Optional client override
        
        Returns:
        list[str]: List of published message IDs
        """
    
    def __enter__(self):
        """
        Enter context manager.
        
        Returns:
        Batch: Self
        """
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Exit context manager and commit batch.
        """
    
    def __iter__(self):
        """
        Iterate over published message IDs (after commit).
        
        Returns:
        Iterator[str]: Message ID iterator
        """

Subscription Operations

Subscription management for message consumption with support for both pull and push delivery models.

class Subscription:
    def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
        """
        Initialize subscription.
        
        Parameters:
        - name (str): Subscription name
        - topic (Topic): Associated topic
        - ack_deadline (int): Acknowledgment deadline in seconds
        - push_endpoint (str): URL for push subscriptions
        """
    
    @classmethod
    def from_api_repr(cls, resource, client, topics=None):
        """
        Create subscription from API response.
        
        Parameters:
        - resource (dict): API response data
        - client (Client): Pub/Sub client
        - topics (dict): Optional topic lookup cache
        
        Returns:
        Subscription: Subscription instance
        """
    
    def create(self, client=None):
        """
        Create subscription.
        
        Parameters:
        - client (Client): Optional client override
        """
    
    def exists(self, client=None):
        """
        Check if subscription exists.
        
        Parameters:
        - client (Client): Optional client override
        
        Returns:
        bool: True if subscription exists
        """
    
    def reload(self, client=None):
        """
        Reload subscription metadata from API.
        
        Parameters:
        - client (Client): Optional client override
        """
    
    def delete(self, client=None):
        """
        Delete subscription.
        
        Parameters:
        - client (Client): Optional client override
        """
    
    def modify_push_configuration(self, push_endpoint, client=None):
        """
        Modify push endpoint configuration.
        
        Parameters:
        - push_endpoint (str): New push endpoint URL (None for pull)
        - client (Client): Optional client override
        """
    
    def pull(self, return_immediately=False, max_messages=1, client=None):
        """
        Pull messages from subscription.
        
        Parameters:
        - return_immediately (bool): Return immediately if no messages
        - max_messages (int): Maximum messages to return
        - client (Client): Optional client override
        
        Returns:
        list[Message]: List of received messages
        """
    
    def acknowledge(self, ack_ids, client=None):
        """
        Acknowledge received messages.
        
        Parameters:
        - ack_ids (list[str]): List of acknowledgment IDs
        - client (Client): Optional client override
        """
    
    def modify_ack_deadline(self, ack_ids, ack_deadline, client=None):
        """
        Modify acknowledgment deadline for messages.
        
        Parameters:
        - ack_ids (list[str]): List of acknowledgment IDs
        - ack_deadline (int): New deadline in seconds
        - client (Client): Optional client override
        """
    
    @property
    def name(self):
        """
        Subscription name.
        
        Returns:
        str: Subscription name
        """
    
    @property
    def topic(self):
        """
        Associated topic.
        
        Returns:
        Topic: Associated topic
        """
    
    @property
    def ack_deadline(self):
        """
        Acknowledgment deadline in seconds.
        
        Returns:
        int: Deadline in seconds
        """
    
    @property
    def push_endpoint(self):
        """
        Push endpoint URL for push subscriptions.
        
        Returns:
        str or None: Push endpoint URL
        """
    
    @property
    def path(self):
        """
        API path for subscription.
        
        Returns:
        str: Subscription API path
        """

Message Handling

Message objects representing individual Pub/Sub messages with data and attributes.

class Message:
    def __init__(self, data, message_id, attributes=None):
        """
        Initialize message.
        
        Parameters:
        - data (bytes): Message data
        - message_id (str): Message ID assigned by API
        - attributes (dict): Optional message attributes
        """
    
    @property
    def data(self):
        """
        Message data.
        
        Returns:
        str: Message data
        """
    
    @property
    def message_id(self):
        """
        Message ID assigned by the API.
        
        Returns:
        str: Message ID
        """
    
    @property
    def attributes(self):
        """
        Message attributes dictionary.
        
        Returns:
        dict: Message attributes
        """
    
    @property
    def timestamp(self):
        """
        Message timestamp from attributes (if present).
        
        Returns:
        datetime: Timestamp in UTC timezone
        
        Raises:
        ValueError: If timestamp not in attributes or invalid format
        """
    
    @classmethod
    def from_api_repr(cls, api_repr):
        """
        Create message from API representation.
        
        Parameters:
        - api_repr (dict): API response data
        
        Returns:
        Message: Message instance
        """

Usage Examples

Basic Topic and Subscription Operations

from gcloud import pubsub

# Initialize client
client = pubsub.Client(project='my-project')

# Create topic
topic = client.topic('my-topic')
topic.create()

# Create subscription
subscription = topic.subscription('my-subscription', ack_deadline=60)
subscription.create()

# Publish message
message_id = topic.publish('Hello, Pub/Sub!', priority='high', source='app1')
print(f"Published message: {message_id}")

Message Publishing

# Publish single message with attributes
topic.publish('Order processed', order_id='12345', status='completed')

# Batch publishing for efficiency
with topic.batch() as batch:
    for i in range(10):
        batch.publish(f'Message {i}', sequence=str(i))
        
# Get message IDs after batch commit
message_ids = list(batch)
print(f"Published {len(message_ids)} messages")

Message Consumption

# Pull messages from subscription
messages = subscription.pull(max_messages=5, return_immediately=True)

# Process messages
ack_ids = []
for message in messages:
    print(f"Received: {message.data}")
    print(f"Attributes: {message.attributes}")
    
    # Process message here
    process_message(message.data)
    
    # Collect acknowledgment ID
    ack_ids.append(message.ack_id)

# Acknowledge processed messages
if ack_ids:
    subscription.acknowledge(ack_ids)

Subscription Management

# Create pull subscription
pull_subscription = topic.subscription('pull-sub', ack_deadline=30)
pull_subscription.create()

# Create push subscription
push_subscription = topic.subscription(
    'push-sub',
    push_endpoint='https://myapp.com/webhook'
)
push_subscription.create()

# Modify push configuration
subscription.modify_push_configuration('https://newapp.com/webhook')

# Convert push to pull
subscription.modify_push_configuration(None)

Advanced Message Handling

# Pull with immediate return
messages = subscription.pull(return_immediately=True, max_messages=10)

if not messages:
    print("No messages available")
else:
    # Extend acknowledgment deadline for processing time
    ack_ids = [msg.ack_id for msg in messages]
    subscription.modify_ack_deadline(ack_ids, 120)  # 2 minutes
    
    # Process messages
    for message in messages:
        try:
            process_complex_message(message.data)
            subscription.acknowledge([message.ack_id])
        except Exception as e:
            print(f"Processing failed: {e}")
            # Message will be redelivered after ack deadline

Install with Tessl CLI

npx tessl i tessl/pypi-gcloud

docs

datastore.md

index.md

pubsub.md

storage.md

tile.json