Table API and SQL integration for declarative stream processing with Kafka 0.9 sources and sinks through factory-based configuration and schema-aware data processing.
Factory class for creating Kafka 0.9 table sources and sinks in the Table API ecosystem.
/**
* Factory for creating configured instances of Kafka 0.9 table sources and sinks.
* Extends the base Kafka table factory with version-specific implementations.
*/
public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
/**
* Returns the Kafka version identifier for this factory.
*
* @return "0.9" as the version string
*/
@Override
protected String kafkaVersion();
/**
* Indicates whether this version supports Kafka timestamps.
*
* @return false, as Kafka 0.9 does not support message timestamps
*/
@Override
protected boolean supportsKafkaTimestamps();
/**
* Creates a Kafka 0.9 table source with the provided configuration.
*
* @param schema Schema of the produced table
* @param proctimeAttribute Field name of the processing time attribute
* @param rowtimeAttributeDescriptors Descriptor for rowtime attributes
* @param fieldMapping Mapping for table schema fields to physical returned type fields
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param deserializationSchema Deserialization schema for decoding records from Kafka
* @param startupMode Startup mode for the contained consumer
* @param specificStartupOffsets Specific startup offsets (when using SPECIFIC_OFFSETS mode)
* @return Configured Kafka09TableSource instance
*/
@Override
protected KafkaTableSourceBase createKafkaTableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Map<String, String> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets);
/**
* Creates a Kafka 0.9 table sink with the provided configuration.
*
* @param schema Schema of the table to be written
* @param topic Target Kafka topic
* @param properties Properties for the Kafka producer
* @param partitioner Optional custom partitioner for message distribution
* @param serializationSchema Serialization schema for encoding records to Kafka
* @return Configured Kafka09TableSink instance
*/
@Override
protected KafkaTableSinkBase createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema);
}Table source implementation for consuming Kafka 0.9 data in Table API queries.
/**
* Kafka table source for Kafka 0.9 - internal implementation.
* Provides streaming table source capabilities for Table API and SQL.
*/
@Internal
public class Kafka09TableSource extends KafkaTableSourceBase {
/**
* Creates a Kafka 0.9 table source with full configuration options.
*
* @param schema Schema of the produced table
* @param proctimeAttribute Field name of the processing time attribute
* @param rowtimeAttributeDescriptors Descriptor for rowtime attributes
* @param fieldMapping Optional mapping for table schema fields to physical type fields
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param deserializationSchema Deserialization schema for decoding records
* @param startupMode Startup mode for the consumer
* @param specificStartupOffsets Specific startup offsets for SPECIFIC_OFFSETS mode
*/
public Kafka09TableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets);
/**
* Creates a simple Kafka 0.9 table source with basic configuration.
*
* @param schema Schema of the produced table
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param deserializationSchema Deserialization schema for decoding records
*/
public Kafka09TableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema);
}Table sink implementation for writing to Kafka 0.9 topics from Table API queries.
/**
* Kafka table sink for Kafka 0.9 - internal implementation.
* Provides streaming table sink capabilities for Table API and SQL.
*/
@Internal
public class Kafka09TableSink extends KafkaTableSinkBase {
/**
* Creates a Kafka 0.9 table sink with the provided configuration.
*
* @param schema Schema of the table to be written
* @param topic Target Kafka topic
* @param properties Properties for the Kafka producer
* @param partitioner Optional custom partitioner for message distribution
* @param serializationSchema Serialization schema for encoding records
*/
public Kafka09TableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema);
}import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;
// Create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Create Kafka source table using DDL
tableEnv.executeSql(
"CREATE TABLE kafka_source (" +
" user_id STRING," +
" event_time TIMESTAMP(3)," +
" event_type STRING," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka-0.9'," +
" 'topic' = 'user-events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'table-consumer-group'," +
" 'format' = 'json'" +
")"
);
// Query the Kafka source
Table result = tableEnv.sqlQuery(
"SELECT user_id, COUNT(*) as event_count " +
"FROM kafka_source " +
"WHERE event_type = 'login' " +
"GROUP BY user_id"
);// Create Kafka sink table
tableEnv.executeSql(
"CREATE TABLE kafka_sink (" +
" user_id STRING," +
" event_count BIGINT" +
") WITH (" +
" 'connector' = 'kafka-0.9'," +
" 'topic' = 'processed-events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Insert query results into Kafka
result.executeInsert("kafka_sink");import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.streaming.connectors.kafka.Kafka09TableSource;
import java.util.Properties;
// Define table schema
TableSchema schema = TableSchema.builder()
.field("user_id", DataTypes.STRING())
.field("timestamp", DataTypes.TIMESTAMP(3))
.field("action", DataTypes.STRING())
.field("value", DataTypes.DOUBLE())
.build();
// Configure Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "programmatic-consumer");
// Create JSON deserializer
DeserializationSchema<Row> deserializer = new JsonRowDeserializationSchema.Builder(
schema.toRowType()
).build();
// Create Kafka table source
Kafka09TableSource kafkaSource = new Kafka09TableSource(
schema,
"user-actions",
properties,
deserializer
);
// Register as table
tableEnv.registerTableSource("user_actions", kafkaSource);
// Use in SQL
Table queryResult = tableEnv.sqlQuery(
"SELECT user_id, SUM(value) as total_value " +
"FROM user_actions " +
"WHERE action = 'purchase' " +
"GROUP BY user_id"
);// Multiple Kafka sources and sinks
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING," +
" customer_id STRING," +
" product_id STRING," +
" quantity INT," +
" price DECIMAL(10,2)," +
" order_time TIMESTAMP(3)," +
" WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE" +
") WITH (" +
" 'connector' = 'kafka-0.9'," +
" 'topic' = 'orders'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'order-processor'," +
" 'format' = 'json'" +
")"
);
tableEnv.executeSql(
"CREATE TABLE order_summary (" +
" window_start TIMESTAMP(3)," +
" window_end TIMESTAMP(3)," +
" total_orders BIGINT," +
" total_revenue DECIMAL(12,2)," +
" avg_order_value DECIMAL(10,2)" +
") WITH (" +
" 'connector' = 'kafka-0.9'," +
" 'topic' = 'order-summary'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Windowed aggregation query
tableEnv.executeSql(
"INSERT INTO order_summary " +
"SELECT " +
" TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start," +
" TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end," +
" COUNT(*) as total_orders," +
" SUM(quantity * price) as total_revenue," +
" AVG(quantity * price) as avg_order_value " +
"FROM orders " +
"GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR)"
);import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
// Custom CSV-like format deserializer
DeserializationSchema<Row> csvDeserializer = new DeserializationSchema<Row>() {
@Override
public Row deserialize(byte[] message) throws IOException {
String line = new String(message);
String[] fields = line.split(",");
Row row = new Row(3);
row.setField(0, fields[0]); // id
row.setField(1, fields[1]); // name
row.setField(2, Double.parseDouble(fields[2])); // value
return row;
}
@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
@Override
public TypeInformation<Row> getProducedType() {
return Types.ROW(Types.STRING, Types.STRING, Types.DOUBLE);
}
};
TableSchema csvSchema = TableSchema.builder()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
.field("value", DataTypes.DOUBLE())
.build();
Kafka09TableSource csvSource = new Kafka09TableSource(
csvSchema,
"csv-data",
properties,
csvDeserializer
);-- Basic Kafka 0.9 connector configuration
'connector' = 'kafka-0.9'
'topic' = 'my-topic' -- Required: Kafka topic name
'properties.bootstrap.servers' = 'localhost:9092' -- Required: Kafka broker addresses
'properties.group.id' = 'my-consumer-group' -- Required for sources: Consumer group ID
-- Consumer-specific properties (sources)
'properties.auto.offset.reset' = 'earliest' -- latest, earliest, none
'properties.enable.auto.commit' = 'false' -- Flink manages commits
'properties.fetch.min.bytes' = '1024' -- Minimum fetch size
'properties.max.partition.fetch.bytes' = '1048576' -- Maximum per-partition fetch
-- Producer-specific properties (sinks)
'properties.acks' = '1' -- 0, 1, all
'properties.retries' = '3' -- Retry count
'properties.batch.size' = '16384' -- Batch size in bytes
'properties.linger.ms' = '5' -- Batch linger time
'properties.compression.type' = 'snappy' -- none, gzip, snappy, lz4
-- Format configuration
'format' = 'json' -- json, csv, avro, etc.
'json.fail-on-missing-field' = 'false' -- Handle missing JSON fields
'json.ignore-parse-errors' = 'true' -- Skip malformed records-- Start from earliest available offset
'scan.startup.mode' = 'earliest-offset'
-- Start from latest available offset
'scan.startup.mode' = 'latest-offset'
-- Start from consumer group's committed offset
'scan.startup.mode' = 'group-offsets'
-- Start from specific offsets (Kafka 0.10+ feature, limited in 0.9)
'scan.startup.mode' = 'specific-offsets'
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'