CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-uber

Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.

Pending
Overview
Eval results
Files

connectors.mddocs/

Connector Framework

Extensible connector architecture with built-in connectors for testing and development, plus framework for custom connector development in Apache Flink's Table API.

Capabilities

Connector Framework Interfaces

Base interfaces for implementing custom table sources and sinks.

/**
 * Base interface for dynamic table sources
 */
public interface DynamicTableSource extends TableSource {
    /**
     * Create a copy of this source with additional abilities
     * @param abilities Required abilities for the source
     * @return New source instance with the specified abilities
     */
    public DynamicTableSource copy();
    
    /**
     * Get a string summary of this source
     * @return Human-readable summary
     */
    public String asSummaryString();
}

/**
 * Interface for scan-based table sources (batch and streaming)
 */
public interface ScanTableSource extends DynamicTableSource {
    /**
     * Get the change log mode supported by this source
     * @return Change log mode (INSERT_ONLY, UPSERT, etc.)
     */
    public ChangelogMode getChangelogMode();
    
    /**
     * Create the actual source provider for runtime
     * @param context Context with parallelism and other runtime info
     * @return Source provider for the Flink runtime
     */
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);
}

/**
 * Interface for lookup table sources (for joins)
 */
public interface LookupTableSource extends DynamicTableSource {
    /**
     * Create the lookup runtime provider
     * @param context Context with lookup configuration
     * @return Runtime provider for lookup operations
     */
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
}

/**
 * Base interface for dynamic table sinks
 */
public interface DynamicTableSink extends TableSink {
    /**
     * Get the change log mode consumed by this sink
     * @param requestedMode Requested change log mode from query
     * @return Change log mode accepted by this sink
     */
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
    
    /**
     * Create the sink runtime provider
     * @param context Context with parallelism and other runtime info
     * @return Sink provider for the Flink runtime
     */
    public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context);
    
    /**
     * Create a copy of this sink with additional abilities
     * @param abilities Required abilities for the sink
     * @return New sink instance with the specified abilities
     */
    public DynamicTableSink copy();
    
    /**
     * Get a string summary of this sink
     * @return Human-readable summary
     */
    public String asSummaryString();
}

Connector Factory Interface

Interface for creating connectors dynamically based on configuration.

/**
 * Factory interface for creating dynamic table sources
 */
public interface DynamicTableSourceFactory extends Factory {
    /**
     * Create a table source based on the context
     * @param context Creation context with options and schema
     * @return Created table source instance
     */
    public DynamicTableSource createDynamicTableSource(Context context);
}

/**
 * Factory interface for creating dynamic table sinks
 */
public interface DynamicTableSinkFactory extends Factory {
    /**
     * Create a table sink based on the context
     * @param context Creation context with options and schema
     * @return Created table sink instance
     */
    public DynamicTableSink createDynamicTableSink(Context context);
}

/**
 * Base factory interface with common metadata
 */
public interface Factory {
    /**
     * Get unique identifier for this factory
     * @return Factory identifier (e.g., "kafka", "filesystem")
     */
    public String factoryIdentifier();
    
    /**
     * Get required configuration options
     * @return Set of required configuration keys
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Get optional configuration options
     * @return Set of optional configuration keys
     */
    public Set<ConfigOption<?>> optionalOptions();
}

Built-in Connectors

Ready-to-use connectors included in the uber JAR for testing and development.

/**
 * Factory for data generation connector (for testing)
 */
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
    @Override
    public String factoryIdentifier() {
        return "datagen";
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return Set.of(
            DataGenConnectorOptions.ROWS_PER_SECOND,
            DataGenConnectorOptions.NUMBER_OF_ROWS,
            DataGenConnectorOptions.FIELDS
        );
    }
}

/**
 * Factory for print sink connector (for debugging)
 */
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
    @Override
    public String factoryIdentifier() {
        return "print";
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return Set.of(
            PrintConnectorOptions.PRINT_IDENTIFIER,
            PrintConnectorOptions.STANDARD_ERROR
        );
    }
}

/**
 * Factory for blackhole sink connector (for performance testing)
 */
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
    @Override
    public String factoryIdentifier() {
        return "blackhole";
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return Collections.emptySet();
    }
}

Built-in Connector Usage:

// Data generation source for testing
tEnv.executeSql("CREATE TABLE test_source (" +
    "id BIGINT," +
    "name STRING," +
    "amount DECIMAL(10,2)," +
    "event_time TIMESTAMP(3)" +
    ") WITH (" +
    "'connector' = 'datagen'," +
    "'rows-per-second' = '100'," +
    "'number-of-rows' = '10000'," +
    "'fields.id.kind' = 'sequence'," +
    "'fields.id.start' = '1'," +
    "'fields.id.end' = '10000'," +
    "'fields.name.length' = '10'," +
    "'fields.amount.min' = '1.00'," +
    "'fields.amount.max' = '1000.00'" +
    ")");

// Print sink for debugging output
tEnv.executeSql("CREATE TABLE debug_sink (" +
    "id BIGINT," +
    "name STRING," +
    "amount DECIMAL(10,2)" +
    ") WITH (" +
    "'connector' = 'print'," +
    "'print-identifier' = 'debug'," +
    "'standard-error' = 'false'" +
    ")");

// Blackhole sink for performance testing
tEnv.executeSql("CREATE TABLE perf_sink (" +
    "id BIGINT," +
    "name STRING," +
    "processed_time TIMESTAMP(3)" +
    ") WITH (" +
    "'connector' = 'blackhole'" +
    ")");

// Use the connectors
tEnv.executeSql("INSERT INTO debug_sink SELECT id, name, amount FROM test_source");

Connector Configuration Options

Configuration utilities for connector options and validation.

/**
 * Configuration options for DataGen connector
 */
public class DataGenConnectorOptions {
    public static final ConfigOption<Long> ROWS_PER_SECOND = ConfigOptions
        .key("rows-per-second")
        .longType()
        .defaultValue(10000L)
        .withDescription("Rows per second to generate");
        
    public static final ConfigOption<Long> NUMBER_OF_ROWS = ConfigOptions
        .key("number-of-rows")
        .longType()
        .noDefaultValue()
        .withDescription("Total number of rows to generate");
        
    public static final ConfigOption<Map<String, String>> FIELDS = ConfigOptions
        .key("fields")
        .mapType()
        .noDefaultValue()
        .withDescription("Field-specific generation options");
}

/**
 * Configuration options for Print connector
 */
public class PrintConnectorOptions {
    public static final ConfigOption<String> PRINT_IDENTIFIER = ConfigOptions
        .key("print-identifier")
        .stringType()
        .noDefaultValue()
        .withDescription("Identifier for print output");
        
    public static final ConfigOption<Boolean> STANDARD_ERROR = ConfigOptions
        .key("standard-error")
        .booleanType()
        .defaultValue(false)
        .withDescription("Print to standard error instead of standard out");
}

Connector Abilities

Interface for extending connector capabilities with additional features.

/**
 * Ability for sources to support reading metadata
 */
public interface SupportsReadingMetadata {
    /**
     * Get available metadata keys that can be read
     * @return Map of metadata key to data type
     */
    public Map<String, DataType> listReadableMetadata();
    
    /**
     * Apply metadata reading configuration
     * @param metadataKeys List of metadata keys to read
     * @param producedDataType Data type that includes metadata
     */
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);
}

/**
 * Ability for sinks to support writing metadata
 */
public interface SupportsWritingMetadata {
    /**
     * Get available metadata keys that can be written
     * @return Map of metadata key to data type
     */
    public Map<String, DataType> listWritableMetadata();
    
    /**
     * Apply metadata writing configuration
     * @param metadataKeys List of metadata keys to write
     * @param consumedDataType Data type that includes metadata
     */
    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType);
}

/**
 * Ability for sources to support projection pushdown
 */
public interface SupportsProjectionPushDown {
    /**
     * Apply projection pushdown optimization
     * @param projectedFields Indices of fields to project
     * @param producedDataType Data type after projection
     */
    public void applyProjection(int[][] projectedFields, DataType producedDataType);
}

/**
 * Ability for sources to support filter pushdown
 */
public interface SupportsFilterPushDown {
    /**
     * Apply filter pushdown optimization
     * @param filters List of filters to push down
     * @return Result indicating which filters were accepted
     */
    public Result applyFilters(List<ResolvedExpression> filters);
    
    /**
     * Result of filter pushdown application
     */
    public static final class Result {
        public static Result of(List<ResolvedExpression> acceptedFilters, 
                              List<ResolvedExpression> remainingFilters) {
            return new Result(acceptedFilters, remainingFilters);
        }
        
        public List<ResolvedExpression> getAcceptedFilters() { return acceptedFilters; }
        public List<ResolvedExpression> getRemainingFilters() { return remainingFilters; }
    }
}

/**
 * Ability for sinks to support overwrite mode
 */
public interface SupportsOverwrite {
    /**
     * Apply overwrite mode configuration
     * @param overwrite Whether to overwrite existing data
     */
    public void applyOverwrite(boolean overwrite);
}

/**
 * Ability for sinks to support partitioning
 */
public interface SupportsPartitioning {
    /**
     * Apply partitioning configuration
     * @param partitionKeys List of partition key names
     */
    public void applyStaticPartition(Map<String, String> partition);
}

Custom Connector Development

Template and utilities for developing custom connectors.

// Example custom source implementation
public class CustomTableSource implements ScanTableSource, SupportsReadingMetadata {
    private final ResolvedSchema schema;
    private final Map<String, String> options;
    private List<String> metadataKeys;
    
    public CustomTableSource(ResolvedSchema schema, Map<String, String> options) {
        this.schema = schema;
        this.options = options;
        this.metadataKeys = new ArrayList<>();
    }
    
    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return SourceProvider.of(new CustomSourceFunction(schema, options, metadataKeys));
    }
    
    @Override
    public Map<String, DataType> listReadableMetadata() {
        Map<String, DataType> metadata = new HashMap<>();
        metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));
        metadata.put("source-id", DataTypes.STRING());
        return metadata;
    }
    
    @Override
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        this.metadataKeys = metadataKeys;
    }
    
    @Override
    public DynamicTableSource copy() {
        CustomTableSource copy = new CustomTableSource(schema, options);
        copy.metadataKeys = new ArrayList<>(this.metadataKeys);
        return copy;
    }
    
    @Override
    public String asSummaryString() {
        return "CustomSource";
    }
}

// Example custom sink implementation
public class CustomTableSink implements DynamicTableSink, SupportsWritingMetadata {
    private final ResolvedSchema schema;
    private final Map<String, String> options;
    private List<String> metadataKeys;
    
    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.insertOnly();
    }
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context) {
        return SinkProvider.of(new CustomSinkFunction(schema, options, metadataKeys));
    }
    
    @Override
    public Map<String, DataType> listWritableMetadata() {
        Map<String, DataType> metadata = new HashMap<>();
        metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));
        metadata.put("partition", DataTypes.STRING());
        return metadata;
    }
    
    @Override
    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
        this.metadataKeys = metadataKeys;
    }
    
    @Override
    public DynamicTableSink copy() {
        CustomTableSink copy = new CustomTableSink(schema, options);
        copy.metadataKeys = new ArrayList<>(this.metadataKeys);
        return copy;
    }
    
    @Override
    public String asSummaryString() {
        return "CustomSink";
    }
}

// Custom factory implementation
public class CustomConnectorFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    @Override
    public String factoryIdentifier() {
        return "custom";
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Set.of(CustomOptions.HOST, CustomOptions.PORT);
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return Set.of(CustomOptions.USERNAME, CustomOptions.PASSWORD);
    }
    
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        
        ReadableConfig options = helper.getOptions();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        
        return new CustomTableSource(schema, options.toMap());
    }
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        
        ReadableConfig options = helper.getOptions();
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        
        return new CustomTableSink(schema, options.toMap());
    }
}

Connector Utilities

Helper utilities for connector development and configuration.

/**
 * Utility for connector factory helpers
 */
public class FactoryUtil {
    /**
     * Create a table factory helper for validation and option extraction
     * @param factory The connector factory
     * @param context Creation context
     * @return Helper for option validation and extraction
     */
    public static TableFactoryHelper createTableFactoryHelper(Factory factory, 
                                                             DynamicTableFactory.Context context);
    
    public static final class TableFactoryHelper {
        /**
         * Validate required and optional options
         */
        public void validate();
        
        /**
         * Get validated configuration options
         * @return ReadableConfig with validated options
         */
        public ReadableConfig getOptions();
    }
}

/**
 * Utility for changelog mode operations
 */
public class ChangelogMode {
    /**
     * Create insert-only changelog mode
     * @return ChangelogMode for append-only sources/sinks
     */
    public static ChangelogMode insertOnly();
    
    /**
     * Create upsert changelog mode
     * @return ChangelogMode supporting inserts, updates, and deletes
     */
    public static ChangelogMode upsert();
    
    /**
     * Create all-changes changelog mode
     * @return ChangelogMode supporting all change types
     */
    public static ChangelogMode all();
}

Testing Connectors

Utilities and patterns for testing custom connectors.

// Test utilities for connector development
public class ConnectorTestUtils {
    /**
     * Create test table environment for connector testing
     * @return TableEnvironment configured for testing
     */
    public static TableEnvironment createTestTableEnvironment() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
            .inStreamingMode()
            .build();
        return TableEnvironment.create(settings);
    }
    
    /**
     * Execute connector test with sample data
     * @param sourceConnector Source connector configuration
     * @param sinkConnector Sink connector configuration
     * @param testData Sample data for testing
     */
    public static void executeConnectorTest(String sourceConnector, 
                                          String sinkConnector,
                                          List<Row> testData) {
        // Implementation for automated connector testing
    }
}

// Example connector test
@Test
public void testCustomConnector() {
    TableEnvironment tEnv = ConnectorTestUtils.createTestTableEnvironment();
    
    // Register custom connector factory
    tEnv.executeSql("CREATE TABLE source_table (" +
        "id BIGINT," +
        "name STRING," +
        "amount DECIMAL(10,2)" +
        ") WITH (" +
        "'connector' = 'custom'," +
        "'host' = 'localhost'," +
        "'port' = '9092'" +
        ")");
    
    tEnv.executeSql("CREATE TABLE sink_table (" +
        "id BIGINT," +
        "name STRING," +
        "amount DECIMAL(10,2)" +
        ") WITH (" +
        "'connector' = 'print'" +
        ")");
    
    // Test data pipeline
    TableResult result = tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table");
    
    // Verify results
    assertThat(result.getJobClient()).isPresent();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber

docs

connectors.md

data-types.md

datastream-bridge.md

expressions.md

functions.md

index.md

sql-gateway.md

table-operations.md

tile.json