0
# Publisher Client
1
2
The PublisherClient provides high-level functionality for publishing messages to Google Cloud Pub/Sub topics. It handles automatic batching, flow control, message ordering, retry logic, and OpenTelemetry integration.
3
4
## Capabilities
5
6
### Client Initialization
7
8
Create and configure a PublisherClient with custom batching and flow control settings.
9
10
```python { .api }
11
class PublisherClient:
12
def __init__(
13
self,
14
batch_settings: Optional[BatchSettings] = None,
15
publisher_options: Optional[PublisherOptions] = None,
16
**kwargs
17
):
18
"""
19
Initialize the publisher client.
20
21
Parameters:
22
- batch_settings: Settings for message batching behavior
23
- publisher_options: Options for publisher client behavior
24
- **kwargs: Additional arguments passed to underlying GAPIC client
25
"""
26
27
@classmethod
28
def from_service_account_file(
29
cls,
30
filename: str,
31
**kwargs
32
) -> "PublisherClient":
33
"""
34
Create client from service account file.
35
36
Parameters:
37
- filename: Path to service account JSON file
38
- **kwargs: Additional arguments for client initialization
39
40
Returns:
41
PublisherClient instance
42
"""
43
```
44
45
### Message Publishing
46
47
Publish messages to topics with support for attributes, ordering keys, and futures for result handling.
48
49
```python { .api }
50
def publish(
51
self,
52
topic: str,
53
data: bytes,
54
ordering_key: str = "",
55
retry: OptionalRetry = DEFAULT,
56
timeout: OptionalTimeout = DEFAULT,
57
**attrs: Union[bytes, str]
58
) -> Future:
59
"""
60
Publish a message to a topic.
61
62
Parameters:
63
- topic: Full topic path (e.g., "projects/my-project/topics/my-topic")
64
- data: Message payload as bytes
65
- ordering_key: Optional key for message ordering (default: "")
66
- retry: Retry configuration for the publish operation
67
- timeout: Timeout configuration for the publish operation
68
- **attrs: Message attributes as keyword arguments (values can be bytes or str)
69
70
Returns:
71
Future that resolves to message ID string
72
"""
73
```
74
75
### Message Ordering
76
77
Resume publishing for an ordering key after an error has occurred.
78
79
```python { .api }
80
def resume_publish(self, topic: str, ordering_key: str) -> None:
81
"""
82
Resume publishing for ordering key after error.
83
84
Parameters:
85
- topic: Full topic path
86
- ordering_key: Ordering key to resume
87
"""
88
```
89
90
### Topic Management
91
92
Create, retrieve, update, and delete topics using the underlying GAPIC client methods.
93
94
```python { .api }
95
def create_topic(
96
self,
97
request: Optional[CreateTopicRequest] = None,
98
*,
99
name: Optional[str] = None,
100
**kwargs
101
) -> Topic:
102
"""
103
Create a new topic.
104
105
Parameters:
106
- request: The request object for creating a topic
107
- name: Topic name (e.g., "projects/my-project/topics/my-topic")
108
- **kwargs: Additional keyword arguments
109
110
Returns:
111
Created Topic object
112
"""
113
114
def get_topic(
115
self,
116
request: Optional[GetTopicRequest] = None,
117
*,
118
topic: Optional[str] = None,
119
**kwargs
120
) -> Topic:
121
"""
122
Get a topic.
123
124
Parameters:
125
- request: The request object for getting a topic
126
- topic: Topic name to retrieve
127
- **kwargs: Additional keyword arguments
128
129
Returns:
130
Topic object
131
"""
132
133
def list_topics(
134
self,
135
request: Optional[ListTopicsRequest] = None,
136
*,
137
project: Optional[str] = None,
138
**kwargs
139
) -> ListTopicsResponse:
140
"""
141
List topics in a project.
142
143
Parameters:
144
- request: The request object for listing topics
145
- project: Project path (e.g., "projects/my-project")
146
- **kwargs: Additional keyword arguments
147
148
Returns:
149
ListTopicsResponse with topics
150
"""
151
152
def list_topic_subscriptions(
153
self,
154
request: Optional[ListTopicSubscriptionsRequest] = None,
155
*,
156
topic: Optional[str] = None,
157
**kwargs
158
) -> ListTopicSubscriptionsResponse:
159
"""
160
List subscriptions attached to a topic.
161
162
Parameters:
163
- request: The request object for listing topic subscriptions
164
- topic: Topic name
165
- **kwargs: Additional keyword arguments
166
167
Returns:
168
ListTopicSubscriptionsResponse with subscription names
169
"""
170
171
def list_topic_snapshots(
172
self,
173
request: Optional[ListTopicSnapshotsRequest] = None,
174
*,
175
topic: Optional[str] = None,
176
**kwargs
177
) -> ListTopicSnapshotsResponse:
178
"""
179
List snapshots for a topic.
180
181
Parameters:
182
- request: The request object for listing topic snapshots
183
- topic: Topic name
184
- **kwargs: Additional keyword arguments
185
186
Returns:
187
ListTopicSnapshotsResponse with snapshot names
188
"""
189
190
def update_topic(
191
self,
192
request: Optional[UpdateTopicRequest] = None,
193
*,
194
topic: Optional[Topic] = None,
195
update_mask: Optional[FieldMask] = None,
196
**kwargs
197
) -> Topic:
198
"""
199
Update a topic.
200
201
Parameters:
202
- request: The request object for updating a topic
203
- topic: Updated topic configuration
204
- update_mask: Fields to update
205
- **kwargs: Additional keyword arguments
206
207
Returns:
208
Updated Topic object
209
"""
210
211
def delete_topic(
212
self,
213
request: Optional[DeleteTopicRequest] = None,
214
*,
215
topic: Optional[str] = None,
216
**kwargs
217
) -> None:
218
"""
219
Delete a topic.
220
221
Parameters:
222
- request: The request object for deleting a topic
223
- topic: Topic name to delete
224
- **kwargs: Additional keyword arguments
225
"""
226
```
227
228
### Path Helper Methods
229
230
Utility methods for constructing and parsing resource paths.
231
232
```python { .api }
233
@staticmethod
234
def topic_path(project: str, topic: str) -> str:
235
"""
236
Construct a topic path from project ID and topic name.
237
238
Parameters:
239
- project: Project ID
240
- topic: Topic name
241
242
Returns:
243
Full topic path string
244
"""
245
246
@staticmethod
247
def subscription_path(project: str, subscription: str) -> str:
248
"""
249
Construct a subscription path from project ID and subscription name.
250
251
Parameters:
252
- project: Project ID
253
- subscription: Subscription name
254
255
Returns:
256
Full subscription path string
257
"""
258
259
@staticmethod
260
def schema_path(project: str, schema: str) -> str:
261
"""
262
Construct a schema path from project ID and schema name.
263
264
Parameters:
265
- project: Project ID
266
- schema: Schema name
267
268
Returns:
269
Full schema path string
270
"""
271
272
@staticmethod
273
def parse_topic_path(path: str) -> Dict[str, str]:
274
"""
275
Parse a topic path into its components.
276
277
Parameters:
278
- path: Topic path string
279
280
Returns:
281
Dictionary with 'project' and 'topic' keys
282
"""
283
284
@staticmethod
285
def parse_subscription_path(path: str) -> Dict[str, str]:
286
"""
287
Parse a subscription path into its components.
288
289
Parameters:
290
- path: Subscription path string
291
292
Returns:
293
Dictionary with 'project' and 'subscription' keys
294
"""
295
296
@staticmethod
297
def parse_schema_path(path: str) -> Dict[str, str]:
298
"""
299
Parse a schema path into its components.
300
301
Parameters:
302
- path: Schema path string
303
304
Returns:
305
Dictionary with 'project' and 'schema' keys
306
"""
307
```
308
309
### Client Management
310
311
Control client lifecycle and access underlying components.
312
313
```python { .api }
314
def stop(self) -> None:
315
"""
316
Stop the publisher client and wait for all batches to complete.
317
"""
318
319
@property
320
def target(self) -> str:
321
"""
322
Get the target endpoint for the client.
323
324
Returns:
325
Target endpoint URL
326
"""
327
328
@property
329
def api(self):
330
"""
331
Get the underlying GAPIC publisher client.
332
333
Returns:
334
GAPIC PublisherClient instance
335
"""
336
337
@property
338
def open_telemetry_enabled(self) -> bool:
339
"""
340
Check if OpenTelemetry tracing is enabled.
341
342
Returns:
343
True if OpenTelemetry is enabled
344
"""
345
```
346
347
### Future Objects
348
349
Publisher operations return Future objects for asynchronous result handling.
350
351
```python { .api }
352
class Future:
353
def result(self, timeout: Optional[float] = None) -> str:
354
"""
355
Get the message ID or raise an exception.
356
357
Parameters:
358
- timeout: Number of seconds to wait before timeout
359
360
Returns:
361
Message ID string
362
363
Raises:
364
TimeoutError: If operation times out
365
Exception: For other errors in publish operation
366
"""
367
368
def add_done_callback(
369
self,
370
callback: Callable[["Future"], None]
371
) -> None:
372
"""
373
Add callback to be called when future completes.
374
375
Parameters:
376
- callback: Function to call with future as argument
377
"""
378
379
def cancel(self) -> bool:
380
"""
381
Attempt to cancel the operation.
382
383
Returns:
384
Always False (Pub/Sub operations cannot be cancelled)
385
"""
386
387
def cancelled(self) -> bool:
388
"""
389
Check if operation was cancelled.
390
391
Returns:
392
Always False (Pub/Sub operations cannot be cancelled)
393
"""
394
```
395
396
## Usage Examples
397
398
### Basic Publishing
399
400
```python
401
from google.cloud import pubsub_v1
402
403
# Create publisher client
404
publisher = pubsub_v1.PublisherClient()
405
406
# Publish a message
407
topic_path = publisher.topic_path("my-project", "my-topic")
408
message_data = b"Hello, World!"
409
future = publisher.publish(topic_path, message_data)
410
411
# Get message ID
412
try:
413
message_id = future.result(timeout=30)
414
print(f"Published message with ID: {message_id}")
415
except Exception as e:
416
print(f"Failed to publish: {e}")
417
```
418
419
### Publishing with Attributes
420
421
```python
422
# Publish message with attributes
423
future = publisher.publish(
424
topic_path,
425
b"Message with metadata",
426
event_type="user_action",
427
user_id="12345",
428
timestamp="2024-01-01T00:00:00Z"
429
)
430
```
431
432
### Message Ordering
433
434
```python
435
from google.cloud.pubsub_v1 import types
436
437
# Configure for message ordering
438
publisher_options = types.PublisherOptions(
439
enable_message_ordering=True
440
)
441
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
442
443
# Publish ordered messages
444
for i in range(5):
445
future = publisher.publish(
446
topic_path,
447
f"Message {i}".encode(),
448
ordering_key="user-123"
449
)
450
```
451
452
### Custom Batching
453
454
```python
455
# Configure custom batching
456
batch_settings = types.BatchSettings(
457
max_bytes=500000, # 500KB
458
max_latency=0.05, # 50ms
459
max_messages=50
460
)
461
462
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
463
```
464
465
### Callback Handling
466
467
```python
468
def publish_callback(future):
469
try:
470
message_id = future.result()
471
print(f"Published: {message_id}")
472
except Exception as e:
473
print(f"Publish failed: {e}")
474
475
# Publish with callback
476
future = publisher.publish(topic_path, b"Async message")
477
future.add_done_callback(publish_callback)
478
```