or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md
tile.json

table-api.mddocs/

Table API Integration

Comprehensive API reference for Table API integration classes in the Apache Flink Kafka Connector 0.8. These classes enable declarative SQL and Table API access to Kafka topics.

Important Note: All Table API classes are marked as @Internal, indicating they are not part of the stable public API and may change between versions.

Kafka08TableSource

Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Internal
Extends: KafkaTableSourceBase
Description: Kafka StreamTableSource for Kafka 0.8, providing Table API access to Kafka topics as streaming tables.

Class Declaration

@Internal
public class Kafka08TableSource extends KafkaTableSourceBase

Constructors

Full Constructor

/**
 * Creates a Kafka08TableSource with full configuration options.
 *
 * @param schema The table schema defining column names and types
 * @param proctimeAttribute Optional processing time attribute name
 * @param rowtimeAttributeDescriptors List of rowtime attribute descriptors for event time
 * @param fieldMapping Optional mapping from table fields to Kafka message fields
 * @param topic The Kafka topic name to read from
 * @param properties Kafka consumer properties
 * @param deserializationSchema Schema to deserialize Kafka messages to Row objects
 * @param startupMode How the consumer should start reading (EARLIEST, LATEST, GROUP_OFFSETS, SPECIFIC_OFFSETS)
 * @param specificStartupOffsets Map of partition to offset for SPECIFIC_OFFSETS startup mode
 */
public Kafka08TableSource(
    TableSchema schema,
    Optional<String> proctimeAttribute,
    List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
    Optional<Map<String, String>> fieldMapping,
    String topic,
    Properties properties,
    DeserializationSchema<Row> deserializationSchema,
    StartupMode startupMode,
    Map<KafkaTopicPartition, Long> specificStartupOffsets)

Basic Constructor

/**
 * Creates a basic Kafka08TableSource with minimal configuration.
 *
 * @param schema The table schema defining column names and types
 * @param topic The Kafka topic name to read from
 * @param properties Kafka consumer properties
 * @param deserializationSchema Schema to deserialize Kafka messages to Row objects
 */
public Kafka08TableSource(
    TableSchema schema,
    String topic,
    Properties properties,
    DeserializationSchema<Row> deserializationSchema)

Protected Methods

/**
 * Creates the underlying FlinkKafkaConsumer for this table source.
 *
 * @param topic The Kafka topic name
 * @param properties Kafka consumer properties
 * @param deserializationSchema The deserialization schema for Row objects
 * @return FlinkKafkaConsumerBase instance configured for Kafka 0.8
 */
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(
    String topic, 
    Properties properties, 
    DeserializationSchema<Row> deserializationSchema)

Usage Examples

Basic Table Source Setup

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;

// Define table schema
TableSchema schema = TableSchema.builder()
    .field("user_id", Types.LONG())
    .field("user_name", Types.STRING())  
    .field("user_email", Types.STRING())
    .field("registration_time", Types.SQL_TIMESTAMP())
    .build();

// Kafka properties
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "table-api-group");

// Custom deserialization schema for JSON messages
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(schema)
    .build();

// Create table source
Kafka08TableSource tableSource = new Kafka08TableSource(
    schema,
    "users-topic", 
    properties,
    deserializer
);

Advanced Table Source with Time Attributes

import org.apache.flink.table.descriptors.RowtimeAttributeDescriptor;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;

// Schema with processing time and event time
TableSchema schema = TableSchema.builder()
    .field("transaction_id", Types.STRING())
    .field("amount", Types.DECIMAL())
    .field("event_time", Types.SQL_TIMESTAMP())
    .field("proc_time", Types.SQL_TIMESTAMP())
    .build();

// Define rowtime attribute for event time processing
List<RowtimeAttributeDescriptor> rowtimeDescriptors = Arrays.asList(
    new RowtimeAttributeDescriptor(
        "event_time",                    // rowtime attribute name
        new JsonRowtimeTimestampExtractor(), // timestamp extractor
        new BoundedOutOfOrdernessWatermarkStrategy(5000) // watermark strategy
    )
);

// Field mapping from Kafka message to table columns
Map<String, String> fieldMapping = new HashMap<>();
fieldMapping.put("transaction_id", "txn_id");
fieldMapping.put("amount", "txn_amount");
fieldMapping.put("event_time", "timestamp");

// Specific startup offsets
Map<KafkaTopicPartition, Long> startupOffsets = new HashMap<>();
startupOffsets.put(new KafkaTopicPartition("transactions", 0), 1000L);
startupOffsets.put(new KafkaTopicPartition("transactions", 1), 2000L);

// Create advanced table source
Kafka08TableSource tableSource = new Kafka08TableSource(
    schema,
    Optional.of("proc_time"),           // processing time attribute
    rowtimeDescriptors,                 // rowtime attributes
    Optional.of(fieldMapping),          // field mapping
    "transactions",                     // topic
    properties,                         // Kafka properties
    deserializer,                       // deserialization schema
    StartupMode.SPECIFIC_OFFSETS,       // startup mode
    startupOffsets                      // specific offsets
);

Kafka08TableSink

Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Internal
Extends: KafkaTableSinkBase
Description: Kafka 0.8 table sink for writing Table API results to Kafka topics.

Class Declaration

@Internal
public class Kafka08TableSink extends KafkaTableSinkBase

Constructor

/**
 * Creates a Kafka08TableSink for writing table data to Kafka.
 *
 * @param schema The table schema defining the structure of rows to be written
 * @param topic The target Kafka topic name
 * @param properties Kafka producer properties
 * @param partitioner Optional custom partitioner for determining target partition
 * @param serializationSchema Schema to serialize Row objects to byte arrays
 */
public Kafka08TableSink(
    TableSchema schema,
    String topic,
    Properties properties,
    Optional<FlinkKafkaPartitioner<Row>> partitioner,
    SerializationSchema<Row> serializationSchema)

Protected Methods

/**
 * Creates the underlying FlinkKafkaProducer for this table sink.
 *
 * @param topic The Kafka topic name
 * @param properties Kafka producer properties
 * @param serializationSchema The serialization schema for Row objects
 * @param partitioner Optional custom partitioner
 * @return FlinkKafkaProducerBase instance configured for Kafka 0.8
 */
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
    String topic, 
    Properties properties, 
    SerializationSchema<Row> serializationSchema, 
    Optional<FlinkKafkaPartitioner<Row>> partitioner)

Usage Examples

Basic Table Sink Setup

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;

// Define output table schema
TableSchema outputSchema = TableSchema.builder()
    .field("result_id", Types.STRING())
    .field("computed_value", Types.DOUBLE())
    .field("processing_time", Types.SQL_TIMESTAMP())
    .build();

// Kafka producer properties
Properties producerProps = new Properties();
producerProps.setProperty("metadata.broker.list", "localhost:9092");

// JSON serialization schema
SerializationSchema<Row> serializer = new JsonRowSerializationSchema.Builder(outputSchema)
    .build();

// Create table sink
Kafka08TableSink tableSink = new Kafka08TableSink(
    outputSchema,
    "results-topic",
    producerProps,
    Optional.empty(),  // No custom partitioner
    serializer
);

Table Sink with Custom Partitioner

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

// Custom partitioner based on result type
FlinkKafkaPartitioner<Row> partitioner = new FlinkKafkaPartitioner<Row>() {
    @Override
    public int partition(Row record, byte[] key, byte[] value, 
                        String targetTopic, int[] partitions) {
        // Partition based on the first field (result_id)
        String resultId = (String) record.getField(0);
        return Math.abs(resultId.hashCode() % partitions.length);
    }
};

Kafka08TableSink tableSink = new Kafka08TableSink(
    outputSchema,
    "partitioned-results",
    producerProps,
    Optional.of(partitioner),  // Custom partitioner
    serializer
);

Kafka08TableSourceSinkFactory

Package: org.apache.flink.streaming.connectors.kafka
Extends: KafkaTableSourceSinkFactoryBase
Description: Factory for creating configured instances of Kafka08TableSource and Kafka08TableSink from descriptors.

Class Declaration

public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase

Protected Methods

/**
 * Returns the Kafka version string for this factory.
 *
 * @return "0.8" indicating Kafka version 0.8 support
 */
protected String kafkaVersion()

/**
 * Indicates whether this Kafka version supports timestamps.
 * Kafka 0.8 does not support message timestamps.
 *
 * @return false, as Kafka 0.8 doesn't support timestamps
 */
protected boolean supportsKafkaTimestamps()

/**
 * Creates a KafkaTableSource instance with the provided configuration.
 *
 * @param schema The table schema
 * @param topic The Kafka topic name
 * @param properties Kafka consumer properties
 * @param deserializationSchema Row deserialization schema
 * @param startupMode Consumer startup mode
 * @param specificStartupOffsets Specific startup offsets (if applicable)
 * @param proctimeAttribute Processing time attribute name (optional)
 * @param rowtimeAttributeDescriptors Rowtime attribute descriptors
 * @param fieldMapping Field mapping configuration (optional)
 * @return Configured Kafka08TableSource instance
 */
protected KafkaTableSourceBase createKafkaTableSource(
    TableSchema schema,
    String topic,
    Properties properties,
    DeserializationSchema<Row> deserializationSchema,
    StartupMode startupMode,
    Map<KafkaTopicPartition, Long> specificStartupOffsets,
    Optional<String> proctimeAttribute,
    List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
    Optional<Map<String, String>> fieldMapping)

/**
 * Creates a KafkaTableSink instance with the provided configuration.
 *
 * @param schema The table schema
 * @param topic The Kafka topic name
 * @param properties Kafka producer properties
 * @param partitioner Optional partitioner
 * @param serializationSchema Row serialization schema
 * @return Configured Kafka08TableSink instance
 */
protected KafkaTableSinkBase createKafkaTableSink(
    TableSchema schema,
    String topic, 
    Properties properties,
    Optional<FlinkKafkaPartitioner<Row>> partitioner,
    SerializationSchema<Row> serializationSchema)

Factory Usage Examples

Programmatic Factory Usage

import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;

Kafka08TableSourceSinkFactory factory = new Kafka08TableSourceSinkFactory();

// Create table source via factory
KafkaTableSourceBase source = factory.createKafkaTableSource(
    schema,                    // Table schema
    "input-topic",            // Topic name
    consumerProperties,       // Kafka properties
    deserializationSchema,    // Deserialization schema
    StartupMode.EARLIEST,     // Startup mode
    Collections.emptyMap(),   // No specific offsets
    Optional.of("proc_time"), // Processing time attribute
    Collections.emptyList(), // No rowtime attributes
    Optional.empty()         // No field mapping
);

// Create table sink via factory  
KafkaTableSinkBase sink = factory.createKafkaTableSink(
    outputSchema,          // Output table schema
    "output-topic",        // Topic name
    producerProperties,    // Kafka properties
    Optional.empty(),      // No custom partitioner
    serializationSchema   // Serialization schema
);

Table API Integration Examples

Complete Table API Workflow

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;

// Setup Table API environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .inStreamingMode()
    .build();
TableEnvironment tEnv = TableEnvironment.create(settings);

// Register Kafka source table
TableSchema sourceSchema = TableSchema.builder()
    .field("user_id", Types.LONG())
    .field("product_id", Types.STRING())
    .field("quantity", Types.INT())
    .field("price", Types.DECIMAL())
    .field("order_time", Types.SQL_TIMESTAMP())
    .field("proc_time", Types.SQL_TIMESTAMP())
    .build();

Properties sourceProps = new Properties();
sourceProps.setProperty("zookeeper.connect", "localhost:2181");
sourceProps.setProperty("group.id", "analytics-group");

Kafka08TableSource source = new Kafka08TableSource(
    sourceSchema,
    "orders",
    sourceProps,
    new JsonRowDeserializationSchema.Builder(sourceSchema).build()
);

tEnv.registerTableSource("Orders", source);

// Register Kafka sink table
TableSchema sinkSchema = TableSchema.builder()
    .field("product_id", Types.STRING())
    .field("total_quantity", Types.LONG())
    .field("total_revenue", Types.DECIMAL())
    .field("window_start", Types.SQL_TIMESTAMP())
    .field("window_end", Types.SQL_TIMESTAMP())
    .build();

Properties sinkProps = new Properties();
sinkProps.setProperty("metadata.broker.list", "localhost:9092");

Kafka08TableSink sink = new Kafka08TableSink(
    sinkSchema,
    "product-analytics",
    sinkProps,
    Optional.empty(),
    new JsonRowSerializationSchema.Builder(sinkSchema).build()
);

tEnv.registerTableSink("ProductAnalytics", sink);

// Execute SQL query
String sql = """
    INSERT INTO ProductAnalytics
    SELECT 
        product_id,
        SUM(quantity) as total_quantity,
        SUM(quantity * price) as total_revenue,
        TUMBLE_START(proc_time, INTERVAL '1' HOUR) as window_start,
        TUMBLE_END(proc_time, INTERVAL '1' HOUR) as window_end
    FROM Orders
    GROUP BY 
        product_id,
        TUMBLE(proc_time, INTERVAL '1' HOUR)
    """;

tEnv.executeSql(sql);

Dynamic Table Registration

import org.apache.flink.table.descriptors.*;

// Register source using descriptors (alternative approach)
tEnv.connect(
    new Kafka()
        .version("0.8")
        .topic("user-events")
        .property("zookeeper.connect", "localhost:2181")
        .property("group.id", "event-processors")
)
.withFormat(
    new Json()
        .failOnMissingField(false)
        .deriveSchema()
)
.withSchema(
    new Schema()
        .field("event_id", Types.STRING())
        .field("user_id", Types.LONG())  
        .field("event_type", Types.STRING())
        .field("event_time", Types.SQL_TIMESTAMP())
        .field("proc_time", Types.SQL_TIMESTAMP()).proctime()
)
.createTemporaryTable("UserEvents");

// Register sink using descriptors
tEnv.connect(
    new Kafka()
        .version("0.8")
        .topic("processed-events")
        .property("metadata.broker.list", "localhost:9092")
)
.withFormat(
    new Json()
)
.withSchema(
    new Schema()
        .field("user_id", Types.LONG())
        .field("event_count", Types.LONG())
        .field("processing_time", Types.SQL_TIMESTAMP())
)
.createTemporaryTable("ProcessedEvents");

Serialization Schemas for Table API

JSON Row Serialization

import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;

// JSON deserialization for table source
JsonRowDeserializationSchema.Builder deserBuilder = 
    new JsonRowDeserializationSchema.Builder(sourceSchema);

DeserializationSchema<Row> deserializer = deserBuilder
    .ignoreParseErrors()    // Skip malformed JSON
    .build();

// JSON serialization for table sink  
JsonRowSerializationSchema.Builder serBuilder = 
    new JsonRowSerializationSchema.Builder(sinkSchema);

SerializationSchema<Row> serializer = serBuilder.build();

Custom Row Serialization

// Custom deserialization schema
public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
    private final TableSchema schema;
    private final String delimiter;
    
    public CsvRowDeserializationSchema(TableSchema schema, String delimiter) {
        this.schema = schema;
        this.delimiter = delimiter;
    }
    
    @Override
    public Row deserialize(byte[] message) throws IOException {
        String line = new String(message);
        String[] fields = line.split(delimiter);
        
        Row row = new Row(schema.getFieldCount());
        for (int i = 0; i < fields.length && i < schema.getFieldCount(); i++) {
            row.setField(i, convertField(fields[i], schema.getFieldTypes()[i]));
        }
        return row;
    }
    
    @Override
    public boolean isEndOfStream(Row nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<Row> getProducedType() {
        return schema.toRowType();
    }
    
    private Object convertField(String field, TypeInformation<?> type) {
        // Type conversion logic based on schema
        if (type == Types.STRING()) return field;
        if (type == Types.LONG()) return Long.parseLong(field);
        if (type == Types.INT()) return Integer.parseInt(field);
        // ... other type conversions
        return field;
    }
}

Configuration and Limitations

Kafka 0.8 Table API Limitations

  1. No Timestamp Support: Kafka 0.8 doesn't support message timestamps, affecting rowtime attributes
  2. Internal API: All classes are marked @Internal and may change between versions
  3. Limited Watermark Strategies: Restricted by Kafka 0.8's metadata capabilities
  4. ZooKeeper Dependency: Requires ZooKeeper configuration for consumer operations

Recommended Configuration

// Source configuration for reliability
Properties sourceProps = new Properties();
sourceProps.setProperty("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
sourceProps.setProperty("group.id", "table-api-consumer");
sourceProps.setProperty("auto.offset.reset", "earliest");
sourceProps.setProperty("auto.commit.enable", "false"); // Managed by Flink

// Sink configuration for reliability  
Properties sinkProps = new Properties();
sinkProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");
sinkProps.setProperty("request.required.acks", "1");
sinkProps.setProperty("message.send.max.retries", "3");

// Enable checkpointing for exactly-once processing (source side only)
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

Migration Considerations

For production Table API usage, consider upgrading to Kafka 0.9+ connectors that provide:

  1. Stable Public API: Non-internal classes with stability guarantees
  2. Message Timestamp Support: Better rowtime attribute support
  3. Improved Reliability: Exactly-once semantics for both source and sink
  4. Enhanced Performance: Better resource utilization and throughput