Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services
—
Asynchronous client implementations for all operations with full async/await support, enabling high-performance concurrent operations and integration with async Python frameworks like FastAPI, aiohttp, and asyncio-based applications.
Asynchronous versions of all service management operations for non-blocking service lifecycle management.
class DataprocMetastoreAsyncClient:
async def list_services(
self,
request: Optional[ListServicesRequest] = None,
*,
parent: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListServicesAsyncPager: ...
async def get_service(
self,
request: Optional[GetServiceRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Service: ...
async def create_service(
self,
request: Optional[CreateServiceRequest] = None,
*,
parent: Optional[str] = None,
service: Optional[Service] = None,
service_id: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...
async def update_service(
self,
request: Optional[UpdateServiceRequest] = None,
*,
service: Optional[Service] = None,
update_mask: Optional[field_mask_pb2.FieldMask] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...
async def delete_service(
self,
request: Optional[DeleteServiceRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...Asynchronous backup and restore operations for non-blocking data protection workflows.
class DataprocMetastoreAsyncClient:
async def list_backups(
self,
request: Optional[ListBackupsRequest] = None,
*,
parent: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListBackupsAsyncPager: ...
async def get_backup(
self,
request: Optional[GetBackupRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Backup: ...
async def create_backup(
self,
request: Optional[CreateBackupRequest] = None,
*,
parent: Optional[str] = None,
backup: Optional[Backup] = None,
backup_id: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...
async def delete_backup(
self,
request: Optional[DeleteBackupRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...
async def restore_service(
self,
request: Optional[RestoreServiceRequest] = None,
*,
service: Optional[str] = None,
backup: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...Asynchronous metadata import, export, and query operations for high-throughput data processing workflows.
class DataprocMetastoreAsyncClient:
async def list_metadata_imports(
self,
request: Optional[ListMetadataImportsRequest] = None,
*,
parent: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListMetadataImportsAsyncPager: ...
async def create_metadata_import(
self,
request: Optional[CreateMetadataImportRequest] = None,
*,
parent: Optional[str] = None,
metadata_import: Optional[MetadataImport] = None,
metadata_import_id: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...
async def export_metadata(
self,
request: Optional[ExportMetadataRequest] = None,
*,
service: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...
async def query_metadata(
self,
request: Optional[QueryMetadataRequest] = None,
*,
service: Optional[str] = None,
query: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...Asynchronous federation operations for managing distributed metastore architectures.
class DataprocMetastoreFederationAsyncClient:
async def list_federations(
self,
request: Optional[ListFederationsRequest] = None,
*,
parent: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListFederationsAsyncPager: ...
async def get_federation(
self,
request: Optional[GetFederationRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Federation: ...
async def create_federation(
self,
request: Optional[CreateFederationRequest] = None,
*,
parent: Optional[str] = None,
federation: Optional[Federation] = None,
federation_id: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation_async.AsyncOperation: ...import asyncio
from google.cloud import metastore
async def manage_multiple_services():
"""Manage multiple metastore services concurrently."""
async_client = metastore.DataprocMetastoreAsyncClient()
parent = "projects/my-project/locations/us-central1"
# Create multiple services concurrently
service_configs = [
{
"service_id": "dev-metastore",
"tier": metastore.Service.Tier.DEVELOPER,
"description": "Development environment metastore"
},
{
"service_id": "staging-metastore",
"tier": metastore.Service.Tier.ENTERPRISE,
"description": "Staging environment metastore"
},
{
"service_id": "prod-metastore",
"tier": metastore.Service.Tier.ENTERPRISE,
"description": "Production environment metastore"
}
]
# Start all service creations concurrently
create_tasks = []
for config in service_configs:
service = metastore.Service(
tier=config["tier"],
hive_metastore_config=metastore.HiveMetastoreConfig(version="3.1.0")
)
task = async_client.create_service(
parent=parent,
service_id=config["service_id"],
service=service
)
create_tasks.append(task)
# Wait for all operations to start
operations = await asyncio.gather(*create_tasks)
# Monitor all operations concurrently
async def wait_for_operation(operation):
result = await operation.result()
return result
# Wait for all services to be created
services = await asyncio.gather(*[wait_for_operation(op) for op in operations])
for service in services:
print(f"Service created: {service.name}")
print(f"Endpoint: {service.endpoint_uri}")
# Run the async function
asyncio.run(manage_multiple_services())import asyncio
from typing import List
from google.cloud import metastore
class AsyncBackupManager:
def __init__(self):
self.async_client = metastore.DataprocMetastoreAsyncClient()
async def create_backups_for_all_services(self, service_names: List[str]) -> List[str]:
"""Create backups for multiple services concurrently."""
backup_tasks = []
for service_name in service_names:
backup_config = metastore.Backup(
description=f"Automated backup for {service_name}",
labels={"type": "automated", "batch": "true"}
)
# Extract service ID for backup naming
service_id = service_name.split('/')[-1]
backup_id = f"backup-{service_id}-{int(time.time())}"
task = self.async_client.create_backup(
parent=service_name,
backup_id=backup_id,
backup=backup_config
)
backup_tasks.append(task)
# Start all backup operations
operations = await asyncio.gather(*backup_tasks)
# Return operation names for monitoring
return [op.name for op in operations]
async def monitor_backup_operations(self, operation_names: List[str]):
"""Monitor multiple backup operations concurrently."""
async def check_operation(operation_name: str):
# In practice, you would use the operations client
# This is a simplified example
while True:
# Check operation status
await asyncio.sleep(30) # Check every 30 seconds
# If operation is complete, return result
break
# Monitor all operations concurrently
await asyncio.gather(*[check_operation(name) for name in operation_names])from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from google.cloud import metastore
import asyncio
app = FastAPI()
# Initialize async clients
metastore_client = metastore.DataprocMetastoreAsyncClient()
federation_client = metastore.DataprocMetastoreFederationAsyncClient()
class ServiceCreateRequest(BaseModel):
project_id: str
location: str
service_id: str
tier: str
hive_version: str = "3.1.0"
class BackupCreateRequest(BaseModel):
service_name: str
backup_id: str
description: str = ""
@app.post("/services")
async def create_service(request: ServiceCreateRequest):
"""Create a new metastore service asynchronously."""
try:
parent = f"projects/{request.project_id}/locations/{request.location}"
# Map string tier to enum
tier_map = {
"developer": metastore.Service.Tier.DEVELOPER,
"enterprise": metastore.Service.Tier.ENTERPRISE
}
service_config = metastore.Service(
tier=tier_map.get(request.tier.lower(), metastore.Service.Tier.DEVELOPER),
hive_metastore_config=metastore.HiveMetastoreConfig(
version=request.hive_version
)
)
operation = await metastore_client.create_service(
parent=parent,
service_id=request.service_id,
service=service_config
)
return {
"operation_name": operation.name,
"status": "CREATING",
"message": f"Service creation started for {request.service_id}"
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/backups")
async def create_backup(request: BackupCreateRequest):
"""Create a backup asynchronously."""
try:
backup_config = metastore.Backup(
description=request.description or f"API-created backup for {request.service_name}"
)
operation = await metastore_client.create_backup(
parent=request.service_name,
backup_id=request.backup_id,
backup=backup_config
)
return {
"operation_name": operation.name,
"status": "CREATING",
"message": f"Backup creation started: {request.backup_id}"
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/services/{project_id}/{location}")
async def list_services(project_id: str, location: str):
"""List all services in a location asynchronously."""
try:
parent = f"projects/{project_id}/locations/{location}"
services = []
async for service in await metastore_client.list_services(parent=parent):
services.append({
"name": service.name,
"state": service.state.name,
"tier": service.tier.name,
"endpoint_uri": service.endpoint_uri,
"create_time": service.create_time.strftime("%Y-%m-%d %H:%M:%S") if service.create_time else None
})
return {"services": services}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint that verifies async client connectivity."""
try:
# Test connectivity by listing locations (lightweight operation)
parent = "projects/test-project" # This would fail but tests client initialization
return {"status": "healthy", "client": "initialized"}
except Exception:
return {"status": "healthy", "note": "Client ready for authenticated requests"}import asyncio
import aiofiles
from typing import List, Dict
from google.cloud import metastore
class AsyncMetastorePipeline:
def __init__(self, service_name: str):
self.client = metastore.DataprocMetastoreAsyncClient()
self.service_name = service_name
async def process_metadata_batch(self, metadata_files: List[str]) -> List[Dict]:
"""Process multiple metadata files concurrently."""
# Create import operations for all files
import_tasks = []
for i, file_uri in enumerate(metadata_files):
import_config = metastore.MetadataImport(
description=f"Batch import {i+1} from {file_uri}",
database_dump=metastore.MetadataImport.DatabaseDump(
gcs_uri=file_uri,
database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
)
)
task = self.client.create_metadata_import(
parent=self.service_name,
metadata_import_id=f"batch-import-{i+1:03d}",
metadata_import=import_config
)
import_tasks.append(task)
# Start all imports concurrently
operations = await asyncio.gather(*import_tasks)
# Monitor progress
results = []
for operation in operations:
try:
result = await operation.result()
results.append({
"name": result.name,
"state": result.state.name,
"success": True
})
except Exception as e:
results.append({
"operation": operation.name,
"error": str(e),
"success": False
})
return results
async def concurrent_metadata_queries(self, queries: List[str]) -> List[Dict]:
"""Execute multiple metadata queries concurrently."""
query_tasks = [
self.client.query_metadata(service=self.service_name, query=query)
for query in queries
]
responses = await asyncio.gather(*query_tasks, return_exceptions=True)
results = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
results.append({
"query_index": i,
"error": str(response),
"success": False
})
else:
results.append({
"query_index": i,
"row_count": response.result_metadata.row_count if hasattr(response, 'result_metadata') else 0,
"success": True
})
return resultsInstall with Tessl CLI
npx tessl i tessl/pypi-google-cloud-dataproc-metastore