Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management
—
Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures. The integration provides automatic checkpointing, sequence number tracking, and stream recovery capabilities.
The system tracks Kinesis sequence numbers to enable precise recovery from stream processing failures.
case class SequenceNumberRange(
streamName: String,
shardId: String,
fromSeqNumber: String,
toSeqNumber: String
)
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty(): Boolean
def nonEmpty(): Boolean
override def toString(): String
}Usage Pattern:
Fault-tolerant RDD implementation that can recover data directly from Kinesis using stored sequence numbers.
class KinesisBackedBlockRDD[T: ClassTag](
sc: SparkContext,
regionName: String,
endpointUrl: String,
blockIds: Array[BlockId],
arrayOfseqNumberRanges: Array[SequenceNumberRanges],
isBlockIdValid: Array[Boolean] = Array.empty,
retryTimeoutMs: Int = 10000,
messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
awsCredentialsOption: Option[SerializableAWSCredentials] = None
) extends BlockRDD[T](sc, blockIds) {
def isValid(): Boolean
def getPartitions: Array[Partition]
def compute(split: Partition, context: TaskContext): Iterator[T]
}Key Features:
Automatic checkpoint coordination through DynamoDB for tracking stream progress.
class KinesisCheckpointer(
receiver: KinesisReceiver[_],
checkpointInterval: Duration,
workerId: String,
clock: Clock = new SystemClock
) extends Logging {
def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit
def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit
}Checkpointing Process:
checkpointIntervalConfigure checkpoint frequency to balance fault tolerance and performance:
// High frequency checkpointing (lower data loss, higher cost)
val highFrequencyInterval = Seconds(10)
// Balanced checkpointing (recommended for most applications)
val balancedInterval = Seconds(30)
// Low frequency checkpointing (higher potential data loss, lower cost)
val lowFrequencyInterval = Seconds(120)
val stream = KinesisUtils.createStream(
ssc,
"fault-tolerant-app",
"reliable-stream",
endpointUrl,
regionName,
InitialPositionInStream.LATEST,
balancedInterval, // Checkpoint frequency
StorageLevel.MEMORY_AND_DISK_2
)Choose storage levels that provide appropriate fault tolerance:
import org.apache.spark.storage.StorageLevel
// Recommended: Memory and disk with replication
StorageLevel.MEMORY_AND_DISK_2 // Best fault tolerance
StorageLevel.MEMORY_AND_DISK // Good fault tolerance
StorageLevel.MEMORY_ONLY_2 // Memory-only with replication
StorageLevel.DISK_ONLY_2 // Disk-only with replicationMEMORY_AND_DISK_2 provides the best balance of performance and fault tolerance with both memory caching and disk persistence, plus replication across nodes.
Configure retry behavior for failed operations:
// Custom timeout for Kinesis data recovery
val customRDD = new KinesisBackedBlockRDD[String](
sparkContext,
"us-east-1",
"https://kinesis.us-east-1.amazonaws.com",
blockIds,
sequenceRanges,
isBlockIdValid,
retryTimeoutMs = 30000, // 30 second timeout
messageHandler = record => new String(record.getData.array()),
None
)When a Spark worker node fails:
// Example recovery process (handled automatically)
def recoverFromFailure(lostBlockIds: Array[BlockId]): Iterator[Record] = {
val correspondingRanges = getSequenceRangesForBlocks(lostBlockIds)
val kinesisClient = createKinesisClient()
correspondingRanges.flatMap { range =>
recoverRecordsFromKinesis(kinesisClient, range)
}.iterator
}When the entire Spark Streaming application restarts:
// Configure checkpoint directory for application restart recovery
ssc.checkpoint("hdfs://cluster/checkpoints/kinesis-app")
// Stream will automatically resume from last checkpoint
val recoveredStream = KinesisUtils.createStream(
ssc, // StreamingContext will load from checkpoint
"persistent-app",
"continuous-stream",
endpointUrl,
regionName,
InitialPositionInStream.LATEST, // Only used if no checkpoint exists
Seconds(30),
StorageLevel.MEMORY_AND_DISK_2
)Kinesis shard topology changes are handled automatically:
Monitor these metrics for fault tolerance health:
// Checkpoint success rate
val checkpointSuccessRate = successfulCheckpoints / totalCheckpointAttempts
// Recovery frequency
val recoveryRate = recoveryEvents / totalProcessingTime
// Processing lag
val processingLag = currentTime - lastProcessedRecordTime
// DynamoDB throttling
val dynamoThrottleRate = throttledRequests / totalDynamoRequests// Monitor for stuck processing
def detectStuckProcessing(
lastCheckpointTime: Long,
maxAllowedLag: Duration
): Boolean = {
val currentTime = System.currentTimeMillis()
val lag = currentTime - lastCheckpointTime
lag > maxAllowedLag.milliseconds
}
// Monitor for high error rates
def detectHighErrorRate(
errorCount: Int,
totalCount: Int,
threshold: Double = 0.05
): Boolean = {
val errorRate = errorCount.toDouble / totalCount
errorRate > threshold
}// Checkpoint cleanup configuration
import org.apache.spark.streaming.StreamingContext._
// Enable automatic checkpoint cleanup
ssc.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
ssc.conf.set("spark.streaming.backpressure.enabled", "true")// Implement graceful error handling in message processing
val faultTolerantStream = kinesisStream.map { record =>
try {
processRecord(record)
} catch {
case NonFatal(e) =>
logError(s"Error processing record: ${e.getMessage}")
// Return error indicator or skip record
None
}
}.filter(_.isDefined).map(_.get)// Integrate with monitoring systems
stream.foreachRDD { rdd =>
val recordCount = rdd.count()
val processingTime = System.currentTimeMillis()
// Send metrics to monitoring system
metricsCollector.recordGauge("kinesis.records.processed", recordCount)
metricsCollector.recordGauge("kinesis.processing.timestamp", processingTime)
// Alert on processing delays
if (isProcessingDelayed(processingTime)) {
alertingSystem.sendAlert("Kinesis processing delayed")
}
}// Configure appropriate resource allocation
val sparkConf = new SparkConf()
.set("spark.streaming.receiver.maxRate", "1000") // Rate limiting
.set("spark.streaming.backpressure.enabled", "true") // Automatic backpressure
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Efficient serialization
.set("spark.streaming.blockInterval", "200ms") // Block interval tuning// Handle DynamoDB throttling
val kinesisConf = new SparkConf()
.set("spark.streaming.kinesis.client.maxRetries", "10")
.set("spark.streaming.kinesis.client.retryDelayMs", "1000")// Configure memory management for large streams
val memoryOptimizedConf = new SparkConf()
.set("spark.streaming.unpersist", "true") // Auto-unpersist old RDDs
.set("spark.streaming.blockInterval", "50ms") // Smaller blocks
.set("spark.executor.memory", "4g") // Adequate executor memory// Handle network partitions and connectivity issues
val networkResilientStream = KinesisUtils.createStream(
ssc,
"network-resilient-app",
"reliable-stream",
endpointUrl,
regionName,
InitialPositionInStream.LATEST,
Seconds(60), // Longer checkpoint interval during network issues
StorageLevel.MEMORY_AND_DISK_2 // Ensure local persistence
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly