0
# Schedulers and Utilities
1
2
Google Cloud Pub/Sub provides scheduler classes and utility functions to control message processing behavior and resource management in subscriber operations.
3
4
## Capabilities
5
6
### Thread Scheduler
7
8
Custom scheduler for controlling how messages are processed in subscriber operations.
9
10
```python { .api }
11
class ThreadScheduler:
12
"""
13
A thread pool-based scheduler for processing subscriber messages.
14
15
This scheduler manages the execution of message callbacks using a
16
configurable thread pool, allowing control over concurrency and
17
resource usage in message processing.
18
"""
19
20
def __init__(self, executor: Optional[ThreadPoolExecutor] = None):
21
"""
22
Initialize the thread scheduler.
23
24
Parameters:
25
- executor: Optional pre-configured ThreadPoolExecutor.
26
If not provided, a default executor will be created.
27
"""
28
29
def schedule(self, callback: Callable, *args, **kwargs) -> Future:
30
"""
31
Schedule a callback function for execution.
32
33
Parameters:
34
- callback: Function to execute
35
- *args: Positional arguments for the callback
36
- **kwargs: Keyword arguments for the callback
37
38
Returns:
39
Future representing the scheduled execution
40
"""
41
42
def shutdown(self, wait: bool = True) -> None:
43
"""
44
Shutdown the scheduler and stop accepting new tasks.
45
46
Parameters:
47
- wait: Whether to wait for currently executing tasks to complete
48
"""
49
50
@property
51
def executor(self) -> ThreadPoolExecutor:
52
"""
53
Get the underlying thread pool executor.
54
55
Returns:
56
ThreadPoolExecutor instance used by this scheduler
57
"""
58
```
59
60
### Future Classes
61
62
Future objects for handling asynchronous operations in publisher and subscriber.
63
64
```python { .api }
65
class StreamingPullFuture:
66
"""
67
Future object for managing streaming pull operations.
68
69
This future controls the lifecycle of a streaming pull subscription
70
and allows monitoring and cancellation of the operation.
71
"""
72
73
def cancel(self) -> bool:
74
"""
75
Cancel the streaming pull operation.
76
77
Stops the streaming pull and releases associated resources.
78
79
Returns:
80
True if the operation was successfully cancelled
81
"""
82
83
def cancelled(self) -> bool:
84
"""
85
Check if the streaming pull operation was cancelled.
86
87
Returns:
88
True if the operation is cancelled
89
"""
90
91
def running(self) -> bool:
92
"""
93
Check if the streaming pull operation is currently running.
94
95
Returns:
96
True if the operation is active
97
"""
98
99
def result(self, timeout: Optional[float] = None) -> None:
100
"""
101
Wait for the streaming pull operation to complete.
102
103
This method blocks until the streaming pull stops due to
104
cancellation, error, or other termination condition.
105
106
Parameters:
107
- timeout: Maximum time to wait in seconds
108
109
Raises:
110
TimeoutError: If the timeout is exceeded
111
"""
112
113
def add_done_callback(self, callback: Callable[["StreamingPullFuture"], None]) -> None:
114
"""
115
Add a callback to be executed when the future completes.
116
117
Parameters:
118
- callback: Function to call when the future is done
119
"""
120
121
class PublisherFuture:
122
"""
123
Future object for publisher operations that return message IDs.
124
125
This future represents the result of a publish operation and
126
resolves to the server-assigned message ID.
127
"""
128
129
def result(self, timeout: Optional[float] = None) -> str:
130
"""
131
Get the message ID from the publish operation.
132
133
Parameters:
134
- timeout: Maximum time to wait in seconds
135
136
Returns:
137
Server-assigned message ID
138
139
Raises:
140
TimeoutError: If the timeout is exceeded
141
PublishError: If the publish operation failed
142
"""
143
144
def add_done_callback(self, callback: Callable[["PublisherFuture"], None]) -> None:
145
"""
146
Add a callback to be executed when the future completes.
147
148
Parameters:
149
- callback: Function to call with the future as argument
150
"""
151
152
def cancel(self) -> bool:
153
"""
154
Attempt to cancel the publish operation.
155
156
Returns:
157
Always False (Pub/Sub publish operations cannot be cancelled)
158
"""
159
160
def cancelled(self) -> bool:
161
"""
162
Check if the publish operation was cancelled.
163
164
Returns:
165
Always False (Pub/Sub publish operations cannot be cancelled)
166
"""
167
168
class AcknowledgeFuture:
169
"""
170
Future object for acknowledgment operations in exactly-once delivery.
171
172
This future represents the result of an ack/nack operation and
173
resolves to an AcknowledgeStatus indicating the result.
174
"""
175
176
def result(self, timeout: Optional[float] = None) -> AcknowledgeStatus:
177
"""
178
Get the acknowledgment status.
179
180
Parameters:
181
- timeout: Maximum time to wait in seconds
182
183
Returns:
184
AcknowledgeStatus indicating the result
185
186
Raises:
187
TimeoutError: If the timeout is exceeded
188
AcknowledgeError: If the acknowledgment operation failed
189
"""
190
191
def add_done_callback(self, callback: Callable[["AcknowledgeFuture"], None]) -> None:
192
"""
193
Add a callback to be executed when the future completes.
194
195
Parameters:
196
- callback: Function to call with the future as argument
197
"""
198
```
199
200
### Utility Functions
201
202
Helper functions for working with Pub/Sub resources and operations.
203
204
```python { .api }
205
def common_project_path(project: str) -> str:
206
"""
207
Construct a project path string.
208
209
Parameters:
210
- project: Project ID
211
212
Returns:
213
Project path in the format "projects/{project}"
214
"""
215
216
def common_location_path(project: str, location: str) -> str:
217
"""
218
Construct a location path string.
219
220
Parameters:
221
- project: Project ID
222
- location: Location/region name
223
224
Returns:
225
Location path in the format "projects/{project}/locations/{location}"
226
"""
227
```
228
229
## Usage Examples
230
231
### Custom Thread Scheduler
232
233
```python
234
import concurrent.futures
235
from google.cloud import pubsub_v1
236
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
237
238
# Create custom thread pool executor
239
executor = concurrent.futures.ThreadPoolExecutor(
240
max_workers=20,
241
thread_name_prefix="pubsub-callback"
242
)
243
244
# Create scheduler with custom executor
245
scheduler = ThreadScheduler(executor=executor)
246
247
# Create subscriber with custom scheduler
248
subscriber = pubsub_v1.SubscriberClient()
249
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
250
251
def message_callback(message):
252
print(f"Processing message: {message.message_id}")
253
# Simulate processing work
254
time.sleep(1)
255
message.ack()
256
257
# Use custom scheduler in subscription
258
streaming_pull_future = subscriber.subscribe(
259
subscription_path,
260
callback=message_callback,
261
scheduler=scheduler
262
)
263
264
try:
265
# Let it run for 60 seconds
266
streaming_pull_future.result(timeout=60)
267
except KeyboardInterrupt:
268
streaming_pull_future.cancel()
269
finally:
270
# Shutdown scheduler
271
scheduler.shutdown(wait=True)
272
```
273
274
### Managing Streaming Pull Future
275
276
```python
277
from google.cloud import pubsub_v1
278
import threading
279
import time
280
281
subscriber = pubsub_v1.SubscriberClient()
282
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
283
284
def callback(message):
285
print(f"Received: {message.data.decode('utf-8')}")
286
message.ack()
287
288
# Start streaming pull
289
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
290
291
print(f"Streaming pull running: {streaming_pull_future.running()}")
292
print(f"Streaming pull cancelled: {streaming_pull_future.cancelled()}")
293
294
# Add completion callback
295
def on_done(future):
296
print("Streaming pull completed")
297
print(f"Was cancelled: {future.cancelled()}")
298
299
streaming_pull_future.add_done_callback(on_done)
300
301
# Run in background thread to allow monitoring
302
def monitor_future():
303
try:
304
# Wait for streaming pull to complete
305
streaming_pull_future.result()
306
except Exception as e:
307
print(f"Streaming pull error: {e}")
308
309
monitor_thread = threading.Thread(target=monitor_future)
310
monitor_thread.start()
311
312
# Let it run for 30 seconds, then cancel
313
time.sleep(30)
314
print("Cancelling streaming pull...")
315
streaming_pull_future.cancel()
316
317
# Wait for monitor thread to complete
318
monitor_thread.join()
319
```
320
321
### Publisher Future Handling
322
323
```python
324
from google.cloud import pubsub_v1
325
import concurrent.futures
326
327
publisher = pubsub_v1.PublisherClient()
328
topic_path = publisher.topic_path("my-project", "my-topic")
329
330
# Publish multiple messages and collect futures
331
futures = []
332
for i in range(10):
333
future = publisher.publish(topic_path, f"Message {i}".encode())
334
futures.append(future)
335
336
# Add callbacks to futures
337
def on_publish_complete(future):
338
try:
339
message_id = future.result()
340
print(f"Published successfully: {message_id}")
341
except Exception as e:
342
print(f"Publish failed: {e}")
343
344
for future in futures:
345
future.add_done_callback(on_publish_complete)
346
347
# Wait for all futures to complete
348
try:
349
# Use concurrent.futures.as_completed for efficient waiting
350
for future in concurrent.futures.as_completed(futures, timeout=30):
351
message_id = future.result()
352
print(f"Completed: {message_id}")
353
354
except concurrent.futures.TimeoutError:
355
print("Some publishes did not complete within timeout")
356
```
357
358
### Exactly-Once Delivery with Acknowledge Futures
359
360
```python
361
from google.cloud import pubsub_v1
362
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus
363
364
subscriber = pubsub_v1.SubscriberClient()
365
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
366
367
def callback(message):
368
try:
369
# Process the message
370
result = process_message(message.data)
371
372
# Acknowledge with response for exactly-once delivery
373
ack_future = message.ack_with_response()
374
375
# Add callback to handle ack result
376
def on_ack_complete(ack_future):
377
try:
378
ack_status = ack_future.result()
379
if ack_status == AcknowledgeStatus.SUCCESS:
380
print(f"Message {message.message_id} acknowledged successfully")
381
else:
382
print(f"Ack failed with status: {ack_status}")
383
# Handle failed acknowledgment
384
385
except Exception as e:
386
print(f"Ack operation failed: {e}")
387
388
ack_future.add_done_callback(on_ack_complete)
389
390
except Exception as e:
391
print(f"Message processing failed: {e}")
392
# Nack with response
393
nack_future = message.nack_with_response()
394
nack_future.add_done_callback(
395
lambda f: print(f"Nack status: {f.result()}")
396
)
397
398
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
399
```
400
401
### Resource Path Utilities
402
403
```python
404
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
405
406
# Using path construction utilities
407
project_id = "my-project"
408
topic_name = "my-topic"
409
subscription_name = "my-subscription"
410
411
# Construct paths
412
topic_path = PublisherClient.topic_path(project_id, topic_name)
413
subscription_path = SubscriberClient.subscription_path(project_id, subscription_name)
414
project_path = PublisherClient.common_project_path(project_id)
415
416
print(f"Topic path: {topic_path}")
417
print(f"Subscription path: {subscription_path}")
418
print(f"Project path: {project_path}")
419
420
# Parse paths back to components
421
topic_components = PublisherClient.parse_topic_path(topic_path)
422
print(f"Topic components: {topic_components}")
423
# Output: {'project': 'my-project', 'topic': 'my-topic'}
424
425
subscription_components = SubscriberClient.parse_subscription_path(subscription_path)
426
print(f"Subscription components: {subscription_components}")
427
# Output: {'project': 'my-project', 'subscription': 'my-subscription'}
428
```
429
430
### Thread Pool Configuration
431
432
```python
433
import concurrent.futures
434
from google.cloud import pubsub_v1
435
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
436
437
# Configure different thread pools for different workloads
438
def create_scheduler_for_workload(workload_type: str) -> ThreadScheduler:
439
if workload_type == "io_intensive":
440
# More threads for I/O bound work
441
executor = concurrent.futures.ThreadPoolExecutor(
442
max_workers=50,
443
thread_name_prefix=f"pubsub-io"
444
)
445
elif workload_type == "cpu_intensive":
446
# Limit threads for CPU bound work
447
executor = concurrent.futures.ThreadPoolExecutor(
448
max_workers=4, # Match CPU cores
449
thread_name_prefix=f"pubsub-cpu"
450
)
451
else:
452
# Default configuration
453
executor = concurrent.futures.ThreadPoolExecutor(
454
max_workers=10,
455
thread_name_prefix=f"pubsub-default"
456
)
457
458
return ThreadScheduler(executor=executor)
459
460
# Use different schedulers for different subscriptions
461
io_scheduler = create_scheduler_for_workload("io_intensive")
462
cpu_scheduler = create_scheduler_for_workload("cpu_intensive")
463
464
subscriber = pubsub_v1.SubscriberClient()
465
466
# I/O intensive subscription (e.g., API calls, database operations)
467
io_subscription = subscriber.subscription_path("my-project", "io-intensive-sub")
468
io_future = subscriber.subscribe(
469
io_subscription,
470
callback=io_intensive_callback,
471
scheduler=io_scheduler
472
)
473
474
# CPU intensive subscription (e.g., data processing, calculations)
475
cpu_subscription = subscriber.subscription_path("my-project", "cpu-intensive-sub")
476
cpu_future = subscriber.subscribe(
477
cpu_subscription,
478
callback=cpu_intensive_callback,
479
scheduler=cpu_scheduler
480
)
481
```