0
# Bulk Operations
1
2
Long-running operations for bulk entry import and tag reconciliation, designed for large-scale metadata management tasks and synchronization operations.
3
4
## Capabilities
5
6
### Bulk Entry Import
7
8
Import large numbers of entries from external sources using long-running operations that can handle thousands of entries asynchronously.
9
10
```python { .api }
11
def import_entries(
12
self,
13
request: ImportEntriesRequest = None,
14
*,
15
parent: str = None,
16
**kwargs
17
) -> Operation:
18
"""
19
Import entries from an external source.
20
21
Args:
22
request: The request object containing import configuration
23
parent: str - Required. Format: projects/{project}/locations/{location}
24
25
Returns:
26
Operation: Long-running operation that resolves to ImportEntriesResponse
27
28
Raises:
29
google.api_core.exceptions.InvalidArgument: Invalid import configuration
30
google.api_core.exceptions.PermissionDenied: Insufficient permissions
31
"""
32
```
33
34
### Tag Reconciliation
35
36
Reconcile tags across entries to ensure consistency and apply bulk tag updates using configurable reconciliation policies.
37
38
```python { .api }
39
def reconcile_tags(
40
self,
41
request: ReconcileTagsRequest = None,
42
*,
43
parent: str = None,
44
tag_template: str = None,
45
force_delete_missing: bool = None,
46
**kwargs
47
) -> Operation:
48
"""
49
Reconcile tags on entries.
50
51
Args:
52
request: The request object
53
parent: str - Required. Format: projects/{project}/locations/{location}/entryGroups/{entry_group}
54
tag_template: str - Required. Tag template to reconcile
55
force_delete_missing: bool - Optional. Delete tags not in reconciliation state
56
57
Returns:
58
Operation: Long-running operation that resolves to ReconcileTagsResponse
59
60
Raises:
61
google.api_core.exceptions.NotFound: Tag template not found
62
google.api_core.exceptions.InvalidArgument: Invalid reconciliation request
63
"""
64
```
65
66
**Usage Example:**
67
68
```python
69
from google.cloud import datacatalog_v1
70
from google.api_core import operation
71
72
client = datacatalog_v1.DataCatalogClient()
73
74
# Start bulk entry import
75
import_request = datacatalog_v1.ImportEntriesRequest(
76
parent="projects/my-project/locations/us-central1",
77
gcs_bucket_path="gs://my-bucket/metadata-export/",
78
job_id="import-job-001"
79
)
80
81
import_operation = client.import_entries(request=import_request)
82
83
print(f"Import operation started: {import_operation.name}")
84
85
# Wait for completion (optional)
86
print("Waiting for import to complete...")
87
import_result = import_operation.result(timeout=3600) # 1 hour timeout
88
89
print(f"Imported {import_result.upserted_entries_count} entries")
90
print(f"Deleted {import_result.deleted_entries_count} entries")
91
92
# Start tag reconciliation
93
reconcile_request = datacatalog_v1.ReconcileTagsRequest(
94
parent="projects/my-project/locations/us-central1/entryGroups/my-group",
95
tag_template="projects/my-project/locations/us-central1/tagTemplates/data-quality",
96
force_delete_missing=False,
97
tags=[
98
datacatalog_v1.Tag(
99
template="projects/my-project/locations/us-central1/tagTemplates/data-quality",
100
fields={
101
"quality_score": datacatalog_v1.TagField(double_value=0.85)
102
}
103
)
104
]
105
)
106
107
reconcile_operation = client.reconcile_tags(request=reconcile_request)
108
109
print(f"Reconciliation operation started: {reconcile_operation.name}")
110
111
# Check operation status
112
if not reconcile_operation.done():
113
print("Reconciliation in progress...")
114
else:
115
reconcile_result = reconcile_operation.result()
116
print(f"Reconciled {reconcile_result.created_tags_count} tags")
117
print(f"Updated {reconcile_result.updated_tags_count} tags")
118
print(f"Deleted {reconcile_result.deleted_tags_count} tags")
119
```
120
121
## Request Types
122
123
```python { .api }
124
class ImportEntriesRequest:
125
parent: str # Required parent location
126
gcs_bucket_path: str # Optional GCS bucket path for import source
127
job_id: str # Optional job identifier for tracking
128
aspect_types: Sequence[str] # Optional aspect types to import
129
import_state: ImportState # Optional import state configuration
130
131
class ImportState(proto.Enum):
132
IMPORT_STATE_UNSPECIFIED = 0
133
FULL = 1
134
INCREMENTAL = 2
135
136
class ReconcileTagsRequest:
137
parent: str # Required parent entry group
138
tag_template: str # Required tag template name
139
force_delete_missing: bool # Optional force delete missing tags
140
tags: Sequence[Tag] # Optional tags to reconcile
141
reconciliation_state: ReconciliationState # Optional reconciliation configuration
142
143
class ReconciliationState(proto.Enum):
144
RECONCILIATION_STATE_UNSPECIFIED = 0
145
RECONCILIATION_REQUIRED = 1
146
RECONCILIATION_DONE = 2
147
```
148
149
## Response Types
150
151
```python { .api }
152
class ImportEntriesResponse:
153
upserted_entries_count: int # Number of entries created or updated
154
deleted_entries_count: int # Number of entries deleted
155
job_errors: Sequence[str] # List of errors encountered during import
156
157
class ReconcileTagsResponse:
158
created_tags_count: int # Number of tags created
159
updated_tags_count: int # Number of tags updated
160
deleted_tags_count: int # Number of tags deleted
161
```
162
163
## Metadata Types
164
165
```python { .api }
166
class ImportEntriesMetadata:
167
state: State # Current operation state
168
errors: Sequence[str] # Errors encountered
169
partial_failures: Sequence[str] # Partial failures
170
171
class State(proto.Enum):
172
STATE_UNSPECIFIED = 0
173
RUNNING = 1
174
SUCCEEDED = 2
175
FAILED = 3
176
CANCELLED = 4
177
178
class ReconcileTagsMetadata:
179
state: State # Current operation state
180
errors: Sequence[str] # Errors encountered
181
182
class State(proto.Enum):
183
STATE_UNSPECIFIED = 0
184
RUNNING = 1
185
SUCCEEDED = 2
186
FAILED = 3
187
CANCELLED = 4
188
```
189
190
## Operation Management
191
192
All bulk operations return Google Cloud long-running operations that can be monitored and managed:
193
194
```python
195
# Check operation status
196
if operation.done():
197
if operation.exception():
198
print(f"Operation failed: {operation.exception()}")
199
else:
200
result = operation.result()
201
print(f"Operation completed successfully")
202
else:
203
print("Operation still running...")
204
205
# Get operation metadata
206
metadata = operation.metadata
207
print(f"Operation state: {metadata.state}")
208
209
# Cancel operation (if supported)
210
operation.cancel()
211
```