or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

catalog-apis.mddocs/

Catalog APIs

The Catalog APIs provide a comprehensive framework for implementing custom catalog systems in Apache Spark. These APIs allow you to integrate external metadata stores, implement custom table discovery, and manage database objects like tables, views, and functions.

Core Interfaces

CatalogPlugin

The base interface for all catalog implementations:

package org.apache.spark.sql.connector.catalog;

public interface CatalogPlugin {
    /**
     * Initialize the catalog with configuration options
     */
    void initialize(String name, CaseInsensitiveStringMap options);
    
    /**
     * Return the name of this catalog
     */
    String name();
}

Usage Example:

public class MyCustomCatalog implements CatalogPlugin {
    private String catalogName;
    private CaseInsensitiveStringMap options;
    
    @Override
    public void initialize(String name, CaseInsensitiveStringMap options) {
        this.catalogName = name;
        this.options = options;
        // Initialize connections, load configuration, etc.
    }
    
    @Override
    public String name() {
        return catalogName;
    }
}

TableCatalog

Main interface for managing tables in a catalog:

public interface TableCatalog extends CatalogPlugin {
    // Table property constants
    String PROP_LOCATION = "location";
    String PROP_IS_MANAGED_LOCATION = "is_managed_location";
    String PROP_EXTERNAL = "external";
    String PROP_COMMENT = "comment";
    String PROP_PROVIDER = "provider";
    String PROP_OWNER = "owner";
    String OPTION_PREFIX = "option.";
    
    // Table discovery
    Identifier[] listTables(String[] namespace);
    boolean tableExists(Identifier ident);
    
    // Table loading
    Table loadTable(Identifier ident);
    Table loadTable(Identifier ident, Set<TableWritePrivilege> writePrivileges);
    Table loadTable(Identifier ident, String version);
    Table loadTable(Identifier ident, long timestamp);
    void invalidateTable(Identifier ident);
    
    // Table lifecycle
    /**
     * Create a table in the catalog (deprecated - use Column[] version).
     * @deprecated Use createTable(Identifier, Column[], Transform[], Map) instead
     */
    @Deprecated
    Table createTable(Identifier ident, StructType schema,
                     Transform[] partitions, Map<String, String> properties);
                     
    /**
     * Create a table in the catalog.
     */
    default Table createTable(Identifier ident, Column[] columns, 
                             Transform[] partitions, Map<String, String> properties);
    Table alterTable(Identifier ident, TableChange... changes);
    boolean dropTable(Identifier ident);
    boolean purgeTable(Identifier ident);
    void renameTable(Identifier oldIdent, Identifier newIdent);
    
    // Query schema handling
    /**
     * If true, mark all fields as nullable when executing CREATE TABLE ... AS SELECT.
     */
    default boolean useNullableQuerySchema();
    
    // Capabilities
    Set<TableCatalogCapability> capabilities();
}

Implementation Example:

public class MyTableCatalog extends MyCustomCatalog implements TableCatalog {
    private final Map<Identifier, Table> tables = new ConcurrentHashMap<>();
    
    @Override
    public Identifier[] listTables(String[] namespace) {
        return tables.keySet()
            .stream()
            .filter(id -> Arrays.equals(id.namespace(), namespace))
            .toArray(Identifier[]::new);
    }
    
    @Override
    public Table loadTable(Identifier ident) {
        Table table = tables.get(ident);
        if (table == null) {
            throw new NoSuchTableException(ident);
        }
        return table;
    }
    
    @Override
    public Table createTable(Identifier ident, Column[] columns, 
                           Transform[] partitions, Map<String, String> properties) {
        if (tableExists(ident)) {
            throw new TableAlreadyExistsException(ident);
        }
        
        Table table = new MyCustomTable(ident.name(), columns, partitions, properties);
        tables.put(ident, table);
        return table;
    }
    
    @Override
    public boolean dropTable(Identifier ident) {
        return tables.remove(ident) != null;
    }
    
    @Override
    public Set<TableCatalogCapability> capabilities() {
        return Set.of(
            TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_COLUMN_DEFAULT_VALUE,
            TableCatalogCapability.SUPPORTS_PARTITION_MANAGEMENT
        );
    }
}

ViewCatalog

Interface for managing views in a catalog:

public interface ViewCatalog extends CatalogPlugin {
    // View discovery
    Identifier[] listViews(String[] namespace);
    
    // View lifecycle  
    View loadView(Identifier ident);
    View createView(Identifier ident, String sql, String currentCatalog,
                   String[] currentNamespace, Column[] schema, 
                   Map<String, String> properties);
    View alterView(Identifier ident, ViewChange... changes);
    boolean dropView(Identifier ident);
    void renameView(Identifier oldIdent, Identifier newIdent);
}

Implementation Example:

public class MyViewCatalog extends MyCustomCatalog implements ViewCatalog {
    private final Map<Identifier, View> views = new ConcurrentHashMap<>();
    
    @Override
    public View createView(Identifier ident, String sql, String currentCatalog,
                          String[] currentNamespace, Column[] schema,
                          Map<String, String> properties) {
        View view = new MyCustomView(ident, sql, currentCatalog, 
                                   currentNamespace, schema, properties);
        views.put(ident, view);
        return view;
    }
    
    @Override
    public View loadView(Identifier ident) {
        View view = views.get(ident);
        if (view == null) {
            throw new NoSuchViewException(ident);
        }
        return view;
    }
}

FunctionCatalog

Interface for managing user-defined functions:

public interface FunctionCatalog extends CatalogPlugin {
    // Function discovery
    Identifier[] listFunctions(String[] namespace);
    
    // Function loading
    UnboundFunction loadFunction(Identifier ident);
}

Implementation Example:

public class MyFunctionCatalog extends MyCustomCatalog implements FunctionCatalog {
    private final Map<Identifier, UnboundFunction> functions = new HashMap<>();
    
    @Override
    public Identifier[] listFunctions(String[] namespace) {
        return functions.keySet()
            .stream()
            .filter(id -> Arrays.equals(id.namespace(), namespace))
            .toArray(Identifier[]::new);
    }
    
    @Override
    public UnboundFunction loadFunction(Identifier ident) {
        UnboundFunction function = functions.get(ident);
        if (function == null) {
            throw new NoSuchFunctionException(ident);
        }
        return function;
    }
}

Catalog Extensions

SupportsNamespaces

Interface for catalogs that support hierarchical namespaces:

public interface SupportsNamespaces {
    // Namespace discovery
    String[][] listNamespaces();
    String[][] listNamespaces(String[] namespace);
    
    // Namespace metadata
    Map<String, String> loadNamespaceMetadata(String[] namespace);
    
    // Namespace lifecycle
    void createNamespace(String[] namespace, Map<String, String> metadata);
    void alterNamespace(String[] namespace, NamespaceChange... changes);
    boolean dropNamespace(String[] namespace, boolean cascade);
}

Implementation Example:

public class MyNamespaceCatalog extends MyTableCatalog implements SupportsNamespaces {
    private final Map<String[], Map<String, String>> namespaces = new HashMap<>();
    
    @Override
    public String[][] listNamespaces() {
        return namespaces.keySet().toArray(new String[0][]);
    }
    
    @Override
    public void createNamespace(String[] namespace, Map<String, String> metadata) {
        if (namespaces.containsKey(namespace)) {
            throw new NamespaceAlreadyExistsException(namespace);
        }
        namespaces.put(namespace, new HashMap<>(metadata));
    }
    
    @Override
    public Map<String, String> loadNamespaceMetadata(String[] namespace) {
        Map<String, String> metadata = namespaces.get(namespace);
        if (metadata == null) {
            throw new NoSuchNamespaceException(namespace);
        }
        return new HashMap<>(metadata);
    }
}

Table Interfaces

Table

Core interface representing a logical table:

public interface Table {
    // Basic metadata
    String name();
    Column[] columns();
    Transform[] partitioning();
    Map<String, String> properties();
    
    // Capabilities
    Set<TableCapability> capabilities();
    
    // Deprecated - use columns() instead
    @Deprecated
    StructType schema();
}

Table Support Interfaces

Tables can implement various support interfaces to provide additional capabilities:

SupportsRead

public interface SupportsRead {
    ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
}

SupportsWrite

public interface SupportsWrite {
    WriteBuilder newWriteBuilder(LogicalWriteInfo info);
}

SupportsDelete

public interface SupportsDelete {
    void deleteWhere(Filter[] filters);
}

SupportsDeleteV2

public interface SupportsDeleteV2 {
    void deleteWhere(Predicate[] predicates);
}

SupportsPartitionManagement

public interface SupportsPartitionManagement {
    // Partition lifecycle
    void createPartition(InternalRow ident, Map<String, String> properties);
    boolean dropPartition(InternalRow ident);
    void replacePartitionMetadata(InternalRow ident, Map<String, String> properties);
    
    // Partition metadata
    Map<String, String> loadPartitionMetadata(InternalRow ident);
    InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);
}

Complete Table Implementation Example:

public class MyCustomTable implements Table, SupportsRead, SupportsWrite, 
                                    SupportsDelete, SupportsPartitionManagement {
    private final String tableName;
    private final Column[] columns;
    private final Transform[] partitioning;
    private final Map<String, String> properties;
    
    public MyCustomTable(String name, Column[] columns, Transform[] partitioning,
                        Map<String, String> properties) {
        this.tableName = name;
        this.columns = columns;
        this.partitioning = partitioning;
        this.properties = properties;
    }
    
    @Override
    public String name() {
        return tableName;
    }
    
    @Override
    public Column[] columns() {
        return columns.clone();
    }
    
    @Override
    public Transform[] partitioning() {
        return partitioning.clone();
    }
    
    @Override
    public Map<String, String> properties() {
        return new HashMap<>(properties);
    }
    
    @Override
    public Set<TableCapability> capabilities() {
        return Set.of(
            TableCapability.BATCH_READ,
            TableCapability.BATCH_WRITE,
            TableCapability.ACCEPT_ANY_SCHEMA,
            TableCapability.OVERWRITE_BY_FILTER
        );
    }
    
    @Override
    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
        return new MyTableScanBuilder(this, options);
    }
    
    @Override
    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
        return new MyTableWriteBuilder(this, info);
    }
    
    @Override
    public void deleteWhere(Filter[] filters) {
        // Implementation for deleting rows matching filters
        for (Filter filter : filters) {
            // Process each filter and delete matching rows
        }
    }
}

Data Structure Classes

Column

Represents a column in a table schema:

public interface Column {
    String name();
    DataType dataType();
    boolean nullable();
    String comment();
    ColumnDefaultValue defaultValue();
    MetadataColumn metadataColumn();
}

Implementation Example:

public class MyColumn implements Column {
    private final String name;
    private final DataType dataType;
    private final boolean nullable;
    private final String comment;
    
    public MyColumn(String name, DataType dataType, boolean nullable, String comment) {
        this.name = name;
        this.dataType = dataType;
        this.nullable = nullable;
        this.comment = comment;
    }
    
    @Override
    public String name() { return name; }
    
    @Override
    public DataType dataType() { return dataType; }
    
    @Override
    public boolean nullable() { return nullable; }
    
    @Override
    public String comment() { return comment; }
    
    @Override
    public ColumnDefaultValue defaultValue() { return null; }
    
    @Override
    public MetadataColumn metadataColumn() { return null; }
}

TableCapability

Enum defining table capabilities:

public enum TableCapability {
    // Read capabilities
    BATCH_READ,
    MICRO_BATCH_READ,
    CONTINUOUS_READ,
    
    // Write capabilities  
    BATCH_WRITE,
    STREAMING_WRITE,
    
    // Schema capabilities
    ACCEPT_ANY_SCHEMA,
    
    // Overwrite capabilities
    OVERWRITE_BY_FILTER,
    OVERWRITE_DYNAMIC,
    TRUNCATE
}

TableChange

Interface for table modification operations:

public interface TableChange {
    // Common table changes (implemented as nested classes):
    // - SetProperty
    // - RemoveProperty  
    // - AddColumn
    // - RenameColumn
    // - UpdateColumnType
    // - UpdateColumnNullability
    // - UpdateColumnComment
    // - DeleteColumn
    // - UpdateColumnPosition
}

Usage Example:

// Creating table changes for ALTER TABLE operations
TableChange[] changes = new TableChange[] {
    TableChange.setProperty("owner", "new_owner"),
    TableChange.addColumn("new_column", DataTypes.StringType),
    TableChange.renameColumn("old_name", "new_name")
};

Table alteredTable = catalog.alterTable(identifier, changes);

Advanced Usage Patterns

Multi-Catalog Implementation

Implement multiple catalog interfaces for full functionality:

public class CompleteCatalog implements TableCatalog, ViewCatalog, 
                                      FunctionCatalog, SupportsNamespaces {
    private final TableCatalog tableCatalog;
    private final ViewCatalog viewCatalog;
    private final FunctionCatalog functionCatalog;
    private final SupportsNamespaces namespacesSupport;
    
    public CompleteCatalog() {
        this.tableCatalog = new MyTableCatalog();
        this.viewCatalog = new MyViewCatalog(); 
        this.functionCatalog = new MyFunctionCatalog();
        this.namespacesSupport = new MyNamespaceCatalog();
    }
    
    // Delegate methods to appropriate implementations...
}

Catalog with External Metadata Store

public class ExternalMetastoreCatalog implements TableCatalog, SupportsNamespaces {
    private final MetastoreClient metastoreClient;
    
    @Override
    public void initialize(String name, CaseInsensitiveStringMap options) {
        String metastoreUrl = options.get("metastore.url");
        this.metastoreClient = new MetastoreClient(metastoreUrl);
    }
    
    @Override
    public Identifier[] listTables(String[] namespace) {
        return metastoreClient.listTables(namespace)
            .stream()
            .map(Identifier::of)
            .toArray(Identifier[]::new);
    }
    
    @Override
    public Table loadTable(Identifier ident) {
        TableMetadata metadata = metastoreClient.getTable(ident);
        return convertToTable(metadata);
    }
}

Cached Catalog Implementation

public class CachedCatalog implements TableCatalog {
    private final TableCatalog delegate;
    private final Cache<Identifier, Table> tableCache;
    
    public CachedCatalog(TableCatalog delegate) {
        this.delegate = delegate;
        this.tableCache = CacheBuilder.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();
    }
    
    @Override
    public Table loadTable(Identifier ident) {
        return tableCache.get(ident, () -> delegate.loadTable(ident));
    }
    
    @Override
    public void invalidateTable(Identifier ident) {
        tableCache.invalidate(ident);
        delegate.invalidateTable(ident);
    }
}

Configuration and Setup

Catalog Registration

Register custom catalogs in Spark configuration:

// Scala configuration
spark.conf.set("spark.sql.catalog.mycatalog", "com.example.MyCustomCatalog")
spark.conf.set("spark.sql.catalog.mycatalog.option1", "value1")
spark.conf.set("spark.sql.catalog.mycatalog.option2", "value2")

// Using SQL
CREATE CATALOG mycatalog USING com.example.MyCustomCatalog
OPTIONS (
  option1 'value1',
  option2 'value2'
)

Catalog Usage in SQL

-- Use catalog for queries
USE CATALOG mycatalog;

-- Fully qualified table names
SELECT * FROM mycatalog.myschema.mytable;

-- Create tables in custom catalog
CREATE TABLE mycatalog.myschema.newtable (
  id INT,
  name STRING
) USING DELTA;

Error Handling

Implement proper exception handling for catalog operations:

public class MyTableCatalog implements TableCatalog {
    @Override
    public Table loadTable(Identifier ident) {
        try {
            // Attempt to load table
            return doLoadTable(ident);
        } catch (TableNotFoundException e) {
            throw new NoSuchTableException(ident);
        } catch (AccessDeniedException e) {
            throw new UnauthorizedException(
                String.format("Access denied for table %s", ident));
        } catch (Exception e) {
            throw new RuntimeException(
                String.format("Failed to load table %s", ident), e);
        }
    }
}

Performance Considerations

Lazy Loading

public class LazyTable implements Table {
    private volatile Column[] columns;
    private final Supplier<Column[]> columnsSupplier;
    
    @Override
    public Column[] columns() {
        if (columns == null) {
            synchronized (this) {
                if (columns == null) {
                    columns = columnsSupplier.get();
                }
            }
        }
        return columns;
    }
}

Batch Operations

public class BatchTableCatalog implements TableCatalog {
    @Override
    public Identifier[] listTables(String[] namespace) {
        // Use batch API to fetch multiple tables efficiently
        return metastoreClient.batchListTables(namespace);
    }
}

The Catalog APIs provide a powerful and flexible foundation for integrating external metadata systems with Spark SQL. They support hierarchical namespaces, comprehensive table management, and extensible capabilities for building robust data platforms.