or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md
tile.json

producer.mddocs/

Kinesis Producer

The FlinkKinesisProducer enables high-throughput data ingestion to Amazon Kinesis Data Streams using the Kinesis Producer Library (KPL) with configurable partitioning, error handling, and backpressure management.

Capabilities

FlinkKinesisProducer

Main producer class for writing data to Kinesis streams with configurable partitioning strategies, error handling, and performance tuning options.

@PublicEvolving
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> 
    implements CheckpointedFunction {
    
    // Metric constants
    public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
    public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
    public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
    public static final String KINESIS_PRODUCER_RELEASE_HOOK_NAME = "kinesisProducer";
    
    /**
     * Create producer with standard serialization schema.
     *
     * @param schema Standard Flink serialization schema
     * @param configProps AWS and producer configuration properties
     */
    public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
    
    /**
     * Create producer with Kinesis-specific serialization schema.
     *
     * @param schema Kinesis serialization schema with target stream specification
     * @param configProps AWS and producer configuration properties
     */
    public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
    
    /**
     * Configure error handling behavior.
     *
     * @param failOnError If true, fail the job on any production error; if false, log and continue
     */
    public void setFailOnError(boolean failOnError);
    
    /**
     * Set the maximum number of outstanding records before backpressuring.
     *
     * @param queueLimit Maximum outstanding records (default: Integer.MAX_VALUE)
     */
    public void setQueueLimit(int queueLimit);
    
    /**
     * Set the default target stream for records.
     *
     * @param defaultStream Default stream name (can be overridden by serialization schema)
     */
    public void setDefaultStream(String defaultStream);
    
    /**
     * Set the default partition for records.
     *
     * @param defaultPartition Default partition ID (can be overridden by serialization schema)
     */
    public void setDefaultPartition(String defaultPartition);
    
    /**
     * Set custom partitioner for distributing records across shards.
     *
     * @param partitioner Custom partitioner implementation
     */
    public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
    
    /**
     * Initialize the producer with runtime configuration.
     *
     * @param parameters Runtime configuration parameters
     * @throws Exception On initialization errors
     */
    public void open(Configuration parameters) throws Exception;
    
    /**
     * Send a record to Kinesis.
     *
     * @param value Record to send
     * @param context Sink context with additional metadata
     * @throws Exception On send errors
     */
    public void invoke(OUT value, Context context) throws Exception;
    
    /**
     * Close the producer and cleanup resources.
     *
     * @throws Exception On cleanup errors
     */
    public void close() throws Exception;
    
    /**
     * Initialize state for checkpointing.
     *
     * @param context Function initialization context
     * @throws Exception On initialization errors
     */
    public void initializeState(FunctionInitializationContext context) throws Exception;
    
    /**
     * Create snapshot of current state for checkpointing.
     *
     * @param context Function snapshot context
     * @throws Exception On snapshot errors
     */
    public void snapshotState(FunctionSnapshotContext context) throws Exception;
}

Usage Examples

Basic Producer Setup

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

// Configure AWS properties
Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

// Create producer with simple string serialization
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
    new SimpleStringSchema(),
    props
);

// Configure producer settings
producer.setDefaultStream("my-output-stream");
producer.setFailOnError(true);
producer.setQueueLimit(1000);

// Add to data stream
dataStream.addSink(producer);

Producer with Custom Serialization

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import java.nio.ByteBuffer;

// Custom serialization schema with target stream selection
KinesisSerializationSchema<MyEvent> customSerializer = new KinesisSerializationSchema<MyEvent>() {
    @Override
    public ByteBuffer serialize(MyEvent element) {
        // Convert to JSON or other format
        String json = toJson(element);
        return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
    }
    
    @Override
    public String getTargetStream(MyEvent element) {
        // Route to different streams based on event type
        return "events-" + element.getEventType().toLowerCase();
    }
};

FlinkKinesisProducer<MyEvent> producer = new FlinkKinesisProducer<>(
    customSerializer,
    props
);

Producer with Custom Partitioning

import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;

// Custom partitioner for load balancing
KinesisPartitioner<MyEvent> customPartitioner = new KinesisPartitioner<MyEvent>() {
    @Override
    public String getPartitionId(MyEvent element) {
        // Partition by user ID for user-based ordering
        return String.valueOf(element.getUserId() % 100);
    }
    
    @Override
    public String getExplicitHashKey(MyEvent element) {
        // Optional: provide explicit hash key for finer control
        return String.valueOf(element.getUserId());
    }
};

producer.setCustomPartitioner(customPartitioner);

Producer with Fixed Partitioning

import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner;

// Use fixed partitioner to ensure each Flink partition maps to same Kinesis partition
FixedKinesisPartitioner<MyEvent> fixedPartitioner = new FixedKinesisPartitioner<>();
producer.setCustomPartitioner(fixedPartitioner);

Producer with Random Partitioning

import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner;

// Use random partitioner for even distribution
RandomKinesisPartitioner<MyEvent> randomPartitioner = new RandomKinesisPartitioner<>();
producer.setCustomPartitioner(randomPartitioner);

High-Throughput Configuration

// Configure for high throughput with KPL settings
props.setProperty("RecordMaxBufferedTime", "100");  // 100ms batching
props.setProperty("MaxConnections", "24");          // More connections
props.setProperty("RequestTimeout", "6000");        // 6 second timeout
props.setProperty("RecordTtl", "30000");           // 30 second TTL

// Configure aggregation
props.setProperty("AggregationEnabled", "true");
props.setProperty("AggregationMaxCount", "4294967295");
props.setProperty("AggregationMaxSize", "51200");

// Set higher queue limit for buffering
producer.setQueueLimit(10000);

// Configure error handling for high throughput
producer.setFailOnError(false); // Log errors but continue processing

Exactly-Once Guarantees with Checkpointing

// Enable checkpointing for exactly-once guarantees
env.enableCheckpointing(60000); // Checkpoint every minute
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Configure checkpoint cleanup
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// The producer automatically participates in checkpointing
// Records are only considered committed after successful checkpoint

Configuration Options

Key configuration properties for the FlinkKinesisProducer:

KPL Configuration

  • RecordMaxBufferedTime: Maximum time to buffer records (default: 100ms)
  • RecordTtl: Time-to-live for records in buffer (default: 30000ms)
  • RequestTimeout: Timeout for HTTP requests (default: 6000ms)
  • MaxConnections: Maximum concurrent connections (default: 24)

Aggregation Settings

  • AggregationEnabled: Enable record aggregation (default: true)
  • AggregationMaxCount: Maximum records per aggregate (default: 4294967295)
  • AggregationMaxSize: Maximum aggregate size in bytes (default: 51200)

Retry Configuration

  • RetryDuration: Maximum retry duration (default: 10000ms)
  • MetricsLevel: CloudWatch metrics level (NONE, SUMMARY, DETAILED)
  • MetricsGranularity: Metrics granularity (GLOBAL, STREAM, SHARD)

Error Handling and Reliability

The producer provides several mechanisms for handling errors and ensuring reliability:

Automatic Retry

  • Built-in exponential backoff for transient failures
  • Configurable retry duration and maximum attempts
  • Automatic handling of throttling and service limits

Error Handling Modes

  • Fail-on-Error: Fail the entire job on any production error
  • Log-and-Continue: Log errors but continue processing other records
  • Custom Handling: Implement custom error handling in serialization schema

Metrics and Monitoring

  • Backpressure Cycles: Number of times producer was backpressured
  • Outstanding Records: Current number of unacknowledged records
  • KPL Metrics: Detailed metrics from Kinesis Producer Library
  • CloudWatch Integration: Automatic metric publishing to CloudWatch

Memory Management

  • Configurable queue limits to prevent out-of-memory errors
  • Automatic backpressure when queue limits are reached
  • Resource cleanup on job cancellation or failure

Performance Tuning

Throughput Optimization

  • Increase MaxConnections for higher parallelism
  • Reduce RecordMaxBufferedTime for lower latency
  • Enable aggregation for better throughput
  • Tune queue limits based on memory availability

Latency Optimization

  • Reduce RecordMaxBufferedTime to minimize buffering delay
  • Disable aggregation for lowest latency
  • Use direct partition assignment instead of random partitioning
  • Configure smaller batch sizes

Resource Management

  • Monitor outstanding records count to prevent memory issues
  • Use appropriate queue limits based on record size and memory
  • Configure KPL thread pool sizes based on CPU cores
  • Set reasonable timeouts to prevent resource leaks