or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md
tile.json

producer-base.mddocs/

Producer Base Classes

Abstract base implementations for Kafka producers that provide exactly-once delivery semantics, serialization handling, and partitioning logic. These classes handle the complexities of reliable message production while delegating version-specific operations to concrete implementations.

Capabilities

FlinkKafkaProducerBase

The core abstract base class that all Flink Kafka producers extend. Provides comprehensive functionality for producing to Kafka topics with exactly-once processing guarantees and transaction support.

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> 
    implements CheckpointedFunction {
    
    public FlinkKafkaProducerBase(
        String defaultTopicId,
        KeyedSerializationSchema<IN> serializationSchema,
        Properties producerConfig,
        FlinkKafkaPartitioner<IN> customPartitioner
    );
}

Parameters:

  • defaultTopicId - Default target topic for messages (can be overridden by serialization schema)
  • serializationSchema - Schema for serializing elements to Kafka key-value messages
  • producerConfig - Kafka producer configuration properties
  • customPartitioner - Custom partitioner for determining target partitions (optional, can be null)

Usage Example:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("transaction.timeout.ms", "900000");

FlinkKafkaProducerBase<MyEvent> producer = new MyKafkaProducer(
    "events-topic",
    new MyEventSerializationSchema(),
    props,
    new FlinkFixedPartitioner<>()
);

Error Handling Configuration

Configure how the producer handles failures during message production.

public void setLogFailuresOnly(boolean logFailuresOnly);

Parameters:

  • logFailuresOnly - If true, failures are only logged (not thrown). If false, failures cause job failure.

Usage Example:

// Log failures but continue processing (not recommended for production)
producer.setLogFailuresOnly(true);

// Fail job on any production failure (recommended for exactly-once)
producer.setLogFailuresOnly(false);

Checkpoint Flush Behavior

Control whether messages are flushed synchronously during checkpoints for exactly-once guarantees.

public void setFlushOnCheckpoint(boolean flush);

Parameters:

  • flush - If true, all pending messages are flushed during checkpoints (required for exactly-once)

Usage Example:

// Enable checkpoint flushing for exactly-once guarantees
producer.setFlushOnCheckpoint(true);

Utility Methods

Static utility methods for common configuration tasks.

public static Properties getPropertiesFromBrokerList(String brokerList);

Parameters:

  • brokerList - Comma-separated list of Kafka brokers (host:port format)

Returns: Properties object with bootstrap.servers configured

Usage Example:

Properties props = FlinkKafkaProducerBase.getPropertiesFromBrokerList("broker1:9092,broker2:9092");
// Additional properties can be set on the returned Properties object
props.setProperty("transaction.timeout.ms", "900000");

Message Production

Core method for sending messages to Kafka (called by Flink runtime).

public void invoke(IN next, Context context) throws Exception;

Parameters:

  • next - The record to be sent to Kafka
  • context - Sink context providing additional information

This method is called by the Flink runtime for each record and should not be called directly by user code.

State Management

Handle checkpointing and transaction coordination (implemented by the framework).

public void initializeState(FunctionInitializationContext context) throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;

These methods handle the exactly-once semantics by coordinating with Kafka transactions and Flink checkpoints.

Resource Management

public void open(Configuration configuration);
public void close() throws Exception;

These methods handle producer lifecycle management including Kafka client initialization and cleanup.

Abstract Methods

Concrete implementations must implement this version-specific method:

protected abstract void flush();

This method must flush all pending records to ensure they are sent to Kafka. Called during checkpoints to guarantee exactly-once processing when setFlushOnCheckpoint(true) is configured.

Protected API for Subclasses

Methods and fields available to concrete implementations:

protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props);
protected void checkErroneous() throws Exception;
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer);
protected long numPendingRecords();

Protected Fields:

protected final Properties producerConfig;
protected final String defaultTopicId; 
protected final KeyedSerializationSchema<IN> schema;
protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
protected boolean logFailuresOnly;
protected boolean flushOnCheckpoint;

Methods:

  • getKafkaProducer() - Factory method for creating Kafka producer instances
  • checkErroneous() - Check for and throw any pending async exceptions
  • getPartitionsByTopic() - Utility to discover partitions for a topic
  • numPendingRecords() - Get count of unacknowledged records (useful for monitoring)

Constants:

public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

Configuration Best Practices

Exactly-Once Configuration

For exactly-once processing guarantees, configure the producer as follows:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("transaction.timeout.ms", "900000");
props.setProperty("max.in.flight.requests.per.connection", "1");
props.setProperty("retries", "2147483647");
props.setProperty("enable.idempotence", "true");

producer.setFlushOnCheckpoint(true);
producer.setLogFailuresOnly(false);

At-Least-Once Configuration

For at-least-once processing with higher throughput:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "2147483647");

producer.setFlushOnCheckpoint(false);
producer.setLogFailuresOnly(false);

High-Throughput Configuration

For maximum throughput with at-least-once guarantees:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "1");
props.setProperty("batch.size", "16384");
props.setProperty("linger.ms", "5");
props.setProperty("compression.type", "lz4");

producer.setFlushOnCheckpoint(false);
producer.setLogFailuresOnly(true);

Error Handling

The producer handles various types of failures:

  • Retriable Errors: Automatically retried based on retries configuration
  • Non-Retriable Errors: Cause immediate failure or logging based on logFailuresOnly setting
  • Transaction Errors: Handled through checkpoint coordination and transaction abort/retry
  • Network Errors: Handled through connection retry and failover mechanisms

When logFailuresOnly is false (recommended), any production failure will cause the Flink job to fail and restart, ensuring no data loss with proper checkpoint configuration.