0
# Subscriber Client
1
2
The SubscriberClient provides high-level functionality for subscribing to Google Cloud Pub/Sub subscriptions. It handles automatic message acknowledgment, flow control, scheduling, and OpenTelemetry integration.
3
4
## Capabilities
5
6
### Client Initialization
7
8
Create and configure a SubscriberClient with custom flow control and subscriber options.
9
10
```python { .api }
11
class SubscriberClient:
12
def __init__(
13
self,
14
flow_control: Optional[FlowControl] = None,
15
subscriber_options: Optional[SubscriberOptions] = None,
16
**kwargs
17
):
18
"""
19
Initialize the subscriber client.
20
21
Parameters:
22
- flow_control: Settings for message flow control
23
- subscriber_options: Options for subscriber client behavior
24
- **kwargs: Additional arguments passed to underlying GAPIC client
25
"""
26
27
@classmethod
28
def from_service_account_file(
29
cls,
30
filename: str,
31
**kwargs
32
) -> "SubscriberClient":
33
"""
34
Create client from service account file.
35
36
Parameters:
37
- filename: Path to service account JSON file
38
- **kwargs: Additional arguments for client initialization
39
40
Returns:
41
SubscriberClient instance
42
"""
43
```
44
45
### Message Subscription
46
47
Subscribe to messages from subscriptions with callback-based processing.
48
49
```python { .api }
50
def subscribe(
51
self,
52
subscription: str,
53
callback: Callable[[Message], Any],
54
flow_control: Union[FlowControl, Sequence] = (),
55
scheduler: Optional[ThreadScheduler] = None,
56
use_legacy_flow_control: bool = False,
57
await_callbacks_on_shutdown: bool = False
58
) -> StreamingPullFuture:
59
"""
60
Subscribe to messages from a subscription.
61
62
Parameters:
63
- subscription: Full subscription path (e.g., "projects/my-project/subscriptions/my-sub")
64
- callback: Function to process received messages
65
- flow_control: Flow control settings or legacy sequence format
66
- scheduler: Custom scheduler for message processing
67
- use_legacy_flow_control: Whether to use legacy flow control behavior
68
- await_callbacks_on_shutdown: Whether to wait for callbacks on shutdown
69
70
Returns:
71
StreamingPullFuture that can be used to control the subscription
72
"""
73
```
74
75
### Subscription Management
76
77
Create, retrieve, update, and delete subscriptions using the underlying GAPIC client methods.
78
79
```python { .api }
80
def create_subscription(
81
self,
82
request: Optional[CreateSubscriptionRequest] = None,
83
*,
84
name: Optional[str] = None,
85
topic: Optional[str] = None,
86
**kwargs
87
) -> Subscription:
88
"""
89
Create a new subscription.
90
91
Parameters:
92
- request: The request object for creating a subscription
93
- name: Subscription name (e.g., "projects/my-project/subscriptions/my-sub")
94
- topic: Topic name to subscribe to
95
- **kwargs: Additional keyword arguments
96
97
Returns:
98
Created Subscription object
99
"""
100
101
def get_subscription(
102
self,
103
request: Optional[GetSubscriptionRequest] = None,
104
*,
105
subscription: Optional[str] = None,
106
**kwargs
107
) -> Subscription:
108
"""
109
Get a subscription.
110
111
Parameters:
112
- request: The request object for getting a subscription
113
- subscription: Subscription name to retrieve
114
- **kwargs: Additional keyword arguments
115
116
Returns:
117
Subscription object
118
"""
119
120
def update_subscription(
121
self,
122
request: Optional[UpdateSubscriptionRequest] = None,
123
*,
124
subscription: Optional[Subscription] = None,
125
update_mask: Optional[FieldMask] = None,
126
**kwargs
127
) -> Subscription:
128
"""
129
Update a subscription.
130
131
Parameters:
132
- request: The request object for updating a subscription
133
- subscription: Updated subscription configuration
134
- update_mask: Fields to update
135
- **kwargs: Additional keyword arguments
136
137
Returns:
138
Updated Subscription object
139
"""
140
141
def list_subscriptions(
142
self,
143
request: Optional[ListSubscriptionsRequest] = None,
144
*,
145
project: Optional[str] = None,
146
**kwargs
147
) -> ListSubscriptionsResponse:
148
"""
149
List subscriptions in a project.
150
151
Parameters:
152
- request: The request object for listing subscriptions
153
- project: Project path (e.g., "projects/my-project")
154
- **kwargs: Additional keyword arguments
155
156
Returns:
157
ListSubscriptionsResponse with subscriptions
158
"""
159
160
def delete_subscription(
161
self,
162
request: Optional[DeleteSubscriptionRequest] = None,
163
*,
164
subscription: Optional[str] = None,
165
**kwargs
166
) -> None:
167
"""
168
Delete a subscription.
169
170
Parameters:
171
- request: The request object for deleting a subscription
172
- subscription: Subscription name to delete
173
- **kwargs: Additional keyword arguments
174
"""
175
```
176
177
### Message Operations
178
179
Low-level message operations for acknowledgment and deadline modification.
180
181
```python { .api }
182
def acknowledge(
183
self,
184
request: Optional[AcknowledgeRequest] = None,
185
*,
186
subscription: Optional[str] = None,
187
ack_ids: Optional[Sequence[str]] = None,
188
**kwargs
189
) -> None:
190
"""
191
Acknowledge messages by their acknowledgment IDs.
192
193
Parameters:
194
- request: The request object for acknowledging messages
195
- subscription: Subscription name
196
- ack_ids: List of acknowledgment IDs to acknowledge
197
- **kwargs: Additional keyword arguments
198
"""
199
200
def modify_ack_deadline(
201
self,
202
request: Optional[ModifyAckDeadlineRequest] = None,
203
*,
204
subscription: Optional[str] = None,
205
ack_ids: Optional[Sequence[str]] = None,
206
ack_deadline_seconds: Optional[int] = None,
207
**kwargs
208
) -> None:
209
"""
210
Modify acknowledgment deadline for messages.
211
212
Parameters:
213
- request: The request object for modifying acknowledgment deadlines
214
- subscription: Subscription name
215
- ack_ids: List of acknowledgment IDs to modify
216
- ack_deadline_seconds: New deadline in seconds
217
- **kwargs: Additional keyword arguments
218
"""
219
220
def pull(
221
self,
222
request: Optional[PullRequest] = None,
223
*,
224
subscription: Optional[str] = None,
225
max_messages: Optional[int] = None,
226
**kwargs
227
) -> PullResponse:
228
"""
229
Pull messages from a subscription synchronously.
230
231
Parameters:
232
- request: The request object for pulling messages
233
- subscription: Subscription name to pull from
234
- max_messages: Maximum number of messages to return
235
- **kwargs: Additional keyword arguments
236
237
Returns:
238
PullResponse with received messages
239
"""
240
241
def streaming_pull(
242
self,
243
requests: Iterator[StreamingPullRequest],
244
**kwargs
245
) -> Iterator[StreamingPullResponse]:
246
"""
247
Establish a streaming pull connection.
248
249
Parameters:
250
- requests: Iterator of streaming pull requests
251
- **kwargs: Additional keyword arguments
252
253
Returns:
254
Iterator of streaming pull responses
255
"""
256
```
257
258
### Snapshot Operations
259
260
Create and manage snapshots for seeking to specific points in time.
261
262
```python { .api }
263
def create_snapshot(
264
self,
265
request: Optional[CreateSnapshotRequest] = None,
266
*,
267
name: Optional[str] = None,
268
subscription: Optional[str] = None,
269
**kwargs
270
) -> Snapshot:
271
"""
272
Create a snapshot of a subscription.
273
274
Parameters:
275
- request: The request object for creating a snapshot
276
- name: Snapshot name (e.g., "projects/my-project/snapshots/my-snapshot")
277
- subscription: Subscription to create snapshot from
278
- **kwargs: Additional keyword arguments
279
280
Returns:
281
Created Snapshot object
282
"""
283
284
def get_snapshot(
285
self,
286
request: Optional[GetSnapshotRequest] = None,
287
*,
288
snapshot: Optional[str] = None,
289
**kwargs
290
) -> Snapshot:
291
"""
292
Get a snapshot.
293
294
Parameters:
295
- request: The request object for getting a snapshot
296
- snapshot: Snapshot name to retrieve
297
- **kwargs: Additional keyword arguments
298
299
Returns:
300
Snapshot object
301
"""
302
303
def list_snapshots(
304
self,
305
request: Optional[ListSnapshotsRequest] = None,
306
*,
307
project: Optional[str] = None,
308
**kwargs
309
) -> ListSnapshotsResponse:
310
"""
311
List snapshots in a project.
312
313
Parameters:
314
- request: The request object for listing snapshots
315
- project: Project path (e.g., "projects/my-project")
316
- **kwargs: Additional keyword arguments
317
318
Returns:
319
ListSnapshotsResponse with snapshots
320
"""
321
322
def update_snapshot(
323
self,
324
request: Optional[UpdateSnapshotRequest] = None,
325
*,
326
snapshot: Optional[Snapshot] = None,
327
update_mask: Optional[FieldMask] = None,
328
**kwargs
329
) -> Snapshot:
330
"""
331
Update a snapshot.
332
333
Parameters:
334
- request: The request object for updating a snapshot
335
- snapshot: Updated snapshot configuration
336
- update_mask: Fields to update
337
- **kwargs: Additional keyword arguments
338
339
Returns:
340
Updated Snapshot object
341
"""
342
343
def delete_snapshot(
344
self,
345
request: Optional[DeleteSnapshotRequest] = None,
346
*,
347
snapshot: Optional[str] = None,
348
**kwargs
349
) -> None:
350
"""
351
Delete a snapshot.
352
353
Parameters:
354
- request: The request object for deleting a snapshot
355
- snapshot: Snapshot name to delete
356
- **kwargs: Additional keyword arguments
357
"""
358
359
def seek(
360
self,
361
request: Optional[SeekRequest] = None,
362
*,
363
subscription: Optional[str] = None,
364
**kwargs
365
) -> SeekResponse:
366
"""
367
Seek a subscription to a specific snapshot or time.
368
369
Parameters:
370
- request: The request object for seeking
371
- subscription: Subscription name to seek
372
- **kwargs: Additional keyword arguments
373
374
Returns:
375
SeekResponse indicating seek result
376
"""
377
```
378
379
### Path Helper Methods
380
381
Utility methods for constructing and parsing resource paths.
382
383
```python { .api }
384
@staticmethod
385
def subscription_path(project: str, subscription: str) -> str:
386
"""
387
Construct a subscription path from project ID and subscription name.
388
389
Parameters:
390
- project: Project ID
391
- subscription: Subscription name
392
393
Returns:
394
Full subscription path string
395
"""
396
397
@staticmethod
398
def snapshot_path(project: str, snapshot: str) -> str:
399
"""
400
Construct a snapshot path from project ID and snapshot name.
401
402
Parameters:
403
- project: Project ID
404
- snapshot: Snapshot name
405
406
Returns:
407
Full snapshot path string
408
"""
409
410
@staticmethod
411
def topic_path(project: str, topic: str) -> str:
412
"""
413
Construct a topic path from project ID and topic name.
414
415
Parameters:
416
- project: Project ID
417
- topic: Topic name
418
419
Returns:
420
Full topic path string
421
"""
422
423
@staticmethod
424
def parse_subscription_path(path: str) -> Dict[str, str]:
425
"""
426
Parse a subscription path into its components.
427
428
Parameters:
429
- path: Subscription path string
430
431
Returns:
432
Dictionary with 'project' and 'subscription' keys
433
"""
434
435
@staticmethod
436
def parse_snapshot_path(path: str) -> Dict[str, str]:
437
"""
438
Parse a snapshot path into its components.
439
440
Parameters:
441
- path: Snapshot path string
442
443
Returns:
444
Dictionary with 'project' and 'snapshot' keys
445
"""
446
447
@staticmethod
448
def parse_topic_path(path: str) -> Dict[str, str]:
449
"""
450
Parse a topic path into its components.
451
452
Parameters:
453
- path: Topic path string
454
455
Returns:
456
Dictionary with 'project' and 'topic' keys
457
"""
458
```
459
460
### Client Management
461
462
Control client lifecycle and access underlying components.
463
464
```python { .api }
465
def close(self) -> None:
466
"""
467
Close the subscriber client and stop all subscriptions.
468
"""
469
470
@property
471
def target(self) -> str:
472
"""
473
Get the target endpoint for the client.
474
475
Returns:
476
Target endpoint URL
477
"""
478
479
@property
480
def api(self):
481
"""
482
Get the underlying GAPIC subscriber client.
483
484
Returns:
485
GAPIC SubscriberClient instance
486
"""
487
488
@property
489
def closed(self) -> bool:
490
"""
491
Check if the client is closed.
492
493
Returns:
494
True if client is closed
495
"""
496
497
@property
498
def open_telemetry_enabled(self) -> bool:
499
"""
500
Check if OpenTelemetry tracing is enabled.
501
502
Returns:
503
True if OpenTelemetry is enabled
504
"""
505
```
506
507
### Context Manager Support
508
509
Use SubscriberClient as a context manager for automatic cleanup.
510
511
```python { .api }
512
def __enter__(self) -> "SubscriberClient":
513
"""
514
Enter context manager.
515
516
Returns:
517
Self
518
"""
519
520
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
521
"""
522
Exit context manager and close client.
523
"""
524
```
525
526
### Streaming Pull Future
527
528
Control and monitor streaming pull operations.
529
530
```python { .api }
531
class StreamingPullFuture:
532
def cancel(self) -> bool:
533
"""
534
Cancel the streaming pull operation.
535
536
Returns:
537
True if cancellation was successful
538
"""
539
540
def cancelled(self) -> bool:
541
"""
542
Check if the operation was cancelled.
543
544
Returns:
545
True if operation is cancelled
546
"""
547
548
def running(self) -> bool:
549
"""
550
Check if the operation is currently running.
551
552
Returns:
553
True if operation is running
554
"""
555
556
def result(self, timeout: Optional[float] = None) -> None:
557
"""
558
Wait for the streaming pull to complete.
559
560
Parameters:
561
- timeout: Maximum time to wait in seconds
562
563
Raises:
564
TimeoutError: If timeout is reached
565
"""
566
```
567
568
## Usage Examples
569
570
### Basic Subscription
571
572
```python
573
from google.cloud import pubsub_v1
574
575
# Create subscriber client
576
subscriber = pubsub_v1.SubscriberClient()
577
578
def callback(message):
579
print(f"Received: {message.data.decode('utf-8')}")
580
print(f"Attributes: {message.attributes}")
581
message.ack()
582
583
# Subscribe to messages
584
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
585
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
586
587
print(f"Listening for messages on {subscription_path}...")
588
589
# Keep the main thread running
590
try:
591
streaming_pull_future.result()
592
except KeyboardInterrupt:
593
streaming_pull_future.cancel()
594
subscriber.close()
595
```
596
597
### Custom Flow Control
598
599
```python
600
from google.cloud.pubsub_v1 import types
601
602
# Configure flow control
603
flow_control = types.FlowControl(
604
max_messages=100, # Process up to 100 messages concurrently
605
max_bytes=10 * 1024 * 1024, # 10MB max outstanding bytes
606
max_lease_duration=600 # 10 minute max lease duration
607
)
608
609
subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)
610
```
611
612
### Message Processing with Error Handling
613
614
```python
615
def callback(message):
616
try:
617
# Process the message
618
data = message.data.decode('utf-8')
619
print(f"Processing: {data}")
620
621
# Simulate processing
622
process_message(data)
623
624
# Acknowledge successful processing
625
message.ack()
626
627
except Exception as e:
628
print(f"Error processing message: {e}")
629
# Negative acknowledge to retry later
630
message.nack()
631
632
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
633
```
634
635
### Context Manager Usage
636
637
```python
638
from google.cloud import pubsub_v1
639
640
def callback(message):
641
print(f"Received: {message.data.decode('utf-8')}")
642
message.ack()
643
644
# Use subscriber as context manager
645
with pubsub_v1.SubscriberClient() as subscriber:
646
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
647
648
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
649
650
try:
651
# Listen for messages for 30 seconds
652
streaming_pull_future.result(timeout=30)
653
except Exception:
654
streaming_pull_future.cancel()
655
# Client automatically closed when exiting context
656
```
657
658
### Multiple Subscriptions
659
660
```python
661
import concurrent.futures
662
from google.cloud import pubsub_v1
663
664
subscriber = pubsub_v1.SubscriberClient()
665
666
def callback_a(message):
667
print(f"Subscription A: {message.data.decode('utf-8')}")
668
message.ack()
669
670
def callback_b(message):
671
print(f"Subscription B: {message.data.decode('utf-8')}")
672
message.ack()
673
674
# Subscribe to multiple subscriptions
675
sub_a_path = subscriber.subscription_path("my-project", "subscription-a")
676
sub_b_path = subscriber.subscription_path("my-project", "subscription-b")
677
678
future_a = subscriber.subscribe(sub_a_path, callback=callback_a)
679
future_b = subscriber.subscribe(sub_b_path, callback=callback_b)
680
681
# Wait for any subscription to complete or fail
682
futures = [future_a, future_b]
683
try:
684
concurrent.futures.as_completed(futures, timeout=300)
685
except KeyboardInterrupt:
686
for future in futures:
687
future.cancel()
688
finally:
689
subscriber.close()
690
```
691
692
### Message Deadline Modification
693
694
```python
695
def callback(message):
696
print(f"Processing: {message.data.decode('utf-8')}")
697
698
# Extend deadline if processing takes longer
699
message.modify_ack_deadline(60) # Extend by 60 seconds
700
701
try:
702
# Long processing operation
703
long_running_task(message.data)
704
message.ack()
705
except Exception as e:
706
print(f"Processing failed: {e}")
707
message.nack()
708
```