The Catalog APIs provide a comprehensive framework for implementing custom catalog systems in Apache Spark. These APIs allow you to integrate external metadata stores, implement custom table discovery, and manage database objects like tables, views, and functions.
The base interface for all catalog implementations:
package org.apache.spark.sql.connector.catalog;
public interface CatalogPlugin {
/**
* Initialize the catalog with configuration options
*/
void initialize(String name, CaseInsensitiveStringMap options);
/**
* Return the name of this catalog
*/
String name();
}Usage Example:
public class MyCustomCatalog implements CatalogPlugin {
private String catalogName;
private CaseInsensitiveStringMap options;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
this.options = options;
// Initialize connections, load configuration, etc.
}
@Override
public String name() {
return catalogName;
}
}Main interface for managing tables in a catalog:
public interface TableCatalog extends CatalogPlugin {
// Table property constants
String PROP_LOCATION = "location";
String PROP_IS_MANAGED_LOCATION = "is_managed_location";
String PROP_EXTERNAL = "external";
String PROP_COMMENT = "comment";
String PROP_PROVIDER = "provider";
String PROP_OWNER = "owner";
String OPTION_PREFIX = "option.";
// Table discovery
Identifier[] listTables(String[] namespace);
boolean tableExists(Identifier ident);
// Table loading
Table loadTable(Identifier ident);
Table loadTable(Identifier ident, Set<TableWritePrivilege> writePrivileges);
Table loadTable(Identifier ident, String version);
Table loadTable(Identifier ident, long timestamp);
void invalidateTable(Identifier ident);
// Table lifecycle
/**
* Create a table in the catalog (deprecated - use Column[] version).
* @deprecated Use createTable(Identifier, Column[], Transform[], Map) instead
*/
@Deprecated
Table createTable(Identifier ident, StructType schema,
Transform[] partitions, Map<String, String> properties);
/**
* Create a table in the catalog.
*/
default Table createTable(Identifier ident, Column[] columns,
Transform[] partitions, Map<String, String> properties);
Table alterTable(Identifier ident, TableChange... changes);
boolean dropTable(Identifier ident);
boolean purgeTable(Identifier ident);
void renameTable(Identifier oldIdent, Identifier newIdent);
// Query schema handling
/**
* If true, mark all fields as nullable when executing CREATE TABLE ... AS SELECT.
*/
default boolean useNullableQuerySchema();
// Capabilities
Set<TableCatalogCapability> capabilities();
}Implementation Example:
public class MyTableCatalog extends MyCustomCatalog implements TableCatalog {
private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();
@Override
public Identifier[] listTables(String[] namespace) {
return tables.keySet()
.stream()
.filter(id -> Arrays.equals(id.namespace(), namespace))
.toArray(Identifier[]::new);
}
@Override
public Table loadTable(Identifier ident) {
Table table = tables.get(ident);
if (table == null) {
throw new NoSuchTableException(ident);
}
return table;
}
@Override
public Table createTable(Identifier ident, Column[] columns,
Transform[] partitions, Map<String, String> properties) {
if (tableExists(ident)) {
throw new TableAlreadyExistsException(ident);
}
Table table = new MyCustomTable(ident.name(), columns, partitions, properties);
tables.put(ident, table);
return table;
}
@Override
public boolean dropTable(Identifier ident) {
return tables.remove(ident) != null;
}
@Override
public Set<TableCatalogCapability> capabilities() {
return Set.of(
TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_COLUMN_DEFAULT_VALUE,
TableCatalogCapability.SUPPORTS_PARTITION_MANAGEMENT
);
}
}Interface for managing views in a catalog:
public interface ViewCatalog extends CatalogPlugin {
// View discovery
Identifier[] listViews(String[] namespace);
// View lifecycle
View loadView(Identifier ident);
View createView(Identifier ident, String sql, String currentCatalog,
String[] currentNamespace, Column[] schema,
Map<String, String> properties);
View alterView(Identifier ident, ViewChange... changes);
boolean dropView(Identifier ident);
void renameView(Identifier oldIdent, Identifier newIdent);
}Implementation Example:
public class MyViewCatalog extends MyCustomCatalog implements ViewCatalog {
private final Map<Identifier, View> views = new ConcurrentHashMap<>();
@Override
public View createView(Identifier ident, String sql, String currentCatalog,
String[] currentNamespace, Column[] schema,
Map<String, String> properties) {
View view = new MyCustomView(ident, sql, currentCatalog,
currentNamespace, schema, properties);
views.put(ident, view);
return view;
}
@Override
public View loadView(Identifier ident) {
View view = views.get(ident);
if (view == null) {
throw new NoSuchViewException(ident);
}
return view;
}
}Interface for managing user-defined functions:
public interface FunctionCatalog extends CatalogPlugin {
// Function discovery
Identifier[] listFunctions(String[] namespace);
// Function loading
UnboundFunction loadFunction(Identifier ident);
}Implementation Example:
public class MyFunctionCatalog extends MyCustomCatalog implements FunctionCatalog {
private final Map<Identifier, UnboundFunction> functions = new HashMap<>();
@Override
public Identifier[] listFunctions(String[] namespace) {
return functions.keySet()
.stream()
.filter(id -> Arrays.equals(id.namespace(), namespace))
.toArray(Identifier[]::new);
}
@Override
public UnboundFunction loadFunction(Identifier ident) {
UnboundFunction function = functions.get(ident);
if (function == null) {
throw new NoSuchFunctionException(ident);
}
return function;
}
}Interface for catalogs that support hierarchical namespaces:
public interface SupportsNamespaces {
// Namespace discovery
String[][] listNamespaces();
String[][] listNamespaces(String[] namespace);
// Namespace metadata
Map<String, String> loadNamespaceMetadata(String[] namespace);
// Namespace lifecycle
void createNamespace(String[] namespace, Map<String, String> metadata);
void alterNamespace(String[] namespace, NamespaceChange... changes);
boolean dropNamespace(String[] namespace, boolean cascade);
}Implementation Example:
public class MyNamespaceCatalog extends MyTableCatalog implements SupportsNamespaces {
private final Map<String[], Map<String, String>> namespaces = new HashMap<>();
@Override
public String[][] listNamespaces() {
return namespaces.keySet().toArray(new String[0][]);
}
@Override
public void createNamespace(String[] namespace, Map<String, String> metadata) {
if (namespaces.containsKey(namespace)) {
throw new NamespaceAlreadyExistsException(namespace);
}
namespaces.put(namespace, new HashMap<>(metadata));
}
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace) {
Map<String, String> metadata = namespaces.get(namespace);
if (metadata == null) {
throw new NoSuchNamespaceException(namespace);
}
return new HashMap<>(metadata);
}
}Core interface representing a logical table:
public interface Table {
// Basic metadata
String name();
Column[] columns();
Transform[] partitioning();
Map<String, String> properties();
// Capabilities
Set<TableCapability> capabilities();
// Deprecated - use columns() instead
@Deprecated
StructType schema();
}Tables can implement various support interfaces to provide additional capabilities:
public interface SupportsRead {
ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
}public interface SupportsWrite {
WriteBuilder newWriteBuilder(LogicalWriteInfo info);
}public interface SupportsDelete {
void deleteWhere(Filter[] filters);
}public interface SupportsDeleteV2 {
void deleteWhere(Predicate[] predicates);
}public interface SupportsPartitionManagement {
// Partition lifecycle
void createPartition(InternalRow ident, Map<String, String> properties);
boolean dropPartition(InternalRow ident);
void replacePartitionMetadata(InternalRow ident, Map<String, String> properties);
// Partition metadata
Map<String, String> loadPartitionMetadata(InternalRow ident);
InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);
}Complete Table Implementation Example:
public class MyCustomTable implements Table, SupportsRead, SupportsWrite,
SupportsDelete, SupportsPartitionManagement {
private final String tableName;
private final Column[] columns;
private final Transform[] partitioning;
private final Map<String, String> properties;
public MyCustomTable(String name, Column[] columns, Transform[] partitioning,
Map<String, String> properties) {
this.tableName = name;
this.columns = columns;
this.partitioning = partitioning;
this.properties = properties;
}
@Override
public String name() {
return tableName;
}
@Override
public Column[] columns() {
return columns.clone();
}
@Override
public Transform[] partitioning() {
return partitioning.clone();
}
@Override
public Map<String, String> properties() {
return new HashMap<>(properties);
}
@Override
public Set<TableCapability> capabilities() {
return Set.of(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA,
TableCapability.OVERWRITE_BY_FILTER
);
}
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyTableScanBuilder(this, options);
}
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new MyTableWriteBuilder(this, info);
}
@Override
public void deleteWhere(Filter[] filters) {
// Implementation for deleting rows matching filters
for (Filter filter : filters) {
// Process each filter and delete matching rows
}
}
}Represents a column in a table schema:
public interface Column {
String name();
DataType dataType();
boolean nullable();
String comment();
ColumnDefaultValue defaultValue();
MetadataColumn metadataColumn();
}Implementation Example:
public class MyColumn implements Column {
private final String name;
private final DataType dataType;
private final boolean nullable;
private final String comment;
public MyColumn(String name, DataType dataType, boolean nullable, String comment) {
this.name = name;
this.dataType = dataType;
this.nullable = nullable;
this.comment = comment;
}
@Override
public String name() { return name; }
@Override
public DataType dataType() { return dataType; }
@Override
public boolean nullable() { return nullable; }
@Override
public String comment() { return comment; }
@Override
public ColumnDefaultValue defaultValue() { return null; }
@Override
public MetadataColumn metadataColumn() { return null; }
}Enum defining table capabilities:
public enum TableCapability {
// Read capabilities
BATCH_READ,
MICRO_BATCH_READ,
CONTINUOUS_READ,
// Write capabilities
BATCH_WRITE,
STREAMING_WRITE,
// Schema capabilities
ACCEPT_ANY_SCHEMA,
// Overwrite capabilities
OVERWRITE_BY_FILTER,
OVERWRITE_DYNAMIC,
TRUNCATE
}Interface for table modification operations:
public interface TableChange {
// Common table changes (implemented as nested classes):
// - SetProperty
// - RemoveProperty
// - AddColumn
// - RenameColumn
// - UpdateColumnType
// - UpdateColumnNullability
// - UpdateColumnComment
// - DeleteColumn
// - UpdateColumnPosition
}Usage Example:
// Creating table changes for ALTER TABLE operations
TableChange[] changes = new TableChange[] {
TableChange.setProperty("owner", "new_owner"),
TableChange.addColumn("new_column", DataTypes.StringType),
TableChange.renameColumn("old_name", "new_name")
};
Table alteredTable = catalog.alterTable(identifier, changes);Implement multiple catalog interfaces for full functionality:
public class CompleteCatalog implements TableCatalog, ViewCatalog,
FunctionCatalog, SupportsNamespaces {
private final TableCatalog tableCatalog;
private final ViewCatalog viewCatalog;
private final FunctionCatalog functionCatalog;
private final SupportsNamespaces namespacesSupport;
public CompleteCatalog() {
this.tableCatalog = new MyTableCatalog();
this.viewCatalog = new MyViewCatalog();
this.functionCatalog = new MyFunctionCatalog();
this.namespacesSupport = new MyNamespaceCatalog();
}
// Delegate methods to appropriate implementations...
}public class ExternalMetastoreCatalog implements TableCatalog, SupportsNamespaces {
private final MetastoreClient metastoreClient;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
String metastoreUrl = options.get("metastore.url");
this.metastoreClient = new MetastoreClient(metastoreUrl);
}
@Override
public Identifier[] listTables(String[] namespace) {
return metastoreClient.listTables(namespace)
.stream()
.map(Identifier::of)
.toArray(Identifier[]::new);
}
@Override
public Table loadTable(Identifier ident) {
TableMetadata metadata = metastoreClient.getTable(ident);
return convertToTable(metadata);
}
}public class CachedCatalog implements TableCatalog {
private final TableCatalog delegate;
private final Cache<Identifier, Table> tableCache;
public CachedCatalog(TableCatalog delegate) {
this.delegate = delegate;
this.tableCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}
@Override
public Table loadTable(Identifier ident) {
return tableCache.get(ident, () -> delegate.loadTable(ident));
}
@Override
public void invalidateTable(Identifier ident) {
tableCache.invalidate(ident);
delegate.invalidateTable(ident);
}
}Register custom catalogs in Spark configuration:
// Scala configuration
spark.conf.set("spark.sql.catalog.mycatalog", "com.example.MyCustomCatalog")
spark.conf.set("spark.sql.catalog.mycatalog.option1", "value1")
spark.conf.set("spark.sql.catalog.mycatalog.option2", "value2")
// Using SQL
CREATE CATALOG mycatalog USING com.example.MyCustomCatalog
OPTIONS (
option1 'value1',
option2 'value2'
)-- Use catalog for queries
USE CATALOG mycatalog;
-- Fully qualified table names
SELECT * FROM mycatalog.myschema.mytable;
-- Create tables in custom catalog
CREATE TABLE mycatalog.myschema.newtable (
id INT,
name STRING
) USING DELTA;Implement proper exception handling for catalog operations:
public class MyTableCatalog implements TableCatalog {
@Override
public Table loadTable(Identifier ident) {
try {
// Attempt to load table
return doLoadTable(ident);
} catch (TableNotFoundException e) {
throw new NoSuchTableException(ident);
} catch (AccessDeniedException e) {
throw new UnauthorizedException(
String.format("Access denied for table %s", ident));
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to load table %s", ident), e);
}
}
}public class LazyTable implements Table {
private volatile Column[] columns;
private final Supplier<Column[]> columnsSupplier;
@Override
public Column[] columns() {
if (columns == null) {
synchronized (this) {
if (columns == null) {
columns = columnsSupplier.get();
}
}
}
return columns;
}
}public class BatchTableCatalog implements TableCatalog {
@Override
public Identifier[] listTables(String[] namespace) {
// Use batch API to fetch multiple tables efficiently
return metastoreClient.batchListTables(namespace);
}
}The Catalog APIs provide a powerful and flexible foundation for integrating external metadata systems with Spark SQL. They support hierarchical namespaces, comprehensive table management, and extensible capabilities for building robust data platforms.