CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees

Pending
Overview
Eval results
Files

consumer.mddocs/

Data Stream Consumer

The FlinkKafkaConsumer010 provides comprehensive functionality for consuming data from Apache Kafka 0.10.x topics with exactly-once processing guarantees, flexible subscription patterns, and advanced features like rate limiting and dynamic partition discovery.

Capabilities

Single Topic Consumer

Creates a consumer for a single Kafka topic with value-only deserialization.

/**
 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
 * @param topic The name of the topic that should be consumed
 * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
 * @param props The properties used to configure the Kafka consumer client
 */
public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);

/**
 * Creates a new Kafka streaming source consumer for Kafka 0.10.x with key-value deserialization
 * @param topic The name of the topic that should be consumed
 * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
 * @param props The properties used to configure the Kafka consumer client
 */
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Properties;

// Simple string deserialization
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
    "events-topic", 
    new SimpleStringSchema(), 
    props
);

// Custom key-value deserialization
FlinkKafkaConsumer010<MyEvent> eventConsumer = new FlinkKafkaConsumer010<>(
    "events-topic",
    new KafkaDeserializationSchema<MyEvent>() {
        @Override
        public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            return MyEvent.fromJson(new String(record.value()));
        }
        
        @Override
        public boolean isEndOfStream(MyEvent nextElement) {
            return false;
        }
        
        @Override
        public TypeInformation<MyEvent> getProducedType() {
            return TypeInformation.of(MyEvent.class);
        }
    },
    props
);

Multiple Topics Consumer

Creates a consumer for multiple Kafka topics specified as a list.

/**
 * Creates a new Kafka streaming source consumer for multiple topics
 * @param topics The Kafka topics to read from
 * @param deserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
 * @param props The properties that are used to configure both the fetcher and the offset handler
 */
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);

/**
 * Creates a new Kafka streaming source consumer for multiple topics with key-value deserialization
 * @param topics The Kafka topics to read from
 * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
 * @param props The properties that are used to configure both the fetcher and the offset handler
 */
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);

Usage Examples:

import java.util.Arrays;
import java.util.List;

// Multiple topics with simple deserialization
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
FlinkKafkaConsumer010<String> multiTopicConsumer = new FlinkKafkaConsumer010<>(
    topics,
    new SimpleStringSchema(),
    props
);

// Multiple topics with JSON deserialization
FlinkKafkaConsumer010<JsonNode> jsonConsumer = new FlinkKafkaConsumer010<>(
    topics,
    new JsonDeserializationSchema(),
    props
);

Pattern-Based Topic Subscription

Creates a consumer that subscribes to topics matching a regular expression pattern, with automatic discovery of new matching topics.

/**
 * Creates a new Kafka streaming source consumer using pattern-based topic subscription
 * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to
 * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects
 * @param props The properties used to configure the Kafka consumer client
 */
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);

/**
 * Creates a new Kafka streaming source consumer using pattern-based topic subscription with key-value deserialization
 * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to
 * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects
 * @param props The properties used to configure the Kafka consumer client
 */
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);

Usage Examples:

import java.util.regex.Pattern;

// Subscribe to all topics starting with "logs-"
Pattern logTopicsPattern = Pattern.compile("logs-.*");
FlinkKafkaConsumer010<String> patternConsumer = new FlinkKafkaConsumer010<>(
    logTopicsPattern,
    new SimpleStringSchema(),
    props
);

// Subscribe to topics matching environment-specific pattern
Pattern envPattern = Pattern.compile("(prod|staging)-events-.*");
FlinkKafkaConsumer010<String> envConsumer = new FlinkKafkaConsumer010<>(
    envPattern,
    new SimpleStringSchema(),
    props
);

Rate Limiting

Configure rate limiting to throttle the number of bytes read from Kafka per second.

/**
 * Set rate limiter for throttling bytes read from Kafka
 * @param kafkaRateLimiter Rate limiter implementation to control consumption rate
 */
public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);

/**
 * Get the configured rate limiter
 * @return The currently configured rate limiter, or null if none is set
 */
public FlinkConnectorRateLimiter getRateLimiter();

Usage Examples:

import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;

// Create consumer
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
    "high-volume-topic",
    new SimpleStringSchema(),
    props
);

// Set rate limiter to 1MB per second
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter(1024 * 1024); // 1MB/s
consumer.setRateLimiter(rateLimiter);

// Check current rate limiter
FlinkConnectorRateLimiter currentLimiter = consumer.getRateLimiter();

Configuration Properties

Consumer-Specific Properties

// Configuration key for polling timeout
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

// Default polling timeout in milliseconds
public static final long DEFAULT_POLL_TIMEOUT = 100L;

Common Kafka Consumer Properties

Required properties:

  • bootstrap.servers: Comma-separated list of Kafka broker addresses
  • group.id: Consumer group identifier

Optional properties:

  • flink.poll-timeout: Time in milliseconds spent waiting in poll if data is not available (default: 100)
  • auto.offset.reset: What to do when there is no initial offset ("earliest", "latest", "none")
  • enable.auto.commit: Whether to automatically commit offsets (should be false for exactly-once)
  • max.poll.records: Maximum number of records returned in a single poll
  • fetch.min.bytes: Minimum amount of data the server should return for a fetch request
  • fetch.max.wait.ms: Maximum time the server will block before responding to fetch request

Dynamic Partition Discovery

Enable automatic discovery of new partitions by setting partition discovery interval:

// Enable partition discovery every 30 seconds
props.setProperty("flink.partition-discovery.interval-millis", "30000");

Exactly-Once Processing

The FlinkKafkaConsumer010 integrates with Flink's checkpointing mechanism to provide exactly-once processing guarantees:

  1. Checkpointing: Offsets are stored in Flink's checkpoints, not committed to Kafka
  2. Recovery: On restart, consumer resumes from the last successful checkpoint
  3. Commit Strategy: Offsets are committed to Kafka only for monitoring purposes
// Configure for exactly-once processing
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");

// In streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds

Error Handling

The consumer handles various error scenarios:

  • Broker failures: Automatic reconnection with backoff
  • Deserialization errors: Can be configured to skip or fail
  • Partition reassignment: Automatic handling of partition changes
  • Consumer group rebalancing: Graceful handling of consumer group membership changes

Watermarks and Event Time

The consumer supports event time processing with watermark extraction:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

FlinkKafkaConsumer010<MyEvent> consumer = new FlinkKafkaConsumer010<>(...);

// Assign watermarks and timestamps
consumer.assignTimestampsAndWatermarks(
    WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((event, timestamp) -> event.getEventTime())
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

docs

consumer.md

index.md

producer.md

table-api.md

tile.json