CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees

Pending
Overview
Eval results
Files

producer.mddocs/

Data Stream Producer

The FlinkKafkaProducer010 provides comprehensive functionality for producing data to Apache Kafka 0.10.x topics with exactly-once processing guarantees, custom partitioning strategies, and timestamp support.

Capabilities

Value-Only Serialization Producers

Creates producers that serialize only the record values, without keys.

/**
 * Creates a FlinkKafkaProducer for a given topic using broker list
 * @param brokerList Comma separated addresses of the brokers
 * @param topicId ID of the Kafka topic
 * @param serializationSchema User defined key-less serialization schema
 */
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);

/**
 * Creates a FlinkKafkaProducer for a given topic using properties
 * @param topicId ID of the Kafka topic
 * @param serializationSchema User defined key-less serialization schema
 * @param producerConfig Properties with the producer configuration
 */
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);

/**
 * Creates a FlinkKafkaProducer with custom partitioning
 * @param topicId The topic to write data to
 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument
 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be distributed to Kafka partitions in a round-robin fashion
 */
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

import java.util.Properties;

// Simple producer with broker list
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
    "localhost:9092",
    "output-topic",
    new SimpleStringSchema()
);

// Producer with properties configuration
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "3");

FlinkKafkaProducer010<String> configuredProducer = new FlinkKafkaProducer010<>(
    "output-topic",
    new SimpleStringSchema(),
    props
);

// Producer with custom partitioner
FlinkKafkaProducer010<MyEvent> eventProducer = new FlinkKafkaProducer010<>(
    "events-topic",
    new MyEventSerializationSchema(),
    props,
    new MyCustomPartitioner<>()
);

Key-Value Serialization Producers

Creates producers that serialize both keys and values, enabling key-based partitioning.

/**
 * Creates a FlinkKafkaProducer with key-value serialization using broker list
 * @param brokerList Comma separated addresses of the brokers
 * @param topicId ID of the Kafka topic
 * @param serializationSchema User defined serialization schema supporting key/value messages
 */
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);

/**
 * Creates a FlinkKafkaProducer with key-value serialization using properties
 * @param topicId ID of the Kafka topic
 * @param serializationSchema User defined serialization schema supporting key/value messages
 * @param producerConfig Properties with the producer configuration
 */
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);

/**
 * Creates a FlinkKafkaProducer with key-value serialization and custom partitioning
 * @param topicId The topic to write data to
 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument
 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be partitioned by the key of each record
 */
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);

Usage Examples:

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

// Custom keyed serialization schema
KeyedSerializationSchema<MyEvent> keyedSchema = new KeyedSerializationSchema<MyEvent>() {
    @Override
    public byte[] serializeKey(MyEvent element) {
        return element.getUserId().getBytes();
    }
    
    @Override
    public byte[] serializeValue(MyEvent element) {
        return element.toJson().getBytes();
    }
    
    @Override
    public String getTargetTopic(MyEvent element) {
        return null; // Use default topic
    }
};

// Producer with key-value serialization
FlinkKafkaProducer010<MyEvent> keyedProducer = new FlinkKafkaProducer010<>(
    "keyed-events-topic",
    keyedSchema,
    props
);

// Producer with custom partitioner for key-value data
FlinkKafkaProducer010<MyEvent> customKeyedProducer = new FlinkKafkaProducer010<>(
    "partitioned-events-topic",
    keyedSchema,
    props,
    new UserIdPartitioner<>()
);

Timestamp Configuration

Configure the producer to write Flink's event time timestamps to Kafka records.

/**
 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. 
 * Timestamps must be positive for Kafka to accept them.
 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka
 */
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);

Usage Examples:

FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(
    "timestamped-events",
    new MyEventSerializationSchema(),
    props
);

// Enable timestamp writing to Kafka
producer.setWriteTimestampToKafka(true);

// Use in streaming pipeline
DataStream<MyEvent> events = env.addSource(new MyEventSource());
events.addSink(producer);

Deprecated Factory Methods

Legacy factory methods for creating producers with timestamp support (deprecated in favor of constructor + setWriteTimestampToKafka).

/**
 * @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)
 */
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
    DataStream<T> inStream,
    String topicId,
    KeyedSerializationSchema<T> serializationSchema,
    Properties producerConfig);

/**
 * @deprecated Use FlinkKafkaProducer010(String, SerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)
 */
@Deprecated  
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
    DataStream<T> inStream,
    String topicId,
    SerializationSchema<T> serializationSchema,
    Properties producerConfig);

/**
 * @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) and call setWriteTimestampToKafka(boolean)
 */
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
    DataStream<T> inStream,
    String topicId,
    KeyedSerializationSchema<T> serializationSchema,
    Properties producerConfig,
    FlinkKafkaPartitioner<T> customPartitioner);

Configuration Wrapper (Deprecated)

/**
 * Configuration wrapper for deprecated timestamp-enabled producer factory methods
 * @deprecated Use constructor approach instead
 */
@Deprecated
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
    /**
     * Configure failure logging behavior
     * @param logFailuresOnly Flag indicating if failures should only be logged instead of causing job failure
     */
    public void setLogFailuresOnly(boolean logFailuresOnly);
    
    /**
     * Configure checkpoint flushing behavior
     * @param flush Flag indicating if producer should flush on checkpoint
     */
    public void setFlushOnCheckpoint(boolean flush);
    
    /**
     * Configure timestamp writing to Kafka
     * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka
     */
    public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
}

Configuration Properties

Producer-Specific Properties

Required properties:

  • bootstrap.servers: Comma-separated list of Kafka broker addresses

Recommended properties for exactly-once:

  • acks: Set to "all" for maximum durability
  • retries: Number of retries for failed sends (e.g., "3")
  • enable.idempotence: Set to "true" for exactly-once semantics
  • max.in.flight.requests.per.connection: Set to "1" for ordering guarantees

Performance tuning properties:

  • batch.size: Batch size for grouping records (default: 16384)
  • linger.ms: Time to wait before sending batch (default: 0)
  • buffer.memory: Total memory available for buffering (default: 33554432)
  • compression.type: Compression algorithm ("none", "gzip", "snappy", "lz4", "zstd")

Custom Partitioning

Implement custom partitioning logic by extending FlinkKafkaPartitioner:

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

public class UserIdPartitioner<T> extends FlinkKafkaPartitioner<MyEvent> {
    @Override
    public int partition(MyEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition based on user ID hash
        return Math.abs(record.getUserId().hashCode()) % partitions.length;
    }
    
    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        // Optional: Initialize partitioner
    }
}

// Use custom partitioner
FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(
    "user-events",
    new MyEventSerializationSchema(),
    props,
    new UserIdPartitioner<>()
);

Exactly-Once Processing

Configure the producer for exactly-once processing guarantees:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "3");
props.setProperty("enable.idempotence", "true");
props.setProperty("max.in.flight.requests.per.connection", "1");

FlinkKafkaProducer010<String> exactlyOnceProducer = new FlinkKafkaProducer010<>(
    "exactly-once-topic",
    new SimpleStringSchema(),
    props
);

// Enable checkpointing in streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

Error Handling

The producer handles various error scenarios:

  • Broker failures: Automatic reconnection with configurable retries
  • Serialization errors: Configurable failure or logging behavior
  • Network partitions: Buffering and retry mechanisms
  • Topic creation: Automatic topic creation if enabled in Kafka

Fault Tolerance

The producer integrates with Flink's fault tolerance mechanisms:

  1. Checkpointing: Producer state is included in Flink checkpoints
  2. Recovery: On restart, exactly-once guarantees are maintained
  3. Commit Protocol: Two-phase commit protocol for exactly-once end-to-end guarantees
  4. Flush on Checkpoint: Ensures all buffered records are committed before checkpoint completion
// Configure fault tolerance
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

docs

consumer.md

index.md

producer.md

table-api.md

tile.json