Comprehensive API reference for Table API integration classes in the Apache Flink Kafka Connector 0.8. These classes enable declarative SQL and Table API access to Kafka topics.
Important Note: All Table API classes are marked as @Internal, indicating they are not part of the stable public API and may change between versions.
Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Internal
Extends: KafkaTableSourceBase
Description: Kafka StreamTableSource for Kafka 0.8, providing Table API access to Kafka topics as streaming tables.
@Internal
public class Kafka08TableSource extends KafkaTableSourceBase/**
* Creates a Kafka08TableSource with full configuration options.
*
* @param schema The table schema defining column names and types
* @param proctimeAttribute Optional processing time attribute name
* @param rowtimeAttributeDescriptors List of rowtime attribute descriptors for event time
* @param fieldMapping Optional mapping from table fields to Kafka message fields
* @param topic The Kafka topic name to read from
* @param properties Kafka consumer properties
* @param deserializationSchema Schema to deserialize Kafka messages to Row objects
* @param startupMode How the consumer should start reading (EARLIEST, LATEST, GROUP_OFFSETS, SPECIFIC_OFFSETS)
* @param specificStartupOffsets Map of partition to offset for SPECIFIC_OFFSETS startup mode
*/
public Kafka08TableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets)/**
* Creates a basic Kafka08TableSource with minimal configuration.
*
* @param schema The table schema defining column names and types
* @param topic The Kafka topic name to read from
* @param properties Kafka consumer properties
* @param deserializationSchema Schema to deserialize Kafka messages to Row objects
*/
public Kafka08TableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema)/**
* Creates the underlying FlinkKafkaConsumer for this table source.
*
* @param topic The Kafka topic name
* @param properties Kafka consumer properties
* @param deserializationSchema The deserialization schema for Row objects
* @return FlinkKafkaConsumerBase instance configured for Kafka 0.8
*/
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema)import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
// Define table schema
TableSchema schema = TableSchema.builder()
.field("user_id", Types.LONG())
.field("user_name", Types.STRING())
.field("user_email", Types.STRING())
.field("registration_time", Types.SQL_TIMESTAMP())
.build();
// Kafka properties
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "table-api-group");
// Custom deserialization schema for JSON messages
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(schema)
.build();
// Create table source
Kafka08TableSource tableSource = new Kafka08TableSource(
schema,
"users-topic",
properties,
deserializer
);import org.apache.flink.table.descriptors.RowtimeAttributeDescriptor;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
// Schema with processing time and event time
TableSchema schema = TableSchema.builder()
.field("transaction_id", Types.STRING())
.field("amount", Types.DECIMAL())
.field("event_time", Types.SQL_TIMESTAMP())
.field("proc_time", Types.SQL_TIMESTAMP())
.build();
// Define rowtime attribute for event time processing
List<RowtimeAttributeDescriptor> rowtimeDescriptors = Arrays.asList(
new RowtimeAttributeDescriptor(
"event_time", // rowtime attribute name
new JsonRowtimeTimestampExtractor(), // timestamp extractor
new BoundedOutOfOrdernessWatermarkStrategy(5000) // watermark strategy
)
);
// Field mapping from Kafka message to table columns
Map<String, String> fieldMapping = new HashMap<>();
fieldMapping.put("transaction_id", "txn_id");
fieldMapping.put("amount", "txn_amount");
fieldMapping.put("event_time", "timestamp");
// Specific startup offsets
Map<KafkaTopicPartition, Long> startupOffsets = new HashMap<>();
startupOffsets.put(new KafkaTopicPartition("transactions", 0), 1000L);
startupOffsets.put(new KafkaTopicPartition("transactions", 1), 2000L);
// Create advanced table source
Kafka08TableSource tableSource = new Kafka08TableSource(
schema,
Optional.of("proc_time"), // processing time attribute
rowtimeDescriptors, // rowtime attributes
Optional.of(fieldMapping), // field mapping
"transactions", // topic
properties, // Kafka properties
deserializer, // deserialization schema
StartupMode.SPECIFIC_OFFSETS, // startup mode
startupOffsets // specific offsets
);Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Internal
Extends: KafkaTableSinkBase
Description: Kafka 0.8 table sink for writing Table API results to Kafka topics.
@Internal
public class Kafka08TableSink extends KafkaTableSinkBase/**
* Creates a Kafka08TableSink for writing table data to Kafka.
*
* @param schema The table schema defining the structure of rows to be written
* @param topic The target Kafka topic name
* @param properties Kafka producer properties
* @param partitioner Optional custom partitioner for determining target partition
* @param serializationSchema Schema to serialize Row objects to byte arrays
*/
public Kafka08TableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema)/**
* Creates the underlying FlinkKafkaProducer for this table sink.
*
* @param topic The Kafka topic name
* @param properties Kafka producer properties
* @param serializationSchema The serialization schema for Row objects
* @param partitioner Optional custom partitioner
* @return FlinkKafkaProducerBase instance configured for Kafka 0.8
*/
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner)import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
// Define output table schema
TableSchema outputSchema = TableSchema.builder()
.field("result_id", Types.STRING())
.field("computed_value", Types.DOUBLE())
.field("processing_time", Types.SQL_TIMESTAMP())
.build();
// Kafka producer properties
Properties producerProps = new Properties();
producerProps.setProperty("metadata.broker.list", "localhost:9092");
// JSON serialization schema
SerializationSchema<Row> serializer = new JsonRowSerializationSchema.Builder(outputSchema)
.build();
// Create table sink
Kafka08TableSink tableSink = new Kafka08TableSink(
outputSchema,
"results-topic",
producerProps,
Optional.empty(), // No custom partitioner
serializer
);import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
// Custom partitioner based on result type
FlinkKafkaPartitioner<Row> partitioner = new FlinkKafkaPartitioner<Row>() {
@Override
public int partition(Row record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
// Partition based on the first field (result_id)
String resultId = (String) record.getField(0);
return Math.abs(resultId.hashCode() % partitions.length);
}
};
Kafka08TableSink tableSink = new Kafka08TableSink(
outputSchema,
"partitioned-results",
producerProps,
Optional.of(partitioner), // Custom partitioner
serializer
);Package: org.apache.flink.streaming.connectors.kafka
Extends: KafkaTableSourceSinkFactoryBase
Description: Factory for creating configured instances of Kafka08TableSource and Kafka08TableSink from descriptors.
public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase/**
* Returns the Kafka version string for this factory.
*
* @return "0.8" indicating Kafka version 0.8 support
*/
protected String kafkaVersion()
/**
* Indicates whether this Kafka version supports timestamps.
* Kafka 0.8 does not support message timestamps.
*
* @return false, as Kafka 0.8 doesn't support timestamps
*/
protected boolean supportsKafkaTimestamps()
/**
* Creates a KafkaTableSource instance with the provided configuration.
*
* @param schema The table schema
* @param topic The Kafka topic name
* @param properties Kafka consumer properties
* @param deserializationSchema Row deserialization schema
* @param startupMode Consumer startup mode
* @param specificStartupOffsets Specific startup offsets (if applicable)
* @param proctimeAttribute Processing time attribute name (optional)
* @param rowtimeAttributeDescriptors Rowtime attribute descriptors
* @param fieldMapping Field mapping configuration (optional)
* @return Configured Kafka08TableSource instance
*/
protected KafkaTableSourceBase createKafkaTableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping)
/**
* Creates a KafkaTableSink instance with the provided configuration.
*
* @param schema The table schema
* @param topic The Kafka topic name
* @param properties Kafka producer properties
* @param partitioner Optional partitioner
* @param serializationSchema Row serialization schema
* @return Configured Kafka08TableSink instance
*/
protected KafkaTableSinkBase createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema)import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;
Kafka08TableSourceSinkFactory factory = new Kafka08TableSourceSinkFactory();
// Create table source via factory
KafkaTableSourceBase source = factory.createKafkaTableSource(
schema, // Table schema
"input-topic", // Topic name
consumerProperties, // Kafka properties
deserializationSchema, // Deserialization schema
StartupMode.EARLIEST, // Startup mode
Collections.emptyMap(), // No specific offsets
Optional.of("proc_time"), // Processing time attribute
Collections.emptyList(), // No rowtime attributes
Optional.empty() // No field mapping
);
// Create table sink via factory
KafkaTableSinkBase sink = factory.createKafkaTableSink(
outputSchema, // Output table schema
"output-topic", // Topic name
producerProperties, // Kafka properties
Optional.empty(), // No custom partitioner
serializationSchema // Serialization schema
);import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;
// Setup Table API environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Register Kafka source table
TableSchema sourceSchema = TableSchema.builder()
.field("user_id", Types.LONG())
.field("product_id", Types.STRING())
.field("quantity", Types.INT())
.field("price", Types.DECIMAL())
.field("order_time", Types.SQL_TIMESTAMP())
.field("proc_time", Types.SQL_TIMESTAMP())
.build();
Properties sourceProps = new Properties();
sourceProps.setProperty("zookeeper.connect", "localhost:2181");
sourceProps.setProperty("group.id", "analytics-group");
Kafka08TableSource source = new Kafka08TableSource(
sourceSchema,
"orders",
sourceProps,
new JsonRowDeserializationSchema.Builder(sourceSchema).build()
);
tEnv.registerTableSource("Orders", source);
// Register Kafka sink table
TableSchema sinkSchema = TableSchema.builder()
.field("product_id", Types.STRING())
.field("total_quantity", Types.LONG())
.field("total_revenue", Types.DECIMAL())
.field("window_start", Types.SQL_TIMESTAMP())
.field("window_end", Types.SQL_TIMESTAMP())
.build();
Properties sinkProps = new Properties();
sinkProps.setProperty("metadata.broker.list", "localhost:9092");
Kafka08TableSink sink = new Kafka08TableSink(
sinkSchema,
"product-analytics",
sinkProps,
Optional.empty(),
new JsonRowSerializationSchema.Builder(sinkSchema).build()
);
tEnv.registerTableSink("ProductAnalytics", sink);
// Execute SQL query
String sql = """
INSERT INTO ProductAnalytics
SELECT
product_id,
SUM(quantity) as total_quantity,
SUM(quantity * price) as total_revenue,
TUMBLE_START(proc_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(proc_time, INTERVAL '1' HOUR) as window_end
FROM Orders
GROUP BY
product_id,
TUMBLE(proc_time, INTERVAL '1' HOUR)
""";
tEnv.executeSql(sql);import org.apache.flink.table.descriptors.*;
// Register source using descriptors (alternative approach)
tEnv.connect(
new Kafka()
.version("0.8")
.topic("user-events")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", "event-processors")
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("event_id", Types.STRING())
.field("user_id", Types.LONG())
.field("event_type", Types.STRING())
.field("event_time", Types.SQL_TIMESTAMP())
.field("proc_time", Types.SQL_TIMESTAMP()).proctime()
)
.createTemporaryTable("UserEvents");
// Register sink using descriptors
tEnv.connect(
new Kafka()
.version("0.8")
.topic("processed-events")
.property("metadata.broker.list", "localhost:9092")
)
.withFormat(
new Json()
)
.withSchema(
new Schema()
.field("user_id", Types.LONG())
.field("event_count", Types.LONG())
.field("processing_time", Types.SQL_TIMESTAMP())
)
.createTemporaryTable("ProcessedEvents");import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
// JSON deserialization for table source
JsonRowDeserializationSchema.Builder deserBuilder =
new JsonRowDeserializationSchema.Builder(sourceSchema);
DeserializationSchema<Row> deserializer = deserBuilder
.ignoreParseErrors() // Skip malformed JSON
.build();
// JSON serialization for table sink
JsonRowSerializationSchema.Builder serBuilder =
new JsonRowSerializationSchema.Builder(sinkSchema);
SerializationSchema<Row> serializer = serBuilder.build();// Custom deserialization schema
public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
private final TableSchema schema;
private final String delimiter;
public CsvRowDeserializationSchema(TableSchema schema, String delimiter) {
this.schema = schema;
this.delimiter = delimiter;
}
@Override
public Row deserialize(byte[] message) throws IOException {
String line = new String(message);
String[] fields = line.split(delimiter);
Row row = new Row(schema.getFieldCount());
for (int i = 0; i < fields.length && i < schema.getFieldCount(); i++) {
row.setField(i, convertField(fields[i], schema.getFieldTypes()[i]));
}
return row;
}
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
@Override
public TypeInformation<Row> getProducedType() {
return schema.toRowType();
}
private Object convertField(String field, TypeInformation<?> type) {
// Type conversion logic based on schema
if (type == Types.STRING()) return field;
if (type == Types.LONG()) return Long.parseLong(field);
if (type == Types.INT()) return Integer.parseInt(field);
// ... other type conversions
return field;
}
}@Internal and may change between versions// Source configuration for reliability
Properties sourceProps = new Properties();
sourceProps.setProperty("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
sourceProps.setProperty("group.id", "table-api-consumer");
sourceProps.setProperty("auto.offset.reset", "earliest");
sourceProps.setProperty("auto.commit.enable", "false"); // Managed by Flink
// Sink configuration for reliability
Properties sinkProps = new Properties();
sinkProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");
sinkProps.setProperty("request.required.acks", "1");
sinkProps.setProperty("message.send.max.retries", "3");
// Enable checkpointing for exactly-once processing (source side only)
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);For production Table API usage, consider upgrading to Kafka 0.9+ connectors that provide: