or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md
tile.json

table-api.mddocs/

Table API Integration

Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference, connector descriptors, and streaming table operations. This enables declarative SQL queries over Kafka streams.

Capabilities

KafkaTableSource

Abstract base class for Kafka table sources providing streaming table functionality with support for projection pushdown, filter pushdown, and watermark extraction.

public abstract class KafkaTableSource implements StreamTableSource<Row>, 
    DefinedProctimeAttribute, DefinedRowtimeAttributes {
    
    // Abstract methods implemented by concrete versions
    protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
        String topic,
        Properties properties,
        DeserializationSchema<Row> deserializationSchema
    );
}

Key Interfaces:

  • StreamTableSource<Row> - Provides streaming data source for table API
  • DefinedProctimeAttribute - Supports processing time attribute definition
  • DefinedRowtimeAttributes - Supports event time attribute definition

KafkaTableSink

Abstract base class for Kafka table sinks providing streaming table output functionality with partitioning support.

public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
    
    // Abstract methods implemented by concrete versions
    protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
        String topic,
        Properties properties,
        SerializationSchema<Row> serializationSchema,
        FlinkKafkaPartitioner<Row> partitioner
    );
}

Key Interface:

  • AppendStreamTableSink<Row> - Supports append-only table sink operations

JSON Table Sources

KafkaJsonTableSource

Table source for JSON-formatted Kafka messages with field mapping support and schema inference.

public abstract class KafkaJsonTableSource extends KafkaTableSource implements DefinedFieldMapping {
    
    public KafkaJsonTableSource(
        TableSchema schema,
        Optional<String> proctimeAttribute, 
        List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
        Optional<Map<String, String>> fieldMapping,
        String topic,
        Properties properties,
        DeserializationSchema<Row> deserializationSchema,
        boolean failOnMissingField,
        boolean ignoreParseErrors
    );
}

Parameters:

  • schema - Table schema defining column names and types
  • proctimeAttribute - Optional processing time attribute name
  • rowtimeAttributeDescriptors - Event time attribute descriptors
  • fieldMapping - Optional mapping from table fields to JSON fields
  • topic - Kafka topic to consume from
  • properties - Kafka consumer properties
  • deserializationSchema - Row deserialization schema
  • failOnMissingField - Whether to fail on missing JSON fields
  • ignoreParseErrors - Whether to skip records with parse errors

Usage Example:

// Define table schema
TableSchema schema = TableSchema.builder()
    .field("user_id", DataTypes.STRING())
    .field("action", DataTypes.STRING())
    .field("timestamp", DataTypes.TIMESTAMP(3))
    .field("proctime", DataTypes.TIMESTAMP(3))
    .build();

// Create JSON table source
KafkaJsonTableSource source = new MyKafkaJsonTableSource(
    schema,
    Optional.of("proctime"),
    Collections.emptyList(),
    Optional.empty(),
    "user-events",
    kafkaProperties,
    new JsonRowDeserializationSchema(schema.toRowType()),
    false, // Don't fail on missing fields
    true   // Skip parse errors
);

KafkaJsonTableSourceFactory

Factory for creating KafkaJsonTableSource instances from table descriptors using SQL DDL.

public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {
    
    public Map<String, String> requiredContext();
    public List<String> supportedProperties();
    public TableSource<Row> createTableSource(Map<String, String> properties);
}

SQL DDL Example:

CREATE TABLE user_events (
    user_id STRING,
    action STRING,
    event_time TIMESTAMP(3),
    proctime AS PROCTIME(),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'
);

JSON Table Sinks

KafkaJsonTableSink

Table sink for writing JSON-formatted messages to Kafka with optional partitioning.

public abstract class KafkaJsonTableSink extends KafkaTableSink {
    
    public KafkaJsonTableSink(
        TableSchema schema,
        String topic,
        Properties properties,
        Optional<FlinkKafkaPartitioner<Row>> partitioner,
        SerializationSchema<Row> serializationSchema
    );
}

Usage Example:

// Create JSON table sink
KafkaJsonTableSink sink = new MyKafkaJsonTableSink(
    schema,
    "output-events",
    kafkaProperties,
    Optional.of(new FlinkFixedPartitioner<>()),
    new JsonRowSerializationSchema(schema.toRowType())
);

Avro Table Sources

KafkaAvroTableSource

Table source for Avro-formatted Kafka messages with field mapping support and schema registry integration.

public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping {
    
    public KafkaAvroTableSource(
        TableSchema schema,
        Optional<String> proctimeAttribute,
        List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
        Optional<Map<String, String>> fieldMapping,
        String topic,
        Properties properties,
        DeserializationSchema<Row> deserializationSchema
    );
}

Usage Example:

// Properties for Avro with Schema Registry
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("schema.registry.url", "http://localhost:8081");

// Create Avro table source
KafkaAvroTableSource source = new MyKafkaAvroTableSource(
    schema,
    Optional.empty(),
    Collections.emptyList(),
    Optional.empty(),
    "avro-events",
    props,
    new AvroRowDeserializationSchema(avroSchema)
);

Table Descriptors

Kafka Connector Descriptor

Programmatic configuration for Kafka table sources and sinks.

public class Kafka extends ConnectorDescriptor {
    public Kafka();
    public Kafka version(String version);
    public Kafka topic(String topic);
    public Kafka properties(Properties properties);
    public Kafka property(String key, String value);
    public Kafka startFromEarliest();
    public Kafka startFromLatest();
    public Kafka startFromGroupOffsets();
    public Kafka startFromSpecificOffsets(Map<Integer, Long> specificOffsets);
}

Usage Example:

// Create table environment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Define source using descriptor API
tEnv.connect(
    new Kafka()
        .version("universal")
        .topic("input-topic")
        .property("bootstrap.servers", "localhost:9092")
        .property("group.id", "test-group")
        .startFromEarliest()
)
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema()
    .field("user_id", DataTypes.STRING())
    .field("action", DataTypes.STRING())
    .field("timestamp", DataTypes.TIMESTAMP(3))
    .field("proctime", DataTypes.TIMESTAMP(3).proctime())
    .field("rowtime", DataTypes.TIMESTAMP(3).rowtime())
)
.createTemporaryTable("user_events");

KafkaValidator

Validator for Kafka table descriptors ensuring proper configuration.

public class KafkaValidator extends ConnectorDescriptorValidator {
    public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
    public static final String CONNECTOR_VERSION = "connector.version";
    public static final String CONNECTOR_TOPIC = "connector.topic";
    public static final String CONNECTOR_PROPERTIES = "connector.properties";
    public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
    public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
}

Time Attributes and Watermarks

Processing Time

Define processing time attribute for time-based operations:

// In table source constructor
Optional<String> proctimeAttribute = Optional.of("proctime");

// In SQL DDL
proctime AS PROCTIME()

Event Time and Watermarks

Define event time attributes with watermark strategies:

// Rowtime descriptor with watermark strategy
List<RowtimeAttributeDescriptor> rowtimeAttributes = Arrays.asList(
    new RowtimeAttributeDescriptor(
        "rowtime",
        new ExistingField("timestamp"),
        new BoundedOutOfOrderTimestamps(5000) // 5 second out-of-orderness
    )
);

// In SQL DDL
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

Advanced Usage Examples

Complex Query with Windowing

-- Create source table
CREATE TABLE user_events (
    user_id STRING,
    action STRING,
    amount DECIMAL(10,2),
    event_time TIMESTAMP(3),
    proctime AS PROCTIME(),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Create sink table
CREATE TABLE hourly_stats (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    user_count BIGINT,
    total_amount DECIMAL(10,2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'hourly-stats',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Windowed aggregation query
INSERT INTO hourly_stats
SELECT 
    TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
    COUNT(DISTINCT user_id) as user_count,
    SUM(amount) as total_amount
FROM user_events
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);

Join with Lookup Table

-- Kafka stream
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    product_id STRING,
    quantity INT,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Enriched output
CREATE TABLE enriched_orders (
    order_id STRING,
    user_id STRING,
    product_name STRING,
    quantity INT,
    total_price DECIMAL(10,2),
    order_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'enriched-orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- Join with product catalog (assuming JDBC lookup table)
INSERT INTO enriched_orders
SELECT 
    o.order_id,
    o.user_id,
    p.product_name,
    o.quantity,
    o.quantity * p.price as total_price,
    o.order_time
FROM orders o
JOIN product_catalog FOR SYSTEM_TIME AS OF o.order_time AS p
    ON o.product_id = p.product_id;

Configuration Best Practices

Consumer Configuration

Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("group.id", "flink-table-consumer");
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("enable.auto.commit", "false"); // Managed by Flink
consumerProps.setProperty("max.poll.records", "500");

Producer Configuration

Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("transaction.timeout.ms", "900000");
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("acks", "all");

Schema Registry Integration

Properties avroProps = new Properties();
avroProps.setProperty("bootstrap.servers", "localhost:9092");
avroProps.setProperty("schema.registry.url", "http://localhost:8081");
avroProps.setProperty("specific.avro.reader", "true");