0
# Long-Running Operations
1
2
Management of Google API long-running operations with polling, cancellation, and both synchronous and asynchronous support. The operations system handles the complexities of tracking operation status, retrieving results, and managing timeouts for operations that may take minutes or hours to complete.
3
4
## Capabilities
5
6
### Synchronous Operations
7
8
Operations that can be polled synchronously until completion with configurable polling intervals and timeouts.
9
10
```python { .api }
11
class Operation:
12
"""
13
A Future for Google API long-running operations.
14
15
Args:
16
operation: Initial operation response from the API
17
refresh (Callable): Function to refresh operation status
18
cancel (Callable): Function to cancel the operation
19
result_type (type, optional): Type to deserialize operation result
20
metadata_type (type, optional): Type to deserialize operation metadata
21
retry (Retry, optional): Retry configuration for polling
22
"""
23
def __init__(self, operation, refresh, cancel, result_type=None, metadata_type=None, retry=None): ...
24
25
def done(self):
26
"""
27
Check if the operation is complete.
28
29
Returns:
30
bool: True if operation is complete, False otherwise
31
"""
32
33
def running(self):
34
"""
35
Check if the operation is currently running.
36
37
Returns:
38
bool: True if operation is running, False otherwise
39
"""
40
41
def result(self, timeout=None):
42
"""
43
Get the result of the operation, blocking until complete.
44
45
Args:
46
timeout (float, optional): Maximum time to wait in seconds
47
48
Returns:
49
Any: The operation result
50
51
Raises:
52
GoogleAPICallError: If operation failed
53
TimeoutError: If timeout exceeded
54
"""
55
56
def exception(self, timeout=None):
57
"""
58
Get the exception from a failed operation.
59
60
Args:
61
timeout (float, optional): Maximum time to wait in seconds
62
63
Returns:
64
Exception or None: Exception if operation failed, None if successful
65
"""
66
67
def add_done_callback(self, callback):
68
"""
69
Add a callback to be executed when operation completes.
70
71
Args:
72
callback (Callable): Function to call when operation completes
73
"""
74
75
def cancel(self):
76
"""
77
Cancel the operation.
78
79
Returns:
80
bool: True if cancellation was successful
81
"""
82
83
def cancelled(self):
84
"""
85
Check if the operation was cancelled.
86
87
Returns:
88
bool: True if operation was cancelled
89
"""
90
91
@property
92
def metadata(self):
93
"""
94
Get operation metadata.
95
96
Returns:
97
Any: Operation metadata if available
98
"""
99
100
@property
101
def name(self):
102
"""
103
Get the operation name/identifier.
104
105
Returns:
106
str: Operation name
107
"""
108
```
109
110
### Asynchronous Operations
111
112
Async versions of operations that integrate with Python's asyncio event loop.
113
114
```python { .api }
115
class AsyncOperation:
116
"""
117
An async Future for Google API long-running operations.
118
119
Args:
120
operation: Initial operation response from the API
121
refresh (Callable): Async function to refresh operation status
122
cancel (Callable): Async function to cancel the operation
123
result_type (type, optional): Type to deserialize operation result
124
metadata_type (type, optional): Type to deserialize operation metadata
125
retry (AsyncRetry, optional): Async retry configuration for polling
126
"""
127
def __init__(self, operation, refresh, cancel, result_type=None, metadata_type=None, retry=None): ...
128
129
async def done(self):
130
"""
131
Check if the operation is complete.
132
133
Returns:
134
bool: True if operation is complete, False otherwise
135
"""
136
137
def running(self):
138
"""
139
Check if the operation is currently running.
140
141
Returns:
142
bool: True if operation is running, False otherwise
143
"""
144
145
async def result(self, timeout=None):
146
"""
147
Get the result of the operation, awaiting until complete.
148
149
Args:
150
timeout (float, optional): Maximum time to wait in seconds
151
152
Returns:
153
Any: The operation result
154
155
Raises:
156
GoogleAPICallError: If operation failed
157
asyncio.TimeoutError: If timeout exceeded
158
"""
159
160
async def exception(self, timeout=None):
161
"""
162
Get the exception from a failed operation.
163
164
Args:
165
timeout (float, optional): Maximum time to wait in seconds
166
167
Returns:
168
Exception or None: Exception if operation failed, None if successful
169
"""
170
171
def add_done_callback(self, callback):
172
"""
173
Add a callback to be executed when operation completes.
174
175
Args:
176
callback (Callable): Function to call when operation completes
177
"""
178
179
async def cancel(self):
180
"""
181
Cancel the operation.
182
183
Returns:
184
bool: True if cancellation was successful
185
"""
186
187
def cancelled(self):
188
"""
189
Check if the operation was cancelled.
190
191
Returns:
192
bool: True if operation was cancelled
193
"""
194
195
@property
196
def metadata(self):
197
"""
198
Get operation metadata.
199
200
Returns:
201
Any: Operation metadata if available
202
"""
203
204
@property
205
def name(self):
206
"""
207
Get the operation name/identifier.
208
209
Returns:
210
str: Operation name
211
"""
212
```
213
214
### Extended Operations
215
216
Enhanced operation handling for APIs with additional metadata and progress tracking.
217
218
```python { .api }
219
class ExtendedOperation:
220
"""
221
Extended operation with enhanced progress tracking and metadata.
222
223
Args:
224
operation: Initial operation response
225
refresh (Callable): Function to refresh operation status
226
cancel (Callable): Function to cancel operation
227
polling_method (Callable, optional): Custom polling implementation
228
result_type (type, optional): Type for operation result
229
metadata_type (type, optional): Type for operation metadata
230
retry (Retry, optional): Retry configuration
231
"""
232
def __init__(self, operation, refresh, cancel, polling_method=None, result_type=None, metadata_type=None, retry=None): ...
233
234
def done(self):
235
"""Check if operation is complete."""
236
237
def result(self, timeout=None):
238
"""Get operation result with timeout."""
239
240
def cancel(self):
241
"""Cancel the operation."""
242
243
@property
244
def progress_percent(self):
245
"""
246
Get operation progress as percentage.
247
248
Returns:
249
float: Progress percentage (0.0 to 100.0)
250
"""
251
252
@property
253
def status_message(self):
254
"""
255
Get current operation status message.
256
257
Returns:
258
str: Human-readable status message
259
"""
260
```
261
262
### Operation Utility Functions
263
264
Helper functions for creating and managing operations.
265
266
```python { .api }
267
def _refresh_http(api_request, operation_name, retry=None):
268
"""
269
Refresh operation status via HTTP request.
270
271
Args:
272
api_request (Callable): HTTP request function
273
operation_name (str): Name/ID of the operation
274
retry (Retry, optional): Retry configuration for requests
275
276
Returns:
277
dict: Updated operation status
278
"""
279
280
def _cancel_http(api_request, operation_name):
281
"""
282
Cancel operation via HTTP request.
283
284
Args:
285
api_request (Callable): HTTP request function
286
operation_name (str): Name/ID of the operation
287
288
Returns:
289
dict: Cancellation response
290
"""
291
```
292
293
## Usage Examples
294
295
### Basic Operation Usage
296
297
```python
298
from google.api_core import operation
299
from google.api_core import retry
300
import requests
301
302
def refresh_operation(op_name):
303
"""Refresh operation status from API."""
304
response = requests.get(f"https://api.example.com/operations/{op_name}")
305
return response.json()
306
307
def cancel_operation(op_name):
308
"""Cancel operation via API."""
309
response = requests.delete(f"https://api.example.com/operations/{op_name}")
310
return response.json()
311
312
# Start a long-running operation
313
initial_response = requests.post("https://api.example.com/long-task",
314
json={"task": "data_processing"})
315
operation_data = initial_response.json()
316
317
# Create operation object
318
op = operation.Operation(
319
operation_data,
320
refresh=lambda: refresh_operation(operation_data["name"]),
321
cancel=lambda: cancel_operation(operation_data["name"]),
322
retry=retry.Retry(initial=1.0, maximum=30.0)
323
)
324
325
# Wait for completion
326
try:
327
result = op.result(timeout=300) # Wait up to 5 minutes
328
print("Operation completed:", result)
329
except Exception as e:
330
print("Operation failed:", e)
331
```
332
333
### Async Operation Usage
334
335
```python
336
import asyncio
337
from google.api_core import operation_async
338
from google.api_core import retry
339
import aiohttp
340
341
async def async_refresh_operation(op_name):
342
"""Async refresh of operation status."""
343
async with aiohttp.ClientSession() as session:
344
async with session.get(f"https://api.example.com/operations/{op_name}") as response:
345
return await response.json()
346
347
async def async_cancel_operation(op_name):
348
"""Async cancel operation."""
349
async with aiohttp.ClientSession() as session:
350
async with session.delete(f"https://api.example.com/operations/{op_name}") as response:
351
return await response.json()
352
353
async def main():
354
# Start async long-running operation
355
async with aiohttp.ClientSession() as session:
356
async with session.post("https://api.example.com/long-task",
357
json={"task": "async_processing"}) as response:
358
operation_data = await response.json()
359
360
# Create async operation
361
async_op = operation_async.AsyncOperation(
362
operation_data,
363
refresh=lambda: async_refresh_operation(operation_data["name"]),
364
cancel=lambda: async_cancel_operation(operation_data["name"]),
365
retry=retry.AsyncRetry(initial=1.0, maximum=30.0)
366
)
367
368
# Await completion
369
try:
370
result = await async_op.result(timeout=300)
371
print("Async operation completed:", result)
372
except Exception as e:
373
print("Async operation failed:", e)
374
375
asyncio.run(main())
376
```
377
378
### Operation with Callbacks
379
380
```python
381
from google.api_core import operation
382
import threading
383
384
def operation_completed_callback(op):
385
"""Callback executed when operation completes."""
386
if op.exception():
387
print(f"Operation {op.name} failed: {op.exception()}")
388
else:
389
print(f"Operation {op.name} succeeded: {op.result()}")
390
391
# Create operation with callback
392
op = operation.Operation(
393
operation_data,
394
refresh=refresh_func,
395
cancel=cancel_func
396
)
397
398
# Add completion callback
399
op.add_done_callback(operation_completed_callback)
400
401
# Start operation in background thread
402
def monitor_operation():
403
try:
404
op.result() # This will block until complete
405
except Exception:
406
pass # Callback will handle the result
407
408
thread = threading.Thread(target=monitor_operation)
409
thread.start()
410
411
# Continue with other work...
412
print("Operation running in background")
413
```
414
415
### Operation Cancellation
416
417
```python
418
from google.api_core import operation
419
import time
420
import threading
421
422
# Start operation
423
op = operation.Operation(
424
operation_data,
425
refresh=refresh_func,
426
cancel=cancel_func
427
)
428
429
# Monitor in background thread
430
def background_monitor():
431
try:
432
result = op.result(timeout=600) # 10 minute timeout
433
print("Operation completed:", result)
434
except Exception as e:
435
print("Operation error:", e)
436
437
thread = threading.Thread(target=background_monitor)
438
thread.start()
439
440
# Cancel after 30 seconds if still running
441
time.sleep(30)
442
if op.running():
443
print("Cancelling long-running operation...")
444
success = op.cancel()
445
if success:
446
print("Operation cancelled successfully")
447
else:
448
print("Failed to cancel operation")
449
450
thread.join()
451
```
452
453
### Extended Operation with Progress
454
455
```python
456
from google.api_core import extended_operation
457
458
# Create extended operation with progress tracking
459
ext_op = extended_operation.ExtendedOperation(
460
operation_data,
461
refresh=refresh_func,
462
cancel=cancel_func
463
)
464
465
# Monitor progress
466
while not ext_op.done():
467
progress = ext_op.progress_percent
468
status = ext_op.status_message
469
print(f"Progress: {progress:.1f}% - {status}")
470
time.sleep(5)
471
472
# Get final result
473
if ext_op.exception():
474
print("Operation failed:", ext_op.exception())
475
else:
476
print("Operation completed:", ext_op.result())
477
```