0
# Asynchronous Operations
1
2
Asynchronous client implementations for all operations with full async/await support, enabling high-performance concurrent operations and integration with async Python frameworks like FastAPI, aiohttp, and asyncio-based applications.
3
4
## Capabilities
5
6
### Async Service Management
7
8
Asynchronous versions of all service management operations for non-blocking service lifecycle management.
9
10
```python { .api }
11
class DataprocMetastoreAsyncClient:
12
async def list_services(
13
self,
14
request: Optional[ListServicesRequest] = None,
15
*,
16
parent: Optional[str] = None,
17
retry: OptionalRetry = gapic_v1.method.DEFAULT,
18
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
19
metadata: Sequence[Tuple[str, str]] = ()
20
) -> pagers.ListServicesAsyncPager: ...
21
22
async def get_service(
23
self,
24
request: Optional[GetServiceRequest] = None,
25
*,
26
name: Optional[str] = None,
27
retry: OptionalRetry = gapic_v1.method.DEFAULT,
28
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
29
metadata: Sequence[Tuple[str, str]] = ()
30
) -> Service: ...
31
32
async def create_service(
33
self,
34
request: Optional[CreateServiceRequest] = None,
35
*,
36
parent: Optional[str] = None,
37
service: Optional[Service] = None,
38
service_id: Optional[str] = None,
39
retry: OptionalRetry = gapic_v1.method.DEFAULT,
40
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
41
metadata: Sequence[Tuple[str, str]] = ()
42
) -> operation_async.AsyncOperation: ...
43
44
async def update_service(
45
self,
46
request: Optional[UpdateServiceRequest] = None,
47
*,
48
service: Optional[Service] = None,
49
update_mask: Optional[field_mask_pb2.FieldMask] = None,
50
retry: OptionalRetry = gapic_v1.method.DEFAULT,
51
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
52
metadata: Sequence[Tuple[str, str]] = ()
53
) -> operation_async.AsyncOperation: ...
54
55
async def delete_service(
56
self,
57
request: Optional[DeleteServiceRequest] = None,
58
*,
59
name: Optional[str] = None,
60
retry: OptionalRetry = gapic_v1.method.DEFAULT,
61
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
62
metadata: Sequence[Tuple[str, str]] = ()
63
) -> operation_async.AsyncOperation: ...
64
```
65
66
### Async Backup Operations
67
68
Asynchronous backup and restore operations for non-blocking data protection workflows.
69
70
```python { .api }
71
class DataprocMetastoreAsyncClient:
72
async def list_backups(
73
self,
74
request: Optional[ListBackupsRequest] = None,
75
*,
76
parent: Optional[str] = None,
77
retry: OptionalRetry = gapic_v1.method.DEFAULT,
78
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
79
metadata: Sequence[Tuple[str, str]] = ()
80
) -> pagers.ListBackupsAsyncPager: ...
81
82
async def get_backup(
83
self,
84
request: Optional[GetBackupRequest] = None,
85
*,
86
name: Optional[str] = None,
87
retry: OptionalRetry = gapic_v1.method.DEFAULT,
88
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
89
metadata: Sequence[Tuple[str, str]] = ()
90
) -> Backup: ...
91
92
async def create_backup(
93
self,
94
request: Optional[CreateBackupRequest] = None,
95
*,
96
parent: Optional[str] = None,
97
backup: Optional[Backup] = None,
98
backup_id: Optional[str] = None,
99
retry: OptionalRetry = gapic_v1.method.DEFAULT,
100
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
101
metadata: Sequence[Tuple[str, str]] = ()
102
) -> operation_async.AsyncOperation: ...
103
104
async def delete_backup(
105
self,
106
request: Optional[DeleteBackupRequest] = None,
107
*,
108
name: Optional[str] = None,
109
retry: OptionalRetry = gapic_v1.method.DEFAULT,
110
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
111
metadata: Sequence[Tuple[str, str]] = ()
112
) -> operation_async.AsyncOperation: ...
113
114
async def restore_service(
115
self,
116
request: Optional[RestoreServiceRequest] = None,
117
*,
118
service: Optional[str] = None,
119
backup: Optional[str] = None,
120
retry: OptionalRetry = gapic_v1.method.DEFAULT,
121
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
122
metadata: Sequence[Tuple[str, str]] = ()
123
) -> operation_async.AsyncOperation: ...
124
```
125
126
### Async Metadata Operations
127
128
Asynchronous metadata import, export, and query operations for high-throughput data processing workflows.
129
130
```python { .api }
131
class DataprocMetastoreAsyncClient:
132
async def list_metadata_imports(
133
self,
134
request: Optional[ListMetadataImportsRequest] = None,
135
*,
136
parent: Optional[str] = None,
137
retry: OptionalRetry = gapic_v1.method.DEFAULT,
138
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
139
metadata: Sequence[Tuple[str, str]] = ()
140
) -> pagers.ListMetadataImportsAsyncPager: ...
141
142
async def create_metadata_import(
143
self,
144
request: Optional[CreateMetadataImportRequest] = None,
145
*,
146
parent: Optional[str] = None,
147
metadata_import: Optional[MetadataImport] = None,
148
metadata_import_id: Optional[str] = None,
149
retry: OptionalRetry = gapic_v1.method.DEFAULT,
150
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
151
metadata: Sequence[Tuple[str, str]] = ()
152
) -> operation_async.AsyncOperation: ...
153
154
async def export_metadata(
155
self,
156
request: Optional[ExportMetadataRequest] = None,
157
*,
158
service: Optional[str] = None,
159
retry: OptionalRetry = gapic_v1.method.DEFAULT,
160
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
161
metadata: Sequence[Tuple[str, str]] = ()
162
) -> operation_async.AsyncOperation: ...
163
164
async def query_metadata(
165
self,
166
request: Optional[QueryMetadataRequest] = None,
167
*,
168
service: Optional[str] = None,
169
query: Optional[str] = None,
170
retry: OptionalRetry = gapic_v1.method.DEFAULT,
171
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
172
metadata: Sequence[Tuple[str, str]] = ()
173
) -> operation_async.AsyncOperation: ...
174
```
175
176
### Async Federation Management
177
178
Asynchronous federation operations for managing distributed metastore architectures.
179
180
```python { .api }
181
class DataprocMetastoreFederationAsyncClient:
182
async def list_federations(
183
self,
184
request: Optional[ListFederationsRequest] = None,
185
*,
186
parent: Optional[str] = None,
187
retry: OptionalRetry = gapic_v1.method.DEFAULT,
188
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
189
metadata: Sequence[Tuple[str, str]] = ()
190
) -> pagers.ListFederationsAsyncPager: ...
191
192
async def get_federation(
193
self,
194
request: Optional[GetFederationRequest] = None,
195
*,
196
name: Optional[str] = None,
197
retry: OptionalRetry = gapic_v1.method.DEFAULT,
198
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
199
metadata: Sequence[Tuple[str, str]] = ()
200
) -> Federation: ...
201
202
async def create_federation(
203
self,
204
request: Optional[CreateFederationRequest] = None,
205
*,
206
parent: Optional[str] = None,
207
federation: Optional[Federation] = None,
208
federation_id: Optional[str] = None,
209
retry: OptionalRetry = gapic_v1.method.DEFAULT,
210
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
211
metadata: Sequence[Tuple[str, str]] = ()
212
) -> operation_async.AsyncOperation: ...
213
```
214
215
## Usage Patterns
216
217
### Async Service Management
218
219
```python
220
import asyncio
221
from google.cloud import metastore
222
223
async def manage_multiple_services():
224
"""Manage multiple metastore services concurrently."""
225
async_client = metastore.DataprocMetastoreAsyncClient()
226
parent = "projects/my-project/locations/us-central1"
227
228
# Create multiple services concurrently
229
service_configs = [
230
{
231
"service_id": "dev-metastore",
232
"tier": metastore.Service.Tier.DEVELOPER,
233
"description": "Development environment metastore"
234
},
235
{
236
"service_id": "staging-metastore",
237
"tier": metastore.Service.Tier.ENTERPRISE,
238
"description": "Staging environment metastore"
239
},
240
{
241
"service_id": "prod-metastore",
242
"tier": metastore.Service.Tier.ENTERPRISE,
243
"description": "Production environment metastore"
244
}
245
]
246
247
# Start all service creations concurrently
248
create_tasks = []
249
for config in service_configs:
250
service = metastore.Service(
251
tier=config["tier"],
252
hive_metastore_config=metastore.HiveMetastoreConfig(version="3.1.0")
253
)
254
255
task = async_client.create_service(
256
parent=parent,
257
service_id=config["service_id"],
258
service=service
259
)
260
create_tasks.append(task)
261
262
# Wait for all operations to start
263
operations = await asyncio.gather(*create_tasks)
264
265
# Monitor all operations concurrently
266
async def wait_for_operation(operation):
267
result = await operation.result()
268
return result
269
270
# Wait for all services to be created
271
services = await asyncio.gather(*[wait_for_operation(op) for op in operations])
272
273
for service in services:
274
print(f"Service created: {service.name}")
275
print(f"Endpoint: {service.endpoint_uri}")
276
277
# Run the async function
278
asyncio.run(manage_multiple_services())
279
```
280
281
### Concurrent Backup Operations
282
283
```python
284
import asyncio
285
from typing import List
286
from google.cloud import metastore
287
288
class AsyncBackupManager:
289
def __init__(self):
290
self.async_client = metastore.DataprocMetastoreAsyncClient()
291
292
async def create_backups_for_all_services(self, service_names: List[str]) -> List[str]:
293
"""Create backups for multiple services concurrently."""
294
backup_tasks = []
295
296
for service_name in service_names:
297
backup_config = metastore.Backup(
298
description=f"Automated backup for {service_name}",
299
labels={"type": "automated", "batch": "true"}
300
)
301
302
# Extract service ID for backup naming
303
service_id = service_name.split('/')[-1]
304
backup_id = f"backup-{service_id}-{int(time.time())}"
305
306
task = self.async_client.create_backup(
307
parent=service_name,
308
backup_id=backup_id,
309
backup=backup_config
310
)
311
backup_tasks.append(task)
312
313
# Start all backup operations
314
operations = await asyncio.gather(*backup_tasks)
315
316
# Return operation names for monitoring
317
return [op.name for op in operations]
318
319
async def monitor_backup_operations(self, operation_names: List[str]):
320
"""Monitor multiple backup operations concurrently."""
321
async def check_operation(operation_name: str):
322
# In practice, you would use the operations client
323
# This is a simplified example
324
while True:
325
# Check operation status
326
await asyncio.sleep(30) # Check every 30 seconds
327
# If operation is complete, return result
328
break
329
330
# Monitor all operations concurrently
331
await asyncio.gather(*[check_operation(name) for name in operation_names])
332
```
333
334
### FastAPI Integration
335
336
```python
337
from fastapi import FastAPI, HTTPException
338
from pydantic import BaseModel
339
from google.cloud import metastore
340
import asyncio
341
342
app = FastAPI()
343
344
# Initialize async clients
345
metastore_client = metastore.DataprocMetastoreAsyncClient()
346
federation_client = metastore.DataprocMetastoreFederationAsyncClient()
347
348
class ServiceCreateRequest(BaseModel):
349
project_id: str
350
location: str
351
service_id: str
352
tier: str
353
hive_version: str = "3.1.0"
354
355
class BackupCreateRequest(BaseModel):
356
service_name: str
357
backup_id: str
358
description: str = ""
359
360
@app.post("/services")
361
async def create_service(request: ServiceCreateRequest):
362
"""Create a new metastore service asynchronously."""
363
try:
364
parent = f"projects/{request.project_id}/locations/{request.location}"
365
366
# Map string tier to enum
367
tier_map = {
368
"developer": metastore.Service.Tier.DEVELOPER,
369
"enterprise": metastore.Service.Tier.ENTERPRISE
370
}
371
372
service_config = metastore.Service(
373
tier=tier_map.get(request.tier.lower(), metastore.Service.Tier.DEVELOPER),
374
hive_metastore_config=metastore.HiveMetastoreConfig(
375
version=request.hive_version
376
)
377
)
378
379
operation = await metastore_client.create_service(
380
parent=parent,
381
service_id=request.service_id,
382
service=service_config
383
)
384
385
return {
386
"operation_name": operation.name,
387
"status": "CREATING",
388
"message": f"Service creation started for {request.service_id}"
389
}
390
391
except Exception as e:
392
raise HTTPException(status_code=400, detail=str(e))
393
394
@app.post("/backups")
395
async def create_backup(request: BackupCreateRequest):
396
"""Create a backup asynchronously."""
397
try:
398
backup_config = metastore.Backup(
399
description=request.description or f"API-created backup for {request.service_name}"
400
)
401
402
operation = await metastore_client.create_backup(
403
parent=request.service_name,
404
backup_id=request.backup_id,
405
backup=backup_config
406
)
407
408
return {
409
"operation_name": operation.name,
410
"status": "CREATING",
411
"message": f"Backup creation started: {request.backup_id}"
412
}
413
414
except Exception as e:
415
raise HTTPException(status_code=400, detail=str(e))
416
417
@app.get("/services/{project_id}/{location}")
418
async def list_services(project_id: str, location: str):
419
"""List all services in a location asynchronously."""
420
try:
421
parent = f"projects/{project_id}/locations/{location}"
422
423
services = []
424
async for service in await metastore_client.list_services(parent=parent):
425
services.append({
426
"name": service.name,
427
"state": service.state.name,
428
"tier": service.tier.name,
429
"endpoint_uri": service.endpoint_uri,
430
"create_time": service.create_time.strftime("%Y-%m-%d %H:%M:%S") if service.create_time else None
431
})
432
433
return {"services": services}
434
435
except Exception as e:
436
raise HTTPException(status_code=400, detail=str(e))
437
438
@app.get("/health")
439
async def health_check():
440
"""Health check endpoint that verifies async client connectivity."""
441
try:
442
# Test connectivity by listing locations (lightweight operation)
443
parent = "projects/test-project" # This would fail but tests client initialization
444
return {"status": "healthy", "client": "initialized"}
445
except Exception:
446
return {"status": "healthy", "note": "Client ready for authenticated requests"}
447
```
448
449
### Async Data Pipeline Integration
450
451
```python
452
import asyncio
453
import aiofiles
454
from typing import List, Dict
455
from google.cloud import metastore
456
457
class AsyncMetastorePipeline:
458
def __init__(self, service_name: str):
459
self.client = metastore.DataprocMetastoreAsyncClient()
460
self.service_name = service_name
461
462
async def process_metadata_batch(self, metadata_files: List[str]) -> List[Dict]:
463
"""Process multiple metadata files concurrently."""
464
465
# Create import operations for all files
466
import_tasks = []
467
for i, file_uri in enumerate(metadata_files):
468
import_config = metastore.MetadataImport(
469
description=f"Batch import {i+1} from {file_uri}",
470
database_dump=metastore.MetadataImport.DatabaseDump(
471
gcs_uri=file_uri,
472
database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
473
)
474
)
475
476
task = self.client.create_metadata_import(
477
parent=self.service_name,
478
metadata_import_id=f"batch-import-{i+1:03d}",
479
metadata_import=import_config
480
)
481
import_tasks.append(task)
482
483
# Start all imports concurrently
484
operations = await asyncio.gather(*import_tasks)
485
486
# Monitor progress
487
results = []
488
for operation in operations:
489
try:
490
result = await operation.result()
491
results.append({
492
"name": result.name,
493
"state": result.state.name,
494
"success": True
495
})
496
except Exception as e:
497
results.append({
498
"operation": operation.name,
499
"error": str(e),
500
"success": False
501
})
502
503
return results
504
505
async def concurrent_metadata_queries(self, queries: List[str]) -> List[Dict]:
506
"""Execute multiple metadata queries concurrently."""
507
query_tasks = [
508
self.client.query_metadata(service=self.service_name, query=query)
509
for query in queries
510
]
511
512
responses = await asyncio.gather(*query_tasks, return_exceptions=True)
513
514
results = []
515
for i, response in enumerate(responses):
516
if isinstance(response, Exception):
517
results.append({
518
"query_index": i,
519
"error": str(response),
520
"success": False
521
})
522
else:
523
results.append({
524
"query_index": i,
525
"row_count": response.result_metadata.row_count if hasattr(response, 'result_metadata') else 0,
526
"success": True
527
})
528
529
return results
530
```