CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees

Pending
Overview
Eval results
Files

table-api.mddocs/

Table API Integration

The Flink Kafka 0.10 connector provides comprehensive integration with Flink's Table API and SQL, enabling declarative stream processing with Kafka sources and sinks. It supports both legacy table factories and the new dynamic table API.

Capabilities

Dynamic Table Factory

Modern table factory implementation for creating Kafka sources and sinks through SQL DDL and Table API.

/**
 * Dynamic table factory for Kafka 0.10.x integration
 */
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
    /** Connector identifier used in SQL DDL */
    public static final String IDENTIFIER = "kafka-0.10";
}

/**
 * Dynamic table source implementation for Kafka 0.10.x
 */
public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
    // Inherits all functionality from base dynamic source
}

/**
 * Dynamic table sink implementation for Kafka 0.10.x  
 */
public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
    // Inherits all functionality from base dynamic sink
}

Usage Examples:

-- Create Kafka source table
CREATE TABLE kafka_source (
    user_id BIGINT,
    event_name STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'my-consumer-group',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'
);

-- Create Kafka sink table
CREATE TABLE kafka_sink (
    user_id BIGINT,
    aggregated_count BIGINT,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3)
) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'user-aggregates',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'sink.partitioner' = 'round-robin'
);

-- Query using the tables
INSERT INTO kafka_sink
SELECT 
    user_id,
    COUNT(*) as aggregated_count,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
FROM kafka_source
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE);
// Table API usage
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());

// Create source table descriptor
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("kafka-0.10")
    .schema(Schema.newBuilder()
        .column("user_id", DataTypes.BIGINT())
        .column("event_name", DataTypes.STRING())
        .column("event_time", DataTypes.TIMESTAMP(3))
        .watermark("event_time", "event_time - INTERVAL '5' SECOND")
        .build())
    .option("topic", "user-events")
    .option("properties.bootstrap.servers", "localhost:9092")
    .option("properties.group.id", "my-consumer-group")
    .option("format", "json")
    .option("scan.startup.mode", "earliest-offset")
    .build();

tableEnv.createTemporaryTable("kafka_source", sourceDescriptor);

Legacy Table Factory

Legacy table factory implementation for backward compatibility with older Table API versions.

/**
 * Legacy table factory for creating Kafka 0.10.x sources and sinks
 */
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
    /**
     * Returns the Kafka version supported by this factory
     * @return Kafka version string "0.10"
     */
    protected String kafkaVersion();
    
    /**
     * Indicates whether this connector supports Kafka timestamps
     * @return true, as Kafka 0.10.x supports timestamps
     */
    protected boolean supportsKafkaTimestamps();
}

/**
 * Legacy table source implementation for consuming from Kafka 0.10.x in Table API
 */
public class Kafka010TableSource extends KafkaTableSourceBase {
    // Internal implementation - extends base Kafka table source
}

/**
 * Legacy table sink implementation for writing to Kafka 0.10.x in Table API
 */
public class Kafka010TableSink extends KafkaTableSinkBase {
    // Internal implementation - extends base Kafka table sink
}

Usage Examples:

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Schema;

// Legacy descriptor-based approach
TableEnvironment tableEnv = TableEnvironment.create(...);

tableEnv.connect(
    new Kafka()
        .version("0.10")
        .topic("user-events")
        .property("bootstrap.servers", "localhost:9092")
        .property("group.id", "my-consumer-group")
        .startFromEarliest()
)
.withFormat(new Json())
.withSchema(new Schema()
    .field("user_id", DataTypes.BIGINT())
    .field("event_name", DataTypes.STRING())
    .field("event_time", DataTypes.TIMESTAMP(3))
)
.createTemporaryTable("legacy_kafka_source");

Configuration Options

Source Configuration

Required Options:

  • connector: Must be "kafka-0.10"
  • topic: Kafka topic name (or list of topics separated by semicolon)
  • properties.bootstrap.servers: Kafka broker addresses

Startup Mode Options:

  • scan.startup.mode: How to start consuming ("earliest-offset", "latest-offset", "group-offsets", "timestamp", "specific-offsets")
  • scan.startup.timestamp-millis: Start timestamp when mode is "timestamp"
  • scan.startup.specific-offsets: Specific partition offsets when mode is "specific-offsets"

Consumer Properties:

  • properties.group.id: Consumer group ID
  • properties.auto.offset.reset: Offset reset behavior ("earliest", "latest")
  • properties.flink.poll-timeout: Polling timeout in milliseconds

Pattern Subscription:

  • topic-pattern: Regular expression pattern for topic subscription instead of specific topics

Sink Configuration

Required Options:

  • connector: Must be "kafka-0.10"
  • topic: Target Kafka topic name
  • properties.bootstrap.servers: Kafka broker addresses

Partitioning Options:

  • sink.partitioner: Partitioning strategy ("default", "round-robin", "custom")
  • sink.partitioner-class: Custom partitioner class when using "custom"

Producer Properties:

  • properties.acks: Acknowledgment mode ("all", "1", "0")
  • properties.retries: Number of retries for failed sends
  • properties.enable.idempotence: Enable idempotent producer for exactly-once

Timestamp Options:

  • sink.timestamp-field: Field to use as Kafka record timestamp
  • sink.timestamp-format: Timestamp format specification

Format Integration

The connector works with various format specifications:

-- JSON format
CREATE TABLE kafka_json_source (...) WITH (
    'connector' = 'kafka-0.10',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'json.timestamp-format.standard' = 'ISO-8601'
);

-- Avro format  
CREATE TABLE kafka_avro_source (...) WITH (
    'connector' = 'kafka-0.10',
    'format' = 'avro',
    'avro.schema-registry.url' = 'http://localhost:8081'
);

-- CSV format
CREATE TABLE kafka_csv_source (...) WITH (
    'connector' = 'kafka-0.10',
    'format' = 'csv',
    'csv.field-delimiter' = ',',
    'csv.ignore-parse-errors' = 'true'
);

Watermark Strategies

Configure watermark generation for event time processing:

-- Bounded out-of-orderness watermarks
CREATE TABLE kafka_source (
    user_id BIGINT,
    event_name STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'events'
);

-- Ascending timestamps watermarks
CREATE TABLE kafka_ordered_source (
    user_id BIGINT,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time
) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'ordered-events'
);

Multiple Topics and Pattern Subscription

Multiple Topics

-- Multiple specific topics
CREATE TABLE multi_topic_source (...) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'topic1;topic2;topic3',
    'properties.bootstrap.servers' = 'localhost:9092'
);

Pattern-Based Subscription

-- Pattern-based topic subscription
CREATE TABLE pattern_source (...) WITH (
    'connector' = 'kafka-0.10',
    'topic-pattern' = 'logs-.*',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.flink.partition-discovery.interval-millis' = '30000'
);

Exactly-Once Semantics

Configure exactly-once processing for both sources and sinks:

Source Configuration

CREATE TABLE exactly_once_source (...) WITH (
    'connector' = 'kafka-0.10',
    'properties.isolation.level' = 'read_committed',
    'properties.enable.auto.commit' = 'false'
);

Sink Configuration

CREATE TABLE exactly_once_sink (...) WITH (
    'connector' = 'kafka-0.10',
    'properties.acks' = 'all',
    'properties.retries' = '3',
    'properties.enable.idempotence' = 'true',
    'properties.max.in.flight.requests.per.connection' = '1'
);

Service Provider Registration

The connector automatically registers itself with Flink's service provider mechanism:

META-INF/services/org.apache.flink.table.factories.Factory:

  • org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory

META-INF/services/org.apache.flink.table.factories.TableFactory:

  • org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory

Migration from Legacy to Dynamic API

When migrating from legacy descriptor-based API to modern SQL DDL:

Legacy (Deprecated):

tableEnv.connect(new Kafka().version("0.10").topic("my-topic"))
       .withFormat(new Json())
       .withSchema(new Schema().field("id", DataTypes.BIGINT()))
       .createTemporaryTable("my_table");

Modern (Recommended):

CREATE TABLE my_table (
    id BIGINT
) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'my-topic',
    'format' = 'json'
);

Error Handling

Table API integration provides several error handling strategies:

  • Parse Errors: Configure format options to ignore or fail on parse errors
  • Serialization Errors: Log-only mode for non-critical failures
  • Connection Errors: Automatic retry mechanisms with exponential backoff
  • Schema Evolution: Support for schema registry integration with Avro/JSON formats

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

docs

consumer.md

index.md

producer.md

table-api.md

tile.json