The Flink Kafka 0.11 connector provides comprehensive Table API and SQL integration through factory classes that enable declarative table definitions and seamless integration with Flink's SQL engine.
Modern factory implementation for SQL DDL support and dynamic table creation.
/**
* Factory for creating dynamic table sources and sinks for Kafka 0.11.x
* Supports the new Dynamic Table API introduced in Flink 1.11+
*/
class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
/**
* Factory identifier used in SQL DDL CREATE TABLE statements
* @return "kafka-0.11" identifier for connector specification
*/
String factoryIdentifier();
}Usage Examples:
-- SQL DDL using the kafka-0.11 connector identifier
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
event_data STRING,
event_timestamp TIMESTAMP(3),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-consumer-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
-- Create sink table
CREATE TABLE processed_events (
user_id BIGINT,
event_count BIGINT,
processing_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'processed-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.transaction.timeout.ms' = '900000',
'sink.semantic' = 'exactly-once',
'format' = 'json'
);Kafka table source implementation for the modern Dynamic Table API.
/**
* Dynamic table source for Kafka 0.11.x supporting advanced features
* like watermark generation and projection pushdown
*/
@Internal
class Kafka011DynamicSource extends KafkaDynamicSourceBase {
/**
* Constructor for dynamic table source
* @param outputDataType the output data type of the source
* @param topic the Kafka topic name
* @param properties Kafka consumer properties
* @param decodingFormat format for deserializing records
* @param startupMode how to start consuming (earliest, latest, etc.)
* @param specificStartupOffsets specific partition offsets for startup
* @param startupTimestampMillis timestamp for timestamp-based startup
*/
Kafka011DynamicSource(
DataType outputDataType,
String topic,
Properties properties,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis
);
/**
* Create a copy of this source for runtime instantiation
* @return copied source instance
*/
DynamicTableSource copy();
/**
* Summary string for debugging and logging
* @return "Kafka-0.11" description
*/
String asSummaryString();
}Kafka table sink implementation for the modern Dynamic Table API.
/**
* Dynamic table sink for Kafka 0.11.x supporting transactional writes
* and exactly-once semantics for table operations
*/
@Internal
class Kafka011DynamicSink extends KafkaDynamicSinkBase {
/**
* Constructor for dynamic table sink
* @param consumedDataType the data type consumed by the sink
* @param topic the target Kafka topic name
* @param properties Kafka producer properties
* @param partitioner optional custom partitioner for records
* @param encodingFormat format for serializing records
*/
Kafka011DynamicSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
EncodingFormat<SerializationSchema<RowData>> encodingFormat
);
/**
* Create a copy of this sink for runtime instantiation
* @return copied sink instance
*/
DynamicTableSink copy();
/**
* Summary string for debugging and logging
* @return "Kafka 0.11 table sink" description
*/
String asSummaryString();
}Legacy factory and table implementations for backward compatibility with older Flink versions.
/**
* Legacy table source for Kafka 0.11.x (pre-1.11 Table API)
* Maintained for backward compatibility
*/
@Internal
class Kafka011TableSource extends KafkaTableSourceBase {
// Full constructor with all table configuration options
Kafka011TableSource(
TableSchema schema,
Optional<String> proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Optional<Map<String, String>> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis
);
// Simplified constructor for basic use cases
Kafka011TableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema
);
}
/**
* Legacy table sink for Kafka 0.11.x (pre-1.11 Table API)
* Maintained for backward compatibility
*/
@Internal
class Kafka011TableSink extends KafkaTableSinkBase {
/**
* Constructor for legacy table sink
* @param schema table schema definition
* @param topic target Kafka topic name
* @param properties Kafka producer properties
* @param partitioner optional custom partitioner
* @param serializationSchema schema for serializing rows
*/
Kafka011TableSink(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema
);
}
/**
* Legacy factory for creating table sources and sinks
* Maintained for backward compatibility with older Flink versions
*/
class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
// Factory methods inherited from base class
// Used by Table API environment for table registration
}Usage Examples:
// Programmatic table registration (legacy approach)
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "table-consumer");
// Register source table
Kafka011TableSource source = new Kafka011TableSource(
TableSchema.builder()
.field("user_id", DataTypes.BIGINT())
.field("event_data", DataTypes.STRING())
.field("event_time", DataTypes.TIMESTAMP(3))
.build(),
"input-topic",
properties,
new JsonDeserializationSchema()
);
tableEnv.registerTableSource("kafka_source", source);
// Register sink table
Kafka011TableSink sink = new Kafka011TableSink(
TableSchema.builder()
.field("result", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
.build(),
"output-topic",
properties,
Optional.empty(),
new JsonSerializationSchema()
);
tableEnv.registerTableSink("kafka_sink", sink);SQL DDL configuration options for Kafka connector tables.
// Required configuration options
'connector' = 'kafka-0.11' // Connector identifier
'topic' = 'topic-name' // Kafka topic name
'properties.bootstrap.servers' = 'host:port' // Kafka broker addresses// Consumer-specific options for source tables
'properties.group.id' = 'consumer-group' // Consumer group ID
'scan.startup.mode' = 'mode' // Startup mode: earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp
'scan.startup.specific-offsets' = 'offsets' // Partition-specific offsets (partition:offset,partition:offset,...)
'scan.startup.timestamp-millis' = 'timestamp' // Timestamp for timestamp mode// Producer-specific options for sink tables
'sink.partitioner' = 'partitioner' // Partitioner: fixed, round-robin, or custom class name
'sink.semantic' = 'semantic' // Delivery semantic: exactly-once, at-least-once, none
'properties.transaction.timeout.ms' = 'timeout' // Transaction timeout for exactly-onceComplete SQL DDL Examples:
-- Source table with watermarks and specific startup configuration
CREATE TABLE orders_source (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
quantity INT,
price DECIMAL(10,2),
order_timestamp TIMESTAMP(3),
WATERMARK FOR order_timestamp AS order_timestamp - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka-cluster:9092',
'properties.group.id' = 'orders-processor',
'properties.auto.offset.reset' = 'earliest',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1609459200000',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081'
);
-- Sink table with exactly-once semantics
CREATE TABLE order_aggregates_sink (
customer_id BIGINT,
total_orders BIGINT,
total_amount DECIMAL(15,2),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'order-aggregates',
'properties.bootstrap.servers' = 'kafka-cluster:9092',
'properties.transaction.timeout.ms' = '900000',
'sink.semantic' = 'exactly-once',
'sink.partitioner' = 'fixed',
'format' = 'json'
);
-- Use tables in SQL query
INSERT INTO order_aggregates_sink
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(price * quantity) as total_amount,
TUMBLE_START(order_timestamp, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_timestamp, INTERVAL '1' HOUR) as window_end
FROM orders_source
GROUP BY customer_id, TUMBLE(order_timestamp, INTERVAL '1' HOUR);The connector registers its factories through Java's Service Loader mechanism for automatic discovery.
// Service registration files:
// META-INF/services/org.apache.flink.table.factories.Factory
// -> org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory
//
// META-INF/services/org.apache.flink.table.factories.TableFactory
// -> org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactoryThis enables automatic discovery and instantiation of the Kafka connector when using SQL DDL or programmatic table registration.