Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services
—
Import metadata from external sources and export metastore data to Google Cloud Storage. Supports various database formats including MySQL and PostgreSQL dumps with comprehensive validation, error handling, and progress tracking for large-scale data migration scenarios.
Retrieve all metadata import operations for a metastore service with filtering and pagination support.
def list_metadata_imports(
self,
request: Optional[ListMetadataImportsRequest] = None,
*,
parent: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListMetadataImportsPager:
"""
Lists imports in a service.
Args:
request: The request object containing list parameters
parent: Required. The relative resource name of the service
Format: projects/{project_id}/locations/{location_id}/services/{service_id}
retry: Retry configuration for the request
timeout: Request timeout in seconds
metadata: Additional metadata for the request
Returns:
ListMetadataImportsPager: Pageable list of metadata imports
Raises:
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage example:
from google.cloud import metastore
client = metastore.DataprocMetastoreClient()
parent = "projects/my-project/locations/us-central1/services/my-metastore"
# List all imports
for metadata_import in client.list_metadata_imports(parent=parent):
print(f"Import: {metadata_import.name}")
print(f"State: {metadata_import.state.name}")
print(f"Database dump: {metadata_import.database_dump.gcs_uri}")
# Filter by state
request = metastore.ListMetadataImportsRequest(
parent=parent,
filter="state=SUCCEEDED",
order_by="create_time desc"
)Retrieve detailed information about a specific metadata import operation including progress and error details.
def get_metadata_import(
self,
request: Optional[GetMetadataImportRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> MetadataImport:
"""
Gets details of a single import.
Args:
request: The request object
name: Required. The relative resource name of the metadata import
Format: projects/{project_id}/locations/{location_id}/services/{service_id}/metadataImports/{import_id}
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
MetadataImport: The metadata import resource
Raises:
google.api_core.exceptions.NotFound: If the import doesn't exist
"""Import metadata from external database dumps stored in Google Cloud Storage.
def create_metadata_import(
self,
request: Optional[CreateMetadataImportRequest] = None,
*,
parent: Optional[str] = None,
metadata_import: Optional[MetadataImport] = None,
metadata_import_id: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Creates a new MetadataImport in a given project and location.
Args:
request: The request object
parent: Required. The relative resource name of the service
metadata_import: Required. The metadata import configuration
metadata_import_id: Required. The ID to use for the import
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for metadata import
Raises:
google.api_core.exceptions.AlreadyExists: If import_id already exists
google.api_core.exceptions.InvalidArgument: If configuration is invalid
"""Usage example:
from google.cloud import metastore
client = metastore.DataprocMetastoreClient()
# Import from MySQL dump
import_config = metastore.MetadataImport(
description="Import production MySQL metastore data",
database_dump=metastore.MetadataImport.DatabaseDump(
gcs_uri="gs://my-bucket/metastore-dumps/prod-metastore-20240115.sql",
database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
)
)
operation = client.create_metadata_import(
parent="projects/my-project/locations/us-central1/services/my-metastore",
metadata_import_id="mysql-import-20240115",
metadata_import=import_config
)
# Monitor import progress
print("Starting metadata import...")
result = operation.result(timeout=7200) # Can take up to 2 hours for large dumps
print(f"Import completed: {result.name}")Update metadata import configuration such as description and labels.
def update_metadata_import(
self,
request: Optional[UpdateMetadataImportRequest] = None,
*,
metadata_import: Optional[MetadataImport] = None,
update_mask: Optional[field_mask_pb2.FieldMask] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Updates a single import.
Args:
request: The request object
metadata_import: Required. The import to update
update_mask: Required. Field mask specifying which fields to update
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for import update
Raises:
google.api_core.exceptions.NotFound: If the import doesn't exist
"""Export metastore metadata to Google Cloud Storage in various formats.
def export_metadata(
self,
request: Optional[ExportMetadataRequest] = None,
*,
service: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Exports metadata from a service.
Args:
request: The request object
service: Required. The relative resource name of the service
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for metadata export
Raises:
google.api_core.exceptions.NotFound: If the service doesn't exist
google.api_core.exceptions.FailedPrecondition: If export cannot be performed
"""Usage example:
from google.cloud import metastore
client = metastore.DataprocMetastoreClient()
# Export to Cloud Storage
export_request = metastore.ExportMetadataRequest(
service="projects/my-project/locations/us-central1/services/my-metastore",
destination_gcs_uri="gs://my-exports/metastore-export-20240115/",
database_dump_type=metastore.DatabaseDumpSpec.Type.MYSQL
)
operation = client.export_metadata(request=export_request)
# Wait for export completion
print("Starting metadata export...")
metadata_export = operation.result(timeout=3600)
print(f"Export completed to: {metadata_export.destination_gcs_uri}")class MetadataImport:
name: str
description: str
create_time: timestamp_pb2.Timestamp
update_time: timestamp_pb2.Timestamp
end_time: timestamp_pb2.Timestamp
state: State
database_dump: DatabaseDump
class State(enum.Enum):
STATE_UNSPECIFIED = 0
RUNNING = 1
SUCCEEDED = 2
UPDATING = 3
FAILED = 4
class DatabaseDump:
gcs_uri: str
database_type: DatabaseType
type: Optional[str] # Deprecated
class DatabaseType(enum.Enum):
DATABASE_TYPE_UNSPECIFIED = 0
MYSQL = 1
POSTGRESQL = 2class MetadataExport:
destination_gcs_uri: str
start_time: timestamp_pb2.Timestamp
end_time: timestamp_pb2.Timestamp
state: State
database_dump_type: DatabaseDumpSpec.Type
class State(enum.Enum):
STATE_UNSPECIFIED = 0
RUNNING = 1
SUCCEEDED = 2
FAILED = 3
CANCELLED = 4class DatabaseDumpSpec:
gcs_uri: str
type: Type
class Type(enum.Enum):
TYPE_UNSPECIFIED = 0
MYSQL = 1
POSTGRESQL = 2class ListMetadataImportsRequest:
parent: str
page_size: int
page_token: str
filter: str
order_by: str
class ListMetadataImportsResponse:
metadata_imports: List[MetadataImport]
next_page_token: str
unreachable: List[str]
class GetMetadataImportRequest:
name: str
class CreateMetadataImportRequest:
parent: str
metadata_import_id: str
metadata_import: MetadataImport
request_id: str
class UpdateMetadataImportRequest:
update_mask: field_mask_pb2.FieldMask
metadata_import: MetadataImport
request_id: str
class ExportMetadataRequest:
service: str
destination_gcs_uri: str
request_id: str
database_dump_type: DatabaseDumpSpec.Typefrom google.cloud import metastore, storage
import logging
from typing import List
class MetastoreMigrator:
def __init__(self, project_id: str, location: str, service_id: str):
self.metastore_client = metastore.DataprocMetastoreClient()
self.storage_client = storage.Client()
self.service_name = f"projects/{project_id}/locations/{location}/services/{service_id}"
def import_from_multiple_dumps(self, dump_uris: List[str]) -> List[str]:
"""Import metadata from multiple database dumps."""
import_operations = []
for i, dump_uri in enumerate(dump_uris):
import_config = metastore.MetadataImport(
description=f"Batch import {i+1} of {len(dump_uris)}",
database_dump=metastore.MetadataImport.DatabaseDump(
gcs_uri=dump_uri,
database_type=metastore.MetadataImport.DatabaseDump.DatabaseType.MYSQL
)
)
operation = self.metastore_client.create_metadata_import(
parent=self.service_name,
metadata_import_id=f"batch-import-{i+1:03d}",
metadata_import=import_config
)
import_operations.append(operation.name)
logging.info(f"Started import {i+1}: {operation.name}")
return import_operations
def wait_for_imports(self, operation_names: List[str]):
"""Wait for all import operations to complete."""
completed = 0
total = len(operation_names)
while completed < total:
for op_name in operation_names:
# Check operation status
# Implementation would use operations client
pass
time.sleep(60) # Check every minute
logging.info(f"Import progress: {completed}/{total} completed")def export_with_validation(service_name: str, export_bucket: str):
"""Export metadata with validation steps."""
client = metastore.DataprocMetastoreClient()
# Create timestamped export location
export_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
export_uri = f"gs://{export_bucket}/exports/{export_timestamp}/"
# Start export
export_request = metastore.ExportMetadataRequest(
service=service_name,
destination_gcs_uri=export_uri,
database_dump_type=metastore.DatabaseDumpSpec.Type.MYSQL
)
operation = client.export_metadata(request=export_request)
try:
# Wait for export completion
result = operation.result(timeout=3600)
# Validate export files exist in Cloud Storage
storage_client = storage.Client()
bucket = storage_client.bucket(export_bucket)
export_files = list(bucket.list_blobs(prefix=f"exports/{export_timestamp}/"))
if not export_files:
raise ValueError("Export completed but no files found in Cloud Storage")
total_size = sum(blob.size for blob in export_files)
logging.info(f"Export validated: {len(export_files)} files, {total_size} bytes")
return export_uri
except Exception as e:
logging.error(f"Export failed or validation error: {e}")
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-google-cloud-dataproc-metastore