CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library

Pending
Overview
Eval results
Files

data-processing.mddocs/

Data Processing

Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.

Capabilities

Default Message Handler

The default message handler extracts raw byte data from Kinesis Records.

/**
 * Default message handler that extracts byte array from Kinesis Record.
 * Handles null records gracefully and converts ByteBuffer to byte array.
 *
 * @param record Kinesis Record containing message data and metadata
 * @return Array[Byte] Raw message data, or null if record is null
 */
def defaultMessageHandler(record: Record): Array[Byte]

Implementation Details:

  • Extracts data from record.getData() ByteBuffer
  • Returns null for null input records
  • Creates new byte array with remaining buffer data

Custom Message Handlers

Custom message handlers enable type-safe processing of Kinesis records with access to both message data and metadata.

/**
 * Type alias for custom message handler functions.
 * Transforms Kinesis Record into user-defined type T.
 */
type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T

Kinesis Record Structure

Understanding the Kinesis Record structure for custom message handlers:

// AWS Kinesis Record (external dependency)
// Available fields for custom message handlers:
// - record.getData(): java.nio.ByteBuffer - The message payload
// - record.getPartitionKey(): String - Partition key used for sharding  
// - record.getSequenceNumber(): String - Unique sequence number per shard
// - record.getApproximateArrivalTimestamp(): java.util.Date - Arrival time

Usage Examples

Text Message Processing

import com.amazonaws.services.kinesis.model.Record
import java.nio.charset.StandardCharsets

// Simple text message handler
def textMessageHandler(record: Record): String = {
  val buffer = record.getData()
  val bytes = new Array[Byte](buffer.remaining())
  buffer.get(bytes)
  new String(bytes, StandardCharsets.UTF_8)
}

val textStream = KinesisUtils.createStream[String](
  ssc, appName, streamName, endpointUrl, regionName,
  InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
  textMessageHandler
)

JSON Message Processing

import com.amazonaws.services.kinesis.model.Record
import spray.json._
import DefaultJsonProtocol._

case class LogEvent(
  timestamp: Long,
  level: String, 
  message: String,
  source: String
)

implicit val logEventFormat = jsonFormat4(LogEvent)

// JSON message handler with error handling
def jsonLogHandler(record: Record): Option[LogEvent] = {
  try {
    val data = new String(record.getData().array(), StandardCharsets.UTF_8)
    Some(data.parseJson.convertTo[LogEvent])
  } catch {
    case _: Exception => None // Handle malformed JSON gracefully
  }
}

val logStream = KinesisUtils.createStream[Option[LogEvent]](
  ssc, appName, streamName, endpointUrl, regionName,
  InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
  jsonLogHandler
)

// Filter out malformed messages and process valid logs
val validLogs = logStream.flatMap(identity)

Message Handler with Metadata

import com.amazonaws.services.kinesis.model.Record

case class EnrichedMessage(
  data: String,
  partitionKey: String, 
  sequenceNumber: String,
  arrivalTime: Long,
  shardId: Option[String] = None
)

// Message handler that captures metadata
def enrichedMessageHandler(record: Record): EnrichedMessage = {
  val data = new String(record.getData().array(), StandardCharsets.UTF_8)
  val partitionKey = record.getPartitionKey()
  val sequenceNumber = record.getSequenceNumber()
  val arrivalTime = record.getApproximateArrivalTimestamp().getTime()
  
  EnrichedMessage(data, partitionKey, sequenceNumber, arrivalTime)
}

val enrichedStream = KinesisUtils.createStream[EnrichedMessage](
  ssc, appName, streamName, endpointUrl, regionName,
  InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
  enrichedMessageHandler
)

Binary Data Processing

import com.amazonaws.services.kinesis.model.Record
import java.nio.ByteBuffer

case class ImageMetadata(
  format: String,
  width: Int,
  height: Int,
  size: Long
)

// Handler for binary image data
def imageMetadataHandler(record: Record): ImageMetadata = {
  val buffer = record.getData()
  val bytes = new Array[Byte](buffer.remaining())
  buffer.get(bytes)
  
  // Simple image format detection (first few bytes)
  val format = bytes.take(4) match {
    case Array(-1, -40, -1, _*) => "JPEG"
    case Array(-119, 80, 78, 71, _*) => "PNG"
    case Array(71, 73, 70, _*) => "GIF"
    case _ => "UNKNOWN"
  }
  
  ImageMetadata(
    format = format,
    width = 0, // Would parse from headers in real implementation
    height = 0,
    size = bytes.length
  )
}

val imageStream = KinesisUtils.createStream[ImageMetadata](
  ssc, appName, streamName, endpointUrl, regionName,
  InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
  imageMetadataHandler
)

Avro Message Processing

import com.amazonaws.services.kinesis.model.Record
import org.apache.avro.io.{DecoderFactory, DatumReader}
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}

// Generic Avro message handler
def avroMessageHandler[T <: SpecificRecordBase](
  schema: Schema, 
  recordClass: Class[T]
)(record: Record): Option[T] = {
  try {
    val bytes = record.getData().array()
    val decoder = DecoderFactory.get().binaryDecoder(bytes, null)
    val reader: DatumReader[T] = new SpecificDatumReader[T](schema)
    Some(reader.read(null.asInstanceOf[T], decoder))
  } catch {
    case _: Exception => None // Handle malformed Avro gracefully
  }
}

// Usage with specific Avro-generated class
val avroStream = KinesisUtils.createStream[Option[MyAvroRecord]](
  ssc, appName, streamName, endpointUrl, regionName,
  InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
  avroMessageHandler(MyAvroRecord.getClassSchema(), classOf[MyAvroRecord])
)

Message Handler Best Practices

Error Handling

  • Always handle potential exceptions in custom message handlers
  • Consider returning Option[T] or Either[Error, T] for error-prone parsing
  • Log parsing errors for monitoring and debugging
  • Avoid throwing exceptions that could crash the receiver

Performance Considerations

  • Keep message handlers lightweight and fast
  • Avoid expensive operations like network calls or database queries
  • Consider pre-compiling regex patterns or parsers outside the handler
  • Use efficient serialization libraries for structured data

Memory Management

  • Be careful with large message payloads
  • Consider streaming parsers for very large messages
  • Avoid keeping references to the original ByteBuffer
  • Use appropriate data structures for your use case

Type Safety

  • Use case classes for structured data
  • Leverage Scala's type system for compile-time safety
  • Consider using sealed traits for message variants
  • Validate data types and ranges when appropriate

Advanced Processing Patterns

Batch Message Processing

// Process messages in batches for efficiency
kinesisStream.foreachRDD { rdd =>
  val messages = rdd.collect() // Be careful with large batches
  
  // Batch process messages
  val processed = processMessageBatch(messages)
  
  // Write to external system
  writeToDatabase(processed)
}

Filtering and Transformation

// Chain filtering and transformation operations
val processedStream = kinesisStream
  .filter(_.nonEmpty) // Filter out empty messages
  .map(parseMessage)  // Parse into structured format
  .filter(_.isValid)  // Filter valid messages only
  .map(enrichMessage) // Add additional metadata

Error Stream Handling

// Separate valid and invalid messages
val (validMessages, errorMessages) = kinesisStream.map { record =>
  parseMessageSafely(record) match {
    case Success(msg) => (Some(msg), None)
    case Failure(err) => (None, Some(err))
  }
}.cache()

val validStream = validMessages.flatMap(_._1)
val errorStream = errorMessages.flatMap(_._2)

// Process streams separately
validStream.foreachRDD(processValidMessages)
errorStream.foreachRDD(logErrors)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

docs

aws-configuration.md

data-processing.md

index.md

java-api.md

stream-creation.md

tile.json