Google Cloud Dataproc Metastore API client library for managing fully managed, highly available metastore services
—
Manage metastore federation services that provide unified access to multiple backend metastores. Supports cross-cloud and multi-region federation scenarios for enterprise data lake architectures, enabling centralized metadata management across diverse metastore environments.
Retrieve all federation services in a specified location with optional filtering and pagination support.
def list_federations(
self,
request: Optional[ListFederationsRequest] = None,
*,
parent: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> pagers.ListFederationsPager:
"""
Lists federations in a project and location.
Args:
request: The request object containing list parameters
parent: Required. The relative resource name of the location
Format: projects/{project_id}/locations/{location_id}
retry: Retry configuration for the request
timeout: Request timeout in seconds
metadata: Additional metadata for the request
Returns:
ListFederationsPager: Pageable list of federations
Raises:
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage example:
from google.cloud import metastore
# Use the federation-specific client
federation_client = metastore.DataprocMetastoreFederationClient()
parent = "projects/my-project/locations/us-central1"
# List all federations
for federation in federation_client.list_federations(parent=parent):
print(f"Federation: {federation.name}")
print(f"Version: {federation.version}")
print(f"Endpoint URI: {federation.endpoint_uri}")
print(f"Backend metastores: {len(federation.backend_metastores)}")
# With filtering
request = metastore.ListFederationsRequest(
parent=parent,
filter="state=ACTIVE",
order_by="create_time desc"
)Retrieve detailed information about a specific federation including backend metastore configurations.
def get_federation(
self,
request: Optional[GetFederationRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> Federation:
"""
Gets the details of a single federation.
Args:
request: The request object
name: Required. The relative resource name of the federation
Format: projects/{project_id}/locations/{location_id}/federations/{federation_id}
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Federation: The federation resource
Raises:
google.api_core.exceptions.NotFound: If the federation doesn't exist
"""Create a new metastore federation with multiple backend metastores for unified metadata access.
def create_federation(
self,
request: Optional[CreateFederationRequest] = None,
*,
parent: Optional[str] = None,
federation: Optional[Federation] = None,
federation_id: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Creates a metastore federation in a project and location.
Args:
request: The request object
parent: Required. The relative resource name of the location
federation: Required. The federation configuration
federation_id: Required. The ID to use for the federation
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for federation creation
Raises:
google.api_core.exceptions.AlreadyExists: If federation_id already exists
google.api_core.exceptions.InvalidArgument: If configuration is invalid
"""Usage example:
from google.cloud import metastore
federation_client = metastore.DataprocMetastoreFederationClient()
# Configure backend metastores
backend_metastores = [
metastore.BackendMetastore(
name="production-hive",
metastore_type=metastore.BackendMetastore.MetastoreType.DATA_METASTORE,
hive_metastore_config=metastore.BackendMetastore.HiveMetastoreConfig(
endpoint_uri="thrift://prod-metastore.company.com:9083"
)
),
metastore.BackendMetastore(
name="staging-metastore",
metastore_type=metastore.BackendMetastore.MetastoreType.DATAPROC_METASTORE,
dataproc_metastore_config=metastore.BackendMetastore.DataprocMetastoreConfig(
metastore_service="projects/my-project/locations/us-west1/services/staging-metastore"
)
)
]
# Create federation
federation_config = metastore.Federation(
version="1.0",
backend_metastores=backend_metastores,
labels={
"environment": "multi-region",
"purpose": "unified-access"
}
)
operation = federation_client.create_federation(
parent="projects/my-project/locations/us-central1",
federation_id="enterprise-federation",
federation=federation_config
)
# Wait for completion
federation = operation.result(timeout=600)
print(f"Federation created: {federation.name}")
print(f"Endpoint URI: {federation.endpoint_uri}")Update an existing federation configuration including backend metastore settings.
def update_federation(
self,
request: Optional[UpdateFederationRequest] = None,
*,
federation: Optional[Federation] = None,
update_mask: Optional[field_mask_pb2.FieldMask] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Updates the fields of a federation.
Args:
request: The request object
federation: Required. The federation to update
update_mask: Required. Field mask specifying which fields to update
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for federation update
Raises:
google.api_core.exceptions.NotFound: If the federation doesn't exist
google.api_core.exceptions.InvalidArgument: If update is invalid
"""Delete a metastore federation and remove unified access to backend metastores.
def delete_federation(
self,
request: Optional[DeleteFederationRequest] = None,
*,
name: Optional[str] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
) -> operation.Operation:
"""
Deletes a single federation.
Args:
request: The request object
name: Required. The relative resource name of the federation to delete
retry: Retry configuration
timeout: Request timeout in seconds
metadata: Additional metadata
Returns:
Operation: Long-running operation for federation deletion
Raises:
google.api_core.exceptions.NotFound: If the federation doesn't exist
google.api_core.exceptions.FailedPrecondition: If federation cannot be deleted
"""class Federation:
name: str
create_time: timestamp_pb2.Timestamp
update_time: timestamp_pb2.Timestamp
labels: Dict[str, str]
version: str
backend_metastores: MutableMapping[int, BackendMetastore]
endpoint_uri: str
state: State
state_message: str
uid: str
class State(enum.Enum):
STATE_UNSPECIFIED = 0
CREATING = 1
ACTIVE = 2
UPDATING = 3
DELETING = 4
ERROR = 5class BackendMetastore:
name: str
metastore_type: MetastoreType
class MetastoreType(enum.Enum):
METASTORE_TYPE_UNSPECIFIED = 0
DATAPROC_METASTORE = 1
BIGQUERY = 2
DATA_METASTORE = 3 # Third-party Hive metastore
# Configuration for Dataproc Metastore backend
class DataprocMetastoreConfig:
metastore_service: str
# Configuration for BigQuery backend
class BigQueryConfig:
# BigQuery-specific configuration fields
pass
# Configuration for external Hive metastore
class HiveMetastoreConfig:
endpoint_uri: str
kerberos_config: Optional[KerberosConfig]class ListFederationsRequest:
parent: str
page_size: int
page_token: str
filter: str
order_by: str
class ListFederationsResponse:
federations: List[Federation]
next_page_token: str
unreachable: List[str]
class GetFederationRequest:
name: str
class CreateFederationRequest:
parent: str
federation_id: str
federation: Federation
request_id: str
class UpdateFederationRequest:
update_mask: field_mask_pb2.FieldMask
federation: Federation
request_id: str
class DeleteFederationRequest:
name: str
request_id: strfrom google.cloud import metastore
def setup_multi_cloud_federation():
"""Setup federation across multiple cloud providers and on-premises."""
federation_client = metastore.DataprocMetastoreFederationClient()
# Define backend metastores from different sources
backend_metastores = [
# Google Cloud Dataproc Metastore
metastore.BackendMetastore(
name="gcp-production",
metastore_type=metastore.BackendMetastore.MetastoreType.DATAPROC_METASTORE,
dataproc_metastore_config=metastore.BackendMetastore.DataprocMetastoreConfig(
metastore_service="projects/gcp-project/locations/us-central1/services/prod-metastore"
)
),
# BigQuery as metastore backend
metastore.BackendMetastore(
name="bigquery-analytics",
metastore_type=metastore.BackendMetastore.MetastoreType.BIGQUERY,
bigquery_config=metastore.BackendMetastore.BigQueryConfig()
),
# On-premises Hive metastore
metastore.BackendMetastore(
name="onprem-hadoop",
metastore_type=metastore.BackendMetastore.MetastoreType.DATA_METASTORE,
hive_metastore_config=metastore.BackendMetastore.HiveMetastoreConfig(
endpoint_uri="thrift://onprem-metastore.company.com:9083",
kerberos_config=metastore.KerberosConfig(
keytab=metastore.Secret(cloud_secret="projects/my-project/secrets/hive-keytab/versions/latest"),
principal="hive/metastore@COMPANY.COM",
krb5_config_gcs_uri="gs://my-config/krb5.conf"
)
)
)
]
# Create federation
federation_config = metastore.Federation(
version="1.0",
backend_metastores=backend_metastores,
labels={
"architecture": "multi-cloud",
"use_case": "unified_data_lake"
}
)
operation = federation_client.create_federation(
parent="projects/my-project/locations/us-central1",
federation_id="enterprise-multi-cloud-federation",
federation=federation_config
)
return operation.result(timeout=600)import logging
from typing import Dict, List
from google.cloud import metastore
class FederationMonitor:
def __init__(self, project_id: str, location: str):
self.federation_client = metastore.DataprocMetastoreFederationClient()
self.parent = f"projects/{project_id}/locations/{location}"
def check_federation_health(self) -> Dict[str, str]:
"""Check health status of all federations."""
health_status = {}
for federation in self.federation_client.list_federations(parent=self.parent):
if federation.state == metastore.Federation.State.ACTIVE:
health_status[federation.name] = "HEALTHY"
elif federation.state == metastore.Federation.State.ERROR:
health_status[federation.name] = f"ERROR: {federation.state_message}"
logging.error(f"Federation {federation.name} in error state: {federation.state_message}")
else:
health_status[federation.name] = f"TRANSITIONING: {federation.state.name}"
return health_status
def validate_backend_connectivity(self, federation_name: str) -> List[Dict[str, str]]:
"""Validate connectivity to all backend metastores in a federation."""
federation = self.federation_client.get_federation(name=federation_name)
backend_status = []
for backend in federation.backend_metastores:
status = {
"name": backend.name,
"type": backend.metastore_type.name,
"status": "UNKNOWN"
}
# In a real implementation, you would test connectivity
# to each backend metastore based on its type and configuration
if backend.metastore_type == metastore.BackendMetastore.MetastoreType.DATAPROC_METASTORE:
# Test Dataproc Metastore connectivity
status["status"] = "CONNECTED"
elif backend.metastore_type == metastore.BackendMetastore.MetastoreType.DATA_METASTORE:
# Test external Hive metastore connectivity
status["status"] = "CONNECTED"
backend_status.append(status)
return backend_statusdef add_backend_to_federation(federation_name: str, new_backend: metastore.BackendMetastore):
"""Dynamically add a new backend metastore to an existing federation."""
federation_client = metastore.DataprocMetastoreFederationClient()
# Get current federation configuration
current_federation = federation_client.get_federation(name=federation_name)
# Add new backend to existing list
updated_backends = list(current_federation.backend_metastores)
updated_backends.append(new_backend)
# Update federation
updated_federation = metastore.Federation(
name=current_federation.name,
version=current_federation.version,
backend_metastores=updated_backends,
labels=current_federation.labels
)
update_request = metastore.UpdateFederationRequest(
federation=updated_federation,
update_mask=field_mask_pb2.FieldMask(paths=["backend_metastores"])
)
operation = federation_client.update_federation(request=update_request)
return operation.result(timeout=300)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-dataproc-metastore