CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.

Pending
Overview
Eval results
Files

built-in-connectors.mddocs/

Built-in Connectors

The Flink Table API Java Bridge includes several built-in connectors designed for development, testing, and specific use cases. These connectors provide ready-to-use implementations for common scenarios.

DataGen Connector

The DataGen connector generates random data for testing and development purposes. It supports various data types and generation strategies.

DataGenTableSourceFactory

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
    public static final String IDENTIFIER = "datagen";
    
    // Factory methods for creating DataGen sources
}

Configuration Options

The DataGen connector supports the following configuration options through DataGenConnectorOptions:

public class DataGenConnectorOptions {
    // Row generation options
    public static final ConfigOption<Long> ROWS_PER_SECOND;
    public static final ConfigOption<Long> NUMBER_OF_ROWS;
    
    // Field-specific options  
    public static final ConfigOption<String> FIELDS_PREFIX;
    public static final ConfigOption<String> KIND_OPTION;
    public static final ConfigOption<String> START_OPTION;
    public static final ConfigOption<String> END_OPTION;
    public static final ConfigOption<String> MAX_PAST_OPTION;
    public static final ConfigOption<String> LENGTH_OPTION;
}

Usage Examples

Basic Random Data Generation:

CREATE TABLE random_source (
    id BIGINT,
    name STRING,
    age INT,
    score DOUBLE,
    birthday TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',
    'number-of-rows' = '1000'
);

Field-Specific Configuration:

CREATE TABLE configured_source (
    user_id BIGINT,
    username STRING,
    age INT,
    balance DECIMAL(10,2),
    registration_time TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50',
    
    -- Configure user_id as sequence  
    'fields.user_id.kind' = 'sequence',
    'fields.user_id.start' = '1',
    'fields.user_id.end' = '1000',
    
    -- Configure username with specific length
    'fields.username.length' = '10',
    
    -- Configure age with range
    'fields.age.min' = '18',
    'fields.age.max' = '65',
    
    -- Configure balance with range
    'fields.balance.min' = '0.00',
    'fields.balance.max' = '10000.00'
);

Programmatic Creation:

// Create DataGen source programmatically
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
    .option(DataGenConnectorOptions.NUMBER_OF_ROWS, 5000L)
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("name", DataTypes.STRING())
        .column("value", DataTypes.DOUBLE())
        .build())
    .build();

tableEnv.createTable("generated_data", sourceDescriptor);

Data Generation Strategies

The DataGen connector supports different generation strategies:

  1. Random Generation: Default behavior for most data types
  2. Sequence Generation: Sequential numeric values
  3. Custom Ranges: Min/max values for numeric types
  4. String Length: Configurable string lengths

Print Connector

The Print connector outputs table data to standard output, useful for debugging and development.

PrintTableSinkFactory

public class PrintTableSinkFactory implements DynamicTableSinkFactory {
    public static final String IDENTIFIER = "print";
    
    // Factory methods for creating Print sinks
}

Usage Examples

Basic Print Sink:

CREATE TABLE print_sink (
    id BIGINT,
    name STRING,
    value DOUBLE
) WITH (
    'connector' = 'print'
);

-- Insert data to see output
INSERT INTO print_sink SELECT id, name, value FROM source_table;

Print with Identifier:

CREATE TABLE debug_print (
    user_id BIGINT,
    event_type STRING,
    timestamp_col TIMESTAMP(3)
) WITH (
    'connector' = 'print',
    'print-identifier' = 'DEBUG'
);

Programmatic Creation:

// Create Print sink programmatically
TableDescriptor printDescriptor = TableDescriptor.forConnector("print")
    .option("print-identifier", "MyOutput")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("message", DataTypes.STRING())
        .column("timestamp_col", DataTypes.TIMESTAMP(3))
        .build())
    .build();

tableEnv.createTable("debug_output", printDescriptor);

// Use in statement set
StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert("debug_output", sourceTable);
statementSet.attachAsDataStream();

Output Format

The Print connector outputs data in a readable format:

DEBUG> +I[1001, Alice, 2023-12-01T10:30:00]
DEBUG> +I[1002, Bob, 2023-12-01T10:31:00]  
DEBUG> -U[1001, Alice, 2023-12-01T10:30:00]
DEBUG> +U[1001, Alice Updated, 2023-12-01T10:30:00]

Where:

  • +I: Insert operation
  • -U: Update before (retract)
  • +U: Update after
  • -D: Delete operation

BlackHole Connector

The BlackHole connector discards all data, useful for performance testing and benchmarking.

BlackHoleTableSinkFactory

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
    public static final String IDENTIFIER = "blackhole";
    
    // Factory methods for creating BlackHole sinks
}

Usage Examples

Basic BlackHole Sink:

CREATE TABLE blackhole_sink (
    id BIGINT,
    data STRING,
    timestamp_col TIMESTAMP(3)
) WITH (
    'connector' = 'blackhole'
);

-- All data inserted here will be discarded
INSERT INTO blackhole_sink SELECT * FROM high_volume_source;

Performance Testing:

// Create BlackHole sink for performance testing
TableDescriptor blackholeDescriptor = TableDescriptor.forConnector("blackhole")
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("payload", DataTypes.STRING())
        .column("processing_time", DataTypes.TIMESTAMP(3))
        .build())
    .build();

tableEnv.createTable("perf_test_sink", blackholeDescriptor);

// Test query performance
Table testQuery = tableEnv.sqlQuery("""
    SELECT 
        id,
        UPPER(payload) as payload,
        CURRENT_TIMESTAMP as processing_time
    FROM source_table 
    WHERE id % 1000 = 0
""");

StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert("perf_test_sink", testQuery);
statementSet.attachAsDataStream();

Legacy CSV Connector (Testing Only)

The CSV connector is maintained only for testing the legacy connector stack and should not be used in production.

CsvTableSource

@Deprecated
public class CsvTableSource extends InputFormatTableSource<Row> {
    // Constructor and configuration methods
    public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);
    public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes, 
                         String fieldDelim, String rowDelim, Character quoteCharacter, 
                         boolean ignoreFirstLine, String ignoreComments, boolean lenient);
}

CsvTableSink

@Deprecated  
public class CsvTableSink extends OutputFormatTableSink<Row> {
    // Constructor and configuration methods
    public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);
}

Legacy Usage (Deprecated)

// Legacy CSV source (deprecated - use modern file connectors instead)
CsvTableSource csvSource = CsvTableSource.builder()
    .path("/path/to/input.csv")
    .field("id", Types.LONG)
    .field("name", Types.STRING)  
    .field("age", Types.INT)
    .fieldDelimiter(",")
    .ignoreFirstLine()
    .build();

tableEnv.registerTableSource("csv_input", csvSource);

// Legacy CSV sink (deprecated)
CsvTableSink csvSink = new CsvTableSink(
    "/path/to/output.csv",
    ",",  // field delimiter
    1,    // num files
    WriteMode.OVERWRITE
);

tableEnv.registerTableSink("csv_output", csvSink);

Common Patterns and Best Practices

Testing Data Pipeline

// Create test data with DataGen
TableDescriptor testDataDesc = TableDescriptor.forConnector("datagen")
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 1000L)
    .option(DataGenConnectorOptions.NUMBER_OF_ROWS, 10000L)
    .schema(Schema.newBuilder()
        .column("transaction_id", DataTypes.BIGINT())
        .column("user_id", DataTypes.STRING())  
        .column("amount", DataTypes.DECIMAL(10, 2))
        .column("transaction_time", DataTypes.TIMESTAMP(3))
        .build())
    .build();

tableEnv.createTable("test_transactions", testDataDesc);

// Process the data
Table processedData = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        COUNT(*) as transaction_count,
        SUM(amount) as total_amount,
        TUMBLE_START(transaction_time, INTERVAL '1' MINUTE) as window_start
    FROM test_transactions
    GROUP BY 
        user_id,
        TUMBLE(transaction_time, INTERVAL '1' MINUTE)
""");

// Output for debugging
TableDescriptor printDesc = TableDescriptor.forConnector("print")
    .option("print-identifier", "PROCESSED")
    .schema(processedData.getResolvedSchema())
    .build();

tableEnv.createTable("processed_output", printDesc);

// Execute
StreamStatementSet statements = tableEnv.createStatementSet();
statements.addInsert("processed_output", processedData);
statements.attachAsDataStream();

Performance Benchmarking

// Generate high-volume test data
TableDescriptor highVolumeSource = TableDescriptor.forConnector("datagen")
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 10000L)
    .schema(Schema.newBuilder()
        .column("id", DataTypes.BIGINT())
        .column("data", DataTypes.STRING())
        .column("timestamp_col", DataTypes.TIMESTAMP(3))
        .build())
    .build();

tableEnv.createTable("benchmark_source", highVolumeSource);

// Complex processing query
Table benchmarkQuery = tableEnv.sqlQuery("""
    SELECT 
        id,
        UPPER(data) as processed_data,
        COUNT(*) OVER (
            PARTITION BY id % 100 
            ORDER BY timestamp_col 
            RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
        ) as windowed_count
    FROM benchmark_source
""");

// Discard results for pure processing benchmark
TableDescriptor blackholeDesc = TableDescriptor.forConnector("blackhole")
    .schema(benchmarkQuery.getResolvedSchema())
    .build();

tableEnv.createTable("benchmark_sink", blackholeDesc);

// Measure processing throughput
StreamStatementSet benchmark = tableEnv.createStatementSet();
benchmark.addInsert("benchmark_sink", benchmarkQuery);
benchmark.attachAsDataStream();

Development Debugging

// Debug intermediate results in complex pipelines
public void debugPipeline() {
    // Step 1: Raw data
    Table rawData = tableEnv.sqlQuery("SELECT * FROM source_table LIMIT 100");
    createPrintSink("debug_raw", rawData);
    
    // Step 2: After filtering
    Table filtered = tableEnv.sqlQuery("""
        SELECT * FROM source_table 
        WHERE status = 'ACTIVE' AND amount > 100
        LIMIT 100
    """);
    createPrintSink("debug_filtered", filtered);
    
    // Step 3: After aggregation
    Table aggregated = tableEnv.sqlQuery("""
        SELECT 
            user_id, 
            COUNT(*) as count, 
            SUM(amount) as total
        FROM source_table 
        WHERE status = 'ACTIVE' 
        GROUP BY user_id
        LIMIT 50
    """);
    createPrintSink("debug_aggregated", aggregated);
}

private void createPrintSink(String name, Table table) {
    TableDescriptor desc = TableDescriptor.forConnector("print")
        .option("print-identifier", name.toUpperCase())
        .schema(table.getResolvedSchema())
        .build();
    
    tableEnv.createTable(name + "_sink", desc);
    
    StreamStatementSet statements = tableEnv.createStatementSet();
    statements.addInsert(name + "_sink", table);
    statements.attachAsDataStream();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

docs

built-in-connectors.md

datastream-conversions.md

index.md

legacy-connector-support.md

modern-connector-framework.md

stream-table-environment.md

watermark-strategies.md

tile.json