Apache Flink connector for integrating with Apache Kafka 0.8.x message broker systems
Comprehensive API reference for Table API integration classes in the Apache Flink Kafka Connector 0.8. These classes enable declarative SQL and Table API access to Kafka topics.
Important Note: All Table API classes are marked as @Internal, indicating they are not part of the stable public API and may change between versions.
Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Internal
Extends: KafkaTableSourceBase
Description: Kafka StreamTableSource for Kafka 0.8, providing Table API access to Kafka topics as streaming tables.
@Internal
public class Kafka08TableSource extends KafkaTableSourceBase/**
* Creates a Kafka08TableSource with full configuration options.
*
* @param schema The table schema defining column names and types
* @param proctimeAttribute Optional processing time attribute name
* @param rowtimeAttributeDescriptors List of rowtime attribute descriptors for event time
* @param fieldMapping Optional mapping from table fields to Kafka message fields
* @param topic The Kafka topic name to read from
* @param properties Kafka consumer properties
* @param deserializationSchema Schema to deserialize Kafka messages to Row objects
* @param startupMode How the consumer should start reading (EARLIEST, LATEST, GROUP_OFFSETS, SPECIFIC_OFFSETS)
* @param specificStartupOffsets Map of partition to offset for SPECIFIC_OFFSETS startup mode
*/
public Kafka08TableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets)/**
* Creates a basic Kafka08TableSource with minimal configuration.
*
* @param schema The table schema defining column names and types
* @param topic The Kafka topic name to read from
* @param properties Kafka consumer properties
* @param deserializationSchema Schema to deserialize Kafka messages to Row objects
*/
public Kafka08TableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema)/**
* Creates the underlying FlinkKafkaConsumer for this table source.
*
* @param topic The Kafka topic name
* @param properties Kafka consumer properties
* @param deserializationSchema The deserialization schema for Row objects
* @return FlinkKafkaConsumerBase instance configured for Kafka 0.8
*/
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema)import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
// Define table schema
TableSchema schema = TableSchema.builder()
.field("user_id", Types.LONG())
.field("user_name", Types.STRING())
.field("user_email", Types.STRING())
.field("registration_time", Types.SQL_TIMESTAMP())
.build();
// Kafka properties
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "table-api-group");
// Custom deserialization schema for JSON messages
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(schema)
.build();
// Create table source
Kafka08TableSource tableSource = new Kafka08TableSource(
schema,
"users-topic",
properties,
deserializer
);import org.apache.flink.table.descriptors.RowtimeAttributeDescriptor;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
// Schema with processing time and event time
TableSchema schema = TableSchema.builder()
.field("transaction_id", Types.STRING())
.field("amount", Types.DECIMAL())
.field("event_time", Types.SQL_TIMESTAMP())
.field("proc_time", Types.SQL_TIMESTAMP())
.build();
// Define rowtime attribute for event time processing
List<RowtimeAttributeDescriptor> rowtimeDescriptors = Arrays.asList(
new RowtimeAttributeDescriptor(
"event_time", // rowtime attribute name
new JsonRowtimeTimestampExtractor(), // timestamp extractor
new BoundedOutOfOrdernessWatermarkStrategy(5000) // watermark strategy
)
);
// Field mapping from Kafka message to table columns
Map<String, String> fieldMapping = new HashMap<>();
fieldMapping.put("transaction_id", "txn_id");
fieldMapping.put("amount", "txn_amount");
fieldMapping.put("event_time", "timestamp");
// Specific startup offsets
Map<KafkaTopicPartition, Long> startupOffsets = new HashMap<>();
startupOffsets.put(new KafkaTopicPartition("transactions", 0), 1000L);
startupOffsets.put(new KafkaTopicPartition("transactions", 1), 2000L);
// Create advanced table source
Kafka08TableSource tableSource = new Kafka08TableSource(
schema,
Optional.of("proc_time"), // processing time attribute
rowtimeDescriptors, // rowtime attributes
Optional.of(fieldMapping), // field mapping
"transactions", // topic
properties, // Kafka properties
deserializer, // deserialization schema
StartupMode.SPECIFIC_OFFSETS, // startup mode
startupOffsets // specific offsets
);Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Internal
Extends: KafkaTableSinkBase
Description: Kafka 0.8 table sink for writing Table API results to Kafka topics.
@Internal
public class Kafka08TableSink extends KafkaTableSinkBase/**
* Creates a Kafka08TableSink for writing table data to Kafka.
*
* @param schema The table schema defining the structure of rows to be written
* @param topic The target Kafka topic name
* @param properties Kafka producer properties
* @param partitioner Optional custom partitioner for determining target partition
* @param serializationSchema Schema to serialize Row objects to byte arrays
*/
public Kafka08TableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema)/**
* Creates the underlying FlinkKafkaProducer for this table sink.
*
* @param topic The Kafka topic name
* @param properties Kafka producer properties
* @param serializationSchema The serialization schema for Row objects
* @param partitioner Optional custom partitioner
* @return FlinkKafkaProducerBase instance configured for Kafka 0.8
*/
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner)import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
// Define output table schema
TableSchema outputSchema = TableSchema.builder()
.field("result_id", Types.STRING())
.field("computed_value", Types.DOUBLE())
.field("processing_time", Types.SQL_TIMESTAMP())
.build();
// Kafka producer properties
Properties producerProps = new Properties();
producerProps.setProperty("metadata.broker.list", "localhost:9092");
// JSON serialization schema
SerializationSchema<Row> serializer = new JsonRowSerializationSchema.Builder(outputSchema)
.build();
// Create table sink
Kafka08TableSink tableSink = new Kafka08TableSink(
outputSchema,
"results-topic",
producerProps,
Optional.empty(), // No custom partitioner
serializer
);import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
// Custom partitioner based on result type
FlinkKafkaPartitioner<Row> partitioner = new FlinkKafkaPartitioner<Row>() {
@Override
public int partition(Row record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
// Partition based on the first field (result_id)
String resultId = (String) record.getField(0);
return Math.abs(resultId.hashCode() % partitions.length);
}
};
Kafka08TableSink tableSink = new Kafka08TableSink(
outputSchema,
"partitioned-results",
producerProps,
Optional.of(partitioner), // Custom partitioner
serializer
);Package: org.apache.flink.streaming.connectors.kafka
Extends: KafkaTableSourceSinkFactoryBase
Description: Factory for creating configured instances of Kafka08TableSource and Kafka08TableSink from descriptors.
public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase/**
* Returns the Kafka version string for this factory.
*
* @return "0.8" indicating Kafka version 0.8 support
*/
protected String kafkaVersion()
/**
* Indicates whether this Kafka version supports timestamps.
* Kafka 0.8 does not support message timestamps.
*
* @return false, as Kafka 0.8 doesn't support timestamps
*/
protected boolean supportsKafkaTimestamps()
/**
* Creates a KafkaTableSource instance with the provided configuration.
*
* @param schema The table schema
* @param topic The Kafka topic name
* @param properties Kafka consumer properties
* @param deserializationSchema Row deserialization schema
* @param startupMode Consumer startup mode
* @param specificStartupOffsets Specific startup offsets (if applicable)
* @param proctimeAttribute Processing time attribute name (optional)
* @param rowtimeAttributeDescriptors Rowtime attribute descriptors
* @param fieldMapping Field mapping configuration (optional)
* @return Configured Kafka08TableSource instance
*/
protected KafkaTableSourceBase createKafkaTableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping)
/**
* Creates a KafkaTableSink instance with the provided configuration.
*
* @param schema The table schema
* @param topic The Kafka topic name
* @param properties Kafka producer properties
* @param partitioner Optional partitioner
* @param serializationSchema Row serialization schema
* @return Configured Kafka08TableSink instance
*/
protected KafkaTableSinkBase createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema)import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;
Kafka08TableSourceSinkFactory factory = new Kafka08TableSourceSinkFactory();
// Create table source via factory
KafkaTableSourceBase source = factory.createKafkaTableSource(
schema, // Table schema
"input-topic", // Topic name
consumerProperties, // Kafka properties
deserializationSchema, // Deserialization schema
StartupMode.EARLIEST, // Startup mode
Collections.emptyMap(), // No specific offsets
Optional.of("proc_time"), // Processing time attribute
Collections.emptyList(), // No rowtime attributes
Optional.empty() // No field mapping
);
// Create table sink via factory
KafkaTableSinkBase sink = factory.createKafkaTableSink(
outputSchema, // Output table schema
"output-topic", // Topic name
producerProperties, // Kafka properties
Optional.empty(), // No custom partitioner
serializationSchema // Serialization schema
);import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;
// Setup Table API environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Register Kafka source table
TableSchema sourceSchema = TableSchema.builder()
.field("user_id", Types.LONG())
.field("product_id", Types.STRING())
.field("quantity", Types.INT())
.field("price", Types.DECIMAL())
.field("order_time", Types.SQL_TIMESTAMP())
.field("proc_time", Types.SQL_TIMESTAMP())
.build();
Properties sourceProps = new Properties();
sourceProps.setProperty("zookeeper.connect", "localhost:2181");
sourceProps.setProperty("group.id", "analytics-group");
Kafka08TableSource source = new Kafka08TableSource(
sourceSchema,
"orders",
sourceProps,
new JsonRowDeserializationSchema.Builder(sourceSchema).build()
);
tEnv.registerTableSource("Orders", source);
// Register Kafka sink table
TableSchema sinkSchema = TableSchema.builder()
.field("product_id", Types.STRING())
.field("total_quantity", Types.LONG())
.field("total_revenue", Types.DECIMAL())
.field("window_start", Types.SQL_TIMESTAMP())
.field("window_end", Types.SQL_TIMESTAMP())
.build();
Properties sinkProps = new Properties();
sinkProps.setProperty("metadata.broker.list", "localhost:9092");
Kafka08TableSink sink = new Kafka08TableSink(
sinkSchema,
"product-analytics",
sinkProps,
Optional.empty(),
new JsonRowSerializationSchema.Builder(sinkSchema).build()
);
tEnv.registerTableSink("ProductAnalytics", sink);
// Execute SQL query
String sql = """
INSERT INTO ProductAnalytics
SELECT
product_id,
SUM(quantity) as total_quantity,
SUM(quantity * price) as total_revenue,
TUMBLE_START(proc_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(proc_time, INTERVAL '1' HOUR) as window_end
FROM Orders
GROUP BY
product_id,
TUMBLE(proc_time, INTERVAL '1' HOUR)
""";
tEnv.executeSql(sql);import org.apache.flink.table.descriptors.*;
// Register source using descriptors (alternative approach)
tEnv.connect(
new Kafka()
.version("0.8")
.topic("user-events")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", "event-processors")
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("event_id", Types.STRING())
.field("user_id", Types.LONG())
.field("event_type", Types.STRING())
.field("event_time", Types.SQL_TIMESTAMP())
.field("proc_time", Types.SQL_TIMESTAMP()).proctime()
)
.createTemporaryTable("UserEvents");
// Register sink using descriptors
tEnv.connect(
new Kafka()
.version("0.8")
.topic("processed-events")
.property("metadata.broker.list", "localhost:9092")
)
.withFormat(
new Json()
)
.withSchema(
new Schema()
.field("user_id", Types.LONG())
.field("event_count", Types.LONG())
.field("processing_time", Types.SQL_TIMESTAMP())
)
.createTemporaryTable("ProcessedEvents");import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
// JSON deserialization for table source
JsonRowDeserializationSchema.Builder deserBuilder =
new JsonRowDeserializationSchema.Builder(sourceSchema);
DeserializationSchema<Row> deserializer = deserBuilder
.ignoreParseErrors() // Skip malformed JSON
.build();
// JSON serialization for table sink
JsonRowSerializationSchema.Builder serBuilder =
new JsonRowSerializationSchema.Builder(sinkSchema);
SerializationSchema<Row> serializer = serBuilder.build();// Custom deserialization schema
public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
private final TableSchema schema;
private final String delimiter;
public CsvRowDeserializationSchema(TableSchema schema, String delimiter) {
this.schema = schema;
this.delimiter = delimiter;
}
@Override
public Row deserialize(byte[] message) throws IOException {
String line = new String(message);
String[] fields = line.split(delimiter);
Row row = new Row(schema.getFieldCount());
for (int i = 0; i < fields.length && i < schema.getFieldCount(); i++) {
row.setField(i, convertField(fields[i], schema.getFieldTypes()[i]));
}
return row;
}
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
@Override
public TypeInformation<Row> getProducedType() {
return schema.toRowType();
}
private Object convertField(String field, TypeInformation<?> type) {
// Type conversion logic based on schema
if (type == Types.STRING()) return field;
if (type == Types.LONG()) return Long.parseLong(field);
if (type == Types.INT()) return Integer.parseInt(field);
// ... other type conversions
return field;
}
}@Internal and may change between versions// Source configuration for reliability
Properties sourceProps = new Properties();
sourceProps.setProperty("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
sourceProps.setProperty("group.id", "table-api-consumer");
sourceProps.setProperty("auto.offset.reset", "earliest");
sourceProps.setProperty("auto.commit.enable", "false"); // Managed by Flink
// Sink configuration for reliability
Properties sinkProps = new Properties();
sinkProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");
sinkProps.setProperty("request.required.acks", "1");
sinkProps.setProperty("message.send.max.retries", "3");
// Enable checkpointing for exactly-once processing (source side only)
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);For production Table API usage, consider upgrading to Kafka 0.9+ connectors that provide:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-11