CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-dataproc-metastore

Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services

Pending
Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Asynchronous client implementations for all operations with full async/await support, enabling high-performance concurrent operations and integration with async Python frameworks like FastAPI, aiohttp, and asyncio-based applications.

Capabilities

Async Service Management

Asynchronous versions of all service management operations for non-blocking service lifecycle management.

class DataprocMetastoreAsyncClient:
    async def list_services(
        self,
        request: Optional[ListServicesRequest] = None,
        *,
        parent: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> pagers.ListServicesAsyncPager: ...

    async def get_service(
        self,
        request: Optional[GetServiceRequest] = None,
        *,
        name: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> Service: ...

    async def create_service(
        self,
        request: Optional[CreateServiceRequest] = None,
        *,
        parent: Optional[str] = None,
        service: Optional[Service] = None,
        service_id: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

    async def update_service(
        self,
        request: Optional[UpdateServiceRequest] = None,
        *,
        service: Optional[Service] = None,
        update_mask: Optional[field_mask_pb2.FieldMask] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

    async def delete_service(
        self,
        request: Optional[DeleteServiceRequest] = None,
        *,
        name: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

Async Backup Operations

Asynchronous backup and restore operations for non-blocking data protection workflows.

class DataprocMetastoreAsyncClient:
    async def list_backups(
        self,
        request: Optional[ListBackupsRequest] = None,
        *,
        parent: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> pagers.ListBackupsAsyncPager: ...

    async def get_backup(
        self,
        request: Optional[GetBackupRequest] = None,
        *,
        name: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> Backup: ...

    async def create_backup(
        self,
        request: Optional[CreateBackupRequest] = None,
        *,
        parent: Optional[str] = None,
        backup: Optional[Backup] = None,
        backup_id: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

    async def delete_backup(
        self,
        request: Optional[DeleteBackupRequest] = None,
        *,
        name: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

    async def restore_service(
        self,
        request: Optional[RestoreServiceRequest] = None,
        *,
        service: Optional[str] = None,
        backup: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

Async Metadata Operations

Asynchronous metadata import, export, and query operations for high-throughput data processing workflows.

class DataprocMetastoreAsyncClient:
    async def list_metadata_imports(
        self,
        request: Optional[ListMetadataImportsRequest] = None,
        *,
        parent: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> pagers.ListMetadataImportsAsyncPager: ...

    async def create_metadata_import(
        self,
        request: Optional[CreateMetadataImportRequest] = None,
        *,
        parent: Optional[str] = None,
        metadata_import: Optional[MetadataImport] = None,
        metadata_import_id: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

    async def export_metadata(
        self,
        request: Optional[ExportMetadataRequest] = None,
        *,
        service: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

    async def query_metadata(
        self,
        request: Optional[QueryMetadataRequest] = None,
        *,
        service: Optional[str] = None,
        query: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

Async Federation Management

Asynchronous federation operations for managing distributed metastore architectures.

class DataprocMetastoreFederationAsyncClient:
    async def list_federations(
        self,
        request: Optional[ListFederationsRequest] = None,
        *,
        parent: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> pagers.ListFederationsAsyncPager: ...

    async def get_federation(
        self,
        request: Optional[GetFederationRequest] = None,
        *,
        name: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> Federation: ...

    async def create_federation(
        self,
        request: Optional[CreateFederationRequest] = None,
        *,
        parent: Optional[str] = None,
        federation: Optional[Federation] = None,
        federation_id: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = ()
    ) -> operation_async.AsyncOperation: ...

Usage Patterns

Async Service Management

import asyncio
from google.cloud import metastore

async def manage_multiple_services():
    """Manage multiple metastore services concurrently."""
    async_client = metastore.DataprocMetastoreAsyncClient()
    parent = "projects/my-project/locations/us-central1"
    
    # Create multiple services concurrently
    service_configs = [
        {
            "service_id": "dev-metastore",
            "tier": metastore.Service.Tier.DEVELOPER,
            "description": "Development environment metastore"
        },
        {
            "service_id": "staging-metastore", 
            "tier": metastore.Service.Tier.ENTERPRISE,
            "description": "Staging environment metastore"
        },
        {
            "service_id": "prod-metastore",
            "tier": metastore.Service.Tier.ENTERPRISE,
            "description": "Production environment metastore"
        }
    ]
    
    # Start all service creations concurrently
    create_tasks = []
    for config in service_configs:
        service = metastore.Service(
            tier=config["tier"],
            hive_metastore_config=metastore.HiveMetastoreConfig(version="3.1.0")
        )
        
        task = async_client.create_service(
            parent=parent,
            service_id=config["service_id"],
            service=service
        )
        create_tasks.append(task)
    
    # Wait for all operations to start
    operations = await asyncio.gather(*create_tasks)
    
    # Monitor all operations concurrently
    async def wait_for_operation(operation):
        result = await operation.result()
        return result
    
    # Wait for all services to be created
    services = await asyncio.gather(*[wait_for_operation(op) for op in operations])
    
    for service in services:
        print(f"Service created: {service.name}")
        print(f"Endpoint: {service.endpoint_uri}")

# Run the async function
asyncio.run(manage_multiple_services())

Concurrent Backup Operations

import asyncio
from typing import List
from google.cloud import metastore

class AsyncBackupManager:
    def __init__(self):
        self.async_client = metastore.DataprocMetastoreAsyncClient()
    
    async def create_backups_for_all_services(self, service_names: List[str]) -> List[str]:
        """Create backups for multiple services concurrently."""
        backup_tasks = []
        
        for service_name in service_names:
            backup_config = metastore.Backup(
                description=f"Automated backup for {service_name}",
                labels={"type": "automated", "batch": "true"}
            )
            
            # Extract service ID for backup naming
            service_id = service_name.split('/')[-1]
            backup_id = f"backup-{service_id}-{int(time.time())}"
            
            task = self.async_client.create_backup(
                parent=service_name,
                backup_id=backup_id,
                backup=backup_config
            )
            backup_tasks.append(task)
        
        # Start all backup operations
        operations = await asyncio.gather(*backup_tasks)
        
        # Return operation names for monitoring
        return [op.name for op in operations]
    
    async def monitor_backup_operations(self, operation_names: List[str]):
        """Monitor multiple backup operations concurrently."""
        async def check_operation(operation_name: str):
            # In practice, you would use the operations client
            # This is a simplified example
            while True:
                # Check operation status
                await asyncio.sleep(30)  # Check every 30 seconds
                # If operation is complete, return result
                break
        
        # Monitor all operations concurrently
        await asyncio.gather(*[check_operation(name) for name in operation_names])

FastAPI Integration

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from google.cloud import metastore
import asyncio

app = FastAPI()

# Initialize async clients
metastore_client = metastore.DataprocMetastoreAsyncClient()
federation_client = metastore.DataprocMetastoreFederationAsyncClient()

class ServiceCreateRequest(BaseModel):
    project_id: str
    location: str
    service_id: str
    tier: str
    hive_version: str = "3.1.0"

class BackupCreateRequest(BaseModel):
    service_name: str
    backup_id: str
    description: str = ""

@app.post("/services")
async def create_service(request: ServiceCreateRequest):
    """Create a new metastore service asynchronously."""
    try:
        parent = f"projects/{request.project_id}/locations/{request.location}"
        
        # Map string tier to enum
        tier_map = {
            "developer": metastore.Service.Tier.DEVELOPER,
            "enterprise": metastore.Service.Tier.ENTERPRISE
        }
        
        service_config = metastore.Service(
            tier=tier_map.get(request.tier.lower(), metastore.Service.Tier.DEVELOPER),
            hive_metastore_config=metastore.HiveMetastoreConfig(
                version=request.hive_version
            )
        )
        
        operation = await metastore_client.create_service(
            parent=parent,
            service_id=request.service_id,
            service=service_config
        )
        
        return {
            "operation_name": operation.name,
            "status": "CREATING",
            "message": f"Service creation started for {request.service_id}"
        }
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/backups")
async def create_backup(request: BackupCreateRequest):
    """Create a backup asynchronously."""
    try:
        backup_config = metastore.Backup(
            description=request.description or f"API-created backup for {request.service_name}"
        )
        
        operation = await metastore_client.create_backup(
            parent=request.service_name,
            backup_id=request.backup_id,
            backup=backup_config
        )
        
        return {
            "operation_name": operation.name,
            "status": "CREATING",
            "message": f"Backup creation started: {request.backup_id}"
        }
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/services/{project_id}/{location}")
async def list_services(project_id: str, location: str):
    """List all services in a location asynchronously."""
    try:
        parent = f"projects/{project_id}/locations/{location}"
        
        services = []
        async for service in await metastore_client.list_services(parent=parent):
            services.append({
                "name": service.name,
                "state": service.state.name,
                "tier": service.tier.name,
                "endpoint_uri": service.endpoint_uri,
                "create_time": service.create_time.strftime("%Y-%m-%d %H:%M:%S") if service.create_time else None
            })
        
        return {"services": services}
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint that verifies async client connectivity."""
    try:
        # Test connectivity by listing locations (lightweight operation)
        parent = "projects/test-project"  # This would fail but tests client initialization
        return {"status": "healthy", "client": "initialized"}
    except Exception:
        return {"status": "healthy", "note": "Client ready for authenticated requests"}

Async Data Pipeline Integration

import asyncio
import aiofiles
from typing import List, Dict
from google.cloud import metastore

class AsyncMetastorePipeline:
    def __init__(self, service_name: str):
        self.client = metastore.DataprocMetastoreAsyncClient()
        self.service_name = service_name
    
    async def process_metadata_batch(self, metadata_files: List[str]) -> List[Dict]:
        """Process multiple metadata files concurrently."""
        
        # Create import operations for all files
        import_tasks = []
        for i, file_uri in enumerate(metadata_files):
            import_config = metastore.MetadataImport(
                description=f"Batch import {i+1} from {file_uri}",
                database_dump=metastore.MetadataImport.DatabaseDump(
                    gcs_uri=file_uri,
                    database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
                )
            )
            
            task = self.client.create_metadata_import(
                parent=self.service_name,
                metadata_import_id=f"batch-import-{i+1:03d}",
                metadata_import=import_config
            )
            import_tasks.append(task)
        
        # Start all imports concurrently
        operations = await asyncio.gather(*import_tasks)
        
        # Monitor progress
        results = []
        for operation in operations:
            try:
                result = await operation.result()
                results.append({
                    "name": result.name,
                    "state": result.state.name,
                    "success": True
                })
            except Exception as e:
                results.append({
                    "operation": operation.name,
                    "error": str(e),
                    "success": False
                })
        
        return results
    
    async def concurrent_metadata_queries(self, queries: List[str]) -> List[Dict]:
        """Execute multiple metadata queries concurrently."""
        query_tasks = [
            self.client.query_metadata(service=self.service_name, query=query)
            for query in queries
        ]
        
        responses = await asyncio.gather(*query_tasks, return_exceptions=True)
        
        results = []
        for i, response in enumerate(responses):
            if isinstance(response, Exception):
                results.append({
                    "query_index": i,
                    "error": str(response),
                    "success": False
                })
            else:
                results.append({
                    "query_index": i,
                    "row_count": response.result_metadata.row_count if hasattr(response, 'result_metadata') else 0,
                    "success": True
                })
        
        return results

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-dataproc-metastore

docs

async-operations.md

backup-restore.md

federation-management.md

index.md

metadata-import-export.md

metadata-query.md

service-management.md

tile.json