or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md
tile.json

data-production.mddocs/

Data Production

Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees for high-throughput data pipelines.

Capabilities

FlinkKafkaProducer09 Class

Main Kafka producer class for writing data from Flink data streams to Kafka 0.9.x topics.

/**
 * Kafka producer for writing data to Apache Kafka 0.9.x topics.
 * Compatible with Kafka 0.9 without reliability guarantees.
 * 
 * @param <IN> The type of records to write to Kafka
 */
@PublicEvolving
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
    
    /**
     * Creates a producer with broker list and key-less serialization using fixed partitioner.
     * 
     * @param brokerList Comma separated addresses of Kafka brokers
     * @param topicId ID of the Kafka topic to write to
     * @param serializationSchema User defined key-less serialization schema
     */
    public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
    
    /**
     * Creates a producer with properties and key-less serialization using fixed partitioner.
     * 
     * @param topicId ID of the Kafka topic to write to
     * @param serializationSchema User defined key-less serialization schema
     * @param producerConfig Properties with the producer configuration
     */
    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    
    /**
     * Creates a producer with key-less serialization and custom partitioner.
     * 
     * @param topicId The topic to write data to
     * @param serializationSchema A key-less serializable serialization schema
     * @param producerConfig Configuration properties for the KafkaProducer
     * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for round-robin
     */
    public FlinkKafkaProducer09(
            String topicId,
            SerializationSchema<IN> serializationSchema,
            Properties producerConfig,
            FlinkKafkaPartitioner<IN> customPartitioner);
    
    /**
     * Creates a producer with broker list and key/value serialization using fixed partitioner.
     * 
     * @param brokerList Comma separated addresses of Kafka brokers
     * @param topicId ID of the Kafka topic to write to
     * @param serializationSchema User defined serialization schema supporting key/value messages
     */
    public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
    
    /**
     * Creates a producer with properties and key/value serialization using fixed partitioner.
     * 
     * @param topicId ID of the Kafka topic to write to
     * @param serializationSchema User defined serialization schema supporting key/value messages
     * @param producerConfig Properties with the producer configuration
     */
    public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
    
    /**
     * Creates a producer with key/value serialization and custom partitioner.
     * 
     * @param topicId The topic to write data to
     * @param serializationSchema A serializable serialization schema for key/value messages
     * @param producerConfig Configuration properties for the KafkaProducer
     * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for key-based partitioning
     */
    public FlinkKafkaProducer09(
            String topicId,
            KeyedSerializationSchema<IN> serializationSchema,
            Properties producerConfig,
            FlinkKafkaPartitioner<IN> customPartitioner);
    
    /**
     * Sets whether the producer should only log failures instead of throwing exceptions.
     * 
     * @param logFailuresOnly True to only log failures instead of throwing exceptions
     */
    public void setLogFailuresOnly(boolean logFailuresOnly);
    
    /**
     * Sets whether the producer should flush pending records on checkpoint.
     * 
     * @param flush True to flush on checkpoint, false otherwise
     */
    public void setFlushOnCheckpoint(boolean flush);
}

Deprecated Constructors

/**
 * @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
 * Use FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner) instead.
 */
@Deprecated
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);

/**
 * @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
 * Use FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) instead.
 */
@Deprecated
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);

Usage Examples

Basic String Production

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
    "output-topic",
    new SimpleStringSchema(),
    properties
);

env.fromElements("Hello", "World", "Kafka")
   .addSink(producer);

env.execute("Basic Kafka Producer");

Custom Object Serialization

import org.apache.flink.api.common.serialization.SerializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

// Custom POJO
public class User {
    public String name;
    public int age;
    public String email;
    
    // constructors, getters, setters...
}

// JSON serialization schema
SerializationSchema<User> jsonSchema = new SerializationSchema<User>() {
    private final ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(User user) {
        try {
            return mapper.writeValueAsBytes(user);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize user", e);
        }
    }
};

FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
    "user-events",
    jsonSchema,
    properties
);

env.fromElements(
    new User("Alice", 25, "alice@example.com"),
    new User("Bob", 30, "bob@example.com")
).addSink(producer);

Key/Value Production

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

// Custom keyed serialization for key/value messages
KeyedSerializationSchema<User> keyedSchema = new KeyedSerializationSchema<User>() {
    private final ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public byte[] serializeKey(User user) {
        return user.email.getBytes(); // Use email as key
    }
    
    @Override
    public byte[] serializeValue(User user) {
        try {
            return mapper.writeValueAsBytes(user);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize user", e);
        }
    }
    
    @Override
    public String getTargetTopic(User user) {
        return null; // Use default topic
    }
};

FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
    "keyed-users",
    keyedSchema,
    properties
);

Custom Partitioning

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

// Custom partitioner based on user age
FlinkKafkaPartitioner<User> agePartitioner = new FlinkKafkaPartitioner<User>() {
    @Override
    public int partition(User record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition by age groups: 0-29, 30-49, 50+
        int ageGroup = record.age < 30 ? 0 : (record.age < 50 ? 1 : 2);
        return partitions[ageGroup % partitions.length];
    }
};

FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
    "age-partitioned-users",
    jsonSchema,
    properties,
    agePartitioner
);

Advanced Producer Configuration

Properties properties = new Properties();
// Required settings
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");

// Performance tuning
properties.setProperty("batch.size", "16384");
properties.setProperty("linger.ms", "5");
properties.setProperty("compression.type", "snappy");

// Reliability settings (limited in Kafka 0.9)
properties.setProperty("acks", "1"); // 0=no ack, 1=leader ack, all=all replicas
properties.setProperty("retries", "3");
properties.setProperty("max.in.flight.requests.per.connection", "5");

// Buffer management
properties.setProperty("buffer.memory", "33554432");
properties.setProperty("max.block.ms", "60000");

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
    "configured-topic",
    new SimpleStringSchema(),
    properties
);

Dynamic Topic Selection

// Schema that can route to different topics based on record content
KeyedSerializationSchema<String> dynamicSchema = new KeyedSerializationSchema<String>() {
    @Override
    public byte[] serializeKey(String record) {
        return null; // No key
    }
    
    @Override
    public byte[] serializeValue(String record) {
        return record.getBytes();
    }
    
    @Override
    public String getTargetTopic(String record) {
        // Route based on record content
        if (record.startsWith("ERROR")) {
            return "error-logs";
        } else if (record.startsWith("WARN")) {
            return "warning-logs";
        } else {
            return "info-logs";
        }
    }
};

FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
    "default-topic", // Fallback topic
    dynamicSchema,
    properties
);

Producer with Data Stream Transformations

import org.apache.flink.streaming.api.datastream.DataStream;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create input stream
DataStream<String> inputStream = env.socketTextStream("localhost", 9999);

// Transform and produce to Kafka
inputStream
    .filter(line -> !line.isEmpty())
    .map(line -> line.toUpperCase())
    .map(line -> "Processed at " + System.currentTimeMillis() + ": " + line)
    .addSink(new FlinkKafkaProducer09<>(
        "processed-data",
        new SimpleStringSchema(),
        properties
    ));

env.execute("Stream Processing to Kafka");

Partitioning Strategies

Default Fixed Partitioner

  • Maps each sink subtask to a single Kafka partition
  • All records from one subtask go to the same partition
  • Provides good parallelism but may create hotspots

Round-Robin Partitioning

  • Used when no custom partitioner is provided and no keys are set
  • Distributes records evenly across partitions
  • Good for load balancing without keys

Key-Based Partitioning

  • Used with KeyedSerializationSchema when keys are provided
  • Records with the same key go to the same partition
  • Maintains ordering for records with the same key

Custom Partitioning

  • Implement FlinkKafkaPartitioner interface
  • Full control over partition assignment logic
  • Can consider record content, metadata, or external factors

Error Handling and Limitations

Kafka 0.9 Limitations:

  • No built-in exactly-once delivery guarantees
  • Limited idempotent producer support
  • No transactional API

Common Issues:

  • SerializationException: Issues with custom serialization schemas
  • TimeoutException: Network connectivity or broker availability
  • RecordTooLargeException: Message exceeds broker limits
  • InvalidTopicException: Topic doesn't exist or invalid name

Best Practices:

  • Configure appropriate timeouts and retries
  • Monitor producer metrics and logs
  • Use compression for better throughput
  • Consider batch size and linger time for latency vs throughput