Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.
—
Extensible connector architecture with built-in connectors for testing and development, plus framework for custom connector development in Apache Flink's Table API.
Base interfaces for implementing custom table sources and sinks.
/**
* Base interface for dynamic table sources
*/
public interface DynamicTableSource extends TableSource {
/**
* Create a copy of this source with additional abilities
* @param abilities Required abilities for the source
* @return New source instance with the specified abilities
*/
public DynamicTableSource copy();
/**
* Get a string summary of this source
* @return Human-readable summary
*/
public String asSummaryString();
}
/**
* Interface for scan-based table sources (batch and streaming)
*/
public interface ScanTableSource extends DynamicTableSource {
/**
* Get the change log mode supported by this source
* @return Change log mode (INSERT_ONLY, UPSERT, etc.)
*/
public ChangelogMode getChangelogMode();
/**
* Create the actual source provider for runtime
* @param context Context with parallelism and other runtime info
* @return Source provider for the Flink runtime
*/
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);
}
/**
* Interface for lookup table sources (for joins)
*/
public interface LookupTableSource extends DynamicTableSource {
/**
* Create the lookup runtime provider
* @param context Context with lookup configuration
* @return Runtime provider for lookup operations
*/
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
}
/**
* Base interface for dynamic table sinks
*/
public interface DynamicTableSink extends TableSink {
/**
* Get the change log mode consumed by this sink
* @param requestedMode Requested change log mode from query
* @return Change log mode accepted by this sink
*/
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
/**
* Create the sink runtime provider
* @param context Context with parallelism and other runtime info
* @return Sink provider for the Flink runtime
*/
public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context);
/**
* Create a copy of this sink with additional abilities
* @param abilities Required abilities for the sink
* @return New sink instance with the specified abilities
*/
public DynamicTableSink copy();
/**
* Get a string summary of this sink
* @return Human-readable summary
*/
public String asSummaryString();
}Interface for creating connectors dynamically based on configuration.
/**
* Factory interface for creating dynamic table sources
*/
public interface DynamicTableSourceFactory extends Factory {
/**
* Create a table source based on the context
* @param context Creation context with options and schema
* @return Created table source instance
*/
public DynamicTableSource createDynamicTableSource(Context context);
}
/**
* Factory interface for creating dynamic table sinks
*/
public interface DynamicTableSinkFactory extends Factory {
/**
* Create a table sink based on the context
* @param context Creation context with options and schema
* @return Created table sink instance
*/
public DynamicTableSink createDynamicTableSink(Context context);
}
/**
* Base factory interface with common metadata
*/
public interface Factory {
/**
* Get unique identifier for this factory
* @return Factory identifier (e.g., "kafka", "filesystem")
*/
public String factoryIdentifier();
/**
* Get required configuration options
* @return Set of required configuration keys
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options
* @return Set of optional configuration keys
*/
public Set<ConfigOption<?>> optionalOptions();
}Ready-to-use connectors included in the uber JAR for testing and development.
/**
* Factory for data generation connector (for testing)
*/
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
@Override
public String factoryIdentifier() {
return "datagen";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(
DataGenConnectorOptions.ROWS_PER_SECOND,
DataGenConnectorOptions.NUMBER_OF_ROWS,
DataGenConnectorOptions.FIELDS
);
}
}
/**
* Factory for print sink connector (for debugging)
*/
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
@Override
public String factoryIdentifier() {
return "print";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(
PrintConnectorOptions.PRINT_IDENTIFIER,
PrintConnectorOptions.STANDARD_ERROR
);
}
}
/**
* Factory for blackhole sink connector (for performance testing)
*/
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
@Override
public String factoryIdentifier() {
return "blackhole";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}Built-in Connector Usage:
// Data generation source for testing
tEnv.executeSql("CREATE TABLE test_source (" +
"id BIGINT," +
"name STRING," +
"amount DECIMAL(10,2)," +
"event_time TIMESTAMP(3)" +
") WITH (" +
"'connector' = 'datagen'," +
"'rows-per-second' = '100'," +
"'number-of-rows' = '10000'," +
"'fields.id.kind' = 'sequence'," +
"'fields.id.start' = '1'," +
"'fields.id.end' = '10000'," +
"'fields.name.length' = '10'," +
"'fields.amount.min' = '1.00'," +
"'fields.amount.max' = '1000.00'" +
")");
// Print sink for debugging output
tEnv.executeSql("CREATE TABLE debug_sink (" +
"id BIGINT," +
"name STRING," +
"amount DECIMAL(10,2)" +
") WITH (" +
"'connector' = 'print'," +
"'print-identifier' = 'debug'," +
"'standard-error' = 'false'" +
")");
// Blackhole sink for performance testing
tEnv.executeSql("CREATE TABLE perf_sink (" +
"id BIGINT," +
"name STRING," +
"processed_time TIMESTAMP(3)" +
") WITH (" +
"'connector' = 'blackhole'" +
")");
// Use the connectors
tEnv.executeSql("INSERT INTO debug_sink SELECT id, name, amount FROM test_source");Configuration utilities for connector options and validation.
/**
* Configuration options for DataGen connector
*/
public class DataGenConnectorOptions {
public static final ConfigOption<Long> ROWS_PER_SECOND = ConfigOptions
.key("rows-per-second")
.longType()
.defaultValue(10000L)
.withDescription("Rows per second to generate");
public static final ConfigOption<Long> NUMBER_OF_ROWS = ConfigOptions
.key("number-of-rows")
.longType()
.noDefaultValue()
.withDescription("Total number of rows to generate");
public static final ConfigOption<Map<String, String>> FIELDS = ConfigOptions
.key("fields")
.mapType()
.noDefaultValue()
.withDescription("Field-specific generation options");
}
/**
* Configuration options for Print connector
*/
public class PrintConnectorOptions {
public static final ConfigOption<String> PRINT_IDENTIFIER = ConfigOptions
.key("print-identifier")
.stringType()
.noDefaultValue()
.withDescription("Identifier for print output");
public static final ConfigOption<Boolean> STANDARD_ERROR = ConfigOptions
.key("standard-error")
.booleanType()
.defaultValue(false)
.withDescription("Print to standard error instead of standard out");
}Interface for extending connector capabilities with additional features.
/**
* Ability for sources to support reading metadata
*/
public interface SupportsReadingMetadata {
/**
* Get available metadata keys that can be read
* @return Map of metadata key to data type
*/
public Map<String, DataType> listReadableMetadata();
/**
* Apply metadata reading configuration
* @param metadataKeys List of metadata keys to read
* @param producedDataType Data type that includes metadata
*/
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);
}
/**
* Ability for sinks to support writing metadata
*/
public interface SupportsWritingMetadata {
/**
* Get available metadata keys that can be written
* @return Map of metadata key to data type
*/
public Map<String, DataType> listWritableMetadata();
/**
* Apply metadata writing configuration
* @param metadataKeys List of metadata keys to write
* @param consumedDataType Data type that includes metadata
*/
public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType);
}
/**
* Ability for sources to support projection pushdown
*/
public interface SupportsProjectionPushDown {
/**
* Apply projection pushdown optimization
* @param projectedFields Indices of fields to project
* @param producedDataType Data type after projection
*/
public void applyProjection(int[][] projectedFields, DataType producedDataType);
}
/**
* Ability for sources to support filter pushdown
*/
public interface SupportsFilterPushDown {
/**
* Apply filter pushdown optimization
* @param filters List of filters to push down
* @return Result indicating which filters were accepted
*/
public Result applyFilters(List<ResolvedExpression> filters);
/**
* Result of filter pushdown application
*/
public static final class Result {
public static Result of(List<ResolvedExpression> acceptedFilters,
List<ResolvedExpression> remainingFilters) {
return new Result(acceptedFilters, remainingFilters);
}
public List<ResolvedExpression> getAcceptedFilters() { return acceptedFilters; }
public List<ResolvedExpression> getRemainingFilters() { return remainingFilters; }
}
}
/**
* Ability for sinks to support overwrite mode
*/
public interface SupportsOverwrite {
/**
* Apply overwrite mode configuration
* @param overwrite Whether to overwrite existing data
*/
public void applyOverwrite(boolean overwrite);
}
/**
* Ability for sinks to support partitioning
*/
public interface SupportsPartitioning {
/**
* Apply partitioning configuration
* @param partitionKeys List of partition key names
*/
public void applyStaticPartition(Map<String, String> partition);
}Template and utilities for developing custom connectors.
// Example custom source implementation
public class CustomTableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema schema;
private final Map<String, String> options;
private List<String> metadataKeys;
public CustomTableSource(ResolvedSchema schema, Map<String, String> options) {
this.schema = schema;
this.options = options;
this.metadataKeys = new ArrayList<>();
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return SourceProvider.of(new CustomSourceFunction(schema, options, metadataKeys));
}
@Override
public Map<String, DataType> listReadableMetadata() {
Map<String, DataType> metadata = new HashMap<>();
metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));
metadata.put("source-id", DataTypes.STRING());
return metadata;
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
}
@Override
public DynamicTableSource copy() {
CustomTableSource copy = new CustomTableSource(schema, options);
copy.metadataKeys = new ArrayList<>(this.metadataKeys);
return copy;
}
@Override
public String asSummaryString() {
return "CustomSource";
}
}
// Example custom sink implementation
public class CustomTableSink implements DynamicTableSink, SupportsWritingMetadata {
private final ResolvedSchema schema;
private final Map<String, String> options;
private List<String> metadataKeys;
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.insertOnly();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context) {
return SinkProvider.of(new CustomSinkFunction(schema, options, metadataKeys));
}
@Override
public Map<String, DataType> listWritableMetadata() {
Map<String, DataType> metadata = new HashMap<>();
metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));
metadata.put("partition", DataTypes.STRING());
return metadata;
}
@Override
public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
this.metadataKeys = metadataKeys;
}
@Override
public DynamicTableSink copy() {
CustomTableSink copy = new CustomTableSink(schema, options);
copy.metadataKeys = new ArrayList<>(this.metadataKeys);
return copy;
}
@Override
public String asSummaryString() {
return "CustomSink";
}
}
// Custom factory implementation
public class CustomConnectorFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@Override
public String factoryIdentifier() {
return "custom";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Set.of(CustomOptions.HOST, CustomOptions.PORT);
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(CustomOptions.USERNAME, CustomOptions.PASSWORD);
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
return new CustomTableSource(schema, options.toMap());
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
return new CustomTableSink(schema, options.toMap());
}
}Helper utilities for connector development and configuration.
/**
* Utility for connector factory helpers
*/
public class FactoryUtil {
/**
* Create a table factory helper for validation and option extraction
* @param factory The connector factory
* @param context Creation context
* @return Helper for option validation and extraction
*/
public static TableFactoryHelper createTableFactoryHelper(Factory factory,
DynamicTableFactory.Context context);
public static final class TableFactoryHelper {
/**
* Validate required and optional options
*/
public void validate();
/**
* Get validated configuration options
* @return ReadableConfig with validated options
*/
public ReadableConfig getOptions();
}
}
/**
* Utility for changelog mode operations
*/
public class ChangelogMode {
/**
* Create insert-only changelog mode
* @return ChangelogMode for append-only sources/sinks
*/
public static ChangelogMode insertOnly();
/**
* Create upsert changelog mode
* @return ChangelogMode supporting inserts, updates, and deletes
*/
public static ChangelogMode upsert();
/**
* Create all-changes changelog mode
* @return ChangelogMode supporting all change types
*/
public static ChangelogMode all();
}Utilities and patterns for testing custom connectors.
// Test utilities for connector development
public class ConnectorTestUtils {
/**
* Create test table environment for connector testing
* @return TableEnvironment configured for testing
*/
public static TableEnvironment createTestTableEnvironment() {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
return TableEnvironment.create(settings);
}
/**
* Execute connector test with sample data
* @param sourceConnector Source connector configuration
* @param sinkConnector Sink connector configuration
* @param testData Sample data for testing
*/
public static void executeConnectorTest(String sourceConnector,
String sinkConnector,
List<Row> testData) {
// Implementation for automated connector testing
}
}
// Example connector test
@Test
public void testCustomConnector() {
TableEnvironment tEnv = ConnectorTestUtils.createTestTableEnvironment();
// Register custom connector factory
tEnv.executeSql("CREATE TABLE source_table (" +
"id BIGINT," +
"name STRING," +
"amount DECIMAL(10,2)" +
") WITH (" +
"'connector' = 'custom'," +
"'host' = 'localhost'," +
"'port' = '9092'" +
")");
tEnv.executeSql("CREATE TABLE sink_table (" +
"id BIGINT," +
"name STRING," +
"amount DECIMAL(10,2)" +
") WITH (" +
"'connector' = 'print'" +
")");
// Test data pipeline
TableResult result = tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table");
// Verify results
assertThat(result.getJobClient()).isPresent();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-uber