or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md
tile.json

kafka-producer.mddocs/

Kafka Producer API

Comprehensive API reference for Kafka 0.8 producer classes in the Apache Flink Kafka Connector.

FlinkKafkaProducer08<IN>

Package: org.apache.flink.streaming.connectors.kafka
Annotations: @PublicEvolving
Extends: FlinkKafkaProducerBase<IN>
Description: Flink sink to produce data into a Kafka topic. Compatible with Kafka 0.8.x. Important Note: This producer does not have reliability guarantees and may lose messages on failures.

Class Declaration

@PublicEvolving
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>

Constants

private static final long serialVersionUID = 1L;

Constructors

Key-less Serialization Constructors

/**
 * Creates a FlinkKafkaProducer08 using a broker list and SerializationSchema.
 * 
 * @param brokerList Comma separated list of Kafka brokers (e.g., "localhost:9092,broker2:9092")
 * @param topicId Target Kafka topic name
 * @param serializationSchema Schema to serialize objects to byte arrays
 */
public FlinkKafkaProducer08(String brokerList, 
                           String topicId, 
                           SerializationSchema<IN> serializationSchema)

/**
 * Creates a FlinkKafkaProducer08 using Properties and SerializationSchema.
 *
 * @param topicId Target Kafka topic name
 * @param serializationSchema Schema to serialize objects to byte arrays
 * @param producerConfig Properties containing Kafka producer configuration
 */
public FlinkKafkaProducer08(String topicId, 
                           SerializationSchema<IN> serializationSchema, 
                           Properties producerConfig)

/**
 * Creates a FlinkKafkaProducer08 with custom partitioner and SerializationSchema.
 *
 * @param topicId Target Kafka topic name
 * @param serializationSchema Schema to serialize objects to byte arrays
 * @param producerConfig Properties containing Kafka producer configuration
 * @param customPartitioner Custom partitioner for determining target partition (can be null)
 */
public FlinkKafkaProducer08(String topicId, 
                           SerializationSchema<IN> serializationSchema, 
                           Properties producerConfig, 
                           @Nullable FlinkKafkaPartitioner<IN> customPartitioner)

Key/Value Serialization Constructors

/**
 * Creates a FlinkKafkaProducer08 using broker list and KeyedSerializationSchema.
 *
 * @param brokerList Comma separated list of Kafka brokers
 * @param topicId Target Kafka topic name  
 * @param serializationSchema Schema to serialize objects to key/value byte arrays
 */
public FlinkKafkaProducer08(String brokerList, 
                           String topicId, 
                           KeyedSerializationSchema<IN> serializationSchema)

/**
 * Creates a FlinkKafkaProducer08 using Properties and KeyedSerializationSchema.
 *
 * @param topicId Target Kafka topic name
 * @param serializationSchema Schema to serialize objects to key/value byte arrays
 * @param producerConfig Properties containing Kafka producer configuration
 */
public FlinkKafkaProducer08(String topicId, 
                           KeyedSerializationSchema<IN> serializationSchema, 
                           Properties producerConfig)

/**
 * Creates a FlinkKafkaProducer08 with custom partitioner and KeyedSerializationSchema.
 *
 * @param topicId Target Kafka topic name
 * @param serializationSchema Schema to serialize objects to key/value byte arrays
 * @param producerConfig Properties containing Kafka producer configuration
 * @param customPartitioner Custom partitioner for determining target partition (can be null)
 */
public FlinkKafkaProducer08(String topicId, 
                           KeyedSerializationSchema<IN> serializationSchema, 
                           Properties producerConfig, 
                           @Nullable FlinkKafkaPartitioner<IN> customPartitioner)

Deprecated Constructors

/**
 * @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner
 */
@Deprecated
public FlinkKafkaProducer08(String topicId, 
                           SerializationSchema<IN> serializationSchema, 
                           Properties producerConfig, 
                           KafkaPartitioner<IN> customPartitioner)

/**
 * @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner
 */
@Deprecated
public FlinkKafkaProducer08(String topicId, 
                           KeyedSerializationSchema<IN> serializationSchema, 
                           Properties producerConfig, 
                           KafkaPartitioner<IN> customPartitioner)

Configuration Methods (Inherited from FlinkKafkaProducerBase)

/**
 * Configures whether the producer should only log failures instead of throwing exceptions.
 * Default is false (failures cause exceptions).
 *
 * @param logFailuresOnly If true, only log failures; if false, throw exceptions on failures
 */
public void setLogFailuresOnly(boolean logFailuresOnly)

/**
 * Configures whether the producer should flush data on checkpoint operations.
 * This can improve reliability at the cost of performance.
 *
 * @param flush If true, flush on checkpoints; if false, don't flush
 */
public void setFlushOnCheckpoint(boolean flush)

Lifecycle Methods (Inherited from FlinkKafkaProducerBase)

/**
 * Opens the producer and initializes resources.
 *
 * @param configuration The task configuration
 */
public void open(Configuration configuration)

/**
 * Sends a record to Kafka.
 *
 * @param next The record to send
 * @param context The sink context (can be used for side outputs)
 * @throws Exception If sending fails
 */
public void invoke(IN next, Context context) throws Exception

/**
 * Closes the producer and cleans up resources.
 *
 * @throws Exception If cleanup fails
 */
public void close() throws Exception

Checkpointing Interface Methods (Inherited)

/**
 * Initializes the state of the function from a checkpoint.
 *
 * @param context The initialization context
 * @throws Exception If state initialization fails
 */
public void initializeState(FunctionInitializationContext context) throws Exception

/**
 * Snapshots the function's state during checkpointing.
 *
 * @param ctx The snapshot context  
 * @throws Exception If state snapshotting fails
 */
public void snapshotState(FunctionSnapshotContext ctx) throws Exception

Static Utility Methods (Inherited)

/**
 * Converts a comma-separated broker list to Properties.
 *
 * @param brokerList Comma-separated list of brokers (e.g., "broker1:9092,broker2:9092")
 * @return Properties object with "metadata.broker.list" set
 */
public static Properties getPropertiesFromBrokerList(String brokerList)

Protected Methods

/**
 * Kafka 0.8 specific flush implementation (no-op since Kafka 0.8 doesn't support flushing).
 */
protected void flush()

FlinkKafkaPartitioner<T> Interface

Package: org.apache.flink.streaming.connectors.kafka.partitioner
Description: Interface for custom partitioning strategies in Flink Kafka producers.

public abstract class FlinkKafkaPartitioner<T> implements Serializable {
    
    /**
     * Returns the partition ID for a given record.
     *
     * @param record The record to partition
     * @param key The serialized key (can be null)
     * @param value The serialized value  
     * @param targetTopic The target topic name
     * @param partitions Array of available partition IDs
     * @return The partition ID (must be within partitions array bounds)
     */
    public abstract int partition(T record, byte[] key, byte[] value, 
                                 String targetTopic, int[] partitions);
    
    /**
     * Called when the partitioner is opened. Override for initialization logic.
     *
     * @param parallelInstanceId The parallel instance ID of this subtask
     * @param parallelInstances The total number of parallel instances
     */
    public void open(int parallelInstanceId, int parallelInstances) {
        // Default implementation is empty
    }
}

Custom Partitioner Examples

// Hash-based partitioning by customer ID
public class CustomerHashPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {
    
    @Override
    public int partition(CustomerEvent record, byte[] key, byte[] value, 
                        String targetTopic, int[] partitions) {
        int customerId = record.getCustomerId();
        return Math.abs(customerId % partitions.length);
    }
}

// Round-robin partitioning
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private int counter = 0;
    
    @Override  
    public int partition(T record, byte[] key, byte[] value,
                        String targetTopic, int[] partitions) {
        return partitions[(counter++) % partitions.length];
    }
}

// Time-based partitioning
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
    
    @Override
    public int partition(TimestampedEvent record, byte[] key, byte[] value,
                        String targetTopic, int[] partitions) {
        // Partition based on hour of day
        long timestamp = record.getTimestamp();
        int hour = (int) ((timestamp / 3600000) % 24);
        return partitions[hour % partitions.length];
    }
}

Usage Examples

Basic Producer Setup

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("metadata.broker.list", "localhost:9092");

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);

env.execute("Kafka Producer Job");

Producer with Custom Partitioner

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

// Custom partitioner implementation
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
    @Override
    public int partition(MyEvent record, byte[] key, byte[] value, 
                        String targetTopic, int[] partitions) {
        // Partition based on event type
        return Math.abs(record.getEventType().hashCode() % partitions.length);
    }
};

FlinkKafkaProducer08<MyEvent> producer = new FlinkKafkaProducer08<>(
    "events-topic",
    new MyEventSerializer(),
    properties,
    partitioner
);

// Configure for reliability
producer.setLogFailuresOnly(false);  // Fail on errors
producer.setFlushOnCheckpoint(true); // Flush on checkpoints

DataStream<MyEvent> events = // ... your event stream
events.addSink(producer);

Producer with Key/Value Serialization

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

KeyedSerializationSchema<CustomerOrder> schema = new KeyedSerializationSchema<CustomerOrder>() {
    @Override
    public byte[] serializeKey(CustomerOrder element) {
        // Use customer ID as key for partitioning
        return String.valueOf(element.getCustomerId()).getBytes();
    }
    
    @Override
    public byte[] serializeValue(CustomerOrder element) {
        // Serialize order details as JSON
        return element.toJson().getBytes();
    }
    
    @Override
    public String getTargetTopic(CustomerOrder element) {
        // Dynamic topic selection based on order type
        return "orders-" + element.getOrderType().toLowerCase();
    }
};

FlinkKafkaProducer08<CustomerOrder> producer = new FlinkKafkaProducer08<>(
    null, // topic will be determined by schema.getTargetTopic()
    schema,
    properties
);

DataStream<CustomerOrder> orders = // ... your order stream
orders.addSink(producer);

Producer Configuration Examples

Properties producerProps = new Properties();

// Required: Broker connection
producerProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");

// Performance tuning
producerProps.setProperty("batch.num.messages", "200");
producerProps.setProperty("queue.buffering.max.ms", "1000");
producerProps.setProperty("queue.buffering.max.messages", "10000");

// Compression (optional)
producerProps.setProperty("compression.codec", "snappy"); // or "gzip", "lz4"

// Network settings
producerProps.setProperty("send.buffer.bytes", "102400");
producerProps.setProperty("client.id", "flink-kafka-producer");

// Reliability settings (Kafka 0.8 limitations apply)
producerProps.setProperty("request.required.acks", "1"); // 0, 1, or -1
producerProps.setProperty("request.timeout.ms", "10000");
producerProps.setProperty("message.send.max.retries", "3");
producerProps.setProperty("retry.backoff.ms", "100");

FlinkKafkaProducer08<MyData> producer = new FlinkKafkaProducer08<>(
    "my-topic", new MyDataSerializer(), producerProps);

Error Handling and Reliability

// Configure producer for different reliability levels
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic", new SimpleStringSchema(), properties);

// Option 1: Fail fast on errors (default)
producer.setLogFailuresOnly(false);

// Option 2: Log errors but continue processing
producer.setLogFailuresOnly(true);

// Enable flushing on checkpoints for better reliability
producer.setFlushOnCheckpoint(true);

// Note: Kafka 0.8 producer limitations
// - No transactional guarantees
// - No exactly-once semantics  
// - Messages may be lost on producer failures
// - No built-in retry mechanism for failed sends

Custom Serialization Examples

// Simple SerializationSchema example
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(T element) {
        try {
            return objectMapper.writeValueAsBytes(element);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize to JSON", e);
        }
    }
}

// KeyedSerializationSchema with dynamic routing
public class RoutingKeyedSchema<T> implements KeyedSerializationSchema<T> {
    
    @Override
    public byte[] serializeKey(T element) {
        if (element instanceof Keyed) {
            return ((Keyed) element).getKey().getBytes();
        }
        return null; // No key
    }
    
    @Override
    public byte[] serializeValue(T element) {
        return element.toString().getBytes();
    }
    
    @Override
    public String getTargetTopic(T element) {
        if (element instanceof Routable) {
            return ((Routable) element).getTargetTopic();
        }
        return null; // Use default topic
    }
}

Deprecated FlinkKafkaProducer<IN>

Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Deprecated
Extends: FlinkKafkaProducer08<IN>

@Deprecated
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>

Deprecated Constructors

/**
 * @deprecated Use FlinkKafkaProducer08 instead
 */
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, 
                         SerializationSchema<IN> serializationSchema)

/**
 * @deprecated Use FlinkKafkaProducer08 instead  
 */
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, 
                         Properties producerConfig)

/**
 * @deprecated Use FlinkKafkaProducer08 instead
 */
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, 
                         Properties producerConfig, KafkaPartitioner customPartitioner)

/**
 * @deprecated Use FlinkKafkaProducer08 instead
 */
@Deprecated  
public FlinkKafkaProducer(String brokerList, String topicId, 
                         KeyedSerializationSchema<IN> serializationSchema)

/**
 * @deprecated Use FlinkKafkaProducer08 instead
 */
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, 
                         Properties producerConfig)

/**
 * @deprecated Use FlinkKafkaProducer08 instead
 */
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, 
                         Properties producerConfig, KafkaPartitioner customPartitioner)

FlinkKafkaPartitioner<T> Interface

Package: org.apache.flink.streaming.connectors.kafka.partitioner Annotations: @PublicEvolving Description: Abstract base class for custom partitioning logic determining which Kafka partition records should be written to.

Interface Definition

@PublicEvolving
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
    
    /**
     * Initializer for the partitioner. Called once on each parallel sink instance.
     * 
     * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
     * @param parallelInstances the total number of parallel instances
     */
    public void open(int parallelInstanceId, int parallelInstances);
    
    /**
     * Determine the id of the partition that the record should be written to.
     * 
     * @param record the record value
     * @param key serialized key of the record
     * @param value serialized value of the record  
     * @param targetTopic target topic for the record
     * @param partitions found partitions for the target topic
     * @return the id of the target partition
     */
    public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

Custom Partitioner Example

public class CustomerIdPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {
    
    @Override
    public int partition(CustomerEvent record, byte[] key, byte[] value, 
                        String targetTopic, int[] partitions) {
        // Partition based on customer ID to ensure events for same customer go to same partition
        return Math.abs(record.getCustomerId().hashCode() % partitions.length);
    }
}

// Usage with producer
FlinkKafkaProducer08<CustomerEvent> producer = new FlinkKafkaProducer08<>(
    "customer-events",
    new CustomerEventSchema(),
    properties,
    new CustomerIdPartitioner()
);

Kafka 0.8 Producer Limitations

Important Reliability Considerations

  1. No Exactly-Once Guarantees: Kafka 0.8 producers cannot provide exactly-once delivery semantics
  2. No Transactional Support: No atomic writes across multiple partitions/topics
  3. No Built-in Flushing: The flush() method is a no-op in Kafka 0.8
  4. Limited Error Recovery: Failed messages may be lost without explicit retry logic
  5. No Idempotent Writes: Duplicate messages may occur during retries

Recommended Practices

// Enable checkpointing for better reliability
env.enableCheckpointing(60000); // Checkpoint every minute

// Use flush on checkpoint
producer.setFlushOnCheckpoint(true);

// Configure appropriate retry settings in Kafka properties
Properties props = new Properties();
props.setProperty("message.send.max.retries", "3");
props.setProperty("retry.backoff.ms", "100");
props.setProperty("request.required.acks", "1"); // Wait for leader acknowledgment

// Consider using exactly-once sink if available in newer Flink versions
// For Kafka 0.8, implement custom retry logic if needed

Migration Recommendations

For production systems requiring strong delivery guarantees:

  1. Upgrade to Kafka 0.9+: Use flink-connector-kafka-0.9 or newer for exactly-once semantics
  2. Implement Custom Reliability: Add application-level retry and deduplication logic
  3. Monitor Producer Metrics: Track failed sends and implement alerting
  4. Use Synchronous Sending: If throughput allows, consider synchronous sends for critical data