or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbackup-restore.mdfederation-management.mdindex.mdmetadata-import-export.mdmetadata-query.mdservice-management.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Asynchronous client implementations for all operations with full async/await support, enabling high-performance concurrent operations and integration with async Python frameworks like FastAPI, aiohttp, and asyncio-based applications.

3

4

## Capabilities

5

6

### Async Service Management

7

8

Asynchronous versions of all service management operations for non-blocking service lifecycle management.

9

10

```python { .api }

11

class DataprocMetastoreAsyncClient:

12

async def list_services(

13

self,

14

request: Optional[ListServicesRequest] = None,

15

*,

16

parent: Optional[str] = None,

17

retry: OptionalRetry = gapic_v1.method.DEFAULT,

18

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

19

metadata: Sequence[Tuple[str, str]] = ()

20

) -> pagers.ListServicesAsyncPager: ...

21

22

async def get_service(

23

self,

24

request: Optional[GetServiceRequest] = None,

25

*,

26

name: Optional[str] = None,

27

retry: OptionalRetry = gapic_v1.method.DEFAULT,

28

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

29

metadata: Sequence[Tuple[str, str]] = ()

30

) -> Service: ...

31

32

async def create_service(

33

self,

34

request: Optional[CreateServiceRequest] = None,

35

*,

36

parent: Optional[str] = None,

37

service: Optional[Service] = None,

38

service_id: Optional[str] = None,

39

retry: OptionalRetry = gapic_v1.method.DEFAULT,

40

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

41

metadata: Sequence[Tuple[str, str]] = ()

42

) -> operation_async.AsyncOperation: ...

43

44

async def update_service(

45

self,

46

request: Optional[UpdateServiceRequest] = None,

47

*,

48

service: Optional[Service] = None,

49

update_mask: Optional[field_mask_pb2.FieldMask] = None,

50

retry: OptionalRetry = gapic_v1.method.DEFAULT,

51

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

52

metadata: Sequence[Tuple[str, str]] = ()

53

) -> operation_async.AsyncOperation: ...

54

55

async def delete_service(

56

self,

57

request: Optional[DeleteServiceRequest] = None,

58

*,

59

name: Optional[str] = None,

60

retry: OptionalRetry = gapic_v1.method.DEFAULT,

61

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

62

metadata: Sequence[Tuple[str, str]] = ()

63

) -> operation_async.AsyncOperation: ...

64

```

65

66

### Async Backup Operations

67

68

Asynchronous backup and restore operations for non-blocking data protection workflows.

69

70

```python { .api }

71

class DataprocMetastoreAsyncClient:

72

async def list_backups(

73

self,

74

request: Optional[ListBackupsRequest] = None,

75

*,

76

parent: Optional[str] = None,

77

retry: OptionalRetry = gapic_v1.method.DEFAULT,

78

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

79

metadata: Sequence[Tuple[str, str]] = ()

80

) -> pagers.ListBackupsAsyncPager: ...

81

82

async def get_backup(

83

self,

84

request: Optional[GetBackupRequest] = None,

85

*,

86

name: Optional[str] = None,

87

retry: OptionalRetry = gapic_v1.method.DEFAULT,

88

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

89

metadata: Sequence[Tuple[str, str]] = ()

90

) -> Backup: ...

91

92

async def create_backup(

93

self,

94

request: Optional[CreateBackupRequest] = None,

95

*,

96

parent: Optional[str] = None,

97

backup: Optional[Backup] = None,

98

backup_id: Optional[str] = None,

99

retry: OptionalRetry = gapic_v1.method.DEFAULT,

100

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

101

metadata: Sequence[Tuple[str, str]] = ()

102

) -> operation_async.AsyncOperation: ...

103

104

async def delete_backup(

105

self,

106

request: Optional[DeleteBackupRequest] = None,

107

*,

108

name: Optional[str] = None,

109

retry: OptionalRetry = gapic_v1.method.DEFAULT,

110

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

111

metadata: Sequence[Tuple[str, str]] = ()

112

) -> operation_async.AsyncOperation: ...

113

114

async def restore_service(

115

self,

116

request: Optional[RestoreServiceRequest] = None,

117

*,

118

service: Optional[str] = None,

119

backup: Optional[str] = None,

120

retry: OptionalRetry = gapic_v1.method.DEFAULT,

121

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

122

metadata: Sequence[Tuple[str, str]] = ()

123

) -> operation_async.AsyncOperation: ...

124

```

125

126

### Async Metadata Operations

127

128

Asynchronous metadata import, export, and query operations for high-throughput data processing workflows.

129

130

```python { .api }

131

class DataprocMetastoreAsyncClient:

132

async def list_metadata_imports(

133

self,

134

request: Optional[ListMetadataImportsRequest] = None,

135

*,

136

parent: Optional[str] = None,

137

retry: OptionalRetry = gapic_v1.method.DEFAULT,

138

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

139

metadata: Sequence[Tuple[str, str]] = ()

140

) -> pagers.ListMetadataImportsAsyncPager: ...

141

142

async def create_metadata_import(

143

self,

144

request: Optional[CreateMetadataImportRequest] = None,

145

*,

146

parent: Optional[str] = None,

147

metadata_import: Optional[MetadataImport] = None,

148

metadata_import_id: Optional[str] = None,

149

retry: OptionalRetry = gapic_v1.method.DEFAULT,

150

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

151

metadata: Sequence[Tuple[str, str]] = ()

152

) -> operation_async.AsyncOperation: ...

153

154

async def export_metadata(

155

self,

156

request: Optional[ExportMetadataRequest] = None,

157

*,

158

service: Optional[str] = None,

159

retry: OptionalRetry = gapic_v1.method.DEFAULT,

160

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

161

metadata: Sequence[Tuple[str, str]] = ()

162

) -> operation_async.AsyncOperation: ...

163

164

async def query_metadata(

165

self,

166

request: Optional[QueryMetadataRequest] = None,

167

*,

168

service: Optional[str] = None,

169

query: Optional[str] = None,

170

retry: OptionalRetry = gapic_v1.method.DEFAULT,

171

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

172

metadata: Sequence[Tuple[str, str]] = ()

173

) -> operation_async.AsyncOperation: ...

174

```

175

176

### Async Federation Management

177

178

Asynchronous federation operations for managing distributed metastore architectures.

179

180

```python { .api }

181

class DataprocMetastoreFederationAsyncClient:

182

async def list_federations(

183

self,

184

request: Optional[ListFederationsRequest] = None,

185

*,

186

parent: Optional[str] = None,

187

retry: OptionalRetry = gapic_v1.method.DEFAULT,

188

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

189

metadata: Sequence[Tuple[str, str]] = ()

190

) -> pagers.ListFederationsAsyncPager: ...

191

192

async def get_federation(

193

self,

194

request: Optional[GetFederationRequest] = None,

195

*,

196

name: Optional[str] = None,

197

retry: OptionalRetry = gapic_v1.method.DEFAULT,

198

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

199

metadata: Sequence[Tuple[str, str]] = ()

200

) -> Federation: ...

201

202

async def create_federation(

203

self,

204

request: Optional[CreateFederationRequest] = None,

205

*,

206

parent: Optional[str] = None,

207

federation: Optional[Federation] = None,

208

federation_id: Optional[str] = None,

209

retry: OptionalRetry = gapic_v1.method.DEFAULT,

210

timeout: Union[float, object] = gapic_v1.method.DEFAULT,

211

metadata: Sequence[Tuple[str, str]] = ()

212

) -> operation_async.AsyncOperation: ...

213

```

214

215

## Usage Patterns

216

217

### Async Service Management

218

219

```python

220

import asyncio

221

from google.cloud import metastore

222

223

async def manage_multiple_services():

224

"""Manage multiple metastore services concurrently."""

225

async_client = metastore.DataprocMetastoreAsyncClient()

226

parent = "projects/my-project/locations/us-central1"

227

228

# Create multiple services concurrently

229

service_configs = [

230

{

231

"service_id": "dev-metastore",

232

"tier": metastore.Service.Tier.DEVELOPER,

233

"description": "Development environment metastore"

234

},

235

{

236

"service_id": "staging-metastore",

237

"tier": metastore.Service.Tier.ENTERPRISE,

238

"description": "Staging environment metastore"

239

},

240

{

241

"service_id": "prod-metastore",

242

"tier": metastore.Service.Tier.ENTERPRISE,

243

"description": "Production environment metastore"

244

}

245

]

246

247

# Start all service creations concurrently

248

create_tasks = []

249

for config in service_configs:

250

service = metastore.Service(

251

tier=config["tier"],

252

hive_metastore_config=metastore.HiveMetastoreConfig(version="3.1.0")

253

)

254

255

task = async_client.create_service(

256

parent=parent,

257

service_id=config["service_id"],

258

service=service

259

)

260

create_tasks.append(task)

261

262

# Wait for all operations to start

263

operations = await asyncio.gather(*create_tasks)

264

265

# Monitor all operations concurrently

266

async def wait_for_operation(operation):

267

result = await operation.result()

268

return result

269

270

# Wait for all services to be created

271

services = await asyncio.gather(*[wait_for_operation(op) for op in operations])

272

273

for service in services:

274

print(f"Service created: {service.name}")

275

print(f"Endpoint: {service.endpoint_uri}")

276

277

# Run the async function

278

asyncio.run(manage_multiple_services())

279

```

280

281

### Concurrent Backup Operations

282

283

```python

284

import asyncio

285

from typing import List

286

from google.cloud import metastore

287

288

class AsyncBackupManager:

289

def __init__(self):

290

self.async_client = metastore.DataprocMetastoreAsyncClient()

291

292

async def create_backups_for_all_services(self, service_names: List[str]) -> List[str]:

293

"""Create backups for multiple services concurrently."""

294

backup_tasks = []

295

296

for service_name in service_names:

297

backup_config = metastore.Backup(

298

description=f"Automated backup for {service_name}",

299

labels={"type": "automated", "batch": "true"}

300

)

301

302

# Extract service ID for backup naming

303

service_id = service_name.split('/')[-1]

304

backup_id = f"backup-{service_id}-{int(time.time())}"

305

306

task = self.async_client.create_backup(

307

parent=service_name,

308

backup_id=backup_id,

309

backup=backup_config

310

)

311

backup_tasks.append(task)

312

313

# Start all backup operations

314

operations = await asyncio.gather(*backup_tasks)

315

316

# Return operation names for monitoring

317

return [op.name for op in operations]

318

319

async def monitor_backup_operations(self, operation_names: List[str]):

320

"""Monitor multiple backup operations concurrently."""

321

async def check_operation(operation_name: str):

322

# In practice, you would use the operations client

323

# This is a simplified example

324

while True:

325

# Check operation status

326

await asyncio.sleep(30) # Check every 30 seconds

327

# If operation is complete, return result

328

break

329

330

# Monitor all operations concurrently

331

await asyncio.gather(*[check_operation(name) for name in operation_names])

332

```

333

334

### FastAPI Integration

335

336

```python

337

from fastapi import FastAPI, HTTPException

338

from pydantic import BaseModel

339

from google.cloud import metastore

340

import asyncio

341

342

app = FastAPI()

343

344

# Initialize async clients

345

metastore_client = metastore.DataprocMetastoreAsyncClient()

346

federation_client = metastore.DataprocMetastoreFederationAsyncClient()

347

348

class ServiceCreateRequest(BaseModel):

349

project_id: str

350

location: str

351

service_id: str

352

tier: str

353

hive_version: str = "3.1.0"

354

355

class BackupCreateRequest(BaseModel):

356

service_name: str

357

backup_id: str

358

description: str = ""

359

360

@app.post("/services")

361

async def create_service(request: ServiceCreateRequest):

362

"""Create a new metastore service asynchronously."""

363

try:

364

parent = f"projects/{request.project_id}/locations/{request.location}"

365

366

# Map string tier to enum

367

tier_map = {

368

"developer": metastore.Service.Tier.DEVELOPER,

369

"enterprise": metastore.Service.Tier.ENTERPRISE

370

}

371

372

service_config = metastore.Service(

373

tier=tier_map.get(request.tier.lower(), metastore.Service.Tier.DEVELOPER),

374

hive_metastore_config=metastore.HiveMetastoreConfig(

375

version=request.hive_version

376

)

377

)

378

379

operation = await metastore_client.create_service(

380

parent=parent,

381

service_id=request.service_id,

382

service=service_config

383

)

384

385

return {

386

"operation_name": operation.name,

387

"status": "CREATING",

388

"message": f"Service creation started for {request.service_id}"

389

}

390

391

except Exception as e:

392

raise HTTPException(status_code=400, detail=str(e))

393

394

@app.post("/backups")

395

async def create_backup(request: BackupCreateRequest):

396

"""Create a backup asynchronously."""

397

try:

398

backup_config = metastore.Backup(

399

description=request.description or f"API-created backup for {request.service_name}"

400

)

401

402

operation = await metastore_client.create_backup(

403

parent=request.service_name,

404

backup_id=request.backup_id,

405

backup=backup_config

406

)

407

408

return {

409

"operation_name": operation.name,

410

"status": "CREATING",

411

"message": f"Backup creation started: {request.backup_id}"

412

}

413

414

except Exception as e:

415

raise HTTPException(status_code=400, detail=str(e))

416

417

@app.get("/services/{project_id}/{location}")

418

async def list_services(project_id: str, location: str):

419

"""List all services in a location asynchronously."""

420

try:

421

parent = f"projects/{project_id}/locations/{location}"

422

423

services = []

424

async for service in await metastore_client.list_services(parent=parent):

425

services.append({

426

"name": service.name,

427

"state": service.state.name,

428

"tier": service.tier.name,

429

"endpoint_uri": service.endpoint_uri,

430

"create_time": service.create_time.strftime("%Y-%m-%d %H:%M:%S") if service.create_time else None

431

})

432

433

return {"services": services}

434

435

except Exception as e:

436

raise HTTPException(status_code=400, detail=str(e))

437

438

@app.get("/health")

439

async def health_check():

440

"""Health check endpoint that verifies async client connectivity."""

441

try:

442

# Test connectivity by listing locations (lightweight operation)

443

parent = "projects/test-project" # This would fail but tests client initialization

444

return {"status": "healthy", "client": "initialized"}

445

except Exception:

446

return {"status": "healthy", "note": "Client ready for authenticated requests"}

447

```

448

449

### Async Data Pipeline Integration

450

451

```python

452

import asyncio

453

import aiofiles

454

from typing import List, Dict

455

from google.cloud import metastore

456

457

class AsyncMetastorePipeline:

458

def __init__(self, service_name: str):

459

self.client = metastore.DataprocMetastoreAsyncClient()

460

self.service_name = service_name

461

462

async def process_metadata_batch(self, metadata_files: List[str]) -> List[Dict]:

463

"""Process multiple metadata files concurrently."""

464

465

# Create import operations for all files

466

import_tasks = []

467

for i, file_uri in enumerate(metadata_files):

468

import_config = metastore.MetadataImport(

469

description=f"Batch import {i+1} from {file_uri}",

470

database_dump=metastore.MetadataImport.DatabaseDump(

471

gcs_uri=file_uri,

472

database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL

473

)

474

)

475

476

task = self.client.create_metadata_import(

477

parent=self.service_name,

478

metadata_import_id=f"batch-import-{i+1:03d}",

479

metadata_import=import_config

480

)

481

import_tasks.append(task)

482

483

# Start all imports concurrently

484

operations = await asyncio.gather(*import_tasks)

485

486

# Monitor progress

487

results = []

488

for operation in operations:

489

try:

490

result = await operation.result()

491

results.append({

492

"name": result.name,

493

"state": result.state.name,

494

"success": True

495

})

496

except Exception as e:

497

results.append({

498

"operation": operation.name,

499

"error": str(e),

500

"success": False

501

})

502

503

return results

504

505

async def concurrent_metadata_queries(self, queries: List[str]) -> List[Dict]:

506

"""Execute multiple metadata queries concurrently."""

507

query_tasks = [

508

self.client.query_metadata(service=self.service_name, query=query)

509

for query in queries

510

]

511

512

responses = await asyncio.gather(*query_tasks, return_exceptions=True)

513

514

results = []

515

for i, response in enumerate(responses):

516

if isinstance(response, Exception):

517

results.append({

518

"query_index": i,

519

"error": str(response),

520

"success": False

521

})

522

else:

523

results.append({

524

"query_index": i,

525

"row_count": response.result_metadata.row_count if hasattr(response, 'result_metadata') else 0,

526

"success": True

527

})

528

529

return results

530

```