CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

builtin-connectors.mddocs/

Built-in Connectors

Ready-to-use connectors for development, testing, and debugging table applications. These connectors are included in the Java Bridge module and provide essential functionality for testing data pipelines and debugging table operations.

Capabilities

BlackHole Connector

High-performance sink connector that discards all input records. Designed for performance testing and scenarios where output is not needed.

-- Create BlackHole sink table
CREATE TABLE sink_table (
    user_id STRING,
    order_count BIGINT,
    total_amount DECIMAL(10, 2)
) WITH (
    'connector' = 'blackhole'
);

-- Insert data (will be discarded)
INSERT INTO sink_table 
SELECT user_id, COUNT(*), SUM(amount) 
FROM orders 
GROUP BY user_id;

Java Factory Usage:

/**
 * BlackHole table sink factory for discarding all input records
 * Identifier: "blackhole"
 */
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
    public static final String IDENTIFIER = "blackhole";
    
    public String factoryIdentifier();
    public Set<ConfigOption<?>> requiredOptions(); // Empty set
    public Set<ConfigOption<?>> optionalOptions(); // Empty set
    public DynamicTableSink createDynamicTableSink(Context context);
}

Usage Examples:

// BlackHole sink supports all changelog modes and partitioning
// Automatically filters out UPDATE_BEFORE events for efficiency

// Performance testing setup
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create source with high data volume
tableEnv.executeSql(
    "CREATE TABLE source_table (" +
    "  id BIGINT," +
    "  data STRING," +
    "  ts TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'datagen'," +
    "  'rows-per-second' = '1000000'" +
    ")"
);

// Create BlackHole sink for performance testing
tableEnv.executeSql(
    "CREATE TABLE perf_sink (" +
    "  id BIGINT," +
    "  processed_data STRING" +
    ") WITH (" +
    "  'connector' = 'blackhole'" +
    ")"
);

// Test query performance (output discarded)
tableEnv.executeSql(
    "INSERT INTO perf_sink " +
    "SELECT id, UPPER(data) FROM source_table"
);

DataGen Connector

Flexible data generation connector for creating test data with configurable patterns and data types.

/**
 * Configuration options for DataGen connector
 */
public class DataGenConnectorOptions {
    
    /** Control emission rate */
    public static final ConfigOption<Long> ROWS_PER_SECOND; // Default: 10000
    
    /** Total rows to emit (unbounded if not set) */
    public static final ConfigOption<Long> NUMBER_OF_ROWS;
    
    /** Source parallelism */
    public static final ConfigOption<Integer> SOURCE_PARALLELISM;
    
    // Field-specific configuration options
    public static final ConfigOption<String> FIELD_KIND; // 'random' or 'sequence'
    public static final ConfigOption<String> FIELD_MIN;  // Minimum value for random
    public static final ConfigOption<String> FIELD_MAX;  // Maximum value for random
    public static final ConfigOption<Duration> FIELD_MAX_PAST; // Max past for timestamps
    public static final ConfigOption<Integer> FIELD_LENGTH; // Collection size/string length
    public static final ConfigOption<String> FIELD_START;   // Sequence start value
    public static final ConfigOption<String> FIELD_END;     // Sequence end value
    public static final ConfigOption<Float> FIELD_NULL_RATE; // Proportion of nulls
    public static final ConfigOption<Boolean> FIELD_VAR_LEN; // Variable length data
}

SQL Usage Examples:

-- Basic DataGen source
CREATE TABLE users_source (
    user_id BIGINT,
    username STRING,
    age INT,
    created_at TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',
    'number-of-rows' = '10000'
);

-- Advanced DataGen with field-specific configuration
CREATE TABLE orders_source (
    order_id BIGINT,
    user_id STRING,
    product_name STRING,
    quantity INT,
    price DECIMAL(10, 2),
    order_time TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50',
    
    -- Sequence generator for order_id
    'fields.order_id.kind' = 'sequence',
    'fields.order_id.start' = '1',
    'fields.order_id.end' = '1000000',
    
    -- Random string for user_id
    'fields.user_id.kind' = 'random',
    'fields.user_id.length' = '8',
    
    -- Random product names with variable length
    'fields.product_name.kind' = 'random',
    'fields.product_name.length' = '20',
    'fields.product_name.var-len' = 'true',
    
    -- Random quantity with bounds
    'fields.quantity.kind' = 'random',
    'fields.quantity.min' = '1',
    'fields.quantity.max' = '10',
    
    -- Random price with bounds
    'fields.price.kind' = 'random',
    'fields.price.min' = '10.00',
    'fields.price.max' = '999.99',
    
    -- Random timestamps within past 30 days
    'fields.order_time.kind' = 'random',
    'fields.order_time.max-past' = '30d'
);

-- DataGen with null values
CREATE TABLE sparse_data (
    id BIGINT,
    optional_field STRING,
    another_field INT
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '10',
    
    'fields.optional_field.null-rate' = '0.3', -- 30% null values
    'fields.another_field.null-rate' = '0.1'   -- 10% null values
);

Java Factory Usage:

/**
 * DataGen table source factory for generating test data
 */
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
    
    public String factoryIdentifier(); // Returns "datagen"
    public Set<ConfigOption<?>> requiredOptions(); // Empty - all options optional
    public Set<ConfigOption<?>> optionalOptions(); // All DataGenConnectorOptions
    public DynamicTableSource createDynamicTableSource(Context context);
}

/**
 * DataGen table source implementation
 */
public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {
    // Supports limit push-down for bounded data generation
}

Print Connector

Debug connector that outputs table data to console with configurable formatting.

/**
 * Configuration options for Print connector
 */
public class PrintConnectorOptions {
    // Standard print connector options (inherited from base framework)
    // Supports standard print formatting and output configuration
}

SQL Usage Examples:

-- Basic print sink
CREATE TABLE debug_output (
    user_id STRING,
    username STRING,
    score INT
) WITH (
    'connector' = 'print'
);

-- Print with custom prefix
CREATE TABLE debug_detailed (
    order_id BIGINT,
    status STRING,
    timestamp TIMESTAMP(3)
) WITH (
    'connector' = 'print',
    'print-identifier' = 'ORDER_DEBUG'
);

-- Insert data to see output in console
INSERT INTO debug_output
SELECT user_id, username, score 
FROM user_scores 
WHERE score > 100;

Java Factory Usage:

/**
 * Print table sink factory for console output debugging
 */
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
    
    public String factoryIdentifier(); // Returns "print"
    public Set<ConfigOption<?>> requiredOptions(); // Empty
    public Set<ConfigOption<?>> optionalOptions(); // Print-specific options
    public DynamicTableSink createDynamicTableSink(Context context);
}

Usage Examples:

// Debug complex transformations
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create source
tableEnv.executeSql(
    "CREATE TABLE transactions (" +
    "  txn_id STRING," +
    "  amount DECIMAL(10, 2)," +
    "  txn_time TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'datagen'," +
    "  'fields.amount.min' = '1.00'," +
    "  'fields.amount.max' = '1000.00'" +
    ")"
);

// Create debug sink
tableEnv.executeSql(
    "CREATE TABLE debug_aggregates (" +
    "  window_start TIMESTAMP(3)," +
    "  window_end TIMESTAMP(3)," +
    "  total_amount DECIMAL(12, 2)," +
    "  txn_count BIGINT" +
    ") WITH (" +
    "  'connector' = 'print'," +
    "  'print-identifier' = 'WINDOW_AGGREGATES'" +
    ")"
);

// Debug windowed aggregation
tableEnv.executeSql(
    "INSERT INTO debug_aggregates " +
    "SELECT " +
    "  window_start, " +
    "  window_end, " +
    "  SUM(amount) as total_amount, " +
    "  COUNT(*) as txn_count " +
    "FROM TABLE(" +
    "  TUMBLE(TABLE transactions, DESCRIPTOR(txn_time), INTERVAL '1' MINUTE)" +
    ") " +
    "GROUP BY window_start, window_end"
);

Connector Combinations

Testing Pipeline Pattern

Combine connectors for comprehensive testing workflows.

-- Complete testing pipeline
-- 1. Generate test data
CREATE TABLE test_orders (
    order_id BIGINT,
    customer_id STRING,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',
    'fields.order_id.kind' = 'sequence',
    'fields.order_id.start' = '1',
    'fields.customer_id.kind' = 'random',
    'fields.customer_id.length' = '6',
    'fields.amount.kind' = 'random',
    'fields.amount.min' = '10.00',
    'fields.amount.max' = '500.00'
);

-- 2. Debug intermediate results
CREATE TABLE debug_customer_stats (
    customer_id STRING,
    order_count BIGINT,
    total_amount DECIMAL(12, 2),
    avg_amount DECIMAL(10, 2)
) WITH (
    'connector' = 'print',
    'print-identifier' = 'CUSTOMER_STATS'
);

-- 3. Performance test final sink
CREATE TABLE perf_sink (
    customer_id STRING,
    order_count BIGINT,
    total_amount DECIMAL(12, 2)
) WITH (
    'connector' = 'blackhole'
);

-- Execute testing workflow
INSERT INTO debug_customer_stats
SELECT 
    customer_id,
    COUNT(*) as order_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount
FROM test_orders
GROUP BY customer_id;

INSERT INTO perf_sink
SELECT customer_id, order_count, total_amount
FROM debug_customer_stats;

Data Generation Patterns

Realistic Test Data

Configure DataGen for realistic business scenarios.

-- E-commerce user behavior simulation
CREATE TABLE user_events (
    user_id STRING,
    event_type STRING,
    product_id STRING,
    session_id STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1000',
    
    -- Realistic user IDs
    'fields.user_id.kind' = 'random',
    'fields.user_id.length' = '10',
    
    -- Event types with realistic distribution (would need custom generator)
    'fields.event_type.kind' = 'random',
    'fields.event_type.length' = '15',
    
    -- Product catalog simulation
    'fields.product_id.kind' = 'sequence',
    'fields.product_id.start' = '100',
    'fields.product_id.end' = '999',
    
    -- Session tracking
    'fields.session_id.kind' = 'random',
    'fields.session_id.length' = '32'
);

Types

Connector Configuration Types

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.connector.print.table.PrintConnectorOptions;
import java.time.Duration;

Factory Implementation Types

import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;

Built-in Connector Identifiers

// Connector identifiers for SQL DDL
public static final String BLACKHOLE_IDENTIFIER = "blackhole";
public static final String DATAGEN_IDENTIFIER = "datagen";
public static final String PRINT_IDENTIFIER = "print";

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json