or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

catalog-apis.mddocs/

0

# Catalog APIs

1

2

The Catalog APIs provide a comprehensive framework for implementing custom catalog systems in Apache Spark. These APIs allow you to integrate external metadata stores, implement custom table discovery, and manage database objects like tables, views, and functions.

3

4

## Core Interfaces

5

6

### CatalogPlugin

7

8

The base interface for all catalog implementations:

9

10

```java { .api }

11

package org.apache.spark.sql.connector.catalog;

12

13

public interface CatalogPlugin {

14

/**

15

* Initialize the catalog with configuration options

16

*/

17

void initialize(String name, CaseInsensitiveStringMap options);

18

19

/**

20

* Return the name of this catalog

21

*/

22

String name();

23

}

24

```

25

26

**Usage Example:**

27

```java

28

public class MyCustomCatalog implements CatalogPlugin {

29

private String catalogName;

30

private CaseInsensitiveStringMap options;

31

32

@Override

33

public void initialize(String name, CaseInsensitiveStringMap options) {

34

this.catalogName = name;

35

this.options = options;

36

// Initialize connections, load configuration, etc.

37

}

38

39

@Override

40

public String name() {

41

return catalogName;

42

}

43

}

44

```

45

46

### TableCatalog

47

48

Main interface for managing tables in a catalog:

49

50

```java { .api }

51

public interface TableCatalog extends CatalogPlugin {

52

// Table property constants

53

String PROP_LOCATION = "location";

54

String PROP_IS_MANAGED_LOCATION = "is_managed_location";

55

String PROP_EXTERNAL = "external";

56

String PROP_COMMENT = "comment";

57

String PROP_PROVIDER = "provider";

58

String PROP_OWNER = "owner";

59

String OPTION_PREFIX = "option.";

60

61

// Table discovery

62

Identifier[] listTables(String[] namespace);

63

boolean tableExists(Identifier ident);

64

65

// Table loading

66

Table loadTable(Identifier ident);

67

Table loadTable(Identifier ident, Set<TableWritePrivilege> writePrivileges);

68

Table loadTable(Identifier ident, String version);

69

Table loadTable(Identifier ident, long timestamp);

70

void invalidateTable(Identifier ident);

71

72

// Table lifecycle

73

/**

74

* Create a table in the catalog (deprecated - use Column[] version).

75

* @deprecated Use createTable(Identifier, Column[], Transform[], Map) instead

76

*/

77

@Deprecated

78

Table createTable(Identifier ident, StructType schema,

79

Transform[] partitions, Map<String, String> properties);

80

81

/**

82

* Create a table in the catalog.

83

*/

84

default Table createTable(Identifier ident, Column[] columns,

85

Transform[] partitions, Map<String, String> properties);

86

Table alterTable(Identifier ident, TableChange... changes);

87

boolean dropTable(Identifier ident);

88

boolean purgeTable(Identifier ident);

89

void renameTable(Identifier oldIdent, Identifier newIdent);

90

91

// Query schema handling

92

/**

93

* If true, mark all fields as nullable when executing CREATE TABLE ... AS SELECT.

94

*/

95

default boolean useNullableQuerySchema();

96

97

// Capabilities

98

Set<TableCatalogCapability> capabilities();

99

}

100

```

101

102

**Implementation Example:**

103

```java

104

public class MyTableCatalog extends MyCustomCatalog implements TableCatalog {

105

private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();

106

107

@Override

108

public Identifier[] listTables(String[] namespace) {

109

return tables.keySet()

110

.stream()

111

.filter(id -> Arrays.equals(id.namespace(), namespace))

112

.toArray(Identifier[]::new);

113

}

114

115

@Override

116

public Table loadTable(Identifier ident) {

117

Table table = tables.get(ident);

118

if (table == null) {

119

throw new NoSuchTableException(ident);

120

}

121

return table;

122

}

123

124

@Override

125

public Table createTable(Identifier ident, Column[] columns,

126

Transform[] partitions, Map<String, String> properties) {

127

if (tableExists(ident)) {

128

throw new TableAlreadyExistsException(ident);

129

}

130

131

Table table = new MyCustomTable(ident.name(), columns, partitions, properties);

132

tables.put(ident, table);

133

return table;

134

}

135

136

@Override

137

public boolean dropTable(Identifier ident) {

138

return tables.remove(ident) != null;

139

}

140

141

@Override

142

public Set<TableCatalogCapability> capabilities() {

143

return Set.of(

144

TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_COLUMN_DEFAULT_VALUE,

145

TableCatalogCapability.SUPPORTS_PARTITION_MANAGEMENT

146

);

147

}

148

}

149

```

150

151

### ViewCatalog

152

153

Interface for managing views in a catalog:

154

155

```java { .api }

156

public interface ViewCatalog extends CatalogPlugin {

157

// View discovery

158

Identifier[] listViews(String[] namespace);

159

160

// View lifecycle

161

View loadView(Identifier ident);

162

View createView(Identifier ident, String sql, String currentCatalog,

163

String[] currentNamespace, Column[] schema,

164

Map<String, String> properties);

165

View alterView(Identifier ident, ViewChange... changes);

166

boolean dropView(Identifier ident);

167

void renameView(Identifier oldIdent, Identifier newIdent);

168

}

169

```

170

171

**Implementation Example:**

172

```java

173

public class MyViewCatalog extends MyCustomCatalog implements ViewCatalog {

174

private final Map<Identifier, View> views = new ConcurrentHashMap<>();

175

176

@Override

177

public View createView(Identifier ident, String sql, String currentCatalog,

178

String[] currentNamespace, Column[] schema,

179

Map<String, String> properties) {

180

View view = new MyCustomView(ident, sql, currentCatalog,

181

currentNamespace, schema, properties);

182

views.put(ident, view);

183

return view;

184

}

185

186

@Override

187

public View loadView(Identifier ident) {

188

View view = views.get(ident);

189

if (view == null) {

190

throw new NoSuchViewException(ident);

191

}

192

return view;

193

}

194

}

195

```

196

197

### FunctionCatalog

198

199

Interface for managing user-defined functions:

200

201

```java { .api }

202

public interface FunctionCatalog extends CatalogPlugin {

203

// Function discovery

204

Identifier[] listFunctions(String[] namespace);

205

206

// Function loading

207

UnboundFunction loadFunction(Identifier ident);

208

}

209

```

210

211

**Implementation Example:**

212

```java

213

public class MyFunctionCatalog extends MyCustomCatalog implements FunctionCatalog {

214

private final Map<Identifier, UnboundFunction> functions = new HashMap<>();

215

216

@Override

217

public Identifier[] listFunctions(String[] namespace) {

218

return functions.keySet()

219

.stream()

220

.filter(id -> Arrays.equals(id.namespace(), namespace))

221

.toArray(Identifier[]::new);

222

}

223

224

@Override

225

public UnboundFunction loadFunction(Identifier ident) {

226

UnboundFunction function = functions.get(ident);

227

if (function == null) {

228

throw new NoSuchFunctionException(ident);

229

}

230

return function;

231

}

232

}

233

```

234

235

## Catalog Extensions

236

237

### SupportsNamespaces

238

239

Interface for catalogs that support hierarchical namespaces:

240

241

```java { .api }

242

public interface SupportsNamespaces {

243

// Namespace discovery

244

String[][] listNamespaces();

245

String[][] listNamespaces(String[] namespace);

246

247

// Namespace metadata

248

Map<String, String> loadNamespaceMetadata(String[] namespace);

249

250

// Namespace lifecycle

251

void createNamespace(String[] namespace, Map<String, String> metadata);

252

void alterNamespace(String[] namespace, NamespaceChange... changes);

253

boolean dropNamespace(String[] namespace, boolean cascade);

254

}

255

```

256

257

**Implementation Example:**

258

```java

259

public class MyNamespaceCatalog extends MyTableCatalog implements SupportsNamespaces {

260

private final Map<String[], Map<String, String>> namespaces = new HashMap<>();

261

262

@Override

263

public String[][] listNamespaces() {

264

return namespaces.keySet().toArray(new String[0][]);

265

}

266

267

@Override

268

public void createNamespace(String[] namespace, Map<String, String> metadata) {

269

if (namespaces.containsKey(namespace)) {

270

throw new NamespaceAlreadyExistsException(namespace);

271

}

272

namespaces.put(namespace, new HashMap<>(metadata));

273

}

274

275

@Override

276

public Map<String, String> loadNamespaceMetadata(String[] namespace) {

277

Map<String, String> metadata = namespaces.get(namespace);

278

if (metadata == null) {

279

throw new NoSuchNamespaceException(namespace);

280

}

281

return new HashMap<>(metadata);

282

}

283

}

284

```

285

286

## Table Interfaces

287

288

### Table

289

290

Core interface representing a logical table:

291

292

```java { .api }

293

public interface Table {

294

// Basic metadata

295

String name();

296

Column[] columns();

297

Transform[] partitioning();

298

Map<String, String> properties();

299

300

// Capabilities

301

Set<TableCapability> capabilities();

302

303

// Deprecated - use columns() instead

304

@Deprecated

305

StructType schema();

306

}

307

```

308

309

### Table Support Interfaces

310

311

Tables can implement various support interfaces to provide additional capabilities:

312

313

#### SupportsRead

314

```java { .api }

315

public interface SupportsRead {

316

ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);

317

}

318

```

319

320

#### SupportsWrite

321

```java { .api }

322

public interface SupportsWrite {

323

WriteBuilder newWriteBuilder(LogicalWriteInfo info);

324

}

325

```

326

327

#### SupportsDelete

328

```java { .api }

329

public interface SupportsDelete {

330

void deleteWhere(Filter[] filters);

331

}

332

```

333

334

#### SupportsDeleteV2

335

```java { .api }

336

public interface SupportsDeleteV2 {

337

void deleteWhere(Predicate[] predicates);

338

}

339

```

340

341

#### SupportsPartitionManagement

342

```java { .api }

343

public interface SupportsPartitionManagement {

344

// Partition lifecycle

345

void createPartition(InternalRow ident, Map<String, String> properties);

346

boolean dropPartition(InternalRow ident);

347

void replacePartitionMetadata(InternalRow ident, Map<String, String> properties);

348

349

// Partition metadata

350

Map<String, String> loadPartitionMetadata(InternalRow ident);

351

InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);

352

}

353

```

354

355

**Complete Table Implementation Example:**

356

```java

357

public class MyCustomTable implements Table, SupportsRead, SupportsWrite,

358

SupportsDelete, SupportsPartitionManagement {

359

private final String tableName;

360

private final Column[] columns;

361

private final Transform[] partitioning;

362

private final Map<String, String> properties;

363

364

public MyCustomTable(String name, Column[] columns, Transform[] partitioning,

365

Map<String, String> properties) {

366

this.tableName = name;

367

this.columns = columns;

368

this.partitioning = partitioning;

369

this.properties = properties;

370

}

371

372

@Override

373

public String name() {

374

return tableName;

375

}

376

377

@Override

378

public Column[] columns() {

379

return columns.clone();

380

}

381

382

@Override

383

public Transform[] partitioning() {

384

return partitioning.clone();

385

}

386

387

@Override

388

public Map<String, String> properties() {

389

return new HashMap<>(properties);

390

}

391

392

@Override

393

public Set<TableCapability> capabilities() {

394

return Set.of(

395

TableCapability.BATCH_READ,

396

TableCapability.BATCH_WRITE,

397

TableCapability.ACCEPT_ANY_SCHEMA,

398

TableCapability.OVERWRITE_BY_FILTER

399

);

400

}

401

402

@Override

403

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

404

return new MyTableScanBuilder(this, options);

405

}

406

407

@Override

408

public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {

409

return new MyTableWriteBuilder(this, info);

410

}

411

412

@Override

413

public void deleteWhere(Filter[] filters) {

414

// Implementation for deleting rows matching filters

415

for (Filter filter : filters) {

416

// Process each filter and delete matching rows

417

}

418

}

419

}

420

```

421

422

## Data Structure Classes

423

424

### Column

425

426

Represents a column in a table schema:

427

428

```java { .api }

429

public interface Column {

430

String name();

431

DataType dataType();

432

boolean nullable();

433

String comment();

434

ColumnDefaultValue defaultValue();

435

MetadataColumn metadataColumn();

436

}

437

```

438

439

**Implementation Example:**

440

```java

441

public class MyColumn implements Column {

442

private final String name;

443

private final DataType dataType;

444

private final boolean nullable;

445

private final String comment;

446

447

public MyColumn(String name, DataType dataType, boolean nullable, String comment) {

448

this.name = name;

449

this.dataType = dataType;

450

this.nullable = nullable;

451

this.comment = comment;

452

}

453

454

@Override

455

public String name() { return name; }

456

457

@Override

458

public DataType dataType() { return dataType; }

459

460

@Override

461

public boolean nullable() { return nullable; }

462

463

@Override

464

public String comment() { return comment; }

465

466

@Override

467

public ColumnDefaultValue defaultValue() { return null; }

468

469

@Override

470

public MetadataColumn metadataColumn() { return null; }

471

}

472

```

473

474

### TableCapability

475

476

Enum defining table capabilities:

477

478

```java { .api }

479

public enum TableCapability {

480

// Read capabilities

481

BATCH_READ,

482

MICRO_BATCH_READ,

483

CONTINUOUS_READ,

484

485

// Write capabilities

486

BATCH_WRITE,

487

STREAMING_WRITE,

488

489

// Schema capabilities

490

ACCEPT_ANY_SCHEMA,

491

492

// Overwrite capabilities

493

OVERWRITE_BY_FILTER,

494

OVERWRITE_DYNAMIC,

495

TRUNCATE

496

}

497

```

498

499

### TableChange

500

501

Interface for table modification operations:

502

503

```java { .api }

504

public interface TableChange {

505

// Common table changes (implemented as nested classes):

506

// - SetProperty

507

// - RemoveProperty

508

// - AddColumn

509

// - RenameColumn

510

// - UpdateColumnType

511

// - UpdateColumnNullability

512

// - UpdateColumnComment

513

// - DeleteColumn

514

// - UpdateColumnPosition

515

}

516

```

517

518

**Usage Example:**

519

```java

520

// Creating table changes for ALTER TABLE operations

521

TableChange[] changes = new TableChange[] {

522

TableChange.setProperty("owner", "new_owner"),

523

TableChange.addColumn("new_column", DataTypes.StringType),

524

TableChange.renameColumn("old_name", "new_name")

525

};

526

527

Table alteredTable = catalog.alterTable(identifier, changes);

528

```

529

530

## Advanced Usage Patterns

531

532

### Multi-Catalog Implementation

533

534

Implement multiple catalog interfaces for full functionality:

535

536

```java

537

public class CompleteCatalog implements TableCatalog, ViewCatalog,

538

FunctionCatalog, SupportsNamespaces {

539

private final TableCatalog tableCatalog;

540

private final ViewCatalog viewCatalog;

541

private final FunctionCatalog functionCatalog;

542

private final SupportsNamespaces namespacesSupport;

543

544

public CompleteCatalog() {

545

this.tableCatalog = new MyTableCatalog();

546

this.viewCatalog = new MyViewCatalog();

547

this.functionCatalog = new MyFunctionCatalog();

548

this.namespacesSupport = new MyNamespaceCatalog();

549

}

550

551

// Delegate methods to appropriate implementations...

552

}

553

```

554

555

### Catalog with External Metadata Store

556

557

```java

558

public class ExternalMetastoreCatalog implements TableCatalog, SupportsNamespaces {

559

private final MetastoreClient metastoreClient;

560

561

@Override

562

public void initialize(String name, CaseInsensitiveStringMap options) {

563

String metastoreUrl = options.get("metastore.url");

564

this.metastoreClient = new MetastoreClient(metastoreUrl);

565

}

566

567

@Override

568

public Identifier[] listTables(String[] namespace) {

569

return metastoreClient.listTables(namespace)

570

.stream()

571

.map(Identifier::of)

572

.toArray(Identifier[]::new);

573

}

574

575

@Override

576

public Table loadTable(Identifier ident) {

577

TableMetadata metadata = metastoreClient.getTable(ident);

578

return convertToTable(metadata);

579

}

580

}

581

```

582

583

### Cached Catalog Implementation

584

585

```java

586

public class CachedCatalog implements TableCatalog {

587

private final TableCatalog delegate;

588

private final Cache<Identifier, Table> tableCache;

589

590

public CachedCatalog(TableCatalog delegate) {

591

this.delegate = delegate;

592

this.tableCache = CacheBuilder.newBuilder()

593

.maximumSize(1000)

594

.expireAfterWrite(10, TimeUnit.MINUTES)

595

.build();

596

}

597

598

@Override

599

public Table loadTable(Identifier ident) {

600

return tableCache.get(ident, () -> delegate.loadTable(ident));

601

}

602

603

@Override

604

public void invalidateTable(Identifier ident) {

605

tableCache.invalidate(ident);

606

delegate.invalidateTable(ident);

607

}

608

}

609

```

610

611

## Configuration and Setup

612

613

### Catalog Registration

614

615

Register custom catalogs in Spark configuration:

616

617

```scala

618

// Scala configuration

619

spark.conf.set("spark.sql.catalog.mycatalog", "com.example.MyCustomCatalog")

620

spark.conf.set("spark.sql.catalog.mycatalog.option1", "value1")

621

spark.conf.set("spark.sql.catalog.mycatalog.option2", "value2")

622

623

// Using SQL

624

CREATE CATALOG mycatalog USING com.example.MyCustomCatalog

625

OPTIONS (

626

option1 'value1',

627

option2 'value2'

628

)

629

```

630

631

### Catalog Usage in SQL

632

633

```sql

634

-- Use catalog for queries

635

USE CATALOG mycatalog;

636

637

-- Fully qualified table names

638

SELECT * FROM mycatalog.myschema.mytable;

639

640

-- Create tables in custom catalog

641

CREATE TABLE mycatalog.myschema.newtable (

642

id INT,

643

name STRING

644

) USING DELTA;

645

```

646

647

## Error Handling

648

649

Implement proper exception handling for catalog operations:

650

651

```java

652

public class MyTableCatalog implements TableCatalog {

653

@Override

654

public Table loadTable(Identifier ident) {

655

try {

656

// Attempt to load table

657

return doLoadTable(ident);

658

} catch (TableNotFoundException e) {

659

throw new NoSuchTableException(ident);

660

} catch (AccessDeniedException e) {

661

throw new UnauthorizedException(

662

String.format("Access denied for table %s", ident));

663

} catch (Exception e) {

664

throw new RuntimeException(

665

String.format("Failed to load table %s", ident), e);

666

}

667

}

668

}

669

```

670

671

## Performance Considerations

672

673

### Lazy Loading

674

```java

675

public class LazyTable implements Table {

676

private volatile Column[] columns;

677

private final Supplier<Column[]> columnsSupplier;

678

679

@Override

680

public Column[] columns() {

681

if (columns == null) {

682

synchronized (this) {

683

if (columns == null) {

684

columns = columnsSupplier.get();

685

}

686

}

687

}

688

return columns;

689

}

690

}

691

```

692

693

### Batch Operations

694

```java

695

public class BatchTableCatalog implements TableCatalog {

696

@Override

697

public Identifier[] listTables(String[] namespace) {

698

// Use batch API to fetch multiple tables efficiently

699

return metastoreClient.batchListTables(namespace);

700

}

701

}

702

```

703

704

The Catalog APIs provide a powerful and flexible foundation for integrating external metadata systems with Spark SQL. They support hierarchical namespaces, comprehensive table management, and extensible capabilities for building robust data platforms.