Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services
Kinesis-specific serialization interfaces that provide access to stream metadata during deserialization and allow custom target stream specification during serialization.
Extended deserialization interface that provides access to Kinesis record metadata including partition key, sequence number, approximate arrival timestamp, stream name, and shard ID.
@PublicEvolving
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialize the deserialization schema with context.
*
* @param context Initialization context with runtime information
* @throws Exception On initialization errors
*/
default void open(DeserializationSchema.InitializationContext context) throws Exception;
/**
* Deserialize a Kinesis record with full metadata access.
*
* @param recordValue Raw record data bytes
* @param partitionKey Partition key used for the record
* @param seqNum Sequence number of the record within the shard
* @param approxArrivalTimestamp Approximate arrival timestamp in milliseconds
* @param stream Name of the Kinesis stream
* @param shardId ID of the shard containing this record
* @return Deserialized object of type T
* @throws IOException On deserialization errors
*/
T deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
}Extended serialization interface that allows specifying the target stream for each record, enabling dynamic stream routing based on record content.
@PublicEvolving
public interface KinesisSerializationSchema<T> extends Serializable {
/**
* Initialize the serialization schema with context.
*
* @param context Initialization context with runtime information
* @throws Exception On initialization errors
*/
default void open(InitializationContext context) throws Exception;
/**
* Serialize an object to ByteBuffer for Kinesis.
*
* @param element Object to serialize
* @return Serialized data as ByteBuffer
*/
ByteBuffer serialize(T element);
/**
* Determine the target stream for this record.
*
* @param element Object to determine stream for
* @return Target stream name
*/
String getTargetStream(T element);
}Internal wrapper that adapts standard Flink DeserializationSchema to KinesisDeserializationSchema, automatically handling the conversion from Kinesis-specific parameters to standard deserialization.
@Internal
public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> {
/**
* Create wrapper around standard deserialization schema.
*
* @param deserializationSchema Standard Flink deserialization schema to wrap
*/
public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
/**
* Deserialize using wrapped schema, ignoring Kinesis metadata.
*
* @param recordValue Raw record data bytes
* @param partitionKey Partition key (ignored by wrapper)
* @param seqNum Sequence number (ignored by wrapper)
* @param approxArrivalTimestamp Arrival timestamp (ignored by wrapper)
* @param stream Stream name (ignored by wrapper)
* @param shardId Shard ID (ignored by wrapper)
* @return Deserialized object using wrapped schema
* @throws IOException On deserialization errors
*/
@Override
public T deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
/**
* Get type information from wrapped schema.
*
* @return Type information from wrapped deserialization schema
*/
@Override
public TypeInformation<T> getProducedType();
/**
* Open the wrapped deserialization schema.
*
* @param context Initialization context
* @throws Exception On initialization errors
*/
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception;
}import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
public class EventDeserializationSchema implements KinesisDeserializationSchema<EventWithMetadata> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public EventWithMetadata deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
// Parse the JSON payload
String json = new String(recordValue, StandardCharsets.UTF_8);
Event event = objectMapper.readValue(json, Event.class);
// Create enriched event with Kinesis metadata
EventWithMetadata enrichedEvent = new EventWithMetadata();
enrichedEvent.setEvent(event);
enrichedEvent.setPartitionKey(partitionKey);
enrichedEvent.setSequenceNumber(seqNum);
enrichedEvent.setArrivalTimestamp(approxArrivalTimestamp);
enrichedEvent.setStreamName(stream);
enrichedEvent.setShardId(shardId);
return enrichedEvent;
}
@Override
public TypeInformation<EventWithMetadata> getProducedType() {
return TypeInformation.of(EventWithMetadata.class);
}
}public class TimestampedEventSchema implements KinesisDeserializationSchema<TimestampedEvent> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public TimestampedEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
String json = new String(recordValue, StandardCharsets.UTF_8);
JsonNode node = objectMapper.readTree(json);
TimestampedEvent event = new TimestampedEvent();
event.setData(node.get("data").asText());
// Use event timestamp if available, otherwise use Kinesis arrival time
if (node.has("timestamp")) {
event.setEventTime(node.get("timestamp").asLong());
} else {
event.setEventTime(approxArrivalTimestamp);
}
// Add processing metadata
event.setProcessingTime(System.currentTimeMillis());
event.setPartitionKey(partitionKey);
return event;
}
@Override
public TypeInformation<TimestampedEvent> getProducedType() {
return TypeInformation.of(TimestampedEvent.class);
}
}public class MultiFormatDeserializationSchema implements KinesisDeserializationSchema<GenericRecord> {
private transient ObjectMapper jsonMapper;
private transient AvroDeserializer avroDeserializer;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
jsonMapper = new ObjectMapper();
avroDeserializer = new AvroDeserializer();
}
@Override
public GenericRecord deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
// Detect format based on stream name or content
if (stream.endsWith("-json")) {
return deserializeJson(recordValue);
} else if (stream.endsWith("-avro")) {
return deserializeAvro(recordValue);
} else {
// Auto-detect based on content
return autoDetectAndDeserialize(recordValue);
}
}
private GenericRecord deserializeJson(byte[] data) throws IOException {
String json = new String(data, StandardCharsets.UTF_8);
return jsonMapper.readValue(json, GenericRecord.class);
}
private GenericRecord deserializeAvro(byte[] data) throws IOException {
return avroDeserializer.deserialize(data);
}
private GenericRecord autoDetectAndDeserialize(byte[] data) throws IOException {
// Simple heuristic: JSON starts with '{' or '['
if (data.length > 0 && (data[0] == '{' || data[0] == '[')) {
return deserializeJson(data);
} else {
return deserializeAvro(data);
}
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeInformation.of(GenericRecord.class);
}
}import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class EventSerializationSchema implements KinesisSerializationSchema<Event> {
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public ByteBuffer serialize(Event element) {
try {
// Add processing timestamp
element.setProcessedAt(System.currentTimeMillis());
// Serialize to JSON
String json = objectMapper.writeValueAsString(element);
return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException("Failed to serialize event", e);
}
}
@Override
public String getTargetStream(Event element) {
// Route to different streams based on event properties
if (element.isHighPriority()) {
return "high-priority-events";
} else if (element.getEventType().equals("ERROR")) {
return "error-events";
} else {
return "normal-events";
}
}
}public class TenantAwareSerializationSchema implements KinesisSerializationSchema<TenantEvent> {
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public ByteBuffer serialize(TenantEvent element) {
try {
// Enrich with metadata
element.setIngestionTime(System.currentTimeMillis());
element.setProcessingRegion(System.getProperty("aws.region", "unknown"));
String json = objectMapper.writeValueAsString(element);
return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException("Failed to serialize tenant event", e);
}
}
@Override
public String getTargetStream(TenantEvent element) {
// Route to tenant-specific streams for isolation
String tenantId = element.getTenantId();
String eventType = element.getEventType();
return String.format("tenant-%s-events-%s", tenantId, eventType.toLowerCase());
}
}public class BinaryDataSchema implements KinesisSerializationSchema<BinaryDataEvent> {
@Override
public ByteBuffer serialize(BinaryDataEvent element) {
// Handle binary data directly
ByteBuffer buffer = ByteBuffer.allocate(element.getDataSize() + 8);
// Add header with timestamp
buffer.putLong(element.getTimestamp());
// Add binary payload
buffer.put(element.getBinaryData());
buffer.flip();
return buffer;
}
@Override
public String getTargetStream(BinaryDataEvent element) {
// Route based on data type
return "binary-data-" + element.getDataType();
}
}import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.GZIPInputStream;
public class CompressedSerializationSchema implements KinesisSerializationSchema<LargeEvent> {
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public ByteBuffer serialize(LargeEvent element) {
try {
// Serialize to JSON first
String json = objectMapper.writeValueAsString(element);
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
// Compress if data is large
if (jsonBytes.length > 1024) {
return ByteBuffer.wrap(compress(jsonBytes));
} else {
return ByteBuffer.wrap(jsonBytes);
}
} catch (Exception e) {
throw new RuntimeException("Failed to serialize and compress event", e);
}
}
private byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {
gzipOut.write(data);
}
return baos.toByteArray();
}
@Override
public String getTargetStream(LargeEvent element) {
return element.getStreamName();
}
}public class RobustDeserializationSchema implements KinesisDeserializationSchema<Either<Event, ErrorRecord>> {
private transient ObjectMapper objectMapper;
private transient Logger logger;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
logger = LoggerFactory.getLogger(getClass());
}
@Override
public Either<Event, ErrorRecord> deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) {
try {
String json = new String(recordValue, StandardCharsets.UTF_8);
Event event = objectMapper.readValue(json, Event.class);
return Either.left(event);
} catch (Exception e) {
logger.warn("Failed to deserialize record from stream {} shard {}: {}",
stream, shardId, e.getMessage());
ErrorRecord errorRecord = new ErrorRecord();
errorRecord.setRawData(recordValue);
errorRecord.setError(e.getMessage());
errorRecord.setStreamName(stream);
errorRecord.setShardId(shardId);
errorRecord.setSequenceNumber(seqNum);
errorRecord.setTimestamp(approxArrivalTimestamp);
return Either.right(errorRecord);
}
}
@Override
public TypeInformation<Either<Event, ErrorRecord>> getProducedType() {
return new EitherTypeInfo<>(
TypeInformation.of(Event.class),
TypeInformation.of(ErrorRecord.class)
);
}
}public class EfficientDeserializationSchema implements KinesisDeserializationSchema<Event> {
private transient ObjectMapper objectMapper;
private transient Event reusableEvent; // Reuse objects to reduce GC pressure
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
reusableEvent = new Event();
}
@Override
public Event deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
// Reuse object and reset fields
reusableEvent.reset();
// Efficient parsing without creating intermediate objects
JsonParser parser = objectMapper.getFactory().createParser(recordValue);
// ... parse fields directly into reusableEvent
return reusableEvent.copy(); // Return copy for thread safety
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}public class BatchOptimizedSchema implements KinesisSerializationSchema<List<Event>> {
private transient ObjectMapper objectMapper;
private transient ByteArrayOutputStream buffer;
@Override
public void open(InitializationContext context) throws Exception {
objectMapper = new ObjectMapper();
buffer = new ByteArrayOutputStream(4096); // Pre-allocated buffer
}
@Override
public ByteBuffer serialize(List<Event> elements) {
try {
buffer.reset(); // Reuse buffer
// Efficient batch serialization
objectMapper.writeValue(buffer, elements);
return ByteBuffer.wrap(buffer.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Failed to serialize event batch", e);
}
}
@Override
public String getTargetStream(List<Event> elements) {
// Route based on first event in batch
return elements.isEmpty() ? "default-stream" : elements.get(0).getStreamName();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kinesis-2-11