Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services
—
Execute Hive and Spark SQL queries directly against metastore metadata for advanced analytics and metadata management operations. Includes table movement between databases, resource location management, and complex metadata transformations for data lake operations.
Execute SQL queries against the metastore's metadata store for analytics, reporting, and metadata management operations.
def query_metadata(
self,
request: Optional[QueryMetadataRequest] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
) -> operation.Operation:
"""
Query DPMS metadata.
Args:
request: The request object containing service name and query
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for query execution
Raises:
google.api_core.exceptions.InvalidArgument: If query is malformed or not read-only
google.api_core.exceptions.PermissionDenied: If insufficient permissions
"""Usage example:
from google.cloud import metastore
client = metastore.DataprocMetastoreClient()
service_name = "projects/my-project/locations/us-central1/services/my-metastore"
# Query table metadata
query_request = metastore.QueryMetadataRequest(
service=service_name,
query="""
SELECT
d.NAME as database_name,
t.TBL_NAME as table_name,
t.TBL_TYPE as table_type,
s.LOCATION as table_location,
t.CREATE_TIME as create_time
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
JOIN SDS s ON t.SD_ID = s.SD_ID
WHERE d.NAME = 'production'
ORDER BY t.CREATE_TIME DESC
LIMIT 100
"""
)
operation = client.query_metadata(request=query_request)
# Wait for query completion
response = operation.result()
print(f"Query executed successfully")
print(f"Result metadata: {response.result_metadata}")
if hasattr(response, 'result_manifest'):
print(f"Results available at: {response.result_manifest.file_uri}")Move tables between databases within the same metastore service for data organization and management.
def move_table_to_database(
self,
request: Optional[MoveTableToDatabaseRequest] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
) -> operation.Operation:
"""
Move a table to another database.
Args:
request: The request object
service: Required. The relative resource name of the service
table_name: Required. The name of the table to move
db_name: Required. The name of the source database
destination_db_name: Required. The name of the destination database
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for table move
Raises:
google.api_core.exceptions.NotFound: If table or database doesn't exist
google.api_core.exceptions.AlreadyExists: If table already exists in destination
"""Usage example:
from google.cloud import metastore
client = metastore.DataprocMetastoreClient()
# Move table from staging to production database
move_request = metastore.MoveTableToDatabaseRequest(
service="projects/my-project/locations/us-central1/services/my-metastore",
table_name="customer_data",
db_name="staging",
destination_db_name="production"
)
operation = client.move_table_to_database(request=move_request)
# Wait for completion
response = operation.result(timeout=300)
print(f"Table moved successfully")
print(f"New table location: {response.table_name}")Update the storage location of metadata resources for data migration and reorganization scenarios.
def alter_metadata_resource_location(
self,
request: Optional[AlterMetadataResourceLocationRequest] = None,
*,
service: Optional[str] = None,
resource_name: Optional[str] = None,
location_uri: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Alter metadata resource location. The metadata resource can be a database, table, or partition.
Args:
request: The request object
service: Required. The relative resource name of the service
resource_name: Required. The relative resource name of the metadata resource
location_uri: Required. The new location URI for the resource
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for location alteration
Raises:
google.api_core.exceptions.NotFound: If resource doesn't exist
google.api_core.exceptions.InvalidArgument: If location URI is invalid
"""Usage example:
from google.cloud import metastore
client = metastore.DataprocMetastoreClient()
# Move table data to new Cloud Storage location
alter_request = metastore.AlterMetadataResourceLocationRequest(
service="projects/my-project/locations/us-central1/services/my-metastore",
resource_name="production.sales_data",
location_uri="gs://new-data-bucket/sales-data/"
)
operation = client.alter_metadata_resource_location(request=alter_request)
# Wait for completion
response = operation.result(timeout=600)
print(f"Resource location updated")
print(f"New location: {response.location_uri}")class QueryMetadataRequest:
service: str
query: str
class QueryMetadataResponse:
result_metadata: ResultMetadata
result_manifest: Optional[ResultManifest]
class ResultMetadata:
row_count: int
execution_time_ms: int
schema: List[ColumnMetadata]
class ColumnMetadata:
name: str
data_type: str
nullable: bool
class ResultManifest:
file_uri: str
file_type: strclass MoveTableToDatabaseRequest:
service: str
table_name: str
db_name: str
destination_db_name: str
class MoveTableToDatabaseResponse:
table_name: str
db_name: strclass AlterMetadataResourceLocationRequest:
service: str
resource_name: str
location_uri: str
class AlterMetadataResourceLocationResponse:
service: str
resource_name: str
location_uri: strfrom google.cloud import metastore
import pandas as pd
from typing import List, Dict
class MetadataAnalytics:
def __init__(self, service_name: str):
self.client = metastore.DataprocMetastoreClient()
self.service_name = service_name
def get_database_statistics(self) -> List[Dict]:
"""Get comprehensive statistics for all databases."""
query = """
SELECT
d.NAME as database_name,
d.DESC as description,
COUNT(DISTINCT t.TBL_ID) as table_count,
COUNT(DISTINCT CASE WHEN t.TBL_TYPE = 'EXTERNAL_TABLE' THEN t.TBL_ID END) as external_tables,
COUNT(DISTINCT CASE WHEN t.TBL_TYPE = 'MANAGED_TABLE' THEN t.TBL_ID END) as managed_tables,
MIN(t.CREATE_TIME) as oldest_table,
MAX(t.CREATE_TIME) as newest_table
FROM DBS d
LEFT JOIN TBLS t ON d.DB_ID = t.DB_ID
GROUP BY d.DB_ID, d.NAME, d.DESC
ORDER BY table_count DESC
"""
response = self.client.query_metadata(
service=self.service_name,
query=query
)
return self._parse_query_results(response)
def find_unused_tables(self, days_threshold: int = 90) -> List[str]:
"""Find tables that haven't been accessed recently."""
query = f"""
SELECT
d.NAME as database_name,
t.TBL_NAME as table_name,
t.CREATE_TIME as create_time,
t.LAST_ACCESS_TIME as last_access_time
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
WHERE t.LAST_ACCESS_TIME < UNIX_TIMESTAMP() - ({days_threshold} * 24 * 3600)
OR t.LAST_ACCESS_TIME = 0
ORDER BY t.LAST_ACCESS_TIME ASC
"""
response = self.client.query_metadata(
service=self.service_name,
query=query
)
results = self._parse_query_results(response)
return [f"{row['database_name']}.{row['table_name']}" for row in results]
def get_storage_usage_by_location(self) -> List[Dict]:
"""Analyze storage usage by location prefix."""
query = """
SELECT
CASE
WHEN s.LOCATION LIKE 'gs://%' THEN 'Google Cloud Storage'
WHEN s.LOCATION LIKE 's3://%' THEN 'Amazon S3'
WHEN s.LOCATION LIKE 'hdfs://%' THEN 'HDFS'
ELSE 'Other'
END as storage_type,
REGEXP_EXTRACT(s.LOCATION, '^[^/]+//[^/]+') as storage_root,
COUNT(DISTINCT t.TBL_ID) as table_count,
COUNT(DISTINCT d.DB_ID) as database_count
FROM SDS s
JOIN TBLS t ON s.SD_ID = t.SD_ID
JOIN DBS d ON t.DB_ID = d.DB_ID
WHERE s.LOCATION IS NOT NULL
GROUP BY storage_type, storage_root
ORDER BY table_count DESC
"""
response = self.client.query_metadata(
service=self.service_name,
query=query
)
return self._parse_query_results(response)
def _parse_query_results(self, response: metastore.QueryMetadataResponse) -> List[Dict]:
"""Parse query response into structured data."""
# In a real implementation, you would parse the actual response format
# This is a simplified example
return []from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
class BatchTableManager:
def __init__(self, service_name: str):
self.client = metastore.DataprocMetastoreClient()
self.service_name = service_name
def bulk_move_tables(self, table_moves: List[Dict[str, str]], max_workers: int = 5):
"""Move multiple tables in parallel."""
operations = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all move operations
future_to_move = {
executor.submit(
self._move_single_table,
table_move['table_name'],
table_move['source_db'],
table_move['target_db']
): table_move
for table_move in table_moves
}
# Collect results
for future in as_completed(future_to_move):
table_move = future_to_move[future]
try:
operation = future.result()
operations.append(operation)
logging.info(f"Started move for {table_move['table_name']}")
except Exception as e:
logging.error(f"Failed to start move for {table_move['table_name']}: {e}")
return operations
def _move_single_table(self, table_name: str, source_db: str, target_db: str):
"""Move a single table."""
move_request = metastore.MoveTableToDatabaseRequest(
service=self.service_name,
table_name=table_name,
db_name=source_db,
destination_db_name=target_db
)
return self.client.move_table_to_database(request=move_request)
def migrate_storage_locations(self, location_mappings: Dict[str, str]):
"""Migrate tables from old storage locations to new ones."""
# First, find all tables in old locations
for old_location, new_location in location_mappings.items():
query = f"""
SELECT
CONCAT(d.NAME, '.', t.TBL_NAME) as full_table_name
FROM TBLS t
JOIN DBS d ON t.DB_ID = d.DB_ID
JOIN SDS s ON t.SD_ID = s.SD_ID
WHERE s.LOCATION LIKE '{old_location}%'
"""
response = self.client.query_metadata(
service=self.service_name,
query=query
)
# Move each table to new location
for result in self._parse_query_results(response):
table_name = result['full_table_name']
new_table_location = result['location'].replace(old_location, new_location)
alter_request = metastore.AlterMetadataResourceLocationRequest(
service=self.service_name,
resource_name=table_name,
location_uri=new_table_location
)
operation = self.client.alter_metadata_resource_location(request=alter_request)
logging.info(f"Started location migration for {table_name}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-dataproc-metastore