Python client library for Google Cloud Platform services including Datastore, Storage, and Pub/Sub
Overall
score
93%
Google Cloud Pub/Sub is a messaging service for asynchronous communication between applications. It provides topic-based publish/subscribe messaging with support for both push and pull subscriptions, automatic scaling, and at-least-once message delivery.
High-level client for managing Pub/Sub topics and subscriptions.
class Client:
def __init__(self, project=None, credentials=None, http=None):
"""
Initialize Pub/Sub client.
Parameters:
- project (str): Google Cloud project ID
- credentials: OAuth2 credentials object
- http: Optional HTTP client
"""
def list_topics(self, page_size=None, page_token=None):
"""
List topics in project.
Parameters:
- page_size (int): Maximum results per page
- page_token (str): Pagination token
Returns:
Iterator: Topic listing iterator
"""
def list_subscriptions(self, page_size=None, page_token=None, topic_name=None):
"""
List subscriptions in project.
Parameters:
- page_size (int): Maximum results per page
- page_token (str): Pagination token
- topic_name (str): Optional topic filter
Returns:
Iterator: Subscription listing iterator
"""
def topic(self, name, timestamp_messages=False):
"""
Create Topic instance.
Parameters:
- name (str): Topic name
- timestamp_messages (bool): Whether to add timestamps to messages
Returns:
Topic: Topic instance
"""Topic management for message publishing with support for batch operations and message attributes.
class Topic:
def __init__(self, name, client, timestamp_messages=False):
"""
Initialize topic.
Parameters:
- name (str): Topic name
- client (Client): Pub/Sub client
- timestamp_messages (bool): Add timestamps to messages
"""
def subscription(self, name, ack_deadline=None, push_endpoint=None):
"""
Create subscription instance for this topic.
Parameters:
- name (str): Subscription name
- ack_deadline (int): Message acknowledgment deadline in seconds
- push_endpoint (str): URL for push subscriptions
Returns:
Subscription: Subscription instance
"""
@classmethod
def from_api_repr(cls, resource, client):
"""
Create topic from API response.
Parameters:
- resource (dict): API response data
- client (Client): Pub/Sub client
Returns:
Topic: Topic instance
"""
def create(self, client=None):
"""
Create topic.
Parameters:
- client (Client): Optional client override
"""
def exists(self, client=None):
"""
Check if topic exists.
Parameters:
- client (Client): Optional client override
Returns:
bool: True if topic exists
"""
def publish(self, message, client=None, **attrs):
"""
Publish single message to topic.
Parameters:
- message (str): Message data
- client (Client): Optional client override
- **attrs: Message attributes as keyword arguments
Returns:
str: Published message ID
"""
def batch(self, client=None):
"""
Create batch publisher for efficient message batching.
Parameters:
- client (Client): Optional client override
Returns:
Batch: Batch publisher instance
"""
def delete(self, client=None):
"""
Delete topic.
Parameters:
- client (Client): Optional client override
"""
@property
def name(self):
"""
Topic name.
Returns:
str: Topic name
"""
@property
def project(self):
"""
Project ID.
Returns:
str: Project ID
"""
@property
def full_name(self):
"""
Fully qualified topic name.
Returns:
str: Full topic name
"""
@property
def path(self):
"""
API path for topic.
Returns:
str: Topic API path
"""
@property
def timestamp_messages(self):
"""
Whether messages are automatically timestamped.
Returns:
bool: Timestamp setting
"""Batch publisher for efficient message publishing with automatic batching and context manager support.
class Batch:
def __init__(self, topic, client):
"""
Initialize batch publisher.
Parameters:
- topic (Topic): Associated topic
- client (Client): Pub/Sub client
"""
def publish(self, message, **attrs):
"""
Add message to batch.
Parameters:
- message (str): Message data
- **attrs: Message attributes as keyword arguments
"""
def commit(self, client=None):
"""
Send all batched messages.
Parameters:
- client (Client): Optional client override
Returns:
list[str]: List of published message IDs
"""
def __enter__(self):
"""
Enter context manager.
Returns:
Batch: Self
"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit context manager and commit batch.
"""
def __iter__(self):
"""
Iterate over published message IDs (after commit).
Returns:
Iterator[str]: Message ID iterator
"""Subscription management for message consumption with support for both pull and push delivery models.
class Subscription:
def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
"""
Initialize subscription.
Parameters:
- name (str): Subscription name
- topic (Topic): Associated topic
- ack_deadline (int): Acknowledgment deadline in seconds
- push_endpoint (str): URL for push subscriptions
"""
@classmethod
def from_api_repr(cls, resource, client, topics=None):
"""
Create subscription from API response.
Parameters:
- resource (dict): API response data
- client (Client): Pub/Sub client
- topics (dict): Optional topic lookup cache
Returns:
Subscription: Subscription instance
"""
def create(self, client=None):
"""
Create subscription.
Parameters:
- client (Client): Optional client override
"""
def exists(self, client=None):
"""
Check if subscription exists.
Parameters:
- client (Client): Optional client override
Returns:
bool: True if subscription exists
"""
def reload(self, client=None):
"""
Reload subscription metadata from API.
Parameters:
- client (Client): Optional client override
"""
def delete(self, client=None):
"""
Delete subscription.
Parameters:
- client (Client): Optional client override
"""
def modify_push_configuration(self, push_endpoint, client=None):
"""
Modify push endpoint configuration.
Parameters:
- push_endpoint (str): New push endpoint URL (None for pull)
- client (Client): Optional client override
"""
def pull(self, return_immediately=False, max_messages=1, client=None):
"""
Pull messages from subscription.
Parameters:
- return_immediately (bool): Return immediately if no messages
- max_messages (int): Maximum messages to return
- client (Client): Optional client override
Returns:
list[Message]: List of received messages
"""
def acknowledge(self, ack_ids, client=None):
"""
Acknowledge received messages.
Parameters:
- ack_ids (list[str]): List of acknowledgment IDs
- client (Client): Optional client override
"""
def modify_ack_deadline(self, ack_ids, ack_deadline, client=None):
"""
Modify acknowledgment deadline for messages.
Parameters:
- ack_ids (list[str]): List of acknowledgment IDs
- ack_deadline (int): New deadline in seconds
- client (Client): Optional client override
"""
@property
def name(self):
"""
Subscription name.
Returns:
str: Subscription name
"""
@property
def topic(self):
"""
Associated topic.
Returns:
Topic: Associated topic
"""
@property
def ack_deadline(self):
"""
Acknowledgment deadline in seconds.
Returns:
int: Deadline in seconds
"""
@property
def push_endpoint(self):
"""
Push endpoint URL for push subscriptions.
Returns:
str or None: Push endpoint URL
"""
@property
def path(self):
"""
API path for subscription.
Returns:
str: Subscription API path
"""Message objects representing individual Pub/Sub messages with data and attributes.
class Message:
def __init__(self, data, message_id, attributes=None):
"""
Initialize message.
Parameters:
- data (bytes): Message data
- message_id (str): Message ID assigned by API
- attributes (dict): Optional message attributes
"""
@property
def data(self):
"""
Message data.
Returns:
str: Message data
"""
@property
def message_id(self):
"""
Message ID assigned by the API.
Returns:
str: Message ID
"""
@property
def attributes(self):
"""
Message attributes dictionary.
Returns:
dict: Message attributes
"""
@property
def timestamp(self):
"""
Message timestamp from attributes (if present).
Returns:
datetime: Timestamp in UTC timezone
Raises:
ValueError: If timestamp not in attributes or invalid format
"""
@classmethod
def from_api_repr(cls, api_repr):
"""
Create message from API representation.
Parameters:
- api_repr (dict): API response data
Returns:
Message: Message instance
"""from gcloud import pubsub
# Initialize client
client = pubsub.Client(project='my-project')
# Create topic
topic = client.topic('my-topic')
topic.create()
# Create subscription
subscription = topic.subscription('my-subscription', ack_deadline=60)
subscription.create()
# Publish message
message_id = topic.publish('Hello, Pub/Sub!', priority='high', source='app1')
print(f"Published message: {message_id}")# Publish single message with attributes
topic.publish('Order processed', order_id='12345', status='completed')
# Batch publishing for efficiency
with topic.batch() as batch:
for i in range(10):
batch.publish(f'Message {i}', sequence=str(i))
# Get message IDs after batch commit
message_ids = list(batch)
print(f"Published {len(message_ids)} messages")# Pull messages from subscription
messages = subscription.pull(max_messages=5, return_immediately=True)
# Process messages
ack_ids = []
for message in messages:
print(f"Received: {message.data}")
print(f"Attributes: {message.attributes}")
# Process message here
process_message(message.data)
# Collect acknowledgment ID
ack_ids.append(message.ack_id)
# Acknowledge processed messages
if ack_ids:
subscription.acknowledge(ack_ids)# Create pull subscription
pull_subscription = topic.subscription('pull-sub', ack_deadline=30)
pull_subscription.create()
# Create push subscription
push_subscription = topic.subscription(
'push-sub',
push_endpoint='https://myapp.com/webhook'
)
push_subscription.create()
# Modify push configuration
subscription.modify_push_configuration('https://newapp.com/webhook')
# Convert push to pull
subscription.modify_push_configuration(None)# Pull with immediate return
messages = subscription.pull(return_immediately=True, max_messages=10)
if not messages:
print("No messages available")
else:
# Extend acknowledgment deadline for processing time
ack_ids = [msg.ack_id for msg in messages]
subscription.modify_ack_deadline(ack_ids, 120) # 2 minutes
# Process messages
for message in messages:
try:
process_complex_message(message.data)
subscription.acknowledge([message.ack_id])
except Exception as e:
print(f"Processing failed: {e}")
# Message will be redelivered after ack deadlineInstall with Tessl CLI
npx tessl i tessl/pypi-gcloudevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10