CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11-2-11

Apache Flink SQL connector for Apache Kafka 0.11 with shaded dependencies providing streaming and table API integration

Overview
Eval results
Files

serialization.mddocs/

Serialization and Deserialization

The Flink Kafka connector provides comprehensive serialization and deserialization interfaces that enable flexible data format handling with access to Kafka record metadata including headers, timestamps, and partition information.

Capabilities

KafkaContextAware Interface

Interface for serialization schemas that need access to Kafka context information like partition assignments and parallelism details.

/**
 * Interface for context-aware Kafka serialization schemas
 * Provides access to Flink parallelism and Kafka partition information
 */
@PublicEvolving
interface KafkaContextAware<T> extends Serializable {
    
    /**
     * Set the parallel instance ID for this serialization instance
     * @param parallelInstanceId the parallel instance ID (0-based)
     */
    default void setParallelInstanceId(int parallelInstanceId) {
        // Default implementation does nothing
    }
    
    /**
     * Set the total number of parallel instances
     * @param numParallelInstances total number of parallel instances
     */
    default void setNumParallelInstances(int numParallelInstances) {
        // Default implementation does nothing
    }
    
    /**
     * Set the available Kafka partitions for the target topic
     * @param partitions array of available partition IDs
     */
    default void setPartitions(int[] partitions) {
        // Default implementation does nothing
    }
    
    /**
     * Determine the target topic for a given element
     * @param element the element to be serialized
     * @return the target Kafka topic name
     */
    String getTargetTopic(T element);
}

KafkaDeserializationSchema Interface

Advanced deserialization interface providing access to complete Kafka ConsumerRecord metadata.

/**
 * Interface for deserializing Kafka records with access to metadata
 * Provides both single-record and collector-based deserialization patterns
 */
@PublicEvolving
interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    
    /**
     * Initialization method called once per deserialization instance
     * @param context initialization context providing metrics and user code classloader
     */
    default void open(DeserializationSchema.InitializationContext context) throws Exception {
        // Default implementation does nothing
    }
    
    /**
     * Check if the given element signals end of input stream
     * @param nextElement the element to check
     * @return true if this element indicates end of stream
     */
    boolean isEndOfStream(T nextElement);
    
    /**
     * Deserialize a Kafka ConsumerRecord into a single output element
     * @param record the Kafka ConsumerRecord containing key, value, headers, etc.
     * @return deserialized element, or null to skip this record
     * @throws Exception if deserialization fails
     */
    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
    
    /**
     * Deserialize a Kafka ConsumerRecord and emit zero or more output elements
     * This method enables one-to-many deserialization patterns
     * @param message the Kafka ConsumerRecord to deserialize
     * @param out collector for emitting output elements
     * @throws Exception if deserialization fails
     */
    default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
        T deserialized = deserialize(message);
        if (deserialized != null) {
            out.collect(deserialized);
        }
    }
}

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;

// Custom deserialization schema with metadata access
public class UserEventDeserializer implements KafkaDeserializationSchema<UserEvent> {
    
    @Override
    public boolean isEndOfStream(UserEvent nextElement) {
        return false; // Never end stream
    }
    
    @Override
    public UserEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // Access Kafka metadata
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        long timestamp = record.timestamp();
        
        // Access headers
        String userId = null;
        if (record.headers().headers("user_id").iterator().hasNext()) {
            userId = new String(record.headers().headers("user_id").iterator().next().value());
        }
        
        // Deserialize value
        String jsonValue = new String(record.value(), StandardCharsets.UTF_8);
        ObjectMapper mapper = new ObjectMapper();
        JsonNode node = mapper.readTree(jsonValue);
        
        return new UserEvent(
            userId != null ? userId : node.get("user_id").asText(),
            node.get("event_type").asText(),
            node.get("event_data").asText(),
            timestamp,
            topic,
            partition,
            offset
        );
    }
    
    @Override
    public TypeInformation<UserEvent> getProducedType() {
        return TypeInformation.of(UserEvent.class);
    }
}

// Usage with consumer
FlinkKafkaConsumer011<UserEvent> consumer = new FlinkKafkaConsumer011<>(
    "user-events",
    new UserEventDeserializer(),
    properties
);

KafkaSerializationSchema Interface

Advanced serialization interface for creating complete Kafka ProducerRecords with control over topic, partition, key, value, headers, and timestamp.

/**
 * Interface for serializing records to Kafka ProducerRecords
 * Provides complete control over all aspects of the Kafka record
 */
@PublicEvolving
interface KafkaSerializationSchema<T> extends Serializable {
    
    /**
     * Initialization method called once per serialization instance
     * @param context initialization context providing metrics and user code classloader
     */
    default void open(SerializationSchema.InitializationContext context) throws Exception {
        // Default implementation does nothing
    }
    
    /**
     * Serialize an element into a Kafka ProducerRecord
     * @param element the element to serialize
     * @param timestamp the timestamp of the record (may be null)
     * @return ProducerRecord with topic, partition, key, value, headers, and timestamp
     */
    ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}

Usage Examples:

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;

// Custom serialization schema with full record control
public class UserEventSerializer implements KafkaSerializationSchema<UserEvent> {
    
    private final String defaultTopic;
    private final ObjectMapper objectMapper;
    
    public UserEventSerializer(String defaultTopic) {
        this.defaultTopic = defaultTopic;
        this.objectMapper = new ObjectMapper();
    }
    
    @Override
    public ProducerRecord<byte[], byte[]> serialize(UserEvent event, Long timestamp) {
        try {
            // Determine target topic (could be dynamic based on event)
            String topic = event.getEventType().equals("critical") ? "critical-events" : defaultTopic;
            
            // Create key for partitioning
            String key = event.getUserId();
            byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
            
            // Serialize value to JSON
            String jsonValue = objectMapper.writeValueAsString(event);
            byte[] valueBytes = jsonValue.getBytes(StandardCharsets.UTF_8);
            
            // Create producer record with headers
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
                topic,
                null, // Let Kafka determine partition based on key
                timestamp, // Use provided timestamp
                keyBytes,
                valueBytes
            );
            
            // Add custom headers
            record.headers().add(new RecordHeader("user_id", event.getUserId().getBytes()));
            record.headers().add(new RecordHeader("event_type", event.getEventType().getBytes()));
            record.headers().add(new RecordHeader("source_system", "flink-processor".getBytes()));
            
            return record;
            
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize UserEvent", e);
        }
    }
}

// Usage with producer
FlinkKafkaProducer011<UserEvent> producer = new FlinkKafkaProducer011<>(
    "default-topic", // This will be overridden by serialization schema
    new UserEventSerializer("user-events"),
    properties,
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);

Multi-Record Deserialization

Using the collector-based deserialization for one-to-many record processing.

// Example of one-to-many deserialization using collector
public class BatchEventDeserializer implements KafkaDeserializationSchema<UserEvent> {
    
    @Override
    public boolean isEndOfStream(UserEvent nextElement) {
        return false;
    }
    
    @Override
    public UserEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // Not used when collector-based method is overridden
        throw new UnsupportedOperationException("Use collector-based deserialize method");
    }
    
    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<UserEvent> out) throws Exception {
        // Deserialize JSON array of events
        String jsonValue = new String(message.value(), StandardCharsets.UTF_8);
        ObjectMapper mapper = new ObjectMapper();
        JsonNode arrayNode = mapper.readTree(jsonValue);
        
        // Emit multiple events from single Kafka record
        if (arrayNode.isArray()) {
            for (JsonNode eventNode : arrayNode) {
                UserEvent event = mapper.treeToValue(eventNode, UserEvent.class);
                // Enrich with Kafka metadata
                event.setKafkaTopic(message.topic());
                event.setKafkaPartition(message.partition());
                event.setKafkaOffset(message.offset());
                event.setKafkaTimestamp(message.timestamp());
                
                out.collect(event);
            }
        }
    }
    
    @Override
    public TypeInformation<UserEvent> getProducedType() {
        return TypeInformation.of(UserEvent.class);
    }
}

Built-in Serialization Formats

The connector works with Flink's built-in serialization schemas for common use cases.

// Using built-in schemas (these work with both simple and Kafka-specific interfaces)
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;

// Simple string serialization/deserialization
SimpleStringSchema stringSchema = new SimpleStringSchema();

// JSON serialization using Flink's built-in support
// Note: For Kafka-specific features, wrap in adapter
KafkaDeserializationSchema<String> kafkaStringSchema = new KafkaDeserializationSchema<String>() {
    private final SimpleStringSchema stringSchema = new SimpleStringSchema();
    
    @Override
    public boolean isEndOfStream(String nextElement) {
        return stringSchema.isEndOfStream(nextElement);
    }
    
    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return stringSchema.deserialize(record.value());
    }
    
    @Override
    public TypeInformation<String> getProducedType() {
        return stringSchema.getProducedType();
    }
};

Usage Examples:

// Consumer with built-in string deserialization
FlinkKafkaConsumer011<String> stringConsumer = new FlinkKafkaConsumer011<>(
    "text-topic",
    new SimpleStringSchema(), // Flink automatically wraps in adapter
    properties
);

// Consumer with custom Kafka-aware deserialization
FlinkKafkaConsumer011<UserEvent> eventConsumer = new FlinkKafkaConsumer011<>(
    "user-events",
    new UserEventDeserializer(), // Direct KafkaDeserializationSchema usage
    properties
);

// Producer with custom Kafka-aware serialization
FlinkKafkaProducer011<UserEvent> eventProducer = new FlinkKafkaProducer011<>(
    "processed-events",
    new UserEventSerializer("processed-events"),
    properties,
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);

Error Handling in Serialization

Best practices for handling serialization errors and malformed records.

// Robust deserialization with error handling
public class RobustUserEventDeserializer implements KafkaDeserializationSchema<UserEvent> {
    
    private final Counter deserializationErrors;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        this.deserializationErrors = context.getMetricGroup()
            .counter("deserialization_errors");
    }
    
    @Override
    public UserEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        try {
            // Attempt deserialization
            String jsonValue = new String(record.value(), StandardCharsets.UTF_8);
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(jsonValue, UserEvent.class);
            
        } catch (Exception e) {
            // Log error with Kafka metadata for debugging
            LOG.warn("Failed to deserialize record from topic {} partition {} offset {}: {}", 
                record.topic(), record.partition(), record.offset(), e.getMessage());
            
            deserializationErrors.inc();
            
            // Return null to skip this record, or create a poison pill record for downstream handling
            return null;
            
            // Alternative: Create error record for downstream error handling
            // return new UserEvent("PARSE_ERROR", record.topic(), record.partition(), record.offset());
        }
    }
    
    @Override
    public boolean isEndOfStream(UserEvent nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<UserEvent> getProducedType() {
        return TypeInformation.of(UserEvent.class);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11-2-11

docs

configuration.md

index.md

serialization.md

streaming-consumer.md

streaming-producer.md

table-api.md

tile.json