or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdserialization.mdstreaming-consumer.mdstreaming-producer.mdtable-api.md
tile.json

streaming-producer.mddocs/

Streaming Producer

The FlinkKafkaProducer011 provides transactional Kafka message production with exactly-once semantics, flexible partitioning, and comprehensive delivery guarantee options optimized for Kafka 0.11.x.

Capabilities

FlinkKafkaProducer011 Class

Main producer class for writing to Kafka 0.11.x topics with transaction support and exactly-once guarantees.

/**
 * Kafka producer for Kafka 0.11.x with support for transactional writes and exactly-once semantics
 * Extends TwoPhaseCommitSinkFunction for transaction coordination
 */
@PublicEvolving
class FlinkKafkaProducer011<IN> extends TwoPhaseCommitSinkFunction<IN, KafkaTransactionState, KafkaTransactionContext> {

    // Simple constructors for basic use cases
    FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
    FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
    
    // Keyed serialization constructors
    FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
    FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
    FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic);
    FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
    
    // Full constructor with all options
    FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize);
}

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

// Basic producer setup
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
    "output-topic",
    new SimpleStringSchema(),
    props
);

// Producer with exactly-once semantics
props.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(
    "critical-topic",
    new SimpleStringSchema(),
    props,
    Semantic.EXACTLY_ONCE
);

// Producer with custom partitioner
FlinkFixedPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(
    "partitioned-topic",
    new SimpleStringSchema(),
    props,
    Optional.of(partitioner)
);

Delivery Semantics

Configure delivery guarantees and transaction behavior.

/**
 * Delivery semantics for the Kafka producer
 */
enum Semantic {
    /** Transactional writes with exactly-once guarantees */
    EXACTLY_ONCE,
    /** At-least-once delivery semantics */
    AT_LEAST_ONCE,
    /** No delivery guarantees */ 
    NONE
}

Usage Examples:

// Exactly-once semantics (requires Kafka transactions)
Properties exactlyOnceProps = new Properties();
exactlyOnceProps.setProperty("bootstrap.servers", "localhost:9092");
exactlyOnceProps.setProperty("transaction.timeout.ms", "900000");
exactlyOnceProps.setProperty("enable.idempotence", "true");

FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(
    "reliable-topic",
    new SimpleStringSchema(),
    exactlyOnceProps,
    Semantic.EXACTLY_ONCE
);

// At-least-once semantics (faster, but may have duplicates)
FlinkKafkaProducer011<String> atLeastOnceProducer = new FlinkKafkaProducer011<>(
    "fast-topic", 
    new SimpleStringSchema(),
    props,
    Semantic.AT_LEAST_ONCE
);

// No guarantees (fastest, fire-and-forget)
FlinkKafkaProducer011<String> fireForgetProducer = new FlinkKafkaProducer011<>(
    "logs-topic",
    new SimpleStringSchema(), 
    props,
    Semantic.NONE
);

Producer Configuration

Essential configuration options and constants for optimal producer behavior.

/**
 * Configuration constants for producer tuning
 */
static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs
static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size
static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout
static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metrics

Usage Examples:

Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");

// Transaction configuration for exactly-once
producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("retries", "2147483647"); // Max retries
producerProps.setProperty("max.in.flight.requests.per.connection", "5");

// Performance tuning
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "5");
producerProps.setProperty("buffer.memory", "33554432");
producerProps.setProperty("compression.type", "snappy");

// Flink-specific configuration
producerProps.setProperty("flink.disable-metrics", "false");

// Create producer with custom pool size
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
    "my-topic",
    new KeyedSerializationSchema<String>() {
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
            return new ProducerRecord<>("my-topic", element.getBytes());
        }
    },
    producerProps,
    Optional.empty(),
    Semantic.EXACTLY_ONCE,
    10 // Custom producer pool size
);

Advanced Producer Methods

Additional configuration methods for specialized use cases.

/**
 * Configure timestamp writing to Kafka records
 * @param writeTimestampToKafka whether to write Flink timestamps to Kafka
 */
void setWriteTimestampToKafka(boolean writeTimestampToKafka);

/**
 * Configure error handling mode
 * @param logFailuresOnly if true, only log failures instead of failing the job
 */
void setLogFailuresOnly(boolean logFailuresOnly);

/**
 * Override transaction timeout handling for specific error scenarios
 * @return this producer instance for method chaining
 */
FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout();

Usage Examples:

FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
    "timestamped-topic",
    new SimpleStringSchema(),
    props,
    Semantic.EXACTLY_ONCE
);

// Configure timestamp writing
producer.setWriteTimestampToKafka(true);

// Configure error handling for non-critical data
FlinkKafkaProducer011<String> loggingProducer = new FlinkKafkaProducer011<>(
    "logs-topic",
    new SimpleStringSchema(),
    props,
    Semantic.AT_LEAST_ONCE
);
loggingProducer.setLogFailuresOnly(true);

// Handle transaction timeout gracefully
producer.ignoreFailuresAfterTransactionTimeout();

// Add to DataStream
dataStream.addSink(producer).name("Kafka Sink");

Custom Partitioning

Implement custom partitioning strategies for message distribution.

/**
 * Base class for custom Kafka partitioners
 */
@PublicEvolving
abstract class FlinkKafkaPartitioner<T> implements Serializable {
    /**
     * Initialize the partitioner
     * @param parallelInstanceId the parallel instance ID of this subtask
     * @param parallelInstances total number of parallel instances
     */
    void open(int parallelInstanceId, int parallelInstances);
    
    /**
     * Determine the partition for a record
     * @param record the record to partition
     * @param key the record key
     * @param value the record value  
     * @param targetTopic the target topic name
     * @param partitions available partition IDs
     * @return the partition ID to use
     */
    abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

/**
 * Fixed partitioner ensuring each Flink partition goes to one Kafka partition
 */
@PublicEvolving
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
    // Implementation ensures deterministic partition assignment
}

Usage Examples:

// Custom partitioner based on record content
public class CustomPartitioner<String> extends FlinkKafkaPartitioner<String> {
    @Override
    public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Hash-based partitioning on record content
        return Math.abs(record.hashCode()) % partitions.length;
    }
}

// Use custom partitioner
CustomPartitioner<String> customPartitioner = new CustomPartitioner<>();
FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(
    "custom-partitioned-topic",
    new SimpleStringSchema(),
    props,
    Optional.of(customPartitioner),
    Semantic.EXACTLY_ONCE,
    DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);

Error Handling

The producer integrates with the Flink Kafka exception hierarchy for comprehensive error management.

// Producer operations may throw FlinkKafka011Exception for transactional or configuration errors
// Common error scenarios:
// - PRODUCERS_POOL_EMPTY: No available producers in the pool
// - EXTERNAL_ERROR: Kafka broker or network issues

Configuration for resilient operation:

Properties resilientProps = new Properties();
resilientProps.setProperty("bootstrap.servers", "localhost:9092");
resilientProps.setProperty("retries", "2147483647");
resilientProps.setProperty("max.in.flight.requests.per.connection", "5");
resilientProps.setProperty("enable.idempotence", "true");
resilientProps.setProperty("acks", "all");
resilientProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
resilientProps.setProperty("delivery.timeout.ms", "600000");
resilientProps.setProperty("request.timeout.ms", "300000");