0
# Metadata Query Operations
1
2
Execute Hive and Spark SQL queries directly against metastore metadata for advanced analytics and metadata management operations. Includes table movement between databases, resource location management, and complex metadata transformations for data lake operations.
3
4
## Capabilities
5
6
### Query Metadata
7
8
Execute SQL queries against the metastore's metadata store for analytics, reporting, and metadata management operations.
9
10
```python { .api }
11
def query_metadata(
12
self,
13
request: Optional[QueryMetadataRequest] = None,
14
retry: OptionalRetry = gapic_v1.method.DEFAULT,
15
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
16
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
17
) -> operation.Operation:
18
"""
19
Query DPMS metadata.
20
21
Args:
22
request: The request object containing service name and query
23
retry: Retry configuration
24
timeout: Request timeout in seconds
25
metadata: Additional metadata
26
27
Returns:
28
Operation: Long-running operation for query execution
29
30
Raises:
31
google.api_core.exceptions.InvalidArgument: If query is malformed or not read-only
32
google.api_core.exceptions.PermissionDenied: If insufficient permissions
33
"""
34
```
35
36
Usage example:
37
38
```python
39
from google.cloud import metastore
40
41
client = metastore.DataprocMetastoreClient()
42
service_name = "projects/my-project/locations/us-central1/services/my-metastore"
43
44
# Query table metadata
45
query_request = metastore.QueryMetadataRequest(
46
service=service_name,
47
query="""
48
SELECT
49
d.NAME as database_name,
50
t.TBL_NAME as table_name,
51
t.TBL_TYPE as table_type,
52
s.LOCATION as table_location,
53
t.CREATE_TIME as create_time
54
FROM TBLS t
55
JOIN DBS d ON t.DB_ID = d.DB_ID
56
JOIN SDS s ON t.SD_ID = s.SD_ID
57
WHERE d.NAME = 'production'
58
ORDER BY t.CREATE_TIME DESC
59
LIMIT 100
60
"""
61
)
62
63
operation = client.query_metadata(request=query_request)
64
65
# Wait for query completion
66
response = operation.result()
67
print(f"Query executed successfully")
68
print(f"Result metadata: {response.result_metadata}")
69
if hasattr(response, 'result_manifest'):
70
print(f"Results available at: {response.result_manifest.file_uri}")
71
```
72
73
### Move Table to Database
74
75
Move tables between databases within the same metastore service for data organization and management.
76
77
```python { .api }
78
def move_table_to_database(
79
self,
80
request: Optional[MoveTableToDatabaseRequest] = None,
81
retry: OptionalRetry = gapic_v1.method.DEFAULT,
82
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
83
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
84
) -> operation.Operation:
85
"""
86
Move a table to another database.
87
88
Args:
89
request: The request object
90
service: Required. The relative resource name of the service
91
table_name: Required. The name of the table to move
92
db_name: Required. The name of the source database
93
destination_db_name: Required. The name of the destination database
94
retry: Retry configuration
95
timeout: Request timeout in seconds
96
metadata: Additional metadata
97
98
Returns:
99
Operation: Long-running operation for table move
100
101
Raises:
102
google.api_core.exceptions.NotFound: If table or database doesn't exist
103
google.api_core.exceptions.AlreadyExists: If table already exists in destination
104
"""
105
```
106
107
Usage example:
108
109
```python
110
from google.cloud import metastore
111
112
client = metastore.DataprocMetastoreClient()
113
114
# Move table from staging to production database
115
move_request = metastore.MoveTableToDatabaseRequest(
116
service="projects/my-project/locations/us-central1/services/my-metastore",
117
table_name="customer_data",
118
db_name="staging",
119
destination_db_name="production"
120
)
121
122
operation = client.move_table_to_database(request=move_request)
123
124
# Wait for completion
125
response = operation.result(timeout=300)
126
print(f"Table moved successfully")
127
print(f"New table location: {response.table_name}")
128
```
129
130
### Alter Metadata Resource Location
131
132
Update the storage location of metadata resources for data migration and reorganization scenarios.
133
134
```python { .api }
135
def alter_metadata_resource_location(
136
self,
137
request: Optional[AlterMetadataResourceLocationRequest] = None,
138
*,
139
service: Optional[str] = None,
140
resource_name: Optional[str] = None,
141
location_uri: Optional[str] = None,
142
retry: OptionalRetry = gapic_v1.method.DEFAULT,
143
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
144
metadata: Sequence[Tuple[str, str]] = ()
145
) -> operation.Operation:
146
"""
147
Alter metadata resource location. The metadata resource can be a database, table, or partition.
148
149
Args:
150
request: The request object
151
service: Required. The relative resource name of the service
152
resource_name: Required. The relative resource name of the metadata resource
153
location_uri: Required. The new location URI for the resource
154
retry: Retry configuration
155
timeout: Request timeout in seconds
156
metadata: Additional metadata
157
158
Returns:
159
Operation: Long-running operation for location alteration
160
161
Raises:
162
google.api_core.exceptions.NotFound: If resource doesn't exist
163
google.api_core.exceptions.InvalidArgument: If location URI is invalid
164
"""
165
```
166
167
Usage example:
168
169
```python
170
from google.cloud import metastore
171
172
client = metastore.DataprocMetastoreClient()
173
174
# Move table data to new Cloud Storage location
175
alter_request = metastore.AlterMetadataResourceLocationRequest(
176
service="projects/my-project/locations/us-central1/services/my-metastore",
177
resource_name="production.sales_data",
178
location_uri="gs://new-data-bucket/sales-data/"
179
)
180
181
operation = client.alter_metadata_resource_location(request=alter_request)
182
183
# Wait for completion
184
response = operation.result(timeout=600)
185
print(f"Resource location updated")
186
print(f"New location: {response.location_uri}")
187
```
188
189
## Core Types
190
191
### Query Request and Response
192
193
```python { .api }
194
class QueryMetadataRequest:
195
service: str
196
query: str
197
198
class QueryMetadataResponse:
199
result_metadata: ResultMetadata
200
result_manifest: Optional[ResultManifest]
201
202
class ResultMetadata:
203
row_count: int
204
execution_time_ms: int
205
schema: List[ColumnMetadata]
206
207
class ColumnMetadata:
208
name: str
209
data_type: str
210
nullable: bool
211
212
class ResultManifest:
213
file_uri: str
214
file_type: str
215
```
216
217
### Table Movement Types
218
219
```python { .api }
220
class MoveTableToDatabaseRequest:
221
service: str
222
table_name: str
223
db_name: str
224
destination_db_name: str
225
226
class MoveTableToDatabaseResponse:
227
table_name: str
228
db_name: str
229
```
230
231
### Resource Location Types
232
233
```python { .api }
234
class AlterMetadataResourceLocationRequest:
235
service: str
236
resource_name: str
237
location_uri: str
238
239
class AlterMetadataResourceLocationResponse:
240
service: str
241
resource_name: str
242
location_uri: str
243
```
244
245
## Usage Patterns
246
247
### Metadata Analytics Queries
248
249
```python
250
from google.cloud import metastore
251
import pandas as pd
252
from typing import List, Dict
253
254
class MetadataAnalytics:
255
def __init__(self, service_name: str):
256
self.client = metastore.DataprocMetastoreClient()
257
self.service_name = service_name
258
259
def get_database_statistics(self) -> List[Dict]:
260
"""Get comprehensive statistics for all databases."""
261
query = """
262
SELECT
263
d.NAME as database_name,
264
d.DESC as description,
265
COUNT(DISTINCT t.TBL_ID) as table_count,
266
COUNT(DISTINCT CASE WHEN t.TBL_TYPE = 'EXTERNAL_TABLE' THEN t.TBL_ID END) as external_tables,
267
COUNT(DISTINCT CASE WHEN t.TBL_TYPE = 'MANAGED_TABLE' THEN t.TBL_ID END) as managed_tables,
268
MIN(t.CREATE_TIME) as oldest_table,
269
MAX(t.CREATE_TIME) as newest_table
270
FROM DBS d
271
LEFT JOIN TBLS t ON d.DB_ID = t.DB_ID
272
GROUP BY d.DB_ID, d.NAME, d.DESC
273
ORDER BY table_count DESC
274
"""
275
276
response = self.client.query_metadata(
277
service=self.service_name,
278
query=query
279
)
280
281
return self._parse_query_results(response)
282
283
def find_unused_tables(self, days_threshold: int = 90) -> List[str]:
284
"""Find tables that haven't been accessed recently."""
285
query = f"""
286
SELECT
287
d.NAME as database_name,
288
t.TBL_NAME as table_name,
289
t.CREATE_TIME as create_time,
290
t.LAST_ACCESS_TIME as last_access_time
291
FROM TBLS t
292
JOIN DBS d ON t.DB_ID = d.DB_ID
293
WHERE t.LAST_ACCESS_TIME < UNIX_TIMESTAMP() - ({days_threshold} * 24 * 3600)
294
OR t.LAST_ACCESS_TIME = 0
295
ORDER BY t.LAST_ACCESS_TIME ASC
296
"""
297
298
response = self.client.query_metadata(
299
service=self.service_name,
300
query=query
301
)
302
303
results = self._parse_query_results(response)
304
return [f"{row['database_name']}.{row['table_name']}" for row in results]
305
306
def get_storage_usage_by_location(self) -> List[Dict]:
307
"""Analyze storage usage by location prefix."""
308
query = """
309
SELECT
310
CASE
311
WHEN s.LOCATION LIKE 'gs://%' THEN 'Google Cloud Storage'
312
WHEN s.LOCATION LIKE 's3://%' THEN 'Amazon S3'
313
WHEN s.LOCATION LIKE 'hdfs://%' THEN 'HDFS'
314
ELSE 'Other'
315
END as storage_type,
316
REGEXP_EXTRACT(s.LOCATION, '^[^/]+//[^/]+') as storage_root,
317
COUNT(DISTINCT t.TBL_ID) as table_count,
318
COUNT(DISTINCT d.DB_ID) as database_count
319
FROM SDS s
320
JOIN TBLS t ON s.SD_ID = t.SD_ID
321
JOIN DBS d ON t.DB_ID = d.DB_ID
322
WHERE s.LOCATION IS NOT NULL
323
GROUP BY storage_type, storage_root
324
ORDER BY table_count DESC
325
"""
326
327
response = self.client.query_metadata(
328
service=self.service_name,
329
query=query
330
)
331
332
return self._parse_query_results(response)
333
334
def _parse_query_results(self, response: metastore.QueryMetadataResponse) -> List[Dict]:
335
"""Parse query response into structured data."""
336
# In a real implementation, you would parse the actual response format
337
# This is a simplified example
338
return []
339
```
340
341
### Batch Table Operations
342
343
```python
344
from concurrent.futures import ThreadPoolExecutor, as_completed
345
import logging
346
347
class BatchTableManager:
348
def __init__(self, service_name: str):
349
self.client = metastore.DataprocMetastoreClient()
350
self.service_name = service_name
351
352
def bulk_move_tables(self, table_moves: List[Dict[str, str]], max_workers: int = 5):
353
"""Move multiple tables in parallel."""
354
operations = []
355
356
with ThreadPoolExecutor(max_workers=max_workers) as executor:
357
# Submit all move operations
358
future_to_move = {
359
executor.submit(
360
self._move_single_table,
361
table_move['table_name'],
362
table_move['source_db'],
363
table_move['target_db']
364
): table_move
365
for table_move in table_moves
366
}
367
368
# Collect results
369
for future in as_completed(future_to_move):
370
table_move = future_to_move[future]
371
try:
372
operation = future.result()
373
operations.append(operation)
374
logging.info(f"Started move for {table_move['table_name']}")
375
except Exception as e:
376
logging.error(f"Failed to start move for {table_move['table_name']}: {e}")
377
378
return operations
379
380
def _move_single_table(self, table_name: str, source_db: str, target_db: str):
381
"""Move a single table."""
382
move_request = metastore.MoveTableToDatabaseRequest(
383
service=self.service_name,
384
table_name=table_name,
385
db_name=source_db,
386
destination_db_name=target_db
387
)
388
389
return self.client.move_table_to_database(request=move_request)
390
391
def migrate_storage_locations(self, location_mappings: Dict[str, str]):
392
"""Migrate tables from old storage locations to new ones."""
393
# First, find all tables in old locations
394
for old_location, new_location in location_mappings.items():
395
query = f"""
396
SELECT
397
CONCAT(d.NAME, '.', t.TBL_NAME) as full_table_name
398
FROM TBLS t
399
JOIN DBS d ON t.DB_ID = d.DB_ID
400
JOIN SDS s ON t.SD_ID = s.SD_ID
401
WHERE s.LOCATION LIKE '{old_location}%'
402
"""
403
404
response = self.client.query_metadata(
405
service=self.service_name,
406
query=query
407
)
408
409
# Move each table to new location
410
for result in self._parse_query_results(response):
411
table_name = result['full_table_name']
412
new_table_location = result['location'].replace(old_location, new_location)
413
414
alter_request = metastore.AlterMetadataResourceLocationRequest(
415
service=self.service_name,
416
resource_name=table_name,
417
location_uri=new_table_location
418
)
419
420
operation = self.client.alter_metadata_resource_location(request=alter_request)
421
logging.info(f"Started location migration for {table_name}")
422
```