Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
—
The Flink Table API Java Bridge includes several built-in connectors designed for development, testing, and specific use cases. These connectors provide ready-to-use implementations for common scenarios.
The DataGen connector generates random data for testing and development purposes. It supports various data types and generation strategies.
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public static final String IDENTIFIER = "datagen";
// Factory methods for creating DataGen sources
}The DataGen connector supports the following configuration options through DataGenConnectorOptions:
public class DataGenConnectorOptions {
// Row generation options
public static final ConfigOption<Long> ROWS_PER_SECOND;
public static final ConfigOption<Long> NUMBER_OF_ROWS;
// Field-specific options
public static final ConfigOption<String> FIELDS_PREFIX;
public static final ConfigOption<String> KIND_OPTION;
public static final ConfigOption<String> START_OPTION;
public static final ConfigOption<String> END_OPTION;
public static final ConfigOption<String> MAX_PAST_OPTION;
public static final ConfigOption<String> LENGTH_OPTION;
}Basic Random Data Generation:
CREATE TABLE random_source (
id BIGINT,
name STRING,
age INT,
score DOUBLE,
birthday TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'number-of-rows' = '1000'
);Field-Specific Configuration:
CREATE TABLE configured_source (
user_id BIGINT,
username STRING,
age INT,
balance DECIMAL(10,2),
registration_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
-- Configure user_id as sequence
'fields.user_id.kind' = 'sequence',
'fields.user_id.start' = '1',
'fields.user_id.end' = '1000',
-- Configure username with specific length
'fields.username.length' = '10',
-- Configure age with range
'fields.age.min' = '18',
'fields.age.max' = '65',
-- Configure balance with range
'fields.balance.min' = '0.00',
'fields.balance.max' = '10000.00'
);Programmatic Creation:
// Create DataGen source programmatically
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.option(DataGenConnectorOptions.NUMBER_OF_ROWS, 5000L)
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("value", DataTypes.DOUBLE())
.build())
.build();
tableEnv.createTable("generated_data", sourceDescriptor);The DataGen connector supports different generation strategies:
The Print connector outputs table data to standard output, useful for debugging and development.
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "print";
// Factory methods for creating Print sinks
}Basic Print Sink:
CREATE TABLE print_sink (
id BIGINT,
name STRING,
value DOUBLE
) WITH (
'connector' = 'print'
);
-- Insert data to see output
INSERT INTO print_sink SELECT id, name, value FROM source_table;Print with Identifier:
CREATE TABLE debug_print (
user_id BIGINT,
event_type STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'print',
'print-identifier' = 'DEBUG'
);Programmatic Creation:
// Create Print sink programmatically
TableDescriptor printDescriptor = TableDescriptor.forConnector("print")
.option("print-identifier", "MyOutput")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("message", DataTypes.STRING())
.column("timestamp_col", DataTypes.TIMESTAMP(3))
.build())
.build();
tableEnv.createTable("debug_output", printDescriptor);
// Use in statement set
StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert("debug_output", sourceTable);
statementSet.attachAsDataStream();The Print connector outputs data in a readable format:
DEBUG> +I[1001, Alice, 2023-12-01T10:30:00]
DEBUG> +I[1002, Bob, 2023-12-01T10:31:00]
DEBUG> -U[1001, Alice, 2023-12-01T10:30:00]
DEBUG> +U[1001, Alice Updated, 2023-12-01T10:30:00]Where:
+I: Insert operation-U: Update before (retract)+U: Update after-D: Delete operationThe BlackHole connector discards all data, useful for performance testing and benchmarking.
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "blackhole";
// Factory methods for creating BlackHole sinks
}Basic BlackHole Sink:
CREATE TABLE blackhole_sink (
id BIGINT,
data STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
-- All data inserted here will be discarded
INSERT INTO blackhole_sink SELECT * FROM high_volume_source;Performance Testing:
// Create BlackHole sink for performance testing
TableDescriptor blackholeDescriptor = TableDescriptor.forConnector("blackhole")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("payload", DataTypes.STRING())
.column("processing_time", DataTypes.TIMESTAMP(3))
.build())
.build();
tableEnv.createTable("perf_test_sink", blackholeDescriptor);
// Test query performance
Table testQuery = tableEnv.sqlQuery("""
SELECT
id,
UPPER(payload) as payload,
CURRENT_TIMESTAMP as processing_time
FROM source_table
WHERE id % 1000 = 0
""");
StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert("perf_test_sink", testQuery);
statementSet.attachAsDataStream();The CSV connector is maintained only for testing the legacy connector stack and should not be used in production.
@Deprecated
public class CsvTableSource extends InputFormatTableSource<Row> {
// Constructor and configuration methods
public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);
public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes,
String fieldDelim, String rowDelim, Character quoteCharacter,
boolean ignoreFirstLine, String ignoreComments, boolean lenient);
}@Deprecated
public class CsvTableSink extends OutputFormatTableSink<Row> {
// Constructor and configuration methods
public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);
}// Legacy CSV source (deprecated - use modern file connectors instead)
CsvTableSource csvSource = CsvTableSource.builder()
.path("/path/to/input.csv")
.field("id", Types.LONG)
.field("name", Types.STRING)
.field("age", Types.INT)
.fieldDelimiter(",")
.ignoreFirstLine()
.build();
tableEnv.registerTableSource("csv_input", csvSource);
// Legacy CSV sink (deprecated)
CsvTableSink csvSink = new CsvTableSink(
"/path/to/output.csv",
",", // field delimiter
1, // num files
WriteMode.OVERWRITE
);
tableEnv.registerTableSink("csv_output", csvSink);// Create test data with DataGen
TableDescriptor testDataDesc = TableDescriptor.forConnector("datagen")
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 1000L)
.option(DataGenConnectorOptions.NUMBER_OF_ROWS, 10000L)
.schema(Schema.newBuilder()
.column("transaction_id", DataTypes.BIGINT())
.column("user_id", DataTypes.STRING())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("transaction_time", DataTypes.TIMESTAMP(3))
.build())
.build();
tableEnv.createTable("test_transactions", testDataDesc);
// Process the data
Table processedData = tableEnv.sqlQuery("""
SELECT
user_id,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
TUMBLE_START(transaction_time, INTERVAL '1' MINUTE) as window_start
FROM test_transactions
GROUP BY
user_id,
TUMBLE(transaction_time, INTERVAL '1' MINUTE)
""");
// Output for debugging
TableDescriptor printDesc = TableDescriptor.forConnector("print")
.option("print-identifier", "PROCESSED")
.schema(processedData.getResolvedSchema())
.build();
tableEnv.createTable("processed_output", printDesc);
// Execute
StreamStatementSet statements = tableEnv.createStatementSet();
statements.addInsert("processed_output", processedData);
statements.attachAsDataStream();// Generate high-volume test data
TableDescriptor highVolumeSource = TableDescriptor.forConnector("datagen")
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 10000L)
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("data", DataTypes.STRING())
.column("timestamp_col", DataTypes.TIMESTAMP(3))
.build())
.build();
tableEnv.createTable("benchmark_source", highVolumeSource);
// Complex processing query
Table benchmarkQuery = tableEnv.sqlQuery("""
SELECT
id,
UPPER(data) as processed_data,
COUNT(*) OVER (
PARTITION BY id % 100
ORDER BY timestamp_col
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
) as windowed_count
FROM benchmark_source
""");
// Discard results for pure processing benchmark
TableDescriptor blackholeDesc = TableDescriptor.forConnector("blackhole")
.schema(benchmarkQuery.getResolvedSchema())
.build();
tableEnv.createTable("benchmark_sink", blackholeDesc);
// Measure processing throughput
StreamStatementSet benchmark = tableEnv.createStatementSet();
benchmark.addInsert("benchmark_sink", benchmarkQuery);
benchmark.attachAsDataStream();// Debug intermediate results in complex pipelines
public void debugPipeline() {
// Step 1: Raw data
Table rawData = tableEnv.sqlQuery("SELECT * FROM source_table LIMIT 100");
createPrintSink("debug_raw", rawData);
// Step 2: After filtering
Table filtered = tableEnv.sqlQuery("""
SELECT * FROM source_table
WHERE status = 'ACTIVE' AND amount > 100
LIMIT 100
""");
createPrintSink("debug_filtered", filtered);
// Step 3: After aggregation
Table aggregated = tableEnv.sqlQuery("""
SELECT
user_id,
COUNT(*) as count,
SUM(amount) as total
FROM source_table
WHERE status = 'ACTIVE'
GROUP BY user_id
LIMIT 50
""");
createPrintSink("debug_aggregated", aggregated);
}
private void createPrintSink(String name, Table table) {
TableDescriptor desc = TableDescriptor.forConnector("print")
.option("print-identifier", name.toUpperCase())
.schema(table.getResolvedSchema())
.build();
tableEnv.createTable(name + "_sink", desc);
StreamStatementSet statements = tableEnv.createStatementSet();
statements.addInsert(name + "_sink", table);
statements.attachAsDataStream();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11