Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library
—
Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.
The default message handler extracts raw byte data from Kinesis Records.
/**
* Default message handler that extracts byte array from Kinesis Record.
* Handles null records gracefully and converts ByteBuffer to byte array.
*
* @param record Kinesis Record containing message data and metadata
* @return Array[Byte] Raw message data, or null if record is null
*/
def defaultMessageHandler(record: Record): Array[Byte]Implementation Details:
record.getData() ByteBufferCustom message handlers enable type-safe processing of Kinesis records with access to both message data and metadata.
/**
* Type alias for custom message handler functions.
* Transforms Kinesis Record into user-defined type T.
*/
type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => TUnderstanding the Kinesis Record structure for custom message handlers:
// AWS Kinesis Record (external dependency)
// Available fields for custom message handlers:
// - record.getData(): java.nio.ByteBuffer - The message payload
// - record.getPartitionKey(): String - Partition key used for sharding
// - record.getSequenceNumber(): String - Unique sequence number per shard
// - record.getApproximateArrivalTimestamp(): java.util.Date - Arrival timeimport com.amazonaws.services.kinesis.model.Record
import java.nio.charset.StandardCharsets
// Simple text message handler
def textMessageHandler(record: Record): String = {
val buffer = record.getData()
val bytes = new Array[Byte](buffer.remaining())
buffer.get(bytes)
new String(bytes, StandardCharsets.UTF_8)
}
val textStream = KinesisUtils.createStream[String](
ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
textMessageHandler
)import com.amazonaws.services.kinesis.model.Record
import spray.json._
import DefaultJsonProtocol._
case class LogEvent(
timestamp: Long,
level: String,
message: String,
source: String
)
implicit val logEventFormat = jsonFormat4(LogEvent)
// JSON message handler with error handling
def jsonLogHandler(record: Record): Option[LogEvent] = {
try {
val data = new String(record.getData().array(), StandardCharsets.UTF_8)
Some(data.parseJson.convertTo[LogEvent])
} catch {
case _: Exception => None // Handle malformed JSON gracefully
}
}
val logStream = KinesisUtils.createStream[Option[LogEvent]](
ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
jsonLogHandler
)
// Filter out malformed messages and process valid logs
val validLogs = logStream.flatMap(identity)import com.amazonaws.services.kinesis.model.Record
case class EnrichedMessage(
data: String,
partitionKey: String,
sequenceNumber: String,
arrivalTime: Long,
shardId: Option[String] = None
)
// Message handler that captures metadata
def enrichedMessageHandler(record: Record): EnrichedMessage = {
val data = new String(record.getData().array(), StandardCharsets.UTF_8)
val partitionKey = record.getPartitionKey()
val sequenceNumber = record.getSequenceNumber()
val arrivalTime = record.getApproximateArrivalTimestamp().getTime()
EnrichedMessage(data, partitionKey, sequenceNumber, arrivalTime)
}
val enrichedStream = KinesisUtils.createStream[EnrichedMessage](
ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
enrichedMessageHandler
)import com.amazonaws.services.kinesis.model.Record
import java.nio.ByteBuffer
case class ImageMetadata(
format: String,
width: Int,
height: Int,
size: Long
)
// Handler for binary image data
def imageMetadataHandler(record: Record): ImageMetadata = {
val buffer = record.getData()
val bytes = new Array[Byte](buffer.remaining())
buffer.get(bytes)
// Simple image format detection (first few bytes)
val format = bytes.take(4) match {
case Array(-1, -40, -1, _*) => "JPEG"
case Array(-119, 80, 78, 71, _*) => "PNG"
case Array(71, 73, 70, _*) => "GIF"
case _ => "UNKNOWN"
}
ImageMetadata(
format = format,
width = 0, // Would parse from headers in real implementation
height = 0,
size = bytes.length
)
}
val imageStream = KinesisUtils.createStream[ImageMetadata](
ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
imageMetadataHandler
)import com.amazonaws.services.kinesis.model.Record
import org.apache.avro.io.{DecoderFactory, DatumReader}
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
// Generic Avro message handler
def avroMessageHandler[T <: SpecificRecordBase](
schema: Schema,
recordClass: Class[T]
)(record: Record): Option[T] = {
try {
val bytes = record.getData().array()
val decoder = DecoderFactory.get().binaryDecoder(bytes, null)
val reader: DatumReader[T] = new SpecificDatumReader[T](schema)
Some(reader.read(null.asInstanceOf[T], decoder))
} catch {
case _: Exception => None // Handle malformed Avro gracefully
}
}
// Usage with specific Avro-generated class
val avroStream = KinesisUtils.createStream[Option[MyAvroRecord]](
ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
avroMessageHandler(MyAvroRecord.getClassSchema(), classOf[MyAvroRecord])
)Option[T] or Either[Error, T] for error-prone parsing// Process messages in batches for efficiency
kinesisStream.foreachRDD { rdd =>
val messages = rdd.collect() // Be careful with large batches
// Batch process messages
val processed = processMessageBatch(messages)
// Write to external system
writeToDatabase(processed)
}// Chain filtering and transformation operations
val processedStream = kinesisStream
.filter(_.nonEmpty) // Filter out empty messages
.map(parseMessage) // Parse into structured format
.filter(_.isValid) // Filter valid messages only
.map(enrichMessage) // Add additional metadata// Separate valid and invalid messages
val (validMessages, errorMessages) = kinesisStream.map { record =>
parseMessageSafely(record) match {
case Success(msg) => (Some(msg), None)
case Failure(err) => (None, Some(err))
}
}.cache()
val validStream = validMessages.flatMap(_._1)
val errorStream = errorMessages.flatMap(_._2)
// Process streams separately
validStream.foreachRDD(processValidMessages)
errorStream.foreachRDD(logErrors)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10