CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-dataproc-metastore

Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services

Pending
Overview
Eval results
Files

metadata-query.mddocs/

Metadata Query Operations

Execute Hive and Spark SQL queries directly against metastore metadata for advanced analytics and metadata management operations. Includes table movement between databases, resource location management, and complex metadata transformations for data lake operations.

Capabilities

Query Metadata

Execute SQL queries against the metastore's metadata store for analytics, reporting, and metadata management operations.

def query_metadata(
    self,
    request: Optional[QueryMetadataRequest] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
) -> operation.Operation:
    """
    Query DPMS metadata.

    Args:
        request: The request object containing service name and query
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for query execution

    Raises:
        google.api_core.exceptions.InvalidArgument: If query is malformed or not read-only
        google.api_core.exceptions.PermissionDenied: If insufficient permissions
    """

Usage example:

from google.cloud import metastore

client = metastore.DataprocMetastoreClient()
service_name = "projects/my-project/locations/us-central1/services/my-metastore"

# Query table metadata
query_request = metastore.QueryMetadataRequest(
    service=service_name,
    query="""
    SELECT 
        d.NAME as database_name,
        t.TBL_NAME as table_name,
        t.TBL_TYPE as table_type,
        s.LOCATION as table_location,
        t.CREATE_TIME as create_time
    FROM TBLS t
    JOIN DBS d ON t.DB_ID = d.DB_ID
    JOIN SDS s ON t.SD_ID = s.SD_ID
    WHERE d.NAME = 'production'
    ORDER BY t.CREATE_TIME DESC
    LIMIT 100
    """
)

operation = client.query_metadata(request=query_request)

# Wait for query completion
response = operation.result()
print(f"Query executed successfully")
print(f"Result metadata: {response.result_metadata}")
if hasattr(response, 'result_manifest'):
    print(f"Results available at: {response.result_manifest.file_uri}")

Move Table to Database

Move tables between databases within the same metastore service for data organization and management.

def move_table_to_database(
    self,
    request: Optional[MoveTableToDatabaseRequest] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
) -> operation.Operation:
    """
    Move a table to another database.

    Args:
        request: The request object
        service: Required. The relative resource name of the service
        table_name: Required. The name of the table to move
        db_name: Required. The name of the source database
        destination_db_name: Required. The name of the destination database
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for table move

    Raises:
        google.api_core.exceptions.NotFound: If table or database doesn't exist
        google.api_core.exceptions.AlreadyExists: If table already exists in destination
    """

Usage example:

from google.cloud import metastore

client = metastore.DataprocMetastoreClient()

# Move table from staging to production database
move_request = metastore.MoveTableToDatabaseRequest(
    service="projects/my-project/locations/us-central1/services/my-metastore",
    table_name="customer_data",
    db_name="staging",
    destination_db_name="production"
)

operation = client.move_table_to_database(request=move_request)

# Wait for completion
response = operation.result(timeout=300)
print(f"Table moved successfully")
print(f"New table location: {response.table_name}")

Alter Metadata Resource Location

Update the storage location of metadata resources for data migration and reorganization scenarios.

def alter_metadata_resource_location(
    self,
    request: Optional[AlterMetadataResourceLocationRequest] = None,
    *,
    service: Optional[str] = None,
    resource_name: Optional[str] = None,
    location_uri: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Alter metadata resource location. The metadata resource can be a database, table, or partition.

    Args:
        request: The request object  
        service: Required. The relative resource name of the service
        resource_name: Required. The relative resource name of the metadata resource
        location_uri: Required. The new location URI for the resource
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for location alteration

    Raises:
        google.api_core.exceptions.NotFound: If resource doesn't exist
        google.api_core.exceptions.InvalidArgument: If location URI is invalid
    """

Usage example:

from google.cloud import metastore

client = metastore.DataprocMetastoreClient()

# Move table data to new Cloud Storage location
alter_request = metastore.AlterMetadataResourceLocationRequest(
    service="projects/my-project/locations/us-central1/services/my-metastore",
    resource_name="production.sales_data",
    location_uri="gs://new-data-bucket/sales-data/"
)

operation = client.alter_metadata_resource_location(request=alter_request)

# Wait for completion
response = operation.result(timeout=600)
print(f"Resource location updated")
print(f"New location: {response.location_uri}")

Core Types

Query Request and Response

class QueryMetadataRequest:
    service: str
    query: str

class QueryMetadataResponse:
    result_metadata: ResultMetadata
    result_manifest: Optional[ResultManifest]

class ResultMetadata:
    row_count: int
    execution_time_ms: int
    schema: List[ColumnMetadata]

class ColumnMetadata:
    name: str
    data_type: str
    nullable: bool

class ResultManifest:
    file_uri: str
    file_type: str

Table Movement Types

class MoveTableToDatabaseRequest:
    service: str
    table_name: str
    db_name: str
    destination_db_name: str

class MoveTableToDatabaseResponse:
    table_name: str
    db_name: str

Resource Location Types

class AlterMetadataResourceLocationRequest:
    service: str
    resource_name: str
    location_uri: str

class AlterMetadataResourceLocationResponse:
    service: str
    resource_name: str
    location_uri: str

Usage Patterns

Metadata Analytics Queries

from google.cloud import metastore
import pandas as pd
from typing import List, Dict

class MetadataAnalytics:
    def __init__(self, service_name: str):
        self.client = metastore.DataprocMetastoreClient()
        self.service_name = service_name
    
    def get_database_statistics(self) -> List[Dict]:
        """Get comprehensive statistics for all databases."""
        query = """
        SELECT 
            d.NAME as database_name,
            d.DESC as description,
            COUNT(DISTINCT t.TBL_ID) as table_count,
            COUNT(DISTINCT CASE WHEN t.TBL_TYPE = 'EXTERNAL_TABLE' THEN t.TBL_ID END) as external_tables,
            COUNT(DISTINCT CASE WHEN t.TBL_TYPE = 'MANAGED_TABLE' THEN t.TBL_ID END) as managed_tables,
            MIN(t.CREATE_TIME) as oldest_table,
            MAX(t.CREATE_TIME) as newest_table
        FROM DBS d
        LEFT JOIN TBLS t ON d.DB_ID = t.DB_ID
        GROUP BY d.DB_ID, d.NAME, d.DESC
        ORDER BY table_count DESC
        """
        
        response = self.client.query_metadata(
            service=self.service_name,
            query=query
        )
        
        return self._parse_query_results(response)
    
    def find_unused_tables(self, days_threshold: int = 90) -> List[str]:
        """Find tables that haven't been accessed recently."""
        query = f"""
        SELECT 
            d.NAME as database_name,
            t.TBL_NAME as table_name,
            t.CREATE_TIME as create_time,
            t.LAST_ACCESS_TIME as last_access_time
        FROM TBLS t
        JOIN DBS d ON t.DB_ID = d.DB_ID
        WHERE t.LAST_ACCESS_TIME < UNIX_TIMESTAMP() - ({days_threshold} * 24 * 3600)
           OR t.LAST_ACCESS_TIME = 0
        ORDER BY t.LAST_ACCESS_TIME ASC
        """
        
        response = self.client.query_metadata(
            service=self.service_name,
            query=query
        )
        
        results = self._parse_query_results(response)
        return [f"{row['database_name']}.{row['table_name']}" for row in results]
    
    def get_storage_usage_by_location(self) -> List[Dict]:
        """Analyze storage usage by location prefix."""
        query = """
        SELECT 
            CASE 
                WHEN s.LOCATION LIKE 'gs://%' THEN 'Google Cloud Storage'
                WHEN s.LOCATION LIKE 's3://%' THEN 'Amazon S3'
                WHEN s.LOCATION LIKE 'hdfs://%' THEN 'HDFS'
                ELSE 'Other'
            END as storage_type,
            REGEXP_EXTRACT(s.LOCATION, '^[^/]+//[^/]+') as storage_root,
            COUNT(DISTINCT t.TBL_ID) as table_count,
            COUNT(DISTINCT d.DB_ID) as database_count
        FROM SDS s
        JOIN TBLS t ON s.SD_ID = t.SD_ID
        JOIN DBS d ON t.DB_ID = d.DB_ID
        WHERE s.LOCATION IS NOT NULL
        GROUP BY storage_type, storage_root
        ORDER BY table_count DESC
        """
        
        response = self.client.query_metadata(
            service=self.service_name,
            query=query
        )
        
        return self._parse_query_results(response)
    
    def _parse_query_results(self, response: metastore.QueryMetadataResponse) -> List[Dict]:
        """Parse query response into structured data."""
        # In a real implementation, you would parse the actual response format
        # This is a simplified example
        return []

Batch Table Operations

from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

class BatchTableManager:
    def __init__(self, service_name: str):
        self.client = metastore.DataprocMetastoreClient()
        self.service_name = service_name
    
    def bulk_move_tables(self, table_moves: List[Dict[str, str]], max_workers: int = 5):
        """Move multiple tables in parallel."""
        operations = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all move operations
            future_to_move = {
                executor.submit(
                    self._move_single_table,
                    table_move['table_name'],
                    table_move['source_db'],
                    table_move['target_db']
                ): table_move
                for table_move in table_moves
            }
            
            # Collect results
            for future in as_completed(future_to_move):
                table_move = future_to_move[future]
                try:
                    operation = future.result()
                    operations.append(operation)
                    logging.info(f"Started move for {table_move['table_name']}")
                except Exception as e:
                    logging.error(f"Failed to start move for {table_move['table_name']}: {e}")
        
        return operations
    
    def _move_single_table(self, table_name: str, source_db: str, target_db: str):
        """Move a single table."""
        move_request = metastore.MoveTableToDatabaseRequest(
            service=self.service_name,
            table_name=table_name,
            db_name=source_db,
            destination_db_name=target_db
        )
        
        return self.client.move_table_to_database(request=move_request)
    
    def migrate_storage_locations(self, location_mappings: Dict[str, str]):
        """Migrate tables from old storage locations to new ones."""
        # First, find all tables in old locations
        for old_location, new_location in location_mappings.items():
            query = f"""
            SELECT 
                CONCAT(d.NAME, '.', t.TBL_NAME) as full_table_name
            FROM TBLS t
            JOIN DBS d ON t.DB_ID = d.DB_ID
            JOIN SDS s ON t.SD_ID = s.SD_ID
            WHERE s.LOCATION LIKE '{old_location}%'
            """
            
            response = self.client.query_metadata(
                service=self.service_name,
                query=query
            )
            
            # Move each table to new location
            for result in self._parse_query_results(response):
                table_name = result['full_table_name']
                new_table_location = result['location'].replace(old_location, new_location)
                
                alter_request = metastore.AlterMetadataResourceLocationRequest(
                    service=self.service_name,
                    resource_name=table_name,
                    location_uri=new_table_location
                )
                
                operation = self.client.alter_metadata_resource_location(request=alter_request)
                logging.info(f"Started location migration for {table_name}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-dataproc-metastore

docs

async-operations.md

backup-restore.md

federation-management.md

index.md

metadata-import-export.md

metadata-query.md

service-management.md

tile.json