CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kinesis-2-11

Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services

Overview
Eval results
Files

serialization.mddocs/

Serialization and Deserialization

Kinesis-specific serialization interfaces that provide access to stream metadata during deserialization and allow custom target stream specification during serialization.

Capabilities

Kinesis Deserialization Schema

Extended deserialization interface that provides access to Kinesis record metadata including partition key, sequence number, approximate arrival timestamp, stream name, and shard ID.

@PublicEvolving
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    
    /**
     * Initialize the deserialization schema with context.
     *
     * @param context Initialization context with runtime information
     * @throws Exception On initialization errors
     */
    default void open(DeserializationSchema.InitializationContext context) throws Exception;
    
    /**
     * Deserialize a Kinesis record with full metadata access.
     *
     * @param recordValue Raw record data bytes
     * @param partitionKey Partition key used for the record
     * @param seqNum Sequence number of the record within the shard
     * @param approxArrivalTimestamp Approximate arrival timestamp in milliseconds
     * @param stream Name of the Kinesis stream
     * @param shardId ID of the shard containing this record
     * @return Deserialized object of type T
     * @throws IOException On deserialization errors
     */
    T deserialize(byte[] recordValue, String partitionKey, String seqNum, 
                  long approxArrivalTimestamp, String stream, String shardId) throws IOException;
}

Kinesis Serialization Schema

Extended serialization interface that allows specifying the target stream for each record, enabling dynamic stream routing based on record content.

@PublicEvolving
public interface KinesisSerializationSchema<T> extends Serializable {
    
    /**
     * Initialize the serialization schema with context.
     *
     * @param context Initialization context with runtime information
     * @throws Exception On initialization errors
     */
    default void open(InitializationContext context) throws Exception;
    
    /**
     * Serialize an object to ByteBuffer for Kinesis.
     *
     * @param element Object to serialize
     * @return Serialized data as ByteBuffer
     */
    ByteBuffer serialize(T element);
    
    /**
     * Determine the target stream for this record.
     *
     * @param element Object to determine stream for
     * @return Target stream name
     */
    String getTargetStream(T element);
}

Schema Wrapper

Internal wrapper that adapts standard Flink DeserializationSchema to KinesisDeserializationSchema, automatically handling the conversion from Kinesis-specific parameters to standard deserialization.

@Internal
public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> {
    
    /**
     * Create wrapper around standard deserialization schema.
     *
     * @param deserializationSchema Standard Flink deserialization schema to wrap
     */
    public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
    
    /**
     * Deserialize using wrapped schema, ignoring Kinesis metadata.
     *
     * @param recordValue Raw record data bytes
     * @param partitionKey Partition key (ignored by wrapper)
     * @param seqNum Sequence number (ignored by wrapper)
     * @param approxArrivalTimestamp Arrival timestamp (ignored by wrapper)
     * @param stream Stream name (ignored by wrapper)
     * @param shardId Shard ID (ignored by wrapper)
     * @return Deserialized object using wrapped schema
     * @throws IOException On deserialization errors
     */
    @Override
    public T deserialize(byte[] recordValue, String partitionKey, String seqNum,
                        long approxArrivalTimestamp, String stream, String shardId) throws IOException;
    
    /**
     * Get type information from wrapped schema.
     *
     * @return Type information from wrapped deserialization schema
     */
    @Override
    public TypeInformation<T> getProducedType();
    
    /**
     * Open the wrapped deserialization schema.
     *
     * @param context Initialization context
     * @throws Exception On initialization errors
     */
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception;
}

Usage Examples

Custom Deserialization with Metadata

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;

public class EventDeserializationSchema implements KinesisDeserializationSchema<EventWithMetadata> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public EventWithMetadata deserialize(byte[] recordValue, String partitionKey, String seqNum,
                                       long approxArrivalTimestamp, String stream, String shardId) 
                                       throws IOException {
        // Parse the JSON payload
        String json = new String(recordValue, StandardCharsets.UTF_8);
        Event event = objectMapper.readValue(json, Event.class);
        
        // Create enriched event with Kinesis metadata
        EventWithMetadata enrichedEvent = new EventWithMetadata();
        enrichedEvent.setEvent(event);
        enrichedEvent.setPartitionKey(partitionKey);
        enrichedEvent.setSequenceNumber(seqNum);
        enrichedEvent.setArrivalTimestamp(approxArrivalTimestamp);
        enrichedEvent.setStreamName(stream);
        enrichedEvent.setShardId(shardId);
        
        return enrichedEvent;
    }
    
    @Override
    public TypeInformation<EventWithMetadata> getProducedType() {
        return TypeInformation.of(EventWithMetadata.class);
    }
}

Event-Time Extraction from Metadata

public class TimestampedEventSchema implements KinesisDeserializationSchema<TimestampedEvent> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public TimestampedEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,
                                      long approxArrivalTimestamp, String stream, String shardId) 
                                      throws IOException {
        String json = new String(recordValue, StandardCharsets.UTF_8);
        JsonNode node = objectMapper.readTree(json);
        
        TimestampedEvent event = new TimestampedEvent();
        event.setData(node.get("data").asText());
        
        // Use event timestamp if available, otherwise use Kinesis arrival time
        if (node.has("timestamp")) {
            event.setEventTime(node.get("timestamp").asLong());
        } else {
            event.setEventTime(approxArrivalTimestamp);
        }
        
        // Add processing metadata
        event.setProcessingTime(System.currentTimeMillis());
        event.setPartitionKey(partitionKey);
        
        return event;
    }
    
    @Override
    public TypeInformation<TimestampedEvent> getProducedType() {
        return TypeInformation.of(TimestampedEvent.class);
    }
}

Multi-Format Deserialization

public class MultiFormatDeserializationSchema implements KinesisDeserializationSchema<GenericRecord> {
    private transient ObjectMapper jsonMapper;
    private transient AvroDeserializer avroDeserializer;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        jsonMapper = new ObjectMapper();
        avroDeserializer = new AvroDeserializer();
    }
    
    @Override
    public GenericRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
                                   long approxArrivalTimestamp, String stream, String shardId) 
                                   throws IOException {
        // Detect format based on stream name or content
        if (stream.endsWith("-json")) {
            return deserializeJson(recordValue);
        } else if (stream.endsWith("-avro")) {
            return deserializeAvro(recordValue);
        } else {
            // Auto-detect based on content
            return autoDetectAndDeserialize(recordValue);
        }
    }
    
    private GenericRecord deserializeJson(byte[] data) throws IOException {
        String json = new String(data, StandardCharsets.UTF_8);
        return jsonMapper.readValue(json, GenericRecord.class);
    }
    
    private GenericRecord deserializeAvro(byte[] data) throws IOException {
        return avroDeserializer.deserialize(data);
    }
    
    private GenericRecord autoDetectAndDeserialize(byte[] data) throws IOException {
        // Simple heuristic: JSON starts with '{' or '['
        if (data.length > 0 && (data[0] == '{' || data[0] == '[')) {
            return deserializeJson(data);
        } else {
            return deserializeAvro(data);
        }
    }
    
    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeInformation.of(GenericRecord.class);
    }
}

Custom Serialization with Stream Routing

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class EventSerializationSchema implements KinesisSerializationSchema<Event> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public ByteBuffer serialize(Event element) {
        try {
            // Add processing timestamp
            element.setProcessedAt(System.currentTimeMillis());
            
            // Serialize to JSON
            String json = objectMapper.writeValueAsString(element);
            return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize event", e);
        }
    }
    
    @Override
    public String getTargetStream(Event element) {
        // Route to different streams based on event properties
        if (element.isHighPriority()) {
            return "high-priority-events";
        } else if (element.getEventType().equals("ERROR")) {
            return "error-events";
        } else {
            return "normal-events";
        }
    }
}

Dynamic Stream Routing with Tenant Isolation

public class TenantAwareSerializationSchema implements KinesisSerializationSchema<TenantEvent> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public ByteBuffer serialize(TenantEvent element) {
        try {
            // Enrich with metadata
            element.setIngestionTime(System.currentTimeMillis());
            element.setProcessingRegion(System.getProperty("aws.region", "unknown"));
            
            String json = objectMapper.writeValueAsString(element);
            return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize tenant event", e);
        }
    }
    
    @Override
    public String getTargetStream(TenantEvent element) {
        // Route to tenant-specific streams for isolation
        String tenantId = element.getTenantId();
        String eventType = element.getEventType();
        
        return String.format("tenant-%s-events-%s", tenantId, eventType.toLowerCase());
    }
}

Binary Data Serialization

public class BinaryDataSchema implements KinesisSerializationSchema<BinaryDataEvent> {
    
    @Override
    public ByteBuffer serialize(BinaryDataEvent element) {
        // Handle binary data directly
        ByteBuffer buffer = ByteBuffer.allocate(element.getDataSize() + 8);
        
        // Add header with timestamp
        buffer.putLong(element.getTimestamp());
        
        // Add binary payload
        buffer.put(element.getBinaryData());
        
        buffer.flip();
        return buffer;
    }
    
    @Override
    public String getTargetStream(BinaryDataEvent element) {
        // Route based on data type
        return "binary-data-" + element.getDataType();
    }
}

Compression Support

import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.GZIPInputStream;

public class CompressedSerializationSchema implements KinesisSerializationSchema<LargeEvent> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public ByteBuffer serialize(LargeEvent element) {
        try {
            // Serialize to JSON first
            String json = objectMapper.writeValueAsString(element);
            byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
            
            // Compress if data is large
            if (jsonBytes.length > 1024) {
                return ByteBuffer.wrap(compress(jsonBytes));
            } else {
                return ByteBuffer.wrap(jsonBytes);
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize and compress event", e);
        }
    }
    
    private byte[] compress(byte[] data) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {
            gzipOut.write(data);
        }
        return baos.toByteArray();
    }
    
    @Override
    public String getTargetStream(LargeEvent element) {
        return element.getStreamName();
    }
}

Error Handling in Serialization

Robust Deserialization with Fallbacks

public class RobustDeserializationSchema implements KinesisDeserializationSchema<Either<Event, ErrorRecord>> {
    private transient ObjectMapper objectMapper;
    private transient Logger logger;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
        logger = LoggerFactory.getLogger(getClass());
    }
    
    @Override
    public Either<Event, ErrorRecord> deserialize(byte[] recordValue, String partitionKey, String seqNum,
                                                 long approxArrivalTimestamp, String stream, String shardId) {
        try {
            String json = new String(recordValue, StandardCharsets.UTF_8);
            Event event = objectMapper.readValue(json, Event.class);
            return Either.left(event);
        } catch (Exception e) {
            logger.warn("Failed to deserialize record from stream {} shard {}: {}", 
                       stream, shardId, e.getMessage());
            
            ErrorRecord errorRecord = new ErrorRecord();
            errorRecord.setRawData(recordValue);
            errorRecord.setError(e.getMessage());
            errorRecord.setStreamName(stream);
            errorRecord.setShardId(shardId);
            errorRecord.setSequenceNumber(seqNum);
            errorRecord.setTimestamp(approxArrivalTimestamp);
            
            return Either.right(errorRecord);
        }
    }
    
    @Override
    public TypeInformation<Either<Event, ErrorRecord>> getProducedType() {
        return new EitherTypeInfo<>(
            TypeInformation.of(Event.class),
            TypeInformation.of(ErrorRecord.class)
        );
    }
}

Performance Considerations

Efficient Object Reuse

public class EfficientDeserializationSchema implements KinesisDeserializationSchema<Event> {
    private transient ObjectMapper objectMapper;
    private transient Event reusableEvent; // Reuse objects to reduce GC pressure
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
        reusableEvent = new Event();
    }
    
    @Override
    public Event deserialize(byte[] recordValue, String partitionKey, String seqNum,
                           long approxArrivalTimestamp, String stream, String shardId) throws IOException {
        // Reuse object and reset fields
        reusableEvent.reset();
        
        // Efficient parsing without creating intermediate objects
        JsonParser parser = objectMapper.getFactory().createParser(recordValue);
        // ... parse fields directly into reusableEvent
        
        return reusableEvent.copy(); // Return copy for thread safety
    }
    
    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

Batch Serialization Optimization

public class BatchOptimizedSchema implements KinesisSerializationSchema<List<Event>> {
    private transient ObjectMapper objectMapper;
    private transient ByteArrayOutputStream buffer;
    
    @Override
    public void open(InitializationContext context) throws Exception {
        objectMapper = new ObjectMapper();
        buffer = new ByteArrayOutputStream(4096); // Pre-allocated buffer
    }
    
    @Override
    public ByteBuffer serialize(List<Event> elements) {
        try {
            buffer.reset(); // Reuse buffer
            
            // Efficient batch serialization
            objectMapper.writeValue(buffer, elements);
            
            return ByteBuffer.wrap(buffer.toByteArray());
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize event batch", e);
        }
    }
    
    @Override
    public String getTargetStream(List<Event> elements) {
        // Route based on first event in batch
        return elements.isEmpty() ? "default-stream" : elements.get(0).getStreamName();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-kinesis-2-11

docs

configuration.md

consumer.md

dynamodb-streams.md

index.md

partitioning.md

producer.md

serialization.md

table-api.md

tile.json