CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-dataproc-metastore

Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services

Pending
Overview
Eval results
Files

metadata-import-export.mddocs/

Metadata Import and Export

Import metadata from external sources and export metastore data to Google Cloud Storage. Supports various database formats including MySQL and PostgreSQL dumps with comprehensive validation, error handling, and progress tracking for large-scale data migration scenarios.

Capabilities

List Metadata Imports

Retrieve all metadata import operations for a metastore service with filtering and pagination support.

def list_metadata_imports(
    self,
    request: Optional[ListMetadataImportsRequest] = None,
    *,
    parent: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListMetadataImportsPager:
    """
    Lists imports in a service.

    Args:
        request: The request object containing list parameters
        parent: Required. The relative resource name of the service
                Format: projects/{project_id}/locations/{location_id}/services/{service_id}
        retry: Retry configuration for the request
        timeout: Request timeout in seconds
        metadata: Additional metadata for the request

    Returns:
        ListMetadataImportsPager: Pageable list of metadata imports

    Raises:
        google.api_core.exceptions.GoogleAPICallError: If the request fails
    """

Usage example:

from google.cloud import metastore

client = metastore.DataprocMetastoreClient()
parent = "projects/my-project/locations/us-central1/services/my-metastore"

# List all imports
for metadata_import in client.list_metadata_imports(parent=parent):
    print(f"Import: {metadata_import.name}")
    print(f"State: {metadata_import.state.name}")
    print(f"Database dump: {metadata_import.database_dump.gcs_uri}")

# Filter by state
request = metastore.ListMetadataImportsRequest(
    parent=parent,
    filter="state=SUCCEEDED",
    order_by="create_time desc"
)

Get Metadata Import

Retrieve detailed information about a specific metadata import operation including progress and error details.

def get_metadata_import(
    self,
    request: Optional[GetMetadataImportRequest] = None,
    *,
    name: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> MetadataImport:
    """
    Gets details of a single import.

    Args:
        request: The request object
        name: Required. The relative resource name of the metadata import
              Format: projects/{project_id}/locations/{location_id}/services/{service_id}/metadataImports/{import_id}
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        MetadataImport: The metadata import resource

    Raises:
        google.api_core.exceptions.NotFound: If the import doesn't exist
    """

Create Metadata Import

Import metadata from external database dumps stored in Google Cloud Storage.

def create_metadata_import(
    self,
    request: Optional[CreateMetadataImportRequest] = None,
    *,
    parent: Optional[str] = None,
    metadata_import: Optional[MetadataImport] = None,
    metadata_import_id: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Creates a new MetadataImport in a given project and location.

    Args:
        request: The request object
        parent: Required. The relative resource name of the service
        metadata_import: Required. The metadata import configuration
        metadata_import_id: Required. The ID to use for the import
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for metadata import

    Raises:
        google.api_core.exceptions.AlreadyExists: If import_id already exists
        google.api_core.exceptions.InvalidArgument: If configuration is invalid
    """

Usage example:

from google.cloud import metastore

client = metastore.DataprocMetastoreClient()

# Import from MySQL dump
import_config = metastore.MetadataImport(
    description="Import production MySQL metastore data",
    database_dump=metastore.MetadataImport.DatabaseDump(
        gcs_uri="gs://my-bucket/metastore-dumps/prod-metastore-20240115.sql",
        database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
    )
)

operation = client.create_metadata_import(
    parent="projects/my-project/locations/us-central1/services/my-metastore",
    metadata_import_id="mysql-import-20240115",
    metadata_import=import_config
)

# Monitor import progress
print("Starting metadata import...")
result = operation.result(timeout=7200)  # Can take up to 2 hours for large dumps
print(f"Import completed: {result.name}")

Update Metadata Import

Update metadata import configuration such as description and labels.

def update_metadata_import(
    self,
    request: Optional[UpdateMetadataImportRequest] = None,
    *,
    metadata_import: Optional[MetadataImport] = None,
    update_mask: Optional[field_mask_pb2.FieldMask] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Updates a single import.

    Args:
        request: The request object
        metadata_import: Required. The import to update
        update_mask: Required. Field mask specifying which fields to update
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for import update

    Raises:
        google.api_core.exceptions.NotFound: If the import doesn't exist
    """

Export Metadata

Export metastore metadata to Google Cloud Storage in various formats.

def export_metadata(
    self,
    request: Optional[ExportMetadataRequest] = None,
    *,
    service: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Exports metadata from a service.

    Args:
        request: The request object
        service: Required. The relative resource name of the service
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for metadata export

    Raises:
        google.api_core.exceptions.NotFound: If the service doesn't exist
        google.api_core.exceptions.FailedPrecondition: If export cannot be performed
    """

Usage example:

from google.cloud import metastore

client = metastore.DataprocMetastoreClient()

# Export to Cloud Storage
export_request = metastore.ExportMetadataRequest(
    service="projects/my-project/locations/us-central1/services/my-metastore",
    destination_gcs_uri="gs://my-exports/metastore-export-20240115/",
    database_dump_type=metastore.DatabaseDumpSpec.Type.MYSQL
)

operation = client.export_metadata(request=export_request)

# Wait for export completion
print("Starting metadata export...")
metadata_export = operation.result(timeout=3600)
print(f"Export completed to: {metadata_export.destination_gcs_uri}")

Core Types

Metadata Import Resource

class MetadataImport:
    name: str
    description: str
    create_time: timestamp_pb2.Timestamp
    update_time: timestamp_pb2.Timestamp
    end_time: timestamp_pb2.Timestamp
    state: State
    database_dump: DatabaseDump

    class State(enum.Enum):
        STATE_UNSPECIFIED = 0
        RUNNING = 1
        SUCCEEDED = 2
        UPDATING = 3
        FAILED = 4

    class DatabaseDump:
        gcs_uri: str
        database_type: DatabaseType
        type: Optional[str]  # Deprecated

        class DatabaseType(enum.Enum):
            DATABASE_TYPE_UNSPECIFIED = 0
            MYSQL = 1
            POSTGRESQL = 2

Metadata Export Resource

class MetadataExport:
    destination_gcs_uri: str
    start_time: timestamp_pb2.Timestamp
    end_time: timestamp_pb2.Timestamp
    state: State
    database_dump_type: DatabaseDumpSpec.Type

    class State(enum.Enum):
        STATE_UNSPECIFIED = 0
        RUNNING = 1
        SUCCEEDED = 2
        FAILED = 3
        CANCELLED = 4

Database Dump Specification

class DatabaseDumpSpec:
    gcs_uri: str
    type: Type

    class Type(enum.Enum):
        TYPE_UNSPECIFIED = 0
        MYSQL = 1
        POSTGRESQL = 2

Request/Response Types

class ListMetadataImportsRequest:
    parent: str
    page_size: int
    page_token: str
    filter: str
    order_by: str

class ListMetadataImportsResponse:
    metadata_imports: List[MetadataImport]
    next_page_token: str
    unreachable: List[str]

class GetMetadataImportRequest:
    name: str

class CreateMetadataImportRequest:
    parent: str
    metadata_import_id: str
    metadata_import: MetadataImport
    request_id: str

class UpdateMetadataImportRequest:
    update_mask: field_mask_pb2.FieldMask
    metadata_import: MetadataImport
    request_id: str

class ExportMetadataRequest:
    service: str
    destination_gcs_uri: str
    request_id: str
    database_dump_type: DatabaseDumpSpec.Type

Usage Patterns

Large-Scale Migration Workflow

from google.cloud import metastore, storage
import logging
from typing import List

class MetastoreMigrator:
    def __init__(self, project_id: str, location: str, service_id: str):
        self.metastore_client = metastore.DataprocMetastoreClient()
        self.storage_client = storage.Client()
        self.service_name = f"projects/{project_id}/locations/{location}/services/{service_id}"
    
    def import_from_multiple_dumps(self, dump_uris: List[str]) -> List[str]:
        """Import metadata from multiple database dumps."""
        import_operations = []
        
        for i, dump_uri in enumerate(dump_uris):
            import_config = metastore.MetadataImport(
                description=f"Batch import {i+1} of {len(dump_uris)}",
                database_dump=metastore.MetadataImport.DatabaseDump(
                    gcs_uri=dump_uri,
                    database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
                )
            )
            
            operation = self.metastore_client.create_metadata_import(
                parent=self.service_name,
                metadata_import_id=f"batch-import-{i+1:03d}",
                metadata_import=import_config
            )
            
            import_operations.append(operation.name)
            logging.info(f"Started import {i+1}: {operation.name}")
        
        return import_operations
    
    def wait_for_imports(self, operation_names: List[str]):
        """Wait for all import operations to complete."""
        completed = 0
        total = len(operation_names)
        
        while completed < total:
            for op_name in operation_names:
                # Check operation status
                # Implementation would use operations client
                pass
            
            time.sleep(60)  # Check every minute
            logging.info(f"Import progress: {completed}/{total} completed")

Export with Validation

def export_with_validation(service_name: str, export_bucket: str):
    """Export metadata with validation steps."""
    client = metastore.DataprocMetastoreClient()
    
    # Create timestamped export location
    export_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    export_uri = f"gs://{export_bucket}/exports/{export_timestamp}/"
    
    # Start export
    export_request = metastore.ExportMetadataRequest(
        service=service_name,
        destination_gcs_uri=export_uri,
        database_dump_type=metastore.DatabaseDumpSpec.Type.MYSQL
    )
    
    operation = client.export_metadata(request=export_request)
    
    try:
        # Wait for export completion
        result = operation.result(timeout=3600)
        
        # Validate export files exist in Cloud Storage
        storage_client = storage.Client()
        bucket = storage_client.bucket(export_bucket)
        
        export_files = list(bucket.list_blobs(prefix=f"exports/{export_timestamp}/"))
        if not export_files:
            raise ValueError("Export completed but no files found in Cloud Storage")
        
        total_size = sum(blob.size for blob in export_files)
        logging.info(f"Export validated: {len(export_files)} files, {total_size} bytes")
        
        return export_uri
        
    except Exception as e:
        logging.error(f"Export failed or validation error: {e}")
        raise

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-dataproc-metastore

docs

async-operations.md

backup-restore.md

federation-management.md

index.md

metadata-import-export.md

metadata-query.md

service-management.md

tile.json