Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
npx @tessl/cli install tessl/pypi-google-cloud-pubsub@2.31.00
# Google Cloud Pub/Sub
1
2
Google Cloud Pub/Sub is a fully-managed real-time messaging service that enables reliable, many-to-many, asynchronous messaging between applications. The Python client library provides comprehensive publisher and subscriber functionality with automatic retry logic, flow control, message ordering capabilities, and authentication integration with Google Cloud identity services.
3
4
## Package Information
5
6
- **Package Name**: google-cloud-pubsub
7
- **Language**: Python
8
- **Installation**: `pip install google-cloud-pubsub`
9
10
## Core Imports
11
12
```python
13
from google.cloud import pubsub_v1
14
```
15
16
Import specific components:
17
18
```python
19
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient, types
20
```
21
22
## Basic Usage
23
24
```python
25
from google.cloud import pubsub_v1
26
27
# Create clients
28
publisher = pubsub_v1.PublisherClient()
29
subscriber = pubsub_v1.SubscriberClient()
30
31
# Publishing messages
32
topic_path = publisher.topic_path("my-project", "my-topic")
33
message_data = b"Hello, World!"
34
future = publisher.publish(topic_path, message_data)
35
message_id = future.result()
36
37
# Subscribing to messages
38
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
39
40
def callback(message):
41
print(f"Received: {message.data.decode('utf-8')}")
42
message.ack()
43
44
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
45
```
46
47
## Architecture
48
49
Google Cloud Pub/Sub follows a publisher-subscriber messaging pattern with the following key components:
50
51
- **Publisher**: Sends messages to topics with batching, flow control, and ordering support
52
- **Subscriber**: Receives messages from subscriptions with automatic acknowledgment and flow control
53
- **Topics**: Named resources to which publishers send messages
54
- **Subscriptions**: Named resources representing a feed of messages from a specific topic
55
- **Messages**: Data payloads with attributes and metadata
56
- **Schema Service**: Manages message schemas for validation and evolution
57
58
The library provides both high-level clients with advanced features (batching, flow control, futures) and low-level GAPIC clients for direct API access.
59
60
## Capabilities
61
62
### Publisher Client
63
64
High-level client for publishing messages to Pub/Sub topics with automatic batching, flow control, message ordering, and OpenTelemetry integration.
65
66
```python { .api }
67
class PublisherClient:
68
def __init__(self, batch_settings=None, publisher_options=None, **kwargs): ...
69
def publish(self, topic, data, ordering_key=None, **attrs): ...
70
def resume_publish(self, topic, ordering_key): ...
71
def stop(self): ...
72
```
73
74
[Publisher Client](./publisher.md)
75
76
### Subscriber Client
77
78
High-level client for subscribing to Pub/Sub subscriptions with automatic message handling, flow control, and OpenTelemetry integration.
79
80
```python { .api }
81
class SubscriberClient:
82
def __init__(self, flow_control=None, subscriber_options=None, **kwargs): ...
83
def subscribe(self, subscription, callback, flow_control=None, scheduler=None): ...
84
def close(self): ...
85
```
86
87
[Subscriber Client](./subscriber.md)
88
89
### Schema Service Client
90
91
Client for managing Pub/Sub schemas including creation, validation, and evolution of message schemas.
92
93
```python { .api }
94
class SchemaServiceClient:
95
def create_schema(self, request=None, **kwargs): ...
96
def get_schema(self, request=None, **kwargs): ...
97
def list_schemas(self, request=None, **kwargs): ...
98
def validate_schema(self, request=None, **kwargs): ...
99
def validate_message(self, request=None, **kwargs): ...
100
```
101
102
[Schema Service](./schema-service.md)
103
104
### Message Handling
105
106
Message objects and utilities for handling received messages, including acknowledgment, negative acknowledgment, and deadline modification.
107
108
```python { .api }
109
class Message:
110
def ack(self): ...
111
def nack(self): ...
112
def modify_ack_deadline(self, seconds): ...
113
def ack_with_response(self): ...
114
def nack_with_response(self): ...
115
```
116
117
[Message Handling](./message-handling.md)
118
119
### Configuration Types
120
121
Comprehensive configuration options for client behavior including batching, flow control, publisher options, and subscriber options.
122
123
```python { .api }
124
class BatchSettings(NamedTuple):
125
max_bytes: int = 1000000
126
max_latency: float = 0.01
127
max_messages: int = 100
128
129
class PublisherOptions(NamedTuple):
130
enable_message_ordering: bool = False
131
flow_control: PublishFlowControl = PublishFlowControl()
132
retry: OptionalRetry = ...
133
timeout: OptionalTimeout = ...
134
```
135
136
[Configuration](./configuration.md)
137
138
### Protobuf Types
139
140
Complete set of protobuf message types for Pub/Sub operations including topics, subscriptions, messages, and requests/responses.
141
142
```python { .api }
143
class PubsubMessage: ...
144
class Topic: ...
145
class Subscription: ...
146
class PublishRequest: ...
147
class PublishResponse: ...
148
```
149
150
[Types](./types.md)
151
152
### Exception Handling
153
154
Comprehensive exception types for handling errors in publishing and subscribing operations, including flow control errors and acknowledgment failures.
155
156
```python { .api }
157
# Publisher exceptions
158
class PublishError(Exception): ...
159
class MessageTooLargeError(PublishError): ...
160
class PublishToPausedOrderingKeyException(PublishError): ...
161
class FlowControlLimitError(PublishError): ...
162
163
# Subscriber exceptions
164
class AcknowledgeError(Exception): ...
165
class AcknowledgeStatus(Enum): ...
166
class TimeoutError(Exception): ...
167
```
168
169
[Exception Handling](./exceptions.md)
170
171
### Schedulers and Utilities
172
173
Scheduler classes and utility functions for controlling message processing behavior and resource management in subscriber operations.
174
175
```python { .api }
176
class ThreadScheduler:
177
def __init__(self, executor: Optional[ThreadPoolExecutor] = None): ...
178
def schedule(self, callback: Callable, *args, **kwargs) -> Future: ...
179
def shutdown(self, wait: bool = True) -> None: ...
180
181
class StreamingPullFuture:
182
def cancel(self) -> bool: ...
183
def result(self, timeout: Optional[float] = None) -> None: ...
184
```
185
186
[Schedulers and Utilities](./schedulers.md)
187
188
## Types
189
190
```python { .api }
191
from typing import Callable, Optional, Union, Any, Sequence
192
from concurrent.futures import Future as ConcurrentFuture
193
from google.api_core import retry, gapic_v1
194
195
# Core type aliases
196
MessageCallback = Callable[[Message], Any]
197
OptionalRetry = Union[retry.Retry, object]
198
OptionalTimeout = Union[float, object]
199
DEFAULT = gapic_v1.method.DEFAULT
200
201
# Future type for publisher operations
202
class Future(ConcurrentFuture):
203
def result(self, timeout: Optional[float] = None) -> str: ...
204
def add_done_callback(self, callback: Callable[["Future"], None]) -> None: ...
205
206
# Scheduler type for subscriber
207
class ThreadScheduler:
208
"""Custom scheduler for message processing in subscriber."""
209
pass
210
211
# Streaming pull future for subscriber operations
212
class StreamingPullFuture:
213
def cancel(self) -> bool: ...
214
def cancelled(self) -> bool: ...
215
def running(self) -> bool: ...
216
def result(self, timeout: Optional[float] = None) -> None: ...
217
```