or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md
tile.json

table-api-integration.mddocs/

Table API Integration

Table API and SQL integration for declarative stream processing with Kafka 0.9 sources and sinks through factory-based configuration and schema-aware data processing.

Capabilities

Kafka09TableSourceSinkFactory

Factory class for creating Kafka 0.9 table sources and sinks in the Table API ecosystem.

/**
 * Factory for creating configured instances of Kafka 0.9 table sources and sinks.
 * Extends the base Kafka table factory with version-specific implementations.
 */
public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
    
    /**
     * Returns the Kafka version identifier for this factory.
     * 
     * @return "0.9" as the version string
     */
    @Override
    protected String kafkaVersion();
    
    /**
     * Indicates whether this version supports Kafka timestamps.
     * 
     * @return false, as Kafka 0.9 does not support message timestamps
     */
    @Override
    protected boolean supportsKafkaTimestamps();
    
    /**
     * Creates a Kafka 0.9 table source with the provided configuration.
     * 
     * @param schema Schema of the produced table
     * @param proctimeAttribute Field name of the processing time attribute
     * @param rowtimeAttributeDescriptors Descriptor for rowtime attributes
     * @param fieldMapping Mapping for table schema fields to physical returned type fields
     * @param topic Kafka topic to consume
     * @param properties Properties for the Kafka consumer
     * @param deserializationSchema Deserialization schema for decoding records from Kafka
     * @param startupMode Startup mode for the contained consumer
     * @param specificStartupOffsets Specific startup offsets (when using SPECIFIC_OFFSETS mode)
     * @return Configured Kafka09TableSource instance
     */
    @Override
    protected KafkaTableSourceBase createKafkaTableSource(
            TableSchema schema,
            Optional<String> proctimeAttribute,
            List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
            Map<String, String> fieldMapping,
            String topic,
            Properties properties,
            DeserializationSchema<Row> deserializationSchema,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets);
    
    /**
     * Creates a Kafka 0.9 table sink with the provided configuration.
     * 
     * @param schema Schema of the table to be written
     * @param topic Target Kafka topic
     * @param properties Properties for the Kafka producer
     * @param partitioner Optional custom partitioner for message distribution
     * @param serializationSchema Serialization schema for encoding records to Kafka
     * @return Configured Kafka09TableSink instance
     */
    @Override
    protected KafkaTableSinkBase createKafkaTableSink(
            TableSchema schema,
            String topic,
            Properties properties,
            Optional<FlinkKafkaPartitioner<Row>> partitioner,
            SerializationSchema<Row> serializationSchema);
}

Kafka09TableSource

Table source implementation for consuming Kafka 0.9 data in Table API queries.

/**
 * Kafka table source for Kafka 0.9 - internal implementation.
 * Provides streaming table source capabilities for Table API and SQL.
 */
@Internal
public class Kafka09TableSource extends KafkaTableSourceBase {
    
    /**
     * Creates a Kafka 0.9 table source with full configuration options.
     * 
     * @param schema Schema of the produced table
     * @param proctimeAttribute Field name of the processing time attribute
     * @param rowtimeAttributeDescriptors Descriptor for rowtime attributes
     * @param fieldMapping Optional mapping for table schema fields to physical type fields
     * @param topic Kafka topic to consume
     * @param properties Properties for the Kafka consumer
     * @param deserializationSchema Deserialization schema for decoding records
     * @param startupMode Startup mode for the consumer
     * @param specificStartupOffsets Specific startup offsets for SPECIFIC_OFFSETS mode
     */
    public Kafka09TableSource(
            TableSchema schema,
            Optional<String> proctimeAttribute,
            List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
            Optional<Map<String, String>> fieldMapping,
            String topic,
            Properties properties,
            DeserializationSchema<Row> deserializationSchema,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets);
    
    /**
     * Creates a simple Kafka 0.9 table source with basic configuration.
     * 
     * @param schema Schema of the produced table
     * @param topic Kafka topic to consume
     * @param properties Properties for the Kafka consumer
     * @param deserializationSchema Deserialization schema for decoding records
     */
    public Kafka09TableSource(
            TableSchema schema,
            String topic,
            Properties properties,
            DeserializationSchema<Row> deserializationSchema);
}

Kafka09TableSink

Table sink implementation for writing to Kafka 0.9 topics from Table API queries.

/**
 * Kafka table sink for Kafka 0.9 - internal implementation.
 * Provides streaming table sink capabilities for Table API and SQL.
 */
@Internal
public class Kafka09TableSink extends KafkaTableSinkBase {
    
    /**
     * Creates a Kafka 0.9 table sink with the provided configuration.
     * 
     * @param schema Schema of the table to be written
     * @param topic Target Kafka topic
     * @param properties Properties for the Kafka producer
     * @param partitioner Optional custom partitioner for message distribution
     * @param serializationSchema Serialization schema for encoding records
     */
    public Kafka09TableSink(
            TableSchema schema,
            String topic,
            Properties properties,
            Optional<FlinkKafkaPartitioner<Row>> partitioner,
            SerializationSchema<Row> serializationSchema);
}

Usage Examples

Table API with Kafka Source

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;

// Create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Create Kafka source table using DDL
tableEnv.executeSql(
    "CREATE TABLE kafka_source (" +
    "  user_id STRING," +
    "  event_time TIMESTAMP(3)," +
    "  event_type STRING," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka-0.9'," +
    "  'topic' = 'user-events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'properties.group.id' = 'table-consumer-group'," +
    "  'format' = 'json'" +
    ")"
);

// Query the Kafka source
Table result = tableEnv.sqlQuery(
    "SELECT user_id, COUNT(*) as event_count " +
    "FROM kafka_source " +
    "WHERE event_type = 'login' " +
    "GROUP BY user_id"
);

Table API with Kafka Sink

// Create Kafka sink table
tableEnv.executeSql(
    "CREATE TABLE kafka_sink (" +
    "  user_id STRING," +
    "  event_count BIGINT" +
    ") WITH (" +
    "  'connector' = 'kafka-0.9'," +
    "  'topic' = 'processed-events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Insert query results into Kafka
result.executeInsert("kafka_sink");

Programmatic Table Source Creation

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.streaming.connectors.kafka.Kafka09TableSource;
import java.util.Properties;

// Define table schema
TableSchema schema = TableSchema.builder()
    .field("user_id", DataTypes.STRING())
    .field("timestamp", DataTypes.TIMESTAMP(3))
    .field("action", DataTypes.STRING())
    .field("value", DataTypes.DOUBLE())
    .build();

// Configure Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "programmatic-consumer");

// Create JSON deserializer
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(
    schema.toRowType()
).build();

// Create Kafka table source
Kafka09TableSource kafkaSource = new Kafka09TableSource(
    schema,
    "user-actions",
    properties,
    deserializer
);

// Register as table
tableEnv.registerTableSource("user_actions", kafkaSource);

// Use in SQL
Table queryResult = tableEnv.sqlQuery(
    "SELECT user_id, SUM(value) as total_value " +
    "FROM user_actions " +
    "WHERE action = 'purchase' " +
    "GROUP BY user_id"
);

Complex Table Processing Pipeline

// Multiple Kafka sources and sinks
tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id STRING," +
    "  customer_id STRING," +
    "  product_id STRING," +
    "  quantity INT," +
    "  price DECIMAL(10,2)," +
    "  order_time TIMESTAMP(3)," +
    "  WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE" +
    ") WITH (" +
    "  'connector' = 'kafka-0.9'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'properties.group.id' = 'order-processor'," +
    "  'format' = 'json'" +
    ")"
);

tableEnv.executeSql(
    "CREATE TABLE order_summary (" +
    "  window_start TIMESTAMP(3)," +
    "  window_end TIMESTAMP(3)," +
    "  total_orders BIGINT," +
    "  total_revenue DECIMAL(12,2)," +
    "  avg_order_value DECIMAL(10,2)" +
    ") WITH (" +
    "  'connector' = 'kafka-0.9'," +
    "  'topic' = 'order-summary'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Windowed aggregation query
tableEnv.executeSql(
    "INSERT INTO order_summary " +
    "SELECT " +
    "  TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start," +
    "  TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end," +
    "  COUNT(*) as total_orders," +
    "  SUM(quantity * price) as total_revenue," +
    "  AVG(quantity * price) as avg_order_value " +
    "FROM orders " +
    "GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR)"
);

Custom Format Integration

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;

// Custom CSV-like format deserializer
DeserializationSchema<Row> csvDeserializer = new DeserializationSchema<Row>() {
    @Override
    public Row deserialize(byte[] message) throws IOException {
        String line = new String(message);
        String[] fields = line.split(",");
        
        Row row = new Row(3);
        row.setField(0, fields[0]); // id
        row.setField(1, fields[1]); // name
        row.setField(2, Double.parseDouble(fields[2])); // value
        return row;
    }
    
    @Override
    public boolean isEndOfStream(Row nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<Row> getProducedType() {
        return Types.ROW(Types.STRING, Types.STRING, Types.DOUBLE);
    }
};

TableSchema csvSchema = TableSchema.builder()
    .field("id", DataTypes.STRING())
    .field("name", DataTypes.STRING())
    .field("value", DataTypes.DOUBLE())
    .build();

Kafka09TableSource csvSource = new Kafka09TableSource(
    csvSchema,
    "csv-data",
    properties,
    csvDeserializer
);

Configuration Options

Connector Properties

-- Basic Kafka 0.9 connector configuration
'connector' = 'kafka-0.9'
'topic' = 'my-topic'                                    -- Required: Kafka topic name
'properties.bootstrap.servers' = 'localhost:9092'      -- Required: Kafka broker addresses
'properties.group.id' = 'my-consumer-group'           -- Required for sources: Consumer group ID

-- Consumer-specific properties (sources)
'properties.auto.offset.reset' = 'earliest'            -- latest, earliest, none
'properties.enable.auto.commit' = 'false'              -- Flink manages commits
'properties.fetch.min.bytes' = '1024'                  -- Minimum fetch size
'properties.max.partition.fetch.bytes' = '1048576'     -- Maximum per-partition fetch

-- Producer-specific properties (sinks)
'properties.acks' = '1'                                 -- 0, 1, all
'properties.retries' = '3'                              -- Retry count
'properties.batch.size' = '16384'                       -- Batch size in bytes
'properties.linger.ms' = '5'                            -- Batch linger time
'properties.compression.type' = 'snappy'                -- none, gzip, snappy, lz4

-- Format configuration
'format' = 'json'                                       -- json, csv, avro, etc.
'json.fail-on-missing-field' = 'false'                 -- Handle missing JSON fields
'json.ignore-parse-errors' = 'true'                    -- Skip malformed records

Startup Modes

-- Start from earliest available offset
'scan.startup.mode' = 'earliest-offset'

-- Start from latest available offset
'scan.startup.mode' = 'latest-offset'

-- Start from consumer group's committed offset
'scan.startup.mode' = 'group-offsets'

-- Start from specific offsets (Kafka 0.10+ feature, limited in 0.9)
'scan.startup.mode' = 'specific-offsets'
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'

Limitations and Considerations

Kafka 0.9 Specific Limitations

  • No timestamp support: Cannot use message timestamps for watermark generation
  • Limited offset management: Fewer startup mode options compared to newer versions
  • No exactly-once semantics: Producer doesn't support transactions or idempotence
  • Basic consumer features: Missing some advanced consumer configuration options

Table API Integration Notes

  • Sources and sinks are created through the factory pattern
  • Schema must be explicitly defined in DDL or programmatically
  • Time attributes require careful configuration for event time processing
  • Custom formats require implementing appropriate serialization/deserialization schemas

Performance Considerations

  • Use appropriate batch sizes and linger times for producers
  • Configure consumer fetch sizes based on message volume
  • Consider partition count vs parallelism for optimal throughput
  • Monitor consumer lag and producer throughput metrics