0
# Google Cloud Pub/Sub
1
2
Google Cloud Pub/Sub is a messaging service for asynchronous communication between applications. It provides topic-based publish/subscribe messaging with support for both push and pull subscriptions, automatic scaling, and at-least-once message delivery.
3
4
## Capabilities
5
6
### Client Operations
7
8
High-level client for managing Pub/Sub topics and subscriptions.
9
10
```python { .api }
11
class Client:
12
def __init__(self, project=None, credentials=None, http=None):
13
"""
14
Initialize Pub/Sub client.
15
16
Parameters:
17
- project (str): Google Cloud project ID
18
- credentials: OAuth2 credentials object
19
- http: Optional HTTP client
20
"""
21
22
def list_topics(self, page_size=None, page_token=None):
23
"""
24
List topics in project.
25
26
Parameters:
27
- page_size (int): Maximum results per page
28
- page_token (str): Pagination token
29
30
Returns:
31
Iterator: Topic listing iterator
32
"""
33
34
def list_subscriptions(self, page_size=None, page_token=None, topic_name=None):
35
"""
36
List subscriptions in project.
37
38
Parameters:
39
- page_size (int): Maximum results per page
40
- page_token (str): Pagination token
41
- topic_name (str): Optional topic filter
42
43
Returns:
44
Iterator: Subscription listing iterator
45
"""
46
47
def topic(self, name, timestamp_messages=False):
48
"""
49
Create Topic instance.
50
51
Parameters:
52
- name (str): Topic name
53
- timestamp_messages (bool): Whether to add timestamps to messages
54
55
Returns:
56
Topic: Topic instance
57
"""
58
```
59
60
### Topic Operations
61
62
Topic management for message publishing with support for batch operations and message attributes.
63
64
```python { .api }
65
class Topic:
66
def __init__(self, name, client, timestamp_messages=False):
67
"""
68
Initialize topic.
69
70
Parameters:
71
- name (str): Topic name
72
- client (Client): Pub/Sub client
73
- timestamp_messages (bool): Add timestamps to messages
74
"""
75
76
def subscription(self, name, ack_deadline=None, push_endpoint=None):
77
"""
78
Create subscription instance for this topic.
79
80
Parameters:
81
- name (str): Subscription name
82
- ack_deadline (int): Message acknowledgment deadline in seconds
83
- push_endpoint (str): URL for push subscriptions
84
85
Returns:
86
Subscription: Subscription instance
87
"""
88
89
@classmethod
90
def from_api_repr(cls, resource, client):
91
"""
92
Create topic from API response.
93
94
Parameters:
95
- resource (dict): API response data
96
- client (Client): Pub/Sub client
97
98
Returns:
99
Topic: Topic instance
100
"""
101
102
def create(self, client=None):
103
"""
104
Create topic.
105
106
Parameters:
107
- client (Client): Optional client override
108
"""
109
110
def exists(self, client=None):
111
"""
112
Check if topic exists.
113
114
Parameters:
115
- client (Client): Optional client override
116
117
Returns:
118
bool: True if topic exists
119
"""
120
121
def publish(self, message, client=None, **attrs):
122
"""
123
Publish single message to topic.
124
125
Parameters:
126
- message (str): Message data
127
- client (Client): Optional client override
128
- **attrs: Message attributes as keyword arguments
129
130
Returns:
131
str: Published message ID
132
"""
133
134
def batch(self, client=None):
135
"""
136
Create batch publisher for efficient message batching.
137
138
Parameters:
139
- client (Client): Optional client override
140
141
Returns:
142
Batch: Batch publisher instance
143
"""
144
145
def delete(self, client=None):
146
"""
147
Delete topic.
148
149
Parameters:
150
- client (Client): Optional client override
151
"""
152
153
@property
154
def name(self):
155
"""
156
Topic name.
157
158
Returns:
159
str: Topic name
160
"""
161
162
@property
163
def project(self):
164
"""
165
Project ID.
166
167
Returns:
168
str: Project ID
169
"""
170
171
@property
172
def full_name(self):
173
"""
174
Fully qualified topic name.
175
176
Returns:
177
str: Full topic name
178
"""
179
180
@property
181
def path(self):
182
"""
183
API path for topic.
184
185
Returns:
186
str: Topic API path
187
"""
188
189
@property
190
def timestamp_messages(self):
191
"""
192
Whether messages are automatically timestamped.
193
194
Returns:
195
bool: Timestamp setting
196
"""
197
```
198
199
### Topic Batch Operations
200
201
Batch publisher for efficient message publishing with automatic batching and context manager support.
202
203
```python { .api }
204
class Batch:
205
def __init__(self, topic, client):
206
"""
207
Initialize batch publisher.
208
209
Parameters:
210
- topic (Topic): Associated topic
211
- client (Client): Pub/Sub client
212
"""
213
214
def publish(self, message, **attrs):
215
"""
216
Add message to batch.
217
218
Parameters:
219
- message (str): Message data
220
- **attrs: Message attributes as keyword arguments
221
"""
222
223
def commit(self, client=None):
224
"""
225
Send all batched messages.
226
227
Parameters:
228
- client (Client): Optional client override
229
230
Returns:
231
list[str]: List of published message IDs
232
"""
233
234
def __enter__(self):
235
"""
236
Enter context manager.
237
238
Returns:
239
Batch: Self
240
"""
241
242
def __exit__(self, exc_type, exc_val, exc_tb):
243
"""
244
Exit context manager and commit batch.
245
"""
246
247
def __iter__(self):
248
"""
249
Iterate over published message IDs (after commit).
250
251
Returns:
252
Iterator[str]: Message ID iterator
253
"""
254
```
255
256
### Subscription Operations
257
258
Subscription management for message consumption with support for both pull and push delivery models.
259
260
```python { .api }
261
class Subscription:
262
def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
263
"""
264
Initialize subscription.
265
266
Parameters:
267
- name (str): Subscription name
268
- topic (Topic): Associated topic
269
- ack_deadline (int): Acknowledgment deadline in seconds
270
- push_endpoint (str): URL for push subscriptions
271
"""
272
273
@classmethod
274
def from_api_repr(cls, resource, client, topics=None):
275
"""
276
Create subscription from API response.
277
278
Parameters:
279
- resource (dict): API response data
280
- client (Client): Pub/Sub client
281
- topics (dict): Optional topic lookup cache
282
283
Returns:
284
Subscription: Subscription instance
285
"""
286
287
def create(self, client=None):
288
"""
289
Create subscription.
290
291
Parameters:
292
- client (Client): Optional client override
293
"""
294
295
def exists(self, client=None):
296
"""
297
Check if subscription exists.
298
299
Parameters:
300
- client (Client): Optional client override
301
302
Returns:
303
bool: True if subscription exists
304
"""
305
306
def reload(self, client=None):
307
"""
308
Reload subscription metadata from API.
309
310
Parameters:
311
- client (Client): Optional client override
312
"""
313
314
def delete(self, client=None):
315
"""
316
Delete subscription.
317
318
Parameters:
319
- client (Client): Optional client override
320
"""
321
322
def modify_push_configuration(self, push_endpoint, client=None):
323
"""
324
Modify push endpoint configuration.
325
326
Parameters:
327
- push_endpoint (str): New push endpoint URL (None for pull)
328
- client (Client): Optional client override
329
"""
330
331
def pull(self, return_immediately=False, max_messages=1, client=None):
332
"""
333
Pull messages from subscription.
334
335
Parameters:
336
- return_immediately (bool): Return immediately if no messages
337
- max_messages (int): Maximum messages to return
338
- client (Client): Optional client override
339
340
Returns:
341
list[Message]: List of received messages
342
"""
343
344
def acknowledge(self, ack_ids, client=None):
345
"""
346
Acknowledge received messages.
347
348
Parameters:
349
- ack_ids (list[str]): List of acknowledgment IDs
350
- client (Client): Optional client override
351
"""
352
353
def modify_ack_deadline(self, ack_ids, ack_deadline, client=None):
354
"""
355
Modify acknowledgment deadline for messages.
356
357
Parameters:
358
- ack_ids (list[str]): List of acknowledgment IDs
359
- ack_deadline (int): New deadline in seconds
360
- client (Client): Optional client override
361
"""
362
363
@property
364
def name(self):
365
"""
366
Subscription name.
367
368
Returns:
369
str: Subscription name
370
"""
371
372
@property
373
def topic(self):
374
"""
375
Associated topic.
376
377
Returns:
378
Topic: Associated topic
379
"""
380
381
@property
382
def ack_deadline(self):
383
"""
384
Acknowledgment deadline in seconds.
385
386
Returns:
387
int: Deadline in seconds
388
"""
389
390
@property
391
def push_endpoint(self):
392
"""
393
Push endpoint URL for push subscriptions.
394
395
Returns:
396
str or None: Push endpoint URL
397
"""
398
399
@property
400
def path(self):
401
"""
402
API path for subscription.
403
404
Returns:
405
str: Subscription API path
406
"""
407
```
408
409
### Message Handling
410
411
Message objects representing individual Pub/Sub messages with data and attributes.
412
413
```python { .api }
414
class Message:
415
def __init__(self, data, message_id, attributes=None):
416
"""
417
Initialize message.
418
419
Parameters:
420
- data (bytes): Message data
421
- message_id (str): Message ID assigned by API
422
- attributes (dict): Optional message attributes
423
"""
424
425
@property
426
def data(self):
427
"""
428
Message data.
429
430
Returns:
431
str: Message data
432
"""
433
434
@property
435
def message_id(self):
436
"""
437
Message ID assigned by the API.
438
439
Returns:
440
str: Message ID
441
"""
442
443
@property
444
def attributes(self):
445
"""
446
Message attributes dictionary.
447
448
Returns:
449
dict: Message attributes
450
"""
451
452
@property
453
def timestamp(self):
454
"""
455
Message timestamp from attributes (if present).
456
457
Returns:
458
datetime: Timestamp in UTC timezone
459
460
Raises:
461
ValueError: If timestamp not in attributes or invalid format
462
"""
463
464
@classmethod
465
def from_api_repr(cls, api_repr):
466
"""
467
Create message from API representation.
468
469
Parameters:
470
- api_repr (dict): API response data
471
472
Returns:
473
Message: Message instance
474
"""
475
```
476
477
## Usage Examples
478
479
### Basic Topic and Subscription Operations
480
481
```python
482
from gcloud import pubsub
483
484
# Initialize client
485
client = pubsub.Client(project='my-project')
486
487
# Create topic
488
topic = client.topic('my-topic')
489
topic.create()
490
491
# Create subscription
492
subscription = topic.subscription('my-subscription', ack_deadline=60)
493
subscription.create()
494
495
# Publish message
496
message_id = topic.publish('Hello, Pub/Sub!', priority='high', source='app1')
497
print(f"Published message: {message_id}")
498
```
499
500
### Message Publishing
501
502
```python
503
# Publish single message with attributes
504
topic.publish('Order processed', order_id='12345', status='completed')
505
506
# Batch publishing for efficiency
507
with topic.batch() as batch:
508
for i in range(10):
509
batch.publish(f'Message {i}', sequence=str(i))
510
511
# Get message IDs after batch commit
512
message_ids = list(batch)
513
print(f"Published {len(message_ids)} messages")
514
```
515
516
### Message Consumption
517
518
```python
519
# Pull messages from subscription
520
messages = subscription.pull(max_messages=5, return_immediately=True)
521
522
# Process messages
523
ack_ids = []
524
for message in messages:
525
print(f"Received: {message.data}")
526
print(f"Attributes: {message.attributes}")
527
528
# Process message here
529
process_message(message.data)
530
531
# Collect acknowledgment ID
532
ack_ids.append(message.ack_id)
533
534
# Acknowledge processed messages
535
if ack_ids:
536
subscription.acknowledge(ack_ids)
537
```
538
539
### Subscription Management
540
541
```python
542
# Create pull subscription
543
pull_subscription = topic.subscription('pull-sub', ack_deadline=30)
544
pull_subscription.create()
545
546
# Create push subscription
547
push_subscription = topic.subscription(
548
'push-sub',
549
push_endpoint='https://myapp.com/webhook'
550
)
551
push_subscription.create()
552
553
# Modify push configuration
554
subscription.modify_push_configuration('https://newapp.com/webhook')
555
556
# Convert push to pull
557
subscription.modify_push_configuration(None)
558
```
559
560
### Advanced Message Handling
561
562
```python
563
# Pull with immediate return
564
messages = subscription.pull(return_immediately=True, max_messages=10)
565
566
if not messages:
567
print("No messages available")
568
else:
569
# Extend acknowledgment deadline for processing time
570
ack_ids = [msg.ack_id for msg in messages]
571
subscription.modify_ack_deadline(ack_ids, 120) # 2 minutes
572
573
# Process messages
574
for message in messages:
575
try:
576
process_complex_message(message.data)
577
subscription.acknowledge([message.ack_id])
578
except Exception as e:
579
print(f"Processing failed: {e}")
580
# Message will be redelivered after ack deadline
581
```