0
# Data Catalog Integration
1
2
Integration with data catalogs for metadata management, table discovery, and governance. Supports Unity Catalog, Apache Iceberg, AWS Glue, S3 Tables, and custom catalog implementations with comprehensive namespace and table management.
3
4
## Capabilities
5
6
### Catalog Interface
7
8
Abstract catalog interface for connecting to various catalog systems.
9
10
```python { .api }
11
class Catalog:
12
"""Interface for data catalog implementations."""
13
14
@property
15
def name(self) -> str:
16
"""Returns the catalog's name."""
17
18
def list_namespaces(self, pattern: Optional[str] = None) -> List[Identifier]:
19
"""
20
List namespaces in the catalog.
21
22
Parameters:
23
- pattern: Optional pattern to filter namespaces
24
25
Returns:
26
List[Identifier]: List of namespace identifiers
27
"""
28
29
def list_tables(self, pattern: Optional[str] = None) -> List[Identifier]:
30
"""
31
List tables in the catalog.
32
33
Parameters:
34
- pattern: Optional pattern to filter tables
35
36
Returns:
37
List[Identifier]: List of table identifiers
38
"""
39
40
def get_table(self, identifier: Union[Identifier, str]) -> Table:
41
"""
42
Get table by identifier.
43
44
Parameters:
45
- identifier: Table identifier or name
46
47
Returns:
48
Table: Table instance
49
"""
50
51
def create_table(
52
self,
53
identifier: Union[Identifier, str],
54
source: Union[Schema, DataFrame],
55
properties: Optional[Dict[str, Any]] = None
56
) -> Table:
57
"""
58
Create new table in catalog.
59
60
Parameters:
61
- identifier: Table identifier or name
62
- source: Schema or DataFrame to create table from
63
- properties: Additional table properties
64
65
Returns:
66
Table: Created table instance
67
"""
68
69
def drop_table(self, identifier: Union[Identifier, str]) -> None:
70
"""
71
Drop table from catalog.
72
73
Parameters:
74
- identifier: Table identifier or name
75
"""
76
77
def has_table(self, identifier: Union[Identifier, str]) -> bool:
78
"""
79
Check if table exists in catalog.
80
81
Parameters:
82
- identifier: Table identifier or name
83
84
Returns:
85
bool: True if table exists
86
"""
87
```
88
89
### Catalog Factory Methods
90
91
Create catalog instances from various systems.
92
93
```python { .api }
94
class Catalog:
95
@staticmethod
96
def from_pydict(tables: Dict[Union[Identifier, str], Any], name: str = "default") -> Catalog:
97
"""
98
Create in-memory catalog from dictionary.
99
100
Parameters:
101
- tables: Dictionary of table-like objects
102
- name: Catalog name
103
104
Returns:
105
Catalog: In-memory catalog instance
106
"""
107
108
@staticmethod
109
def from_iceberg(catalog: Any) -> Catalog:
110
"""
111
Create catalog from PyIceberg catalog.
112
113
Parameters:
114
- catalog: PyIceberg catalog instance
115
116
Returns:
117
Catalog: Daft catalog wrapping Iceberg catalog
118
"""
119
120
@staticmethod
121
def from_unity(catalog: Any) -> Catalog:
122
"""
123
Create catalog from Unity Catalog client.
124
125
Parameters:
126
- catalog: Unity Catalog client instance
127
128
Returns:
129
Catalog: Daft catalog wrapping Unity catalog
130
"""
131
132
@staticmethod
133
def from_glue(
134
name: str,
135
client: Optional[Any] = None,
136
session: Optional[Any] = None
137
) -> Catalog:
138
"""
139
Create catalog from AWS Glue.
140
141
Parameters:
142
- name: Glue database name
143
- client: Optional boto3 Glue client
144
- session: Optional boto3 session
145
146
Returns:
147
Catalog: Daft catalog wrapping Glue catalog
148
"""
149
150
@staticmethod
151
def from_s3tables(
152
table_bucket_arn: str,
153
client: Optional[Any] = None,
154
session: Optional[Any] = None
155
) -> Catalog:
156
"""
157
Create catalog from S3 Tables.
158
159
Parameters:
160
- table_bucket_arn: S3 Tables bucket ARN
161
- client: Optional boto3 client
162
- session: Optional boto3 session
163
164
Returns:
165
Catalog: Daft catalog for S3 Tables
166
"""
167
```
168
169
### Table Interface
170
171
Abstract table interface for catalog tables.
172
173
```python { .api }
174
class Table:
175
"""Interface for catalog table implementations."""
176
177
@property
178
def name(self) -> str:
179
"""Returns the table's name."""
180
181
def schema(self) -> Schema:
182
"""
183
Returns the table's schema.
184
185
Returns:
186
Schema: Table schema definition
187
"""
188
189
def read(self, **options: Any) -> DataFrame:
190
"""
191
Read table as DataFrame.
192
193
Parameters:
194
- options: Additional read options
195
196
Returns:
197
DataFrame: Table data as DataFrame
198
"""
199
200
def write(
201
self,
202
df: DataFrame,
203
mode: Literal["append", "overwrite"] = "append",
204
**options: Any
205
) -> None:
206
"""
207
Write DataFrame to table.
208
209
Parameters:
210
- df: DataFrame to write
211
- mode: Write mode ('append' or 'overwrite')
212
- options: Additional write options
213
"""
214
215
def append(self, df: DataFrame, **options: Any) -> None:
216
"""
217
Append DataFrame to table.
218
219
Parameters:
220
- df: DataFrame to append
221
- options: Additional options
222
"""
223
224
def overwrite(self, df: DataFrame, **options: Any) -> None:
225
"""
226
Overwrite table with DataFrame.
227
228
Parameters:
229
- df: DataFrame to overwrite with
230
- options: Additional options
231
"""
232
```
233
234
### Identifier System
235
236
Hierarchical identifiers for catalog objects.
237
238
```python { .api }
239
class Identifier:
240
"""Reference to catalog object (namespace.table or catalog.namespace.table)."""
241
242
def __init__(self, *parts: str):
243
"""
244
Create identifier from parts.
245
246
Parameters:
247
- parts: Identifier components (namespace, table name, etc.)
248
"""
249
250
@staticmethod
251
def from_str(input: str) -> Identifier:
252
"""
253
Parse identifier from dot-delimited string.
254
255
Parameters:
256
- input: Dot-delimited identifier string
257
258
Returns:
259
Identifier: Parsed identifier
260
"""
261
262
@staticmethod
263
def from_sql(input: str, normalize: bool = False) -> Identifier:
264
"""
265
Parse identifier from SQL string.
266
267
Parameters:
268
- input: SQL identifier string
269
- normalize: Whether to normalize case
270
271
Returns:
272
Identifier: Parsed SQL identifier
273
"""
274
275
def drop(self, n: int = 1) -> Identifier:
276
"""
277
Drop first n parts from identifier.
278
279
Parameters:
280
- n: Number of parts to drop
281
282
Returns:
283
Identifier: New identifier with parts dropped
284
"""
285
```
286
287
## Usage Examples
288
289
### In-Memory Catalog
290
```python
291
import daft
292
from daft.catalog import Catalog
293
294
# Create catalog from Python data
295
data = {
296
"users": {
297
"id": [1, 2, 3],
298
"name": ["Alice", "Bob", "Charlie"],
299
"email": ["alice@example.com", "bob@example.com", "charlie@example.com"]
300
},
301
"orders": {
302
"order_id": [101, 102, 103],
303
"user_id": [1, 2, 1],
304
"amount": [250.0, 180.0, 320.0]
305
}
306
}
307
308
catalog = Catalog.from_pydict(data, name="sales_catalog")
309
310
# List available tables
311
tables = catalog.list_tables()
312
print(f"Available tables: {tables}")
313
314
# Read table as DataFrame
315
users_df = catalog.get_table("users").read()
316
orders_df = catalog.get_table("orders").read()
317
```
318
319
### Iceberg Catalog Integration
320
```python
321
# Connect to Iceberg catalog
322
try:
323
from pyiceberg.catalog import load_catalog
324
325
# Load Iceberg catalog configuration
326
iceberg_catalog = load_catalog("my_iceberg_catalog",
327
uri="http://localhost:8181",
328
warehouse="s3://my-warehouse/")
329
330
# Create Daft catalog wrapper
331
catalog = Catalog.from_iceberg(iceberg_catalog)
332
333
# List namespaces and tables
334
namespaces = catalog.list_namespaces()
335
tables = catalog.list_tables()
336
337
# Read Iceberg table
338
sales_table = catalog.get_table("sales.transactions")
339
sales_df = sales_table.read()
340
341
except ImportError:
342
print("Install iceberg support: pip install 'daft[iceberg]'")
343
```
344
345
### Unity Catalog Integration
346
```python
347
# Connect to Unity Catalog
348
try:
349
from unitycatalog import UnityCatalogClient
350
351
# Create Unity Catalog client
352
unity_client = UnityCatalogClient(
353
base_url="https://unity-catalog-server.com",
354
token="your-access-token"
355
)
356
357
# Create Daft catalog
358
catalog = Catalog.from_unity(unity_client)
359
360
# Work with Unity Catalog tables
361
table = catalog.get_table("main.sales.customers")
362
customers_df = table.read()
363
364
# Write back to Unity Catalog
365
processed_df = customers_df.filter(daft.col("active") == True)
366
table.append(processed_df)
367
368
except ImportError:
369
print("Install Unity support: pip install 'daft[unity]'")
370
```
371
372
### AWS Glue Catalog
373
```python
374
# Connect to AWS Glue
375
try:
376
import boto3
377
378
# Create Glue catalog
379
catalog = Catalog.from_glue(
380
name="my-glue-database",
381
session=boto3.Session(region_name="us-west-2")
382
)
383
384
# List Glue tables
385
tables = catalog.list_tables()
386
387
# Read from Glue table
388
glue_table = catalog.get_table("customer_data")
389
df = glue_table.read()
390
391
except ImportError:
392
print("Install AWS support: pip install 'daft[aws]'")
393
```
394
395
### S3 Tables Integration
396
```python
397
# Connect to S3 Tables
398
try:
399
catalog = Catalog.from_s3tables(
400
table_bucket_arn="arn:aws:s3:::my-s3tables-bucket"
401
)
402
403
# List S3 Tables
404
tables = catalog.list_tables()
405
406
# Read S3 Table
407
s3_table = catalog.get_table("analytics.user_events")
408
events_df = s3_table.read()
409
410
except ImportError:
411
print("Install AWS support: pip install 'daft[aws]'")
412
```
413
414
### Multi-Catalog Operations
415
```python
416
# Work with multiple catalogs
417
iceberg_catalog = Catalog.from_iceberg(iceberg_instance)
418
unity_catalog = Catalog.from_unity(unity_instance)
419
420
# Read from different catalogs
421
source_df = iceberg_catalog.get_table("source.raw_data").read()
422
reference_df = unity_catalog.get_table("reference.lookup_table").read()
423
424
# Join data from different catalogs
425
joined_df = source_df.join(
426
reference_df,
427
on=daft.col("key") == daft.col("reference_key")
428
)
429
430
# Write result to another catalog
431
result_table = iceberg_catalog.create_table("processed.joined_data", joined_df)
432
result_table.append(joined_df)
433
```
434
435
### Namespace Management
436
```python
437
from daft.catalog import Identifier
438
439
# Create hierarchical namespace
440
namespace_id = Identifier("analytics", "customer_data")
441
442
# Check if namespace exists
443
if catalog.has_namespace(namespace_id):
444
print("Namespace exists")
445
446
# Create namespace if needed
447
catalog.create_namespace_if_not_exists(namespace_id)
448
449
# List tables in namespace
450
tables_in_namespace = catalog.list_tables(pattern="analytics.customer_data.*")
451
```
452
453
### Table Management
454
```python
455
# Create table from DataFrame
456
df = daft.from_pydict({
457
"product_id": [1, 2, 3],
458
"name": ["Widget A", "Widget B", "Widget C"],
459
"price": [19.99, 29.99, 39.99]
460
})
461
462
# Create table in catalog
463
table_id = Identifier("inventory", "products")
464
products_table = catalog.create_table(table_id, df)
465
466
# Check table properties
467
schema = products_table.schema()
468
table_name = products_table.name
469
470
# Update table data
471
new_products = daft.from_pydict({
472
"product_id": [4, 5],
473
"name": ["Widget D", "Widget E"],
474
"price": [49.99, 59.99]
475
})
476
477
products_table.append(new_products)
478
```
479
480
### Advanced Table Operations
481
```python
482
# Table with partitioning and properties
483
partitioned_df = daft.read_parquet("s3://data/sales/*.parquet")
484
485
# Create partitioned table
486
properties = {
487
"format-version": "2",
488
"write.parquet.compression-codec": "snappy"
489
}
490
491
sales_table = catalog.create_table(
492
"sales.transactions",
493
partitioned_df,
494
properties=properties
495
)
496
497
# Read with predicate pushdown
498
filtered_df = sales_table.read(
499
columns=["date", "amount", "customer_id"],
500
predicate=daft.col("date") >= "2024-01-01"
501
)
502
```
503
504
### Catalog Discovery and Metadata
505
```python
506
# Discover catalog structure
507
def explore_catalog(catalog: Catalog):
508
print(f"Catalog: {catalog.name}")
509
510
# List all namespaces
511
namespaces = catalog.list_namespaces()
512
for namespace in namespaces:
513
print(f" Namespace: {namespace}")
514
515
# List tables in namespace
516
tables = catalog.list_tables(pattern=f"{namespace}.*")
517
for table_id in tables:
518
table = catalog.get_table(table_id)
519
schema = table.schema()
520
print(f" Table: {table.name}")
521
print(f" Columns: {schema.column_names}")
522
523
# Explore all connected catalogs
524
explore_catalog(catalog)
525
```
526
527
### Error Handling
528
```python
529
from daft.catalog import NotFoundError
530
531
try:
532
# Attempt to get non-existent table
533
table = catalog.get_table("non_existent.table")
534
except NotFoundError:
535
print("Table not found")
536
537
# Safe table access
538
if catalog.has_table("sales.customers"):
539
customers = catalog.get_table("sales.customers").read()
540
else:
541
print("Creating customers table...")
542
# Create table logic here
543
```
544
545
### Integration with Session Management
546
```python
547
# Register catalog in session
548
daft.attach_catalog("main_catalog", catalog)
549
550
# Use catalog tables in SQL
551
result = daft.sql("SELECT * FROM main_catalog.sales.customers WHERE active = true")
552
553
# List registered catalogs
554
catalogs = daft.list_catalogs()
555
```
556
557
## Data Catalog Types
558
559
```python { .api }
560
class DataCatalogTable:
561
"""Representation of table in data catalog."""
562
563
class DataCatalogType:
564
"""Enumeration of supported catalog types."""
565
```
566
567
Daft's catalog integration provides a unified interface for working with diverse data catalog systems, enabling metadata-driven data discovery and governance across different platforms and storage systems.