0
# Catalog APIs
1
2
The Catalog APIs provide a comprehensive framework for implementing custom catalog systems in Apache Spark. These APIs allow you to integrate external metadata stores, implement custom table discovery, and manage database objects like tables, views, and functions.
3
4
## Core Interfaces
5
6
### CatalogPlugin
7
8
The base interface for all catalog implementations:
9
10
```java { .api }
11
package org.apache.spark.sql.connector.catalog;
12
13
public interface CatalogPlugin {
14
/**
15
* Initialize the catalog with configuration options
16
*/
17
void initialize(String name, CaseInsensitiveStringMap options);
18
19
/**
20
* Return the name of this catalog
21
*/
22
String name();
23
}
24
```
25
26
**Usage Example:**
27
```java
28
public class MyCustomCatalog implements CatalogPlugin {
29
private String catalogName;
30
private CaseInsensitiveStringMap options;
31
32
@Override
33
public void initialize(String name, CaseInsensitiveStringMap options) {
34
this.catalogName = name;
35
this.options = options;
36
// Initialize connections, load configuration, etc.
37
}
38
39
@Override
40
public String name() {
41
return catalogName;
42
}
43
}
44
```
45
46
### TableCatalog
47
48
Main interface for managing tables in a catalog:
49
50
```java { .api }
51
public interface TableCatalog extends CatalogPlugin {
52
// Table property constants
53
String PROP_LOCATION = "location";
54
String PROP_IS_MANAGED_LOCATION = "is_managed_location";
55
String PROP_EXTERNAL = "external";
56
String PROP_COMMENT = "comment";
57
String PROP_PROVIDER = "provider";
58
String PROP_OWNER = "owner";
59
String OPTION_PREFIX = "option.";
60
61
// Table discovery
62
Identifier[] listTables(String[] namespace);
63
boolean tableExists(Identifier ident);
64
65
// Table loading
66
Table loadTable(Identifier ident);
67
Table loadTable(Identifier ident, Set<TableWritePrivilege> writePrivileges);
68
Table loadTable(Identifier ident, String version);
69
Table loadTable(Identifier ident, long timestamp);
70
void invalidateTable(Identifier ident);
71
72
// Table lifecycle
73
/**
74
* Create a table in the catalog (deprecated - use Column[] version).
75
* @deprecated Use createTable(Identifier, Column[], Transform[], Map) instead
76
*/
77
@Deprecated
78
Table createTable(Identifier ident, StructType schema,
79
Transform[] partitions, Map<String, String> properties);
80
81
/**
82
* Create a table in the catalog.
83
*/
84
default Table createTable(Identifier ident, Column[] columns,
85
Transform[] partitions, Map<String, String> properties);
86
Table alterTable(Identifier ident, TableChange... changes);
87
boolean dropTable(Identifier ident);
88
boolean purgeTable(Identifier ident);
89
void renameTable(Identifier oldIdent, Identifier newIdent);
90
91
// Query schema handling
92
/**
93
* If true, mark all fields as nullable when executing CREATE TABLE ... AS SELECT.
94
*/
95
default boolean useNullableQuerySchema();
96
97
// Capabilities
98
Set<TableCatalogCapability> capabilities();
99
}
100
```
101
102
**Implementation Example:**
103
```java
104
public class MyTableCatalog extends MyCustomCatalog implements TableCatalog {
105
private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();
106
107
@Override
108
public Identifier[] listTables(String[] namespace) {
109
return tables.keySet()
110
.stream()
111
.filter(id -> Arrays.equals(id.namespace(), namespace))
112
.toArray(Identifier[]::new);
113
}
114
115
@Override
116
public Table loadTable(Identifier ident) {
117
Table table = tables.get(ident);
118
if (table == null) {
119
throw new NoSuchTableException(ident);
120
}
121
return table;
122
}
123
124
@Override
125
public Table createTable(Identifier ident, Column[] columns,
126
Transform[] partitions, Map<String, String> properties) {
127
if (tableExists(ident)) {
128
throw new TableAlreadyExistsException(ident);
129
}
130
131
Table table = new MyCustomTable(ident.name(), columns, partitions, properties);
132
tables.put(ident, table);
133
return table;
134
}
135
136
@Override
137
public boolean dropTable(Identifier ident) {
138
return tables.remove(ident) != null;
139
}
140
141
@Override
142
public Set<TableCatalogCapability> capabilities() {
143
return Set.of(
144
TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_COLUMN_DEFAULT_VALUE,
145
TableCatalogCapability.SUPPORTS_PARTITION_MANAGEMENT
146
);
147
}
148
}
149
```
150
151
### ViewCatalog
152
153
Interface for managing views in a catalog:
154
155
```java { .api }
156
public interface ViewCatalog extends CatalogPlugin {
157
// View discovery
158
Identifier[] listViews(String[] namespace);
159
160
// View lifecycle
161
View loadView(Identifier ident);
162
View createView(Identifier ident, String sql, String currentCatalog,
163
String[] currentNamespace, Column[] schema,
164
Map<String, String> properties);
165
View alterView(Identifier ident, ViewChange... changes);
166
boolean dropView(Identifier ident);
167
void renameView(Identifier oldIdent, Identifier newIdent);
168
}
169
```
170
171
**Implementation Example:**
172
```java
173
public class MyViewCatalog extends MyCustomCatalog implements ViewCatalog {
174
private final Map<Identifier, View> views = new ConcurrentHashMap<>();
175
176
@Override
177
public View createView(Identifier ident, String sql, String currentCatalog,
178
String[] currentNamespace, Column[] schema,
179
Map<String, String> properties) {
180
View view = new MyCustomView(ident, sql, currentCatalog,
181
currentNamespace, schema, properties);
182
views.put(ident, view);
183
return view;
184
}
185
186
@Override
187
public View loadView(Identifier ident) {
188
View view = views.get(ident);
189
if (view == null) {
190
throw new NoSuchViewException(ident);
191
}
192
return view;
193
}
194
}
195
```
196
197
### FunctionCatalog
198
199
Interface for managing user-defined functions:
200
201
```java { .api }
202
public interface FunctionCatalog extends CatalogPlugin {
203
// Function discovery
204
Identifier[] listFunctions(String[] namespace);
205
206
// Function loading
207
UnboundFunction loadFunction(Identifier ident);
208
}
209
```
210
211
**Implementation Example:**
212
```java
213
public class MyFunctionCatalog extends MyCustomCatalog implements FunctionCatalog {
214
private final Map<Identifier, UnboundFunction> functions = new HashMap<>();
215
216
@Override
217
public Identifier[] listFunctions(String[] namespace) {
218
return functions.keySet()
219
.stream()
220
.filter(id -> Arrays.equals(id.namespace(), namespace))
221
.toArray(Identifier[]::new);
222
}
223
224
@Override
225
public UnboundFunction loadFunction(Identifier ident) {
226
UnboundFunction function = functions.get(ident);
227
if (function == null) {
228
throw new NoSuchFunctionException(ident);
229
}
230
return function;
231
}
232
}
233
```
234
235
## Catalog Extensions
236
237
### SupportsNamespaces
238
239
Interface for catalogs that support hierarchical namespaces:
240
241
```java { .api }
242
public interface SupportsNamespaces {
243
// Namespace discovery
244
String[][] listNamespaces();
245
String[][] listNamespaces(String[] namespace);
246
247
// Namespace metadata
248
Map<String, String> loadNamespaceMetadata(String[] namespace);
249
250
// Namespace lifecycle
251
void createNamespace(String[] namespace, Map<String, String> metadata);
252
void alterNamespace(String[] namespace, NamespaceChange... changes);
253
boolean dropNamespace(String[] namespace, boolean cascade);
254
}
255
```
256
257
**Implementation Example:**
258
```java
259
public class MyNamespaceCatalog extends MyTableCatalog implements SupportsNamespaces {
260
private final Map<String[], Map<String, String>> namespaces = new HashMap<>();
261
262
@Override
263
public String[][] listNamespaces() {
264
return namespaces.keySet().toArray(new String[0][]);
265
}
266
267
@Override
268
public void createNamespace(String[] namespace, Map<String, String> metadata) {
269
if (namespaces.containsKey(namespace)) {
270
throw new NamespaceAlreadyExistsException(namespace);
271
}
272
namespaces.put(namespace, new HashMap<>(metadata));
273
}
274
275
@Override
276
public Map<String, String> loadNamespaceMetadata(String[] namespace) {
277
Map<String, String> metadata = namespaces.get(namespace);
278
if (metadata == null) {
279
throw new NoSuchNamespaceException(namespace);
280
}
281
return new HashMap<>(metadata);
282
}
283
}
284
```
285
286
## Table Interfaces
287
288
### Table
289
290
Core interface representing a logical table:
291
292
```java { .api }
293
public interface Table {
294
// Basic metadata
295
String name();
296
Column[] columns();
297
Transform[] partitioning();
298
Map<String, String> properties();
299
300
// Capabilities
301
Set<TableCapability> capabilities();
302
303
// Deprecated - use columns() instead
304
@Deprecated
305
StructType schema();
306
}
307
```
308
309
### Table Support Interfaces
310
311
Tables can implement various support interfaces to provide additional capabilities:
312
313
#### SupportsRead
314
```java { .api }
315
public interface SupportsRead {
316
ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
317
}
318
```
319
320
#### SupportsWrite
321
```java { .api }
322
public interface SupportsWrite {
323
WriteBuilder newWriteBuilder(LogicalWriteInfo info);
324
}
325
```
326
327
#### SupportsDelete
328
```java { .api }
329
public interface SupportsDelete {
330
void deleteWhere(Filter[] filters);
331
}
332
```
333
334
#### SupportsDeleteV2
335
```java { .api }
336
public interface SupportsDeleteV2 {
337
void deleteWhere(Predicate[] predicates);
338
}
339
```
340
341
#### SupportsPartitionManagement
342
```java { .api }
343
public interface SupportsPartitionManagement {
344
// Partition lifecycle
345
void createPartition(InternalRow ident, Map<String, String> properties);
346
boolean dropPartition(InternalRow ident);
347
void replacePartitionMetadata(InternalRow ident, Map<String, String> properties);
348
349
// Partition metadata
350
Map<String, String> loadPartitionMetadata(InternalRow ident);
351
InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);
352
}
353
```
354
355
**Complete Table Implementation Example:**
356
```java
357
public class MyCustomTable implements Table, SupportsRead, SupportsWrite,
358
SupportsDelete, SupportsPartitionManagement {
359
private final String tableName;
360
private final Column[] columns;
361
private final Transform[] partitioning;
362
private final Map<String, String> properties;
363
364
public MyCustomTable(String name, Column[] columns, Transform[] partitioning,
365
Map<String, String> properties) {
366
this.tableName = name;
367
this.columns = columns;
368
this.partitioning = partitioning;
369
this.properties = properties;
370
}
371
372
@Override
373
public String name() {
374
return tableName;
375
}
376
377
@Override
378
public Column[] columns() {
379
return columns.clone();
380
}
381
382
@Override
383
public Transform[] partitioning() {
384
return partitioning.clone();
385
}
386
387
@Override
388
public Map<String, String> properties() {
389
return new HashMap<>(properties);
390
}
391
392
@Override
393
public Set<TableCapability> capabilities() {
394
return Set.of(
395
TableCapability.BATCH_READ,
396
TableCapability.BATCH_WRITE,
397
TableCapability.ACCEPT_ANY_SCHEMA,
398
TableCapability.OVERWRITE_BY_FILTER
399
);
400
}
401
402
@Override
403
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
404
return new MyTableScanBuilder(this, options);
405
}
406
407
@Override
408
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
409
return new MyTableWriteBuilder(this, info);
410
}
411
412
@Override
413
public void deleteWhere(Filter[] filters) {
414
// Implementation for deleting rows matching filters
415
for (Filter filter : filters) {
416
// Process each filter and delete matching rows
417
}
418
}
419
}
420
```
421
422
## Data Structure Classes
423
424
### Column
425
426
Represents a column in a table schema:
427
428
```java { .api }
429
public interface Column {
430
String name();
431
DataType dataType();
432
boolean nullable();
433
String comment();
434
ColumnDefaultValue defaultValue();
435
MetadataColumn metadataColumn();
436
}
437
```
438
439
**Implementation Example:**
440
```java
441
public class MyColumn implements Column {
442
private final String name;
443
private final DataType dataType;
444
private final boolean nullable;
445
private final String comment;
446
447
public MyColumn(String name, DataType dataType, boolean nullable, String comment) {
448
this.name = name;
449
this.dataType = dataType;
450
this.nullable = nullable;
451
this.comment = comment;
452
}
453
454
@Override
455
public String name() { return name; }
456
457
@Override
458
public DataType dataType() { return dataType; }
459
460
@Override
461
public boolean nullable() { return nullable; }
462
463
@Override
464
public String comment() { return comment; }
465
466
@Override
467
public ColumnDefaultValue defaultValue() { return null; }
468
469
@Override
470
public MetadataColumn metadataColumn() { return null; }
471
}
472
```
473
474
### TableCapability
475
476
Enum defining table capabilities:
477
478
```java { .api }
479
public enum TableCapability {
480
// Read capabilities
481
BATCH_READ,
482
MICRO_BATCH_READ,
483
CONTINUOUS_READ,
484
485
// Write capabilities
486
BATCH_WRITE,
487
STREAMING_WRITE,
488
489
// Schema capabilities
490
ACCEPT_ANY_SCHEMA,
491
492
// Overwrite capabilities
493
OVERWRITE_BY_FILTER,
494
OVERWRITE_DYNAMIC,
495
TRUNCATE
496
}
497
```
498
499
### TableChange
500
501
Interface for table modification operations:
502
503
```java { .api }
504
public interface TableChange {
505
// Common table changes (implemented as nested classes):
506
// - SetProperty
507
// - RemoveProperty
508
// - AddColumn
509
// - RenameColumn
510
// - UpdateColumnType
511
// - UpdateColumnNullability
512
// - UpdateColumnComment
513
// - DeleteColumn
514
// - UpdateColumnPosition
515
}
516
```
517
518
**Usage Example:**
519
```java
520
// Creating table changes for ALTER TABLE operations
521
TableChange[] changes = new TableChange[] {
522
TableChange.setProperty("owner", "new_owner"),
523
TableChange.addColumn("new_column", DataTypes.StringType),
524
TableChange.renameColumn("old_name", "new_name")
525
};
526
527
Table alteredTable = catalog.alterTable(identifier, changes);
528
```
529
530
## Advanced Usage Patterns
531
532
### Multi-Catalog Implementation
533
534
Implement multiple catalog interfaces for full functionality:
535
536
```java
537
public class CompleteCatalog implements TableCatalog, ViewCatalog,
538
FunctionCatalog, SupportsNamespaces {
539
private final TableCatalog tableCatalog;
540
private final ViewCatalog viewCatalog;
541
private final FunctionCatalog functionCatalog;
542
private final SupportsNamespaces namespacesSupport;
543
544
public CompleteCatalog() {
545
this.tableCatalog = new MyTableCatalog();
546
this.viewCatalog = new MyViewCatalog();
547
this.functionCatalog = new MyFunctionCatalog();
548
this.namespacesSupport = new MyNamespaceCatalog();
549
}
550
551
// Delegate methods to appropriate implementations...
552
}
553
```
554
555
### Catalog with External Metadata Store
556
557
```java
558
public class ExternalMetastoreCatalog implements TableCatalog, SupportsNamespaces {
559
private final MetastoreClient metastoreClient;
560
561
@Override
562
public void initialize(String name, CaseInsensitiveStringMap options) {
563
String metastoreUrl = options.get("metastore.url");
564
this.metastoreClient = new MetastoreClient(metastoreUrl);
565
}
566
567
@Override
568
public Identifier[] listTables(String[] namespace) {
569
return metastoreClient.listTables(namespace)
570
.stream()
571
.map(Identifier::of)
572
.toArray(Identifier[]::new);
573
}
574
575
@Override
576
public Table loadTable(Identifier ident) {
577
TableMetadata metadata = metastoreClient.getTable(ident);
578
return convertToTable(metadata);
579
}
580
}
581
```
582
583
### Cached Catalog Implementation
584
585
```java
586
public class CachedCatalog implements TableCatalog {
587
private final TableCatalog delegate;
588
private final Cache<Identifier, Table> tableCache;
589
590
public CachedCatalog(TableCatalog delegate) {
591
this.delegate = delegate;
592
this.tableCache = CacheBuilder.newBuilder()
593
.maximumSize(1000)
594
.expireAfterWrite(10, TimeUnit.MINUTES)
595
.build();
596
}
597
598
@Override
599
public Table loadTable(Identifier ident) {
600
return tableCache.get(ident, () -> delegate.loadTable(ident));
601
}
602
603
@Override
604
public void invalidateTable(Identifier ident) {
605
tableCache.invalidate(ident);
606
delegate.invalidateTable(ident);
607
}
608
}
609
```
610
611
## Configuration and Setup
612
613
### Catalog Registration
614
615
Register custom catalogs in Spark configuration:
616
617
```scala
618
// Scala configuration
619
spark.conf.set("spark.sql.catalog.mycatalog", "com.example.MyCustomCatalog")
620
spark.conf.set("spark.sql.catalog.mycatalog.option1", "value1")
621
spark.conf.set("spark.sql.catalog.mycatalog.option2", "value2")
622
623
// Using SQL
624
CREATE CATALOG mycatalog USING com.example.MyCustomCatalog
625
OPTIONS (
626
option1 'value1',
627
option2 'value2'
628
)
629
```
630
631
### Catalog Usage in SQL
632
633
```sql
634
-- Use catalog for queries
635
USE CATALOG mycatalog;
636
637
-- Fully qualified table names
638
SELECT * FROM mycatalog.myschema.mytable;
639
640
-- Create tables in custom catalog
641
CREATE TABLE mycatalog.myschema.newtable (
642
id INT,
643
name STRING
644
) USING DELTA;
645
```
646
647
## Error Handling
648
649
Implement proper exception handling for catalog operations:
650
651
```java
652
public class MyTableCatalog implements TableCatalog {
653
@Override
654
public Table loadTable(Identifier ident) {
655
try {
656
// Attempt to load table
657
return doLoadTable(ident);
658
} catch (TableNotFoundException e) {
659
throw new NoSuchTableException(ident);
660
} catch (AccessDeniedException e) {
661
throw new UnauthorizedException(
662
String.format("Access denied for table %s", ident));
663
} catch (Exception e) {
664
throw new RuntimeException(
665
String.format("Failed to load table %s", ident), e);
666
}
667
}
668
}
669
```
670
671
## Performance Considerations
672
673
### Lazy Loading
674
```java
675
public class LazyTable implements Table {
676
private volatile Column[] columns;
677
private final Supplier<Column[]> columnsSupplier;
678
679
@Override
680
public Column[] columns() {
681
if (columns == null) {
682
synchronized (this) {
683
if (columns == null) {
684
columns = columnsSupplier.get();
685
}
686
}
687
}
688
return columns;
689
}
690
}
691
```
692
693
### Batch Operations
694
```java
695
public class BatchTableCatalog implements TableCatalog {
696
@Override
697
public Identifier[] listTables(String[] namespace) {
698
// Use batch API to fetch multiple tables efficiently
699
return metastoreClient.batchListTables(namespace);
700
}
701
}
702
```
703
704
The Catalog APIs provide a powerful and flexible foundation for integrating external metadata systems with Spark SQL. They support hierarchical namespaces, comprehensive table management, and extensible capabilities for building robust data platforms.