Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
—
Ready-to-use connectors for development, testing, and debugging table applications. These connectors are included in the Java Bridge module and provide essential functionality for testing data pipelines and debugging table operations.
High-performance sink connector that discards all input records. Designed for performance testing and scenarios where output is not needed.
-- Create BlackHole sink table
CREATE TABLE sink_table (
user_id STRING,
order_count BIGINT,
total_amount DECIMAL(10, 2)
) WITH (
'connector' = 'blackhole'
);
-- Insert data (will be discarded)
INSERT INTO sink_table
SELECT user_id, COUNT(*), SUM(amount)
FROM orders
GROUP BY user_id;Java Factory Usage:
/**
* BlackHole table sink factory for discarding all input records
* Identifier: "blackhole"
*/
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "blackhole";
public String factoryIdentifier();
public Set<ConfigOption<?>> requiredOptions(); // Empty set
public Set<ConfigOption<?>> optionalOptions(); // Empty set
public DynamicTableSink createDynamicTableSink(Context context);
}Usage Examples:
// BlackHole sink supports all changelog modes and partitioning
// Automatically filters out UPDATE_BEFORE events for efficiency
// Performance testing setup
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create source with high data volume
tableEnv.executeSql(
"CREATE TABLE source_table (" +
" id BIGINT," +
" data STRING," +
" ts TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'datagen'," +
" 'rows-per-second' = '1000000'" +
")"
);
// Create BlackHole sink for performance testing
tableEnv.executeSql(
"CREATE TABLE perf_sink (" +
" id BIGINT," +
" processed_data STRING" +
") WITH (" +
" 'connector' = 'blackhole'" +
")"
);
// Test query performance (output discarded)
tableEnv.executeSql(
"INSERT INTO perf_sink " +
"SELECT id, UPPER(data) FROM source_table"
);Flexible data generation connector for creating test data with configurable patterns and data types.
/**
* Configuration options for DataGen connector
*/
public class DataGenConnectorOptions {
/** Control emission rate */
public static final ConfigOption<Long> ROWS_PER_SECOND; // Default: 10000
/** Total rows to emit (unbounded if not set) */
public static final ConfigOption<Long> NUMBER_OF_ROWS;
/** Source parallelism */
public static final ConfigOption<Integer> SOURCE_PARALLELISM;
// Field-specific configuration options
public static final ConfigOption<String> FIELD_KIND; // 'random' or 'sequence'
public static final ConfigOption<String> FIELD_MIN; // Minimum value for random
public static final ConfigOption<String> FIELD_MAX; // Maximum value for random
public static final ConfigOption<Duration> FIELD_MAX_PAST; // Max past for timestamps
public static final ConfigOption<Integer> FIELD_LENGTH; // Collection size/string length
public static final ConfigOption<String> FIELD_START; // Sequence start value
public static final ConfigOption<String> FIELD_END; // Sequence end value
public static final ConfigOption<Float> FIELD_NULL_RATE; // Proportion of nulls
public static final ConfigOption<Boolean> FIELD_VAR_LEN; // Variable length data
}SQL Usage Examples:
-- Basic DataGen source
CREATE TABLE users_source (
user_id BIGINT,
username STRING,
age INT,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'number-of-rows' = '10000'
);
-- Advanced DataGen with field-specific configuration
CREATE TABLE orders_source (
order_id BIGINT,
user_id STRING,
product_name STRING,
quantity INT,
price DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
-- Sequence generator for order_id
'fields.order_id.kind' = 'sequence',
'fields.order_id.start' = '1',
'fields.order_id.end' = '1000000',
-- Random string for user_id
'fields.user_id.kind' = 'random',
'fields.user_id.length' = '8',
-- Random product names with variable length
'fields.product_name.kind' = 'random',
'fields.product_name.length' = '20',
'fields.product_name.var-len' = 'true',
-- Random quantity with bounds
'fields.quantity.kind' = 'random',
'fields.quantity.min' = '1',
'fields.quantity.max' = '10',
-- Random price with bounds
'fields.price.kind' = 'random',
'fields.price.min' = '10.00',
'fields.price.max' = '999.99',
-- Random timestamps within past 30 days
'fields.order_time.kind' = 'random',
'fields.order_time.max-past' = '30d'
);
-- DataGen with null values
CREATE TABLE sparse_data (
id BIGINT,
optional_field STRING,
another_field INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.optional_field.null-rate' = '0.3', -- 30% null values
'fields.another_field.null-rate' = '0.1' -- 10% null values
);Java Factory Usage:
/**
* DataGen table source factory for generating test data
*/
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public String factoryIdentifier(); // Returns "datagen"
public Set<ConfigOption<?>> requiredOptions(); // Empty - all options optional
public Set<ConfigOption<?>> optionalOptions(); // All DataGenConnectorOptions
public DynamicTableSource createDynamicTableSource(Context context);
}
/**
* DataGen table source implementation
*/
public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {
// Supports limit push-down for bounded data generation
}Debug connector that outputs table data to console with configurable formatting.
/**
* Configuration options for Print connector
*/
public class PrintConnectorOptions {
// Standard print connector options (inherited from base framework)
// Supports standard print formatting and output configuration
}SQL Usage Examples:
-- Basic print sink
CREATE TABLE debug_output (
user_id STRING,
username STRING,
score INT
) WITH (
'connector' = 'print'
);
-- Print with custom prefix
CREATE TABLE debug_detailed (
order_id BIGINT,
status STRING,
timestamp TIMESTAMP(3)
) WITH (
'connector' = 'print',
'print-identifier' = 'ORDER_DEBUG'
);
-- Insert data to see output in console
INSERT INTO debug_output
SELECT user_id, username, score
FROM user_scores
WHERE score > 100;Java Factory Usage:
/**
* Print table sink factory for console output debugging
*/
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
public String factoryIdentifier(); // Returns "print"
public Set<ConfigOption<?>> requiredOptions(); // Empty
public Set<ConfigOption<?>> optionalOptions(); // Print-specific options
public DynamicTableSink createDynamicTableSink(Context context);
}Usage Examples:
// Debug complex transformations
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create source
tableEnv.executeSql(
"CREATE TABLE transactions (" +
" txn_id STRING," +
" amount DECIMAL(10, 2)," +
" txn_time TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'datagen'," +
" 'fields.amount.min' = '1.00'," +
" 'fields.amount.max' = '1000.00'" +
")"
);
// Create debug sink
tableEnv.executeSql(
"CREATE TABLE debug_aggregates (" +
" window_start TIMESTAMP(3)," +
" window_end TIMESTAMP(3)," +
" total_amount DECIMAL(12, 2)," +
" txn_count BIGINT" +
") WITH (" +
" 'connector' = 'print'," +
" 'print-identifier' = 'WINDOW_AGGREGATES'" +
")"
);
// Debug windowed aggregation
tableEnv.executeSql(
"INSERT INTO debug_aggregates " +
"SELECT " +
" window_start, " +
" window_end, " +
" SUM(amount) as total_amount, " +
" COUNT(*) as txn_count " +
"FROM TABLE(" +
" TUMBLE(TABLE transactions, DESCRIPTOR(txn_time), INTERVAL '1' MINUTE)" +
") " +
"GROUP BY window_start, window_end"
);Combine connectors for comprehensive testing workflows.
-- Complete testing pipeline
-- 1. Generate test data
CREATE TABLE test_orders (
order_id BIGINT,
customer_id STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.order_id.kind' = 'sequence',
'fields.order_id.start' = '1',
'fields.customer_id.kind' = 'random',
'fields.customer_id.length' = '6',
'fields.amount.kind' = 'random',
'fields.amount.min' = '10.00',
'fields.amount.max' = '500.00'
);
-- 2. Debug intermediate results
CREATE TABLE debug_customer_stats (
customer_id STRING,
order_count BIGINT,
total_amount DECIMAL(12, 2),
avg_amount DECIMAL(10, 2)
) WITH (
'connector' = 'print',
'print-identifier' = 'CUSTOMER_STATS'
);
-- 3. Performance test final sink
CREATE TABLE perf_sink (
customer_id STRING,
order_count BIGINT,
total_amount DECIMAL(12, 2)
) WITH (
'connector' = 'blackhole'
);
-- Execute testing workflow
INSERT INTO debug_customer_stats
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM test_orders
GROUP BY customer_id;
INSERT INTO perf_sink
SELECT customer_id, order_count, total_amount
FROM debug_customer_stats;Configure DataGen for realistic business scenarios.
-- E-commerce user behavior simulation
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
product_id STRING,
session_id STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000',
-- Realistic user IDs
'fields.user_id.kind' = 'random',
'fields.user_id.length' = '10',
-- Event types with realistic distribution (would need custom generator)
'fields.event_type.kind' = 'random',
'fields.event_type.length' = '15',
-- Product catalog simulation
'fields.product_id.kind' = 'sequence',
'fields.product_id.start' = '100',
'fields.product_id.end' = '999',
-- Session tracking
'fields.session_id.kind' = 'random',
'fields.session_id.length' = '32'
);import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.connector.print.table.PrintConnectorOptions;
import java.time.Duration;import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;// Connector identifiers for SQL DDL
public static final String BLACKHOLE_IDENTIFIER = "blackhole";
public static final String DATAGEN_IDENTIFIER = "datagen";
public static final String PRINT_IDENTIFIER = "print";Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge