or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ai-ml.mdcatalog.mddata-io.mddataframe-operations.mdexpressions.mdindex.mdsession.mdsql.mdudf.md

catalog.mddocs/

0

# Data Catalog Integration

1

2

Integration with data catalogs for metadata management, table discovery, and governance. Supports Unity Catalog, Apache Iceberg, AWS Glue, S3 Tables, and custom catalog implementations with comprehensive namespace and table management.

3

4

## Capabilities

5

6

### Catalog Interface

7

8

Abstract catalog interface for connecting to various catalog systems.

9

10

```python { .api }

11

class Catalog:

12

"""Interface for data catalog implementations."""

13

14

@property

15

def name(self) -> str:

16

"""Returns the catalog's name."""

17

18

def list_namespaces(self, pattern: Optional[str] = None) -> List[Identifier]:

19

"""

20

List namespaces in the catalog.

21

22

Parameters:

23

- pattern: Optional pattern to filter namespaces

24

25

Returns:

26

List[Identifier]: List of namespace identifiers

27

"""

28

29

def list_tables(self, pattern: Optional[str] = None) -> List[Identifier]:

30

"""

31

List tables in the catalog.

32

33

Parameters:

34

- pattern: Optional pattern to filter tables

35

36

Returns:

37

List[Identifier]: List of table identifiers

38

"""

39

40

def get_table(self, identifier: Union[Identifier, str]) -> Table:

41

"""

42

Get table by identifier.

43

44

Parameters:

45

- identifier: Table identifier or name

46

47

Returns:

48

Table: Table instance

49

"""

50

51

def create_table(

52

self,

53

identifier: Union[Identifier, str],

54

source: Union[Schema, DataFrame],

55

properties: Optional[Dict[str, Any]] = None

56

) -> Table:

57

"""

58

Create new table in catalog.

59

60

Parameters:

61

- identifier: Table identifier or name

62

- source: Schema or DataFrame to create table from

63

- properties: Additional table properties

64

65

Returns:

66

Table: Created table instance

67

"""

68

69

def drop_table(self, identifier: Union[Identifier, str]) -> None:

70

"""

71

Drop table from catalog.

72

73

Parameters:

74

- identifier: Table identifier or name

75

"""

76

77

def has_table(self, identifier: Union[Identifier, str]) -> bool:

78

"""

79

Check if table exists in catalog.

80

81

Parameters:

82

- identifier: Table identifier or name

83

84

Returns:

85

bool: True if table exists

86

"""

87

```

88

89

### Catalog Factory Methods

90

91

Create catalog instances from various systems.

92

93

```python { .api }

94

class Catalog:

95

@staticmethod

96

def from_pydict(tables: Dict[Union[Identifier, str], Any], name: str = "default") -> Catalog:

97

"""

98

Create in-memory catalog from dictionary.

99

100

Parameters:

101

- tables: Dictionary of table-like objects

102

- name: Catalog name

103

104

Returns:

105

Catalog: In-memory catalog instance

106

"""

107

108

@staticmethod

109

def from_iceberg(catalog: Any) -> Catalog:

110

"""

111

Create catalog from PyIceberg catalog.

112

113

Parameters:

114

- catalog: PyIceberg catalog instance

115

116

Returns:

117

Catalog: Daft catalog wrapping Iceberg catalog

118

"""

119

120

@staticmethod

121

def from_unity(catalog: Any) -> Catalog:

122

"""

123

Create catalog from Unity Catalog client.

124

125

Parameters:

126

- catalog: Unity Catalog client instance

127

128

Returns:

129

Catalog: Daft catalog wrapping Unity catalog

130

"""

131

132

@staticmethod

133

def from_glue(

134

name: str,

135

client: Optional[Any] = None,

136

session: Optional[Any] = None

137

) -> Catalog:

138

"""

139

Create catalog from AWS Glue.

140

141

Parameters:

142

- name: Glue database name

143

- client: Optional boto3 Glue client

144

- session: Optional boto3 session

145

146

Returns:

147

Catalog: Daft catalog wrapping Glue catalog

148

"""

149

150

@staticmethod

151

def from_s3tables(

152

table_bucket_arn: str,

153

client: Optional[Any] = None,

154

session: Optional[Any] = None

155

) -> Catalog:

156

"""

157

Create catalog from S3 Tables.

158

159

Parameters:

160

- table_bucket_arn: S3 Tables bucket ARN

161

- client: Optional boto3 client

162

- session: Optional boto3 session

163

164

Returns:

165

Catalog: Daft catalog for S3 Tables

166

"""

167

```

168

169

### Table Interface

170

171

Abstract table interface for catalog tables.

172

173

```python { .api }

174

class Table:

175

"""Interface for catalog table implementations."""

176

177

@property

178

def name(self) -> str:

179

"""Returns the table's name."""

180

181

def schema(self) -> Schema:

182

"""

183

Returns the table's schema.

184

185

Returns:

186

Schema: Table schema definition

187

"""

188

189

def read(self, **options: Any) -> DataFrame:

190

"""

191

Read table as DataFrame.

192

193

Parameters:

194

- options: Additional read options

195

196

Returns:

197

DataFrame: Table data as DataFrame

198

"""

199

200

def write(

201

self,

202

df: DataFrame,

203

mode: Literal["append", "overwrite"] = "append",

204

**options: Any

205

) -> None:

206

"""

207

Write DataFrame to table.

208

209

Parameters:

210

- df: DataFrame to write

211

- mode: Write mode ('append' or 'overwrite')

212

- options: Additional write options

213

"""

214

215

def append(self, df: DataFrame, **options: Any) -> None:

216

"""

217

Append DataFrame to table.

218

219

Parameters:

220

- df: DataFrame to append

221

- options: Additional options

222

"""

223

224

def overwrite(self, df: DataFrame, **options: Any) -> None:

225

"""

226

Overwrite table with DataFrame.

227

228

Parameters:

229

- df: DataFrame to overwrite with

230

- options: Additional options

231

"""

232

```

233

234

### Identifier System

235

236

Hierarchical identifiers for catalog objects.

237

238

```python { .api }

239

class Identifier:

240

"""Reference to catalog object (namespace.table or catalog.namespace.table)."""

241

242

def __init__(self, *parts: str):

243

"""

244

Create identifier from parts.

245

246

Parameters:

247

- parts: Identifier components (namespace, table name, etc.)

248

"""

249

250

@staticmethod

251

def from_str(input: str) -> Identifier:

252

"""

253

Parse identifier from dot-delimited string.

254

255

Parameters:

256

- input: Dot-delimited identifier string

257

258

Returns:

259

Identifier: Parsed identifier

260

"""

261

262

@staticmethod

263

def from_sql(input: str, normalize: bool = False) -> Identifier:

264

"""

265

Parse identifier from SQL string.

266

267

Parameters:

268

- input: SQL identifier string

269

- normalize: Whether to normalize case

270

271

Returns:

272

Identifier: Parsed SQL identifier

273

"""

274

275

def drop(self, n: int = 1) -> Identifier:

276

"""

277

Drop first n parts from identifier.

278

279

Parameters:

280

- n: Number of parts to drop

281

282

Returns:

283

Identifier: New identifier with parts dropped

284

"""

285

```

286

287

## Usage Examples

288

289

### In-Memory Catalog

290

```python

291

import daft

292

from daft.catalog import Catalog

293

294

# Create catalog from Python data

295

data = {

296

"users": {

297

"id": [1, 2, 3],

298

"name": ["Alice", "Bob", "Charlie"],

299

"email": ["alice@example.com", "bob@example.com", "charlie@example.com"]

300

},

301

"orders": {

302

"order_id": [101, 102, 103],

303

"user_id": [1, 2, 1],

304

"amount": [250.0, 180.0, 320.0]

305

}

306

}

307

308

catalog = Catalog.from_pydict(data, name="sales_catalog")

309

310

# List available tables

311

tables = catalog.list_tables()

312

print(f"Available tables: {tables}")

313

314

# Read table as DataFrame

315

users_df = catalog.get_table("users").read()

316

orders_df = catalog.get_table("orders").read()

317

```

318

319

### Iceberg Catalog Integration

320

```python

321

# Connect to Iceberg catalog

322

try:

323

from pyiceberg.catalog import load_catalog

324

325

# Load Iceberg catalog configuration

326

iceberg_catalog = load_catalog("my_iceberg_catalog",

327

uri="http://localhost:8181",

328

warehouse="s3://my-warehouse/")

329

330

# Create Daft catalog wrapper

331

catalog = Catalog.from_iceberg(iceberg_catalog)

332

333

# List namespaces and tables

334

namespaces = catalog.list_namespaces()

335

tables = catalog.list_tables()

336

337

# Read Iceberg table

338

sales_table = catalog.get_table("sales.transactions")

339

sales_df = sales_table.read()

340

341

except ImportError:

342

print("Install iceberg support: pip install 'daft[iceberg]'")

343

```

344

345

### Unity Catalog Integration

346

```python

347

# Connect to Unity Catalog

348

try:

349

from unitycatalog import UnityCatalogClient

350

351

# Create Unity Catalog client

352

unity_client = UnityCatalogClient(

353

base_url="https://unity-catalog-server.com",

354

token="your-access-token"

355

)

356

357

# Create Daft catalog

358

catalog = Catalog.from_unity(unity_client)

359

360

# Work with Unity Catalog tables

361

table = catalog.get_table("main.sales.customers")

362

customers_df = table.read()

363

364

# Write back to Unity Catalog

365

processed_df = customers_df.filter(daft.col("active") == True)

366

table.append(processed_df)

367

368

except ImportError:

369

print("Install Unity support: pip install 'daft[unity]'")

370

```

371

372

### AWS Glue Catalog

373

```python

374

# Connect to AWS Glue

375

try:

376

import boto3

377

378

# Create Glue catalog

379

catalog = Catalog.from_glue(

380

name="my-glue-database",

381

session=boto3.Session(region_name="us-west-2")

382

)

383

384

# List Glue tables

385

tables = catalog.list_tables()

386

387

# Read from Glue table

388

glue_table = catalog.get_table("customer_data")

389

df = glue_table.read()

390

391

except ImportError:

392

print("Install AWS support: pip install 'daft[aws]'")

393

```

394

395

### S3 Tables Integration

396

```python

397

# Connect to S3 Tables

398

try:

399

catalog = Catalog.from_s3tables(

400

table_bucket_arn="arn:aws:s3:::my-s3tables-bucket"

401

)

402

403

# List S3 Tables

404

tables = catalog.list_tables()

405

406

# Read S3 Table

407

s3_table = catalog.get_table("analytics.user_events")

408

events_df = s3_table.read()

409

410

except ImportError:

411

print("Install AWS support: pip install 'daft[aws]'")

412

```

413

414

### Multi-Catalog Operations

415

```python

416

# Work with multiple catalogs

417

iceberg_catalog = Catalog.from_iceberg(iceberg_instance)

418

unity_catalog = Catalog.from_unity(unity_instance)

419

420

# Read from different catalogs

421

source_df = iceberg_catalog.get_table("source.raw_data").read()

422

reference_df = unity_catalog.get_table("reference.lookup_table").read()

423

424

# Join data from different catalogs

425

joined_df = source_df.join(

426

reference_df,

427

on=daft.col("key") == daft.col("reference_key")

428

)

429

430

# Write result to another catalog

431

result_table = iceberg_catalog.create_table("processed.joined_data", joined_df)

432

result_table.append(joined_df)

433

```

434

435

### Namespace Management

436

```python

437

from daft.catalog import Identifier

438

439

# Create hierarchical namespace

440

namespace_id = Identifier("analytics", "customer_data")

441

442

# Check if namespace exists

443

if catalog.has_namespace(namespace_id):

444

print("Namespace exists")

445

446

# Create namespace if needed

447

catalog.create_namespace_if_not_exists(namespace_id)

448

449

# List tables in namespace

450

tables_in_namespace = catalog.list_tables(pattern="analytics.customer_data.*")

451

```

452

453

### Table Management

454

```python

455

# Create table from DataFrame

456

df = daft.from_pydict({

457

"product_id": [1, 2, 3],

458

"name": ["Widget A", "Widget B", "Widget C"],

459

"price": [19.99, 29.99, 39.99]

460

})

461

462

# Create table in catalog

463

table_id = Identifier("inventory", "products")

464

products_table = catalog.create_table(table_id, df)

465

466

# Check table properties

467

schema = products_table.schema()

468

table_name = products_table.name

469

470

# Update table data

471

new_products = daft.from_pydict({

472

"product_id": [4, 5],

473

"name": ["Widget D", "Widget E"],

474

"price": [49.99, 59.99]

475

})

476

477

products_table.append(new_products)

478

```

479

480

### Advanced Table Operations

481

```python

482

# Table with partitioning and properties

483

partitioned_df = daft.read_parquet("s3://data/sales/*.parquet")

484

485

# Create partitioned table

486

properties = {

487

"format-version": "2",

488

"write.parquet.compression-codec": "snappy"

489

}

490

491

sales_table = catalog.create_table(

492

"sales.transactions",

493

partitioned_df,

494

properties=properties

495

)

496

497

# Read with predicate pushdown

498

filtered_df = sales_table.read(

499

columns=["date", "amount", "customer_id"],

500

predicate=daft.col("date") >= "2024-01-01"

501

)

502

```

503

504

### Catalog Discovery and Metadata

505

```python

506

# Discover catalog structure

507

def explore_catalog(catalog: Catalog):

508

print(f"Catalog: {catalog.name}")

509

510

# List all namespaces

511

namespaces = catalog.list_namespaces()

512

for namespace in namespaces:

513

print(f" Namespace: {namespace}")

514

515

# List tables in namespace

516

tables = catalog.list_tables(pattern=f"{namespace}.*")

517

for table_id in tables:

518

table = catalog.get_table(table_id)

519

schema = table.schema()

520

print(f" Table: {table.name}")

521

print(f" Columns: {schema.column_names}")

522

523

# Explore all connected catalogs

524

explore_catalog(catalog)

525

```

526

527

### Error Handling

528

```python

529

from daft.catalog import NotFoundError

530

531

try:

532

# Attempt to get non-existent table

533

table = catalog.get_table("non_existent.table")

534

except NotFoundError:

535

print("Table not found")

536

537

# Safe table access

538

if catalog.has_table("sales.customers"):

539

customers = catalog.get_table("sales.customers").read()

540

else:

541

print("Creating customers table...")

542

# Create table logic here

543

```

544

545

### Integration with Session Management

546

```python

547

# Register catalog in session

548

daft.attach_catalog("main_catalog", catalog)

549

550

# Use catalog tables in SQL

551

result = daft.sql("SELECT * FROM main_catalog.sales.customers WHERE active = true")

552

553

# List registered catalogs

554

catalogs = daft.list_catalogs()

555

```

556

557

## Data Catalog Types

558

559

```python { .api }

560

class DataCatalogTable:

561

"""Representation of table in data catalog."""

562

563

class DataCatalogType:

564

"""Enumeration of supported catalog types."""

565

```

566

567

Daft's catalog integration provides a unified interface for working with diverse data catalog systems, enabling metadata-driven data discovery and governance across different platforms and storage systems.