0
# Configuration
1
2
Google Cloud Pub/Sub provides comprehensive configuration options for controlling client behavior, batching, flow control, retry logic, and timeouts. These configuration types enable fine-tuning for different use cases and performance requirements.
3
4
## Capabilities
5
6
### Batch Settings
7
8
Configure message batching behavior for the publisher client.
9
10
```python { .api }
11
class BatchSettings(NamedTuple):
12
"""
13
Settings for batch publishing messages.
14
15
Attributes:
16
- max_bytes: Maximum total size of messages in a batch (default: 1MB)
17
- max_latency: Maximum time to wait before publishing batch (default: 0.01s)
18
- max_messages: Maximum number of messages in a batch (default: 100)
19
"""
20
21
max_bytes: int = 1000000
22
"""
23
Maximum total size of messages to collect before automatically
24
publishing the batch, including byte size overhead of the publish
25
request. Maximum server-side limit is 10,000,000 bytes.
26
"""
27
28
max_latency: float = 0.01
29
"""
30
Maximum number of seconds to wait for additional messages before
31
automatically publishing the batch.
32
"""
33
34
max_messages: int = 100
35
"""
36
Maximum number of messages to collect before automatically
37
publishing the batch.
38
"""
39
```
40
41
### Publisher Options
42
43
Configure publisher client behavior including message ordering, flow control, and OpenTelemetry.
44
45
```python { .api }
46
class PublisherOptions(NamedTuple):
47
"""
48
Options for the publisher client.
49
50
Attributes:
51
- enable_message_ordering: Whether to order messages by ordering key
52
- flow_control: Flow control settings for message publishing
53
- retry: Retry settings for publish operations
54
- timeout: Timeout settings for publish operations
55
- enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing
56
"""
57
58
enable_message_ordering: bool = False
59
"""
60
Whether to order messages in a batch by a supplied ordering key.
61
"""
62
63
flow_control: PublishFlowControl = PublishFlowControl()
64
"""
65
Flow control settings for message publishing by the client.
66
By default the publisher client does not do any throttling.
67
"""
68
69
retry: OptionalRetry = ...
70
"""
71
Retry settings for message publishing by the client.
72
Should be an instance of google.api_core.retry.Retry.
73
"""
74
75
timeout: OptionalTimeout = ConstantTimeout(60)
76
"""
77
Timeout settings for message publishing by the client.
78
Should be compatible with pubsub_v1.types.TimeoutType.
79
"""
80
81
enable_open_telemetry_tracing: bool = False
82
"""
83
Whether to enable OpenTelemetry tracing for publish operations.
84
"""
85
```
86
87
### Publish Flow Control
88
89
Configure flow control limits for the publisher client to prevent resource exhaustion.
90
91
```python { .api }
92
class PublishFlowControl(NamedTuple):
93
"""
94
Client flow control settings for message publishing.
95
96
Attributes:
97
- message_limit: Maximum number of messages awaiting publication
98
- byte_limit: Maximum total size of messages awaiting publication
99
- limit_exceeded_behavior: Action when flow control limits are exceeded
100
"""
101
102
message_limit: int = 1000
103
"""
104
Maximum number of messages awaiting to be published.
105
"""
106
107
byte_limit: int = 10000000
108
"""
109
Maximum total size of messages awaiting to be published.
110
"""
111
112
limit_exceeded_behavior: LimitExceededBehavior = LimitExceededBehavior.IGNORE
113
"""
114
Action to take when publish flow control limits are exceeded.
115
"""
116
```
117
118
### Limit Exceeded Behavior
119
120
Enum defining actions when flow control limits are exceeded.
121
122
```python { .api }
123
class LimitExceededBehavior(str, Enum):
124
"""
125
Possible actions when exceeding publish flow control limits.
126
"""
127
128
IGNORE = "ignore"
129
"""
130
Ignore flow control limits and continue publishing.
131
"""
132
133
BLOCK = "block"
134
"""
135
Block publishing until flow control limits are no longer exceeded.
136
"""
137
138
ERROR = "error"
139
"""
140
Raise an error when flow control limits are exceeded.
141
"""
142
```
143
144
### Subscriber Flow Control
145
146
Configure flow control limits for the subscriber client to manage message processing load.
147
148
```python { .api }
149
class FlowControl(NamedTuple):
150
"""
151
Settings for controlling the rate at which messages are pulled
152
with an asynchronous subscription.
153
154
Attributes:
155
- max_bytes: Maximum total size of outstanding messages
156
- max_messages: Maximum number of outstanding messages
157
- max_lease_duration: Maximum time to hold a lease on a message
158
- min_duration_per_lease_extension: Minimum lease extension duration
159
- max_duration_per_lease_extension: Maximum lease extension duration
160
"""
161
162
max_bytes: int = 104857600 # 100 MiB
163
"""
164
Maximum total size of received - but not yet processed - messages
165
before pausing the message stream.
166
"""
167
168
max_messages: int = 1000
169
"""
170
Maximum number of received - but not yet processed - messages
171
before pausing the message stream.
172
"""
173
174
max_lease_duration: float = 3600 # 1 hour
175
"""
176
Maximum amount of time in seconds to hold a lease on a message
177
before dropping it from lease management.
178
"""
179
180
min_duration_per_lease_extension: float = 0
181
"""
182
Minimum amount of time in seconds for a single lease extension attempt.
183
Must be between 10 and 600 seconds (inclusive). Ignored by default,
184
but set to 60 seconds if subscription has exactly-once delivery enabled.
185
"""
186
187
max_duration_per_lease_extension: float = 0
188
"""
189
Maximum amount of time in seconds for a single lease extension attempt.
190
Bounds the delay before message redelivery if subscriber fails to extend
191
the deadline. Must be between 10 and 600 seconds (inclusive).
192
Ignored if set to 0.
193
"""
194
```
195
196
### Subscriber Options
197
198
Configure subscriber client behavior including OpenTelemetry tracing.
199
200
```python { .api }
201
class SubscriberOptions(NamedTuple):
202
"""
203
Options for the subscriber client.
204
205
Attributes:
206
- enable_open_telemetry_tracing: Whether to enable OpenTelemetry tracing
207
"""
208
209
enable_open_telemetry_tracing: bool = False
210
"""
211
Whether to enable OpenTelemetry tracing for subscriber operations.
212
"""
213
```
214
215
## Usage Examples
216
217
### Custom Batch Settings
218
219
```python
220
from google.cloud import pubsub_v1
221
from google.cloud.pubsub_v1 import types
222
223
# Configure custom batching
224
batch_settings = types.BatchSettings(
225
max_bytes=500000, # 500KB batches
226
max_latency=0.05, # 50ms max wait
227
max_messages=50 # Max 50 messages per batch
228
)
229
230
# Create publisher with custom batching
231
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
232
```
233
234
### Publisher with Message Ordering
235
236
```python
237
from google.cloud import pubsub_v1
238
from google.cloud.pubsub_v1 import types
239
240
# Configure publisher for message ordering
241
publisher_options = types.PublisherOptions(
242
enable_message_ordering=True,
243
enable_open_telemetry_tracing=True
244
)
245
246
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
247
248
# Publish ordered messages
249
topic_path = publisher.topic_path("my-project", "my-topic")
250
for i in range(10):
251
future = publisher.publish(
252
topic_path,
253
f"Message {i}".encode(),
254
ordering_key="user-123"
255
)
256
```
257
258
### Publisher Flow Control
259
260
```python
261
from google.cloud import pubsub_v1
262
from google.cloud.pubsub_v1 import types
263
264
# Configure publisher flow control
265
flow_control = types.PublishFlowControl(
266
message_limit=500, # Max 500 pending messages
267
byte_limit=5000000, # Max 5MB pending bytes
268
limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK
269
)
270
271
publisher_options = types.PublisherOptions(
272
flow_control=flow_control
273
)
274
275
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
276
```
277
278
### Subscriber Flow Control
279
280
```python
281
from google.cloud import pubsub_v1
282
from google.cloud.pubsub_v1 import types
283
284
# Configure subscriber flow control
285
flow_control = types.FlowControl(
286
max_messages=50, # Process max 50 messages concurrently
287
max_bytes=50 * 1024 * 1024, # Max 50MB outstanding
288
max_lease_duration=300, # 5 minute max lease
289
min_duration_per_lease_extension=60, # Min 60s lease extensions
290
max_duration_per_lease_extension=120 # Max 120s lease extensions
291
)
292
293
subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)
294
```
295
296
### Retry and Timeout Configuration
297
298
```python
299
from google.cloud import pubsub_v1
300
from google.cloud.pubsub_v1 import types
301
from google.api_core import retry
302
from google.api_core.timeout import ConstantTimeout
303
304
# Configure custom retry policy
305
custom_retry = retry.Retry(
306
initial=1.0, # Initial delay
307
maximum=60.0, # Maximum delay
308
multiplier=2.0, # Backoff multiplier
309
deadline=300.0 # Total deadline
310
)
311
312
# Configure custom timeout
313
custom_timeout = ConstantTimeout(120)
314
315
publisher_options = types.PublisherOptions(
316
retry=custom_retry,
317
timeout=custom_timeout
318
)
319
320
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
321
```
322
323
### High-Throughput Configuration
324
325
```python
326
from google.cloud import pubsub_v1
327
from google.cloud.pubsub_v1 import types
328
329
# High-throughput publisher configuration
330
batch_settings = types.BatchSettings(
331
max_bytes=5000000, # 5MB batches
332
max_latency=0.1, # 100ms max wait
333
max_messages=1000 # Large batches
334
)
335
336
flow_control = types.PublishFlowControl(
337
message_limit=10000, # High message limit
338
byte_limit=100000000, # 100MB limit
339
limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK
340
)
341
342
publisher_options = types.PublisherOptions(
343
flow_control=flow_control
344
)
345
346
publisher = pubsub_v1.PublisherClient(
347
batch_settings=batch_settings,
348
publisher_options=publisher_options
349
)
350
351
# High-throughput subscriber configuration
352
subscriber_flow_control = types.FlowControl(
353
max_messages=1000, # High concurrency
354
max_bytes=200 * 1024 * 1024, # 200MB outstanding
355
max_lease_duration=1800 # 30 minute max lease
356
)
357
358
subscriber = pubsub_v1.SubscriberClient(flow_control=subscriber_flow_control)
359
```
360
361
### Low-Latency Configuration
362
363
```python
364
from google.cloud import pubsub_v1
365
from google.cloud.pubsub_v1 import types
366
367
# Low-latency publisher configuration
368
batch_settings = types.BatchSettings(
369
max_bytes=100000, # Small batches for low latency
370
max_latency=0.001, # 1ms max wait
371
max_messages=10 # Small batches
372
)
373
374
publisher = pubsub_v1.PublisherClient(batch_settings=batch_settings)
375
376
# Low-latency subscriber configuration
377
flow_control = types.FlowControl(
378
max_messages=10, # Low concurrency for predictable latency
379
max_bytes=1024 * 1024, # 1MB outstanding
380
max_lease_duration=60 # Short lease duration
381
)
382
383
subscriber = pubsub_v1.SubscriberClient(flow_control=flow_control)
384
```