Google Cloud Datacatalog API client library for data discovery and metadata management
—
Long-running operations for bulk entry import and tag reconciliation, designed for large-scale metadata management tasks and synchronization operations.
Import large numbers of entries from external sources using long-running operations that can handle thousands of entries asynchronously.
def import_entries(
self,
request: ImportEntriesRequest = None,
*,
parent: str = None,
**kwargs
) -> Operation:
"""
Import entries from an external source.
Args:
request: The request object containing import configuration
parent: str - Required. Format: projects/{project}/locations/{location}
Returns:
Operation: Long-running operation that resolves to ImportEntriesResponse
Raises:
google.api_core.exceptions.InvalidArgument: Invalid import configuration
google.api_core.exceptions.PermissionDenied: Insufficient permissions
"""Reconcile tags across entries to ensure consistency and apply bulk tag updates using configurable reconciliation policies.
def reconcile_tags(
self,
request: ReconcileTagsRequest = None,
*,
parent: str = None,
tag_template: str = None,
force_delete_missing: bool = None,
**kwargs
) -> Operation:
"""
Reconcile tags on entries.
Args:
request: The request object
parent: str - Required. Format: projects/{project}/locations/{location}/entryGroups/{entry_group}
tag_template: str - Required. Tag template to reconcile
force_delete_missing: bool - Optional. Delete tags not in reconciliation state
Returns:
Operation: Long-running operation that resolves to ReconcileTagsResponse
Raises:
google.api_core.exceptions.NotFound: Tag template not found
google.api_core.exceptions.InvalidArgument: Invalid reconciliation request
"""Usage Example:
from google.cloud import datacatalog_v1
from google.api_core import operation
client = datacatalog_v1.DataCatalogClient()
# Start bulk entry import
import_request = datacatalog_v1.ImportEntriesRequest(
parent="projects/my-project/locations/us-central1",
gcs_bucket_path="gs://my-bucket/metadata-export/",
job_id="import-job-001"
)
import_operation = client.import_entries(request=import_request)
print(f"Import operation started: {import_operation.name}")
# Wait for completion (optional)
print("Waiting for import to complete...")
import_result = import_operation.result(timeout=3600) # 1 hour timeout
print(f"Imported {import_result.upserted_entries_count} entries")
print(f"Deleted {import_result.deleted_entries_count} entries")
# Start tag reconciliation
reconcile_request = datacatalog_v1.ReconcileTagsRequest(
parent="projects/my-project/locations/us-central1/entryGroups/my-group",
tag_template="projects/my-project/locations/us-central1/tagTemplates/data-quality",
force_delete_missing=False,
tags=[
datacatalog_v1.Tag(
template="projects/my-project/locations/us-central1/tagTemplates/data-quality",
fields={
"quality_score": datacatalog_v1.TagField(double_value=0.85)
}
)
]
)
reconcile_operation = client.reconcile_tags(request=reconcile_request)
print(f"Reconciliation operation started: {reconcile_operation.name}")
# Check operation status
if not reconcile_operation.done():
print("Reconciliation in progress...")
else:
reconcile_result = reconcile_operation.result()
print(f"Reconciled {reconcile_result.created_tags_count} tags")
print(f"Updated {reconcile_result.updated_tags_count} tags")
print(f"Deleted {reconcile_result.deleted_tags_count} tags")class ImportEntriesRequest:
parent: str # Required parent location
gcs_bucket_path: str # Optional GCS bucket path for import source
job_id: str # Optional job identifier for tracking
aspect_types: Sequence[str] # Optional aspect types to import
import_state: ImportState # Optional import state configuration
class ImportState(proto.Enum):
IMPORT_STATE_UNSPECIFIED = 0
FULL = 1
INCREMENTAL = 2
class ReconcileTagsRequest:
parent: str # Required parent entry group
tag_template: str # Required tag template name
force_delete_missing: bool # Optional force delete missing tags
tags: Sequence[Tag] # Optional tags to reconcile
reconciliation_state: ReconciliationState # Optional reconciliation configuration
class ReconciliationState(proto.Enum):
RECONCILIATION_STATE_UNSPECIFIED = 0
RECONCILIATION_REQUIRED = 1
RECONCILIATION_DONE = 2class ImportEntriesResponse:
upserted_entries_count: int # Number of entries created or updated
deleted_entries_count: int # Number of entries deleted
job_errors: Sequence[str] # List of errors encountered during import
class ReconcileTagsResponse:
created_tags_count: int # Number of tags created
updated_tags_count: int # Number of tags updated
deleted_tags_count: int # Number of tags deletedclass ImportEntriesMetadata:
state: State # Current operation state
errors: Sequence[str] # Errors encountered
partial_failures: Sequence[str] # Partial failures
class State(proto.Enum):
STATE_UNSPECIFIED = 0
RUNNING = 1
SUCCEEDED = 2
FAILED = 3
CANCELLED = 4
class ReconcileTagsMetadata:
state: State # Current operation state
errors: Sequence[str] # Errors encountered
class State(proto.Enum):
STATE_UNSPECIFIED = 0
RUNNING = 1
SUCCEEDED = 2
FAILED = 3
CANCELLED = 4All bulk operations return Google Cloud long-running operations that can be monitored and managed:
# Check operation status
if operation.done():
if operation.exception():
print(f"Operation failed: {operation.exception()}")
else:
result = operation.result()
print(f"Operation completed successfully")
else:
print("Operation still running...")
# Get operation metadata
metadata = operation.metadata
print(f"Operation state: {metadata.state}")
# Cancel operation (if supported)
operation.cancel()Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-datacatalog