Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees
—
The Flink Kafka 0.10 connector provides comprehensive integration with Flink's Table API and SQL, enabling declarative stream processing with Kafka sources and sinks. It supports both legacy table factories and the new dynamic table API.
Modern table factory implementation for creating Kafka sources and sinks through SQL DDL and Table API.
/**
* Dynamic table factory for Kafka 0.10.x integration
*/
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
/** Connector identifier used in SQL DDL */
public static final String IDENTIFIER = "kafka-0.10";
}
/**
* Dynamic table source implementation for Kafka 0.10.x
*/
public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
// Inherits all functionality from base dynamic source
}
/**
* Dynamic table sink implementation for Kafka 0.10.x
*/
public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
// Inherits all functionality from base dynamic sink
}Usage Examples:
-- Create Kafka source table
CREATE TABLE kafka_source (
user_id BIGINT,
event_name STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-consumer-group',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
-- Create Kafka sink table
CREATE TABLE kafka_sink (
user_id BIGINT,
aggregated_count BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'user-aggregates',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'sink.partitioner' = 'round-robin'
);
-- Query using the tables
INSERT INTO kafka_sink
SELECT
user_id,
COUNT(*) as aggregated_count,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
FROM kafka_source
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE);// Table API usage
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
// Create source table descriptor
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("kafka-0.10")
.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("event_name", DataTypes.STRING())
.column("event_time", DataTypes.TIMESTAMP(3))
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
.build())
.option("topic", "user-events")
.option("properties.bootstrap.servers", "localhost:9092")
.option("properties.group.id", "my-consumer-group")
.option("format", "json")
.option("scan.startup.mode", "earliest-offset")
.build();
tableEnv.createTemporaryTable("kafka_source", sourceDescriptor);Legacy table factory implementation for backward compatibility with older Table API versions.
/**
* Legacy table factory for creating Kafka 0.10.x sources and sinks
*/
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
/**
* Returns the Kafka version supported by this factory
* @return Kafka version string "0.10"
*/
protected String kafkaVersion();
/**
* Indicates whether this connector supports Kafka timestamps
* @return true, as Kafka 0.10.x supports timestamps
*/
protected boolean supportsKafkaTimestamps();
}
/**
* Legacy table source implementation for consuming from Kafka 0.10.x in Table API
*/
public class Kafka010TableSource extends KafkaTableSourceBase {
// Internal implementation - extends base Kafka table source
}
/**
* Legacy table sink implementation for writing to Kafka 0.10.x in Table API
*/
public class Kafka010TableSink extends KafkaTableSinkBase {
// Internal implementation - extends base Kafka table sink
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;
// Legacy descriptor-based approach
TableEnvironment tableEnv = TableEnvironment.create(...);
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("user-events")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "my-consumer-group")
.startFromEarliest()
)
.withFormat(new Json())
.withSchema(new Schema()
.field("user_id", DataTypes.BIGINT())
.field("event_name", DataTypes.STRING())
.field("event_time", DataTypes.TIMESTAMP(3))
)
.createTemporaryTable("legacy_kafka_source");Required Options:
connector: Must be "kafka-0.10"topic: Kafka topic name (or list of topics separated by semicolon)properties.bootstrap.servers: Kafka broker addressesStartup Mode Options:
scan.startup.mode: How to start consuming ("earliest-offset", "latest-offset", "group-offsets", "timestamp", "specific-offsets")scan.startup.timestamp-millis: Start timestamp when mode is "timestamp"scan.startup.specific-offsets: Specific partition offsets when mode is "specific-offsets"Consumer Properties:
properties.group.id: Consumer group IDproperties.auto.offset.reset: Offset reset behavior ("earliest", "latest")properties.flink.poll-timeout: Polling timeout in millisecondsPattern Subscription:
topic-pattern: Regular expression pattern for topic subscription instead of specific topicsRequired Options:
connector: Must be "kafka-0.10"topic: Target Kafka topic nameproperties.bootstrap.servers: Kafka broker addressesPartitioning Options:
sink.partitioner: Partitioning strategy ("default", "round-robin", "custom")sink.partitioner-class: Custom partitioner class when using "custom"Producer Properties:
properties.acks: Acknowledgment mode ("all", "1", "0")properties.retries: Number of retries for failed sendsproperties.enable.idempotence: Enable idempotent producer for exactly-onceTimestamp Options:
sink.timestamp-field: Field to use as Kafka record timestampsink.timestamp-format: Timestamp format specificationThe connector works with various format specifications:
-- JSON format
CREATE TABLE kafka_json_source (...) WITH (
'connector' = 'kafka-0.10',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'json.timestamp-format.standard' = 'ISO-8601'
);
-- Avro format
CREATE TABLE kafka_avro_source (...) WITH (
'connector' = 'kafka-0.10',
'format' = 'avro',
'avro.schema-registry.url' = 'http://localhost:8081'
);
-- CSV format
CREATE TABLE kafka_csv_source (...) WITH (
'connector' = 'kafka-0.10',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true'
);Configure watermark generation for event time processing:
-- Bounded out-of-orderness watermarks
CREATE TABLE kafka_source (
user_id BIGINT,
event_name STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'events'
);
-- Ascending timestamps watermarks
CREATE TABLE kafka_ordered_source (
user_id BIGINT,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'ordered-events'
);-- Multiple specific topics
CREATE TABLE multi_topic_source (...) WITH (
'connector' = 'kafka-0.10',
'topic' = 'topic1;topic2;topic3',
'properties.bootstrap.servers' = 'localhost:9092'
);-- Pattern-based topic subscription
CREATE TABLE pattern_source (...) WITH (
'connector' = 'kafka-0.10',
'topic-pattern' = 'logs-.*',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.flink.partition-discovery.interval-millis' = '30000'
);Configure exactly-once processing for both sources and sinks:
CREATE TABLE exactly_once_source (...) WITH (
'connector' = 'kafka-0.10',
'properties.isolation.level' = 'read_committed',
'properties.enable.auto.commit' = 'false'
);CREATE TABLE exactly_once_sink (...) WITH (
'connector' = 'kafka-0.10',
'properties.acks' = 'all',
'properties.retries' = '3',
'properties.enable.idempotence' = 'true',
'properties.max.in.flight.requests.per.connection' = '1'
);The connector automatically registers itself with Flink's service provider mechanism:
META-INF/services/org.apache.flink.table.factories.Factory:
org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactoryMETA-INF/services/org.apache.flink.table.factories.TableFactory:
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactoryWhen migrating from legacy descriptor-based API to modern SQL DDL:
Legacy (Deprecated):
tableEnv.connect(new Kafka().version("0.10").topic("my-topic"))
.withFormat(new Json())
.withSchema(new Schema().field("id", DataTypes.BIGINT()))
.createTemporaryTable("my_table");Modern (Recommended):
CREATE TABLE my_table (
id BIGINT
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'my-topic',
'format' = 'json'
);Table API integration provides several error handling strategies:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12