CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-dataproc-metastore

Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services

Pending
Overview
Eval results
Files

federation-management.mddocs/

Federation Management

Manage metastore federation services that provide unified access to multiple backend metastores. Supports cross-cloud and multi-region federation scenarios for enterprise data lake architectures, enabling centralized metadata management across diverse metastore environments.

Capabilities

List Federations

Retrieve all federation services in a specified location with optional filtering and pagination support.

def list_federations(
    self,
    request: Optional[ListFederationsRequest] = None,
    *,
    parent: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListFederationsPager:
    """
    Lists federations in a project and location.

    Args:
        request: The request object containing list parameters
        parent: Required. The relative resource name of the location
                Format: projects/{project_id}/locations/{location_id}
        retry: Retry configuration for the request
        timeout: Request timeout in seconds
        metadata: Additional metadata for the request

    Returns:
        ListFederationsPager: Pageable list of federations

    Raises:
        google.api_core.exceptions.GoogleAPICallError: If the request fails
    """

Usage example:

from google.cloud import metastore

# Use the federation-specific client
federation_client = metastore.DataprocMetastoreFederationClient()
parent = "projects/my-project/locations/us-central1"

# List all federations
for federation in federation_client.list_federations(parent=parent):
    print(f"Federation: {federation.name}")
    print(f"Version: {federation.version}")
    print(f"Endpoint URI: {federation.endpoint_uri}")
    print(f"Backend metastores: {len(federation.backend_metastores)}")

# With filtering
request = metastore.ListFederationsRequest(
    parent=parent,
    filter="state=ACTIVE",
    order_by="create_time desc"
)

Get Federation

Retrieve detailed information about a specific federation including backend metastore configurations.

def get_federation(
    self,
    request: Optional[GetFederationRequest] = None,
    *,
    name: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> Federation:
    """
    Gets the details of a single federation.

    Args:
        request: The request object
        name: Required. The relative resource name of the federation
              Format: projects/{project_id}/locations/{location_id}/federations/{federation_id}
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Federation: The federation resource

    Raises:
        google.api_core.exceptions.NotFound: If the federation doesn't exist
    """

Create Federation

Create a new metastore federation with multiple backend metastores for unified metadata access.

def create_federation(
    self,
    request: Optional[CreateFederationRequest] = None,
    *,
    parent: Optional[str] = None,
    federation: Optional[Federation] = None,
    federation_id: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Creates a metastore federation in a project and location.

    Args:
        request: The request object
        parent: Required. The relative resource name of the location
        federation: Required. The federation configuration
        federation_id: Required. The ID to use for the federation
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for federation creation

    Raises:
        google.api_core.exceptions.AlreadyExists: If federation_id already exists
        google.api_core.exceptions.InvalidArgument: If configuration is invalid
    """

Usage example:

from google.cloud import metastore

federation_client = metastore.DataprocMetastoreFederationClient()

# Configure backend metastores
backend_metastores = [
    metastore.BackendMetastore(
        name="production-hive",
        metastore_type=metastore.BackendMetastore.MetastoreType.DATA_METASTORE,
        hive_metastore_config=metastore.BackendMetastore.HiveMetastoreConfig(
            endpoint_uri="thrift://prod-metastore.company.com:9083"
        )
    ),
    metastore.BackendMetastore(
        name="staging-metastore",
        metastore_type=metastore.BackendMetastore.MetastoreType.DATAPROC_METASTORE,
        dataproc_metastore_config=metastore.BackendMetastore.DataprocMetastoreConfig(
            metastore_service="projects/my-project/locations/us-west1/services/staging-metastore"
        )
    )
]

# Create federation
federation_config = metastore.Federation(
    version="1.0",
    backend_metastores=backend_metastores,
    labels={
        "environment": "multi-region",
        "purpose": "unified-access"
    }
)

operation = federation_client.create_federation(
    parent="projects/my-project/locations/us-central1",
    federation_id="enterprise-federation",
    federation=federation_config
)

# Wait for completion
federation = operation.result(timeout=600)
print(f"Federation created: {federation.name}")
print(f"Endpoint URI: {federation.endpoint_uri}")

Update Federation

Update an existing federation configuration including backend metastore settings.

def update_federation(
    self,
    request: Optional[UpdateFederationRequest] = None,
    *,
    federation: Optional[Federation] = None,
    update_mask: Optional[field_mask_pb2.FieldMask] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Updates the fields of a federation.

    Args:
        request: The request object
        federation: Required. The federation to update
        update_mask: Required. Field mask specifying which fields to update
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for federation update

    Raises:
        google.api_core.exceptions.NotFound: If the federation doesn't exist
        google.api_core.exceptions.InvalidArgument: If update is invalid
    """

Delete Federation

Delete a metastore federation and remove unified access to backend metastores.

def delete_federation(
    self,
    request: Optional[DeleteFederationRequest] = None,
    *,
    name: Optional[str] = None,
    retry: OptionalRetry = gapic_v1.method.DEFAULT,
    timeout: Union[float, object] = gapic_v1.method.DEFAULT,
    metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
    """
    Deletes a single federation.

    Args:
        request: The request object
        name: Required. The relative resource name of the federation to delete
        retry: Retry configuration
        timeout: Request timeout in seconds
        metadata: Additional metadata

    Returns:
        Operation: Long-running operation for federation deletion

    Raises:
        google.api_core.exceptions.NotFound: If the federation doesn't exist
        google.api_core.exceptions.FailedPrecondition: If federation cannot be deleted
    """

Core Types

Federation Resource

class Federation:
    name: str
    create_time: timestamp_pb2.Timestamp
    update_time: timestamp_pb2.Timestamp
    labels: Dict[str, str]
    version: str
    backend_metastores: MutableMapping[int, BackendMetastore]
    endpoint_uri: str
    state: State
    state_message: str
    uid: str

    class State(enum.Enum):
        STATE_UNSPECIFIED = 0
        CREATING = 1
        ACTIVE = 2
        UPDATING = 3
        DELETING = 4
        ERROR = 5

Backend Metastore Configuration

class BackendMetastore:
    name: str
    metastore_type: MetastoreType

    class MetastoreType(enum.Enum):
        METASTORE_TYPE_UNSPECIFIED = 0
        DATAPROC_METASTORE = 1
        BIGQUERY = 2
        DATA_METASTORE = 3  # Third-party Hive metastore

    # Configuration for Dataproc Metastore backend
    class DataprocMetastoreConfig:
        metastore_service: str

    # Configuration for BigQuery backend
    class BigQueryConfig:
        # BigQuery-specific configuration fields
        pass

    # Configuration for external Hive metastore
    class HiveMetastoreConfig:
        endpoint_uri: str
        kerberos_config: Optional[KerberosConfig]

Request/Response Types

class ListFederationsRequest:
    parent: str
    page_size: int
    page_token: str
    filter: str
    order_by: str

class ListFederationsResponse:
    federations: List[Federation]
    next_page_token: str
    unreachable: List[str]

class GetFederationRequest:
    name: str

class CreateFederationRequest:
    parent: str
    federation_id: str
    federation: Federation
    request_id: str

class UpdateFederationRequest:
    update_mask: field_mask_pb2.FieldMask
    federation: Federation
    request_id: str

class DeleteFederationRequest:
    name: str
    request_id: str

Usage Patterns

Multi-Cloud Federation Setup

from google.cloud import metastore

def setup_multi_cloud_federation():
    """Setup federation across multiple cloud providers and on-premises."""
    federation_client = metastore.DataprocMetastoreFederationClient()
    
    # Define backend metastores from different sources
    backend_metastores = [
        # Google Cloud Dataproc Metastore
        metastore.BackendMetastore(
            name="gcp-production",
            metastore_type=metastore.BackendMetastore.MetastoreType.DATAPROC_METASTORE,
            dataproc_metastore_config=metastore.BackendMetastore.DataprocMetastoreConfig(
                metastore_service="projects/gcp-project/locations/us-central1/services/prod-metastore"
            )
        ),
        
        # BigQuery as metastore backend
        metastore.BackendMetastore(
            name="bigquery-analytics",
            metastore_type=metastore.BackendMetastore.MetastoreType.BIGQUERY,
            bigquery_config=metastore.BackendMetastore.BigQueryConfig()
        ),
        
        # On-premises Hive metastore
        metastore.BackendMetastore(
            name="onprem-hadoop",
            metastore_type=metastore.BackendMetastore.MetastoreType.DATA_METASTORE,
            hive_metastore_config=metastore.BackendMetastore.HiveMetastoreConfig(
                endpoint_uri="thrift://onprem-metastore.company.com:9083",
                kerberos_config=metastore.KerberosConfig(
                    keytab=metastore.Secret(cloud_secret="projects/my-project/secrets/hive-keytab/versions/latest"),
                    principal="hive/metastore@COMPANY.COM",
                    krb5_config_gcs_uri="gs://my-config/krb5.conf"
                )
            )
        )
    ]
    
    # Create federation
    federation_config = metastore.Federation(
        version="1.0",
        backend_metastores=backend_metastores,
        labels={
            "architecture": "multi-cloud",
            "use_case": "unified_data_lake"
        }
    )
    
    operation = federation_client.create_federation(
        parent="projects/my-project/locations/us-central1",
        federation_id="enterprise-multi-cloud-federation",
        federation=federation_config
    )
    
    return operation.result(timeout=600)

Federation Health Monitoring

import logging
from typing import Dict, List
from google.cloud import metastore

class FederationMonitor:
    def __init__(self, project_id: str, location: str):
        self.federation_client = metastore.DataprocMetastoreFederationClient()
        self.parent = f"projects/{project_id}/locations/{location}"
    
    def check_federation_health(self) -> Dict[str, str]:
        """Check health status of all federations."""
        health_status = {}
        
        for federation in self.federation_client.list_federations(parent=self.parent):
            if federation.state == metastore.Federation.State.ACTIVE:
                health_status[federation.name] = "HEALTHY"
            elif federation.state == metastore.Federation.State.ERROR:
                health_status[federation.name] = f"ERROR: {federation.state_message}"
                logging.error(f"Federation {federation.name} in error state: {federation.state_message}")
            else:
                health_status[federation.name] = f"TRANSITIONING: {federation.state.name}"
        
        return health_status
    
    def validate_backend_connectivity(self, federation_name: str) -> List[Dict[str, str]]:
        """Validate connectivity to all backend metastores in a federation."""
        federation = self.federation_client.get_federation(name=federation_name)
        backend_status = []
        
        for backend in federation.backend_metastores:
            status = {
                "name": backend.name,
                "type": backend.metastore_type.name,
                "status": "UNKNOWN"
            }
            
            # In a real implementation, you would test connectivity
            # to each backend metastore based on its type and configuration
            if backend.metastore_type == metastore.BackendMetastore.MetastoreType.DATAPROC_METASTORE:
                # Test Dataproc Metastore connectivity
                status["status"] = "CONNECTED"
            elif backend.metastore_type == metastore.BackendMetastore.MetastoreType.DATA_METASTORE:
                # Test external Hive metastore connectivity
                status["status"] = "CONNECTED"
            
            backend_status.append(status)
        
        return backend_status

Dynamic Backend Management

def add_backend_to_federation(federation_name: str, new_backend: metastore.BackendMetastore):
    """Dynamically add a new backend metastore to an existing federation."""
    federation_client = metastore.DataprocMetastoreFederationClient()
    
    # Get current federation configuration
    current_federation = federation_client.get_federation(name=federation_name)
    
    # Add new backend to existing list
    updated_backends = list(current_federation.backend_metastores)
    updated_backends.append(new_backend)
    
    # Update federation
    updated_federation = metastore.Federation(
        name=current_federation.name,
        version=current_federation.version,
        backend_metastores=updated_backends,
        labels=current_federation.labels
    )
    
    update_request = metastore.UpdateFederationRequest(
        federation=updated_federation,
        update_mask=field_mask_pb2.FieldMask(paths=["backend_metastores"])
    )
    
    operation = federation_client.update_federation(request=update_request)
    return operation.result(timeout=300)

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-dataproc-metastore

docs

async-operations.md

backup-restore.md

federation-management.md

index.md

metadata-import-export.md

metadata-query.md

service-management.md

tile.json