CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-datacatalog

Google Cloud Datacatalog API client library for data discovery and metadata management

Pending
Overview
Eval results
Files

bulk-operations.mddocs/

Bulk Operations

Long-running operations for bulk entry import and tag reconciliation, designed for large-scale metadata management tasks and synchronization operations.

Capabilities

Bulk Entry Import

Import large numbers of entries from external sources using long-running operations that can handle thousands of entries asynchronously.

def import_entries(
    self,
    request: ImportEntriesRequest = None,
    *,
    parent: str = None,
    **kwargs
) -> Operation:
    """
    Import entries from an external source.

    Args:
        request: The request object containing import configuration
        parent: str - Required. Format: projects/{project}/locations/{location}

    Returns:
        Operation: Long-running operation that resolves to ImportEntriesResponse

    Raises:
        google.api_core.exceptions.InvalidArgument: Invalid import configuration
        google.api_core.exceptions.PermissionDenied: Insufficient permissions
    """

Tag Reconciliation

Reconcile tags across entries to ensure consistency and apply bulk tag updates using configurable reconciliation policies.

def reconcile_tags(
    self,
    request: ReconcileTagsRequest = None,
    *,
    parent: str = None,
    tag_template: str = None,
    force_delete_missing: bool = None,
    **kwargs
) -> Operation:
    """
    Reconcile tags on entries.

    Args:
        request: The request object
        parent: str - Required. Format: projects/{project}/locations/{location}/entryGroups/{entry_group}
        tag_template: str - Required. Tag template to reconcile
        force_delete_missing: bool - Optional. Delete tags not in reconciliation state

    Returns:
        Operation: Long-running operation that resolves to ReconcileTagsResponse

    Raises:
        google.api_core.exceptions.NotFound: Tag template not found
        google.api_core.exceptions.InvalidArgument: Invalid reconciliation request
    """

Usage Example:

from google.cloud import datacatalog_v1
from google.api_core import operation

client = datacatalog_v1.DataCatalogClient()

# Start bulk entry import
import_request = datacatalog_v1.ImportEntriesRequest(
    parent="projects/my-project/locations/us-central1",
    gcs_bucket_path="gs://my-bucket/metadata-export/",
    job_id="import-job-001"
)

import_operation = client.import_entries(request=import_request)

print(f"Import operation started: {import_operation.name}")

# Wait for completion (optional)
print("Waiting for import to complete...")
import_result = import_operation.result(timeout=3600)  # 1 hour timeout

print(f"Imported {import_result.upserted_entries_count} entries")
print(f"Deleted {import_result.deleted_entries_count} entries")

# Start tag reconciliation  
reconcile_request = datacatalog_v1.ReconcileTagsRequest(
    parent="projects/my-project/locations/us-central1/entryGroups/my-group",
    tag_template="projects/my-project/locations/us-central1/tagTemplates/data-quality",
    force_delete_missing=False,
    tags=[
        datacatalog_v1.Tag(
            template="projects/my-project/locations/us-central1/tagTemplates/data-quality",
            fields={
                "quality_score": datacatalog_v1.TagField(double_value=0.85)
            }
        )
    ]
)

reconcile_operation = client.reconcile_tags(request=reconcile_request)

print(f"Reconciliation operation started: {reconcile_operation.name}")

# Check operation status
if not reconcile_operation.done():
    print("Reconciliation in progress...")
else:
    reconcile_result = reconcile_operation.result()
    print(f"Reconciled {reconcile_result.created_tags_count} tags")
    print(f"Updated {reconcile_result.updated_tags_count} tags")
    print(f"Deleted {reconcile_result.deleted_tags_count} tags")

Request Types

class ImportEntriesRequest:
    parent: str  # Required parent location
    gcs_bucket_path: str  # Optional GCS bucket path for import source
    job_id: str  # Optional job identifier for tracking
    aspect_types: Sequence[str]  # Optional aspect types to import
    import_state: ImportState  # Optional import state configuration

    class ImportState(proto.Enum):
        IMPORT_STATE_UNSPECIFIED = 0
        FULL = 1
        INCREMENTAL = 2

class ReconcileTagsRequest:
    parent: str  # Required parent entry group
    tag_template: str  # Required tag template name
    force_delete_missing: bool  # Optional force delete missing tags
    tags: Sequence[Tag]  # Optional tags to reconcile
    reconciliation_state: ReconciliationState  # Optional reconciliation configuration

    class ReconciliationState(proto.Enum):
        RECONCILIATION_STATE_UNSPECIFIED = 0
        RECONCILIATION_REQUIRED = 1
        RECONCILIATION_DONE = 2

Response Types

class ImportEntriesResponse:
    upserted_entries_count: int  # Number of entries created or updated
    deleted_entries_count: int  # Number of entries deleted
    job_errors: Sequence[str]  # List of errors encountered during import

class ReconcileTagsResponse:
    created_tags_count: int  # Number of tags created
    updated_tags_count: int  # Number of tags updated  
    deleted_tags_count: int  # Number of tags deleted

Metadata Types

class ImportEntriesMetadata:
    state: State  # Current operation state
    errors: Sequence[str]  # Errors encountered
    partial_failures: Sequence[str]  # Partial failures

    class State(proto.Enum):
        STATE_UNSPECIFIED = 0
        RUNNING = 1
        SUCCEEDED = 2
        FAILED = 3
        CANCELLED = 4

class ReconcileTagsMetadata:
    state: State  # Current operation state
    errors: Sequence[str]  # Errors encountered

    class State(proto.Enum):
        STATE_UNSPECIFIED = 0
        RUNNING = 1
        SUCCEEDED = 2
        FAILED = 3
        CANCELLED = 4

Operation Management

All bulk operations return Google Cloud long-running operations that can be monitored and managed:

# Check operation status
if operation.done():
    if operation.exception():
        print(f"Operation failed: {operation.exception()}")
    else:
        result = operation.result()
        print(f"Operation completed successfully")
else:
    print("Operation still running...")
    
# Get operation metadata
metadata = operation.metadata
print(f"Operation state: {metadata.state}")

# Cancel operation (if supported)
operation.cancel()

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-datacatalog

docs

bulk-operations.md

data-catalog.md

entry-metadata.md

index.md

policy-tags.md

tags.md

taxonomy-serialization.md

tile.json