or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11_2-11

Apache Flink SQL connector for Apache Kafka 0.11 with shaded dependencies providing streaming and table API integration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-connector-kafka-0.11_2.11@1.11.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11_2-11@1.11.0

index.mddocs/

Apache Flink Kafka 0.11 SQL Connector

Apache Flink SQL connector for Apache Kafka 0.11.x that provides both streaming and Table/SQL API integration with comprehensive transaction support and exactly-once semantics. This shaded connector packages all Kafka client dependencies to prevent classpath conflicts in Flink deployments.

Package Information

  • Package Name: flink-sql-connector-kafka-0.11_2.11
  • Package Type: maven
  • Language: Java/Scala
  • Group ID: org.apache.flink
  • Artifact ID: flink-sql-connector-kafka-0.11_2.11
  • Version: 1.11.6
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-connector-kafka-0.11_2.11</artifactId>
      <version>1.11.6</version>
    </dependency>

Core Imports

For streaming DataStream programs:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;

For Table/SQL API integration:

import org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;

For serialization and partitioning:

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;

Basic Usage

Streaming Consumer

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "my-group");

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

DataStream<String> stream = env.addSource(consumer);

Streaming Producer with Exactly-Once

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;

Properties properties = new Properties();  
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("transaction.timeout.ms", "900000");

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
    "output-topic",
    new SimpleStringSchema(),
    properties,
    Semantic.EXACTLY_ONCE
);

stream.addSink(producer);

SQL Table Definition

CREATE TABLE kafka_table (
  id INT,
  name STRING,
  timestamp_col TIMESTAMP(3)
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'my-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

Architecture

The Flink Kafka 0.11 connector is built around several key components:

  • Streaming Consumer: FlinkKafkaConsumer011 for reading from Kafka topics with checkpoint integration
  • Streaming Producer: FlinkKafkaProducer011 supporting transactional writes and exactly-once semantics
  • Table API Integration: Dynamic table factories for SQL DDL support
  • Shaded Dependencies: All Kafka client libraries relocated to prevent conflicts
  • Transaction Support: Two-phase commit protocol for exactly-once guarantees
  • Configuration System: Extensive Properties-based configuration with Flink-specific extensions

Capabilities

Streaming Consumer API

Provides FlinkKafkaConsumer011 for consuming from Kafka topics with support for multiple deserialization patterns, startup modes, and partition discovery.

// Primary constructors for different use cases
FlinkKafkaConsumer011<T>(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer011<T>(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer011<T>(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)

Streaming Consumer

Streaming Producer API

Provides FlinkKafkaProducer011 with transactional support, multiple delivery semantics, and flexible partitioning options.

// Core producer constructors
FlinkKafkaProducer011<IN>(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic)
FlinkKafkaProducer011<IN>(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize)

// Delivery semantics
enum Semantic {
    EXACTLY_ONCE,
    AT_LEAST_ONCE, 
    NONE
}

Streaming Producer

Table/SQL API Integration

Provides factory classes for creating Kafka table sources and sinks in Flink's Table API and SQL, supporting both legacy and modern dynamic table APIs.

// Dynamic table factory for SQL DDL
class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
    String factoryIdentifier() // Returns "kafka-0.11"
}

// Dynamic table source and sink
class Kafka011DynamicSource extends KafkaDynamicSourceBase
class Kafka011DynamicSink extends KafkaDynamicSinkBase

Table API

Serialization and Deserialization

Base interfaces and implementations for converting between Flink data types and Kafka record formats, with access to Kafka metadata.

// Core serialization interfaces
interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
    void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception;
}

interface KafkaSerializationSchema<T> extends Serializable {
    ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}

Serialization

Configuration and Partitioning

Configuration options, startup modes, and custom partitioning strategies for fine-tuning connector behavior.

// Startup mode options
enum StartupMode {
    GROUP_OFFSETS,
    EARLIEST,
    LATEST,
    TIMESTAMP,
    SPECIFIC_OFFSETS
}

// Custom partitioner base class
abstract class FlinkKafkaPartitioner<T> implements Serializable {
    abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

Configuration

Types

Core Exception Types

class FlinkKafka011Exception extends FlinkException {
    FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message);
    FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause);
    FlinkKafka011ErrorCode getErrorCode();
}

enum FlinkKafka011ErrorCode {
    PRODUCERS_POOL_EMPTY,
    EXTERNAL_ERROR
}