or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md
tile.json

kafka-producer.mddocs/

Kafka Producer

The Flink Kafka producer enables reliable message production to Kafka 0.8.x topics with configurable partitioning and serialization strategies.

Capabilities

FlinkKafkaProducer08

The primary producer class for Kafka 0.8.x integration with extensive configuration options.

/**
 * Kafka producer for Apache Kafka 0.8.x (provides best-effort delivery guarantees)
 */
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
    
    /**
     * Creates producer with broker list, topic, and value-only serialization
     * @param brokerList Comma-separated list of Kafka brokers
     * @param topicId Target Kafka topic name
     * @param serializationSchema Schema for serializing values
     */
    public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
    
    /**
     * Creates producer with properties and value-only serialization
     * @param topicId Target Kafka topic name
     * @param serializationSchema Schema for serializing values
     * @param producerConfig Kafka producer properties
     */
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    
    /**
     * Creates producer with properties, value serialization, and custom partitioner
     * @param topicId Target Kafka topic name
     * @param serializationSchema Schema for serializing values
     * @param producerConfig Kafka producer properties
     * @param customPartitioner Custom partitioner for message distribution
     */
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
    
    /**
     * Creates producer with broker list, topic, and key-value serialization
     * @param brokerList Comma-separated list of Kafka brokers
     * @param topicId Target Kafka topic name
     * @param serializationSchema Schema for serializing keys and values
     */
    public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
    
    /**
     * Creates producer with properties and key-value serialization
     * @param topicId Target Kafka topic name
     * @param serializationSchema Schema for serializing keys and values
     * @param producerConfig Kafka producer properties
     */
    public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
    
    /**
     * Creates producer with properties, key-value serialization, and custom partitioner
     * @param topicId Target Kafka topic name
     * @param serializationSchema Schema for serializing keys and values
     * @param producerConfig Kafka producer properties
     * @param customPartitioner Custom partitioner for message distribution
     */
    public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
    
    /**
     * @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
     */
    @Deprecated
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
    
    /**
     * @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
     */
    @Deprecated
    public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
}

Usage Examples:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

// Basic producer with broker list
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "localhost:9092",
    "output-topic",
    new SimpleStringSchema()
);

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);

// Producer with properties configuration
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "10");
producerProps.setProperty("compression.type", "snappy");

FlinkKafkaProducer08<String> configuredProducer = new FlinkKafkaProducer08<>(
    "events-topic",
    new SimpleStringSchema(),
    producerProps
);

stream.addSink(configuredProducer);

// Producer with custom partitioner
FlinkKafkaProducer08<String> partitionedProducer = new FlinkKafkaProducer08<>(
    "partitioned-topic",
    new SimpleStringSchema(),
    producerProps,
    new CustomPartitioner<>() // implement FlinkKafkaPartitioner
);

// Key-value producer
KeyedSerializationSchema<Tuple2<String, String>> keyValueSchema = 
    new KeyedSerializationSchemaWrapper<>(
        new SimpleStringSchema(), // key serializer
        new SimpleStringSchema()  // value serializer
    );

FlinkKafkaProducer08<Tuple2<String, String>> kvProducer = new FlinkKafkaProducer08<>(
    "key-value-topic",
    keyValueSchema,
    producerProps
);

DataStream<Tuple2<String, String>> kvStream = env.fromElements(
    Tuple2.of("key1", "value1"),
    Tuple2.of("key2", "value2")
);
kvStream.addSink(kvProducer);

FlinkKafkaProducer (Deprecated)

Deprecated alias that redirects to FlinkKafkaProducer08.

/**
 * @deprecated Use FlinkKafkaProducer08 instead
 */
@Deprecated
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
    /**
     * @deprecated Use FlinkKafkaProducer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
    
    /**
     * @deprecated Use FlinkKafkaProducer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    
    /**
     * @deprecated Use FlinkKafkaProducer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
    
    /**
     * @deprecated Use FlinkKafkaProducer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
    
    /**
     * @deprecated Use FlinkKafkaProducer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
    
    /**
     * @deprecated Use FlinkKafkaProducer08 constructor instead
     */
    @Deprecated
    public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
}

Configuration

Required Properties

  • bootstrap.servers: Comma-separated list of Kafka broker addresses

Recommended Properties

  • batch.size: Number of bytes to batch before sending (default: 16384)
  • linger.ms: Time to wait for additional messages in batch (default: 0)
  • compression.type: Compression algorithm (none, gzip, snappy, lz4)
  • acks: Acknowledgment mode (0, 1, all)
  • retries: Number of retry attempts on failure
  • retry.backoff.ms: Backoff time between retries

Advanced Properties

  • buffer.memory: Total memory for producer buffering
  • max.block.ms: Maximum time to block on send
  • request.timeout.ms: Request timeout duration
  • delivery.timeout.ms: Total time for message delivery

Serialization Schemas

The producer supports various serialization schemas:

// Simple string serialization
SerializationSchema<String> stringSchema = new SimpleStringSchema();

// JSON serialization
SerializationSchema<MyObject> jsonSchema = new JSONSerializationSchema<>();

// Avro serialization (with schema registry)
SerializationSchema<MyAvroRecord> avroSchema = new AvroSerializationSchema<>(MyAvroRecord.class);

// Custom serialization
SerializationSchema<MyCustomType> customSchema = new SerializationSchema<MyCustomType>() {
    @Override
    public byte[] serialize(MyCustomType element) {
        // Custom serialization logic
        return element.toBytes();
    }
};

Custom Partitioning

Implement custom partitioning logic:

public class MyCustomPartitioner implements FlinkKafkaPartitioner<String> {
    
    @Override
    public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Custom partitioning logic
        if (record.startsWith("urgent")) {
            return 0; // Route urgent messages to partition 0
        }
        return record.hashCode() % partitions.length;
    }
}

// Use custom partitioner
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic",
    new SimpleStringSchema(),
    props,
    new MyCustomPartitioner()
);

Error Handling

The producer handles errors according to Kafka 0.8.x limitations:

  • Best-effort delivery: No exactly-once guarantees with Kafka 0.8.x
  • Retry behavior: Configurable through Kafka producer properties
  • Failure modes: Messages may be lost on producer failures
// Configure retry behavior
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("retries", "3");
props.setProperty("retry.backoff.ms", "1000");
props.setProperty("acks", "1"); // Wait for leader acknowledgment

// Error handling in application
try {
    stream.addSink(producer);
    env.execute("Kafka Producer Job");
} catch (Exception e) {
    logger.error("Kafka producer failed", e);
    // Implement fallback or recovery logic
}

Performance Considerations

  • Batching: Use appropriate batch.size and linger.ms for throughput
  • Compression: Enable compression for network efficiency
  • Partitioning: Distribute load evenly across partitions
  • Parallelism: Match producer parallelism to topic partition count
  • Memory: Configure buffer.memory based on throughput requirements