CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management

Pending
Overview
Eval results
Files

fault-tolerance.mddocs/

Fault Tolerance & Recovery

Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures. The integration provides automatic checkpointing, sequence number tracking, and stream recovery capabilities.

Core Fault Tolerance Components

Sequence Number-Based Recovery

The system tracks Kinesis sequence numbers to enable precise recovery from stream processing failures.

case class SequenceNumberRange(
  streamName: String,
  shardId: String,
  fromSeqNumber: String,
  toSeqNumber: String
) 

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
  def isEmpty(): Boolean
  def nonEmpty(): Boolean
  override def toString(): String
}

Usage Pattern:

  • Each processed batch of records is associated with sequence number ranges
  • When failures occur, the system can recover by resuming from the last successfully processed sequence number
  • Sequence numbers are monotonically increasing strings that provide ordering guarantees

KinesisBackedBlockRDD

Fault-tolerant RDD implementation that can recover data directly from Kinesis using stored sequence numbers.

class KinesisBackedBlockRDD[T: ClassTag](
  sc: SparkContext,
  regionName: String,
  endpointUrl: String,
  blockIds: Array[BlockId],
  arrayOfseqNumberRanges: Array[SequenceNumberRanges],
  isBlockIdValid: Array[Boolean] = Array.empty,
  retryTimeoutMs: Int = 10000,
  messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
  awsCredentialsOption: Option[SerializableAWSCredentials] = None
) extends BlockRDD[T](sc, blockIds) {
  
  def isValid(): Boolean
  def getPartitions: Array[Partition]
  def compute(split: Partition, context: TaskContext): Iterator[T]
}

Key Features:

  • Automatically recreates data from Kinesis when local storage is unavailable
  • Uses sequence number ranges to fetch exact data ranges that were lost
  • Provides configurable retry timeout for recovery operations
  • Maintains data lineage for reliable stream processing

Checkpointing System

Automatic checkpoint coordination through DynamoDB for tracking stream progress.

class KinesisCheckpointer(
  receiver: KinesisReceiver[_],
  checkpointInterval: Duration,
  workerId: String,
  clock: Clock = new SystemClock
) extends Logging {
  
  def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit
  def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit
}

Checkpointing Process:

  1. Periodic checkpointing to DynamoDB based on checkpointInterval
  2. Per-shard sequence number tracking
  3. Coordinated checkpoint management across multiple workers
  4. Automatic cleanup when shards are closed or reassigned

Fault Tolerance Configuration

Checkpoint Intervals

Configure checkpoint frequency to balance fault tolerance and performance:

// High frequency checkpointing (lower data loss, higher cost)
val highFrequencyInterval = Seconds(10)

// Balanced checkpointing (recommended for most applications)
val balancedInterval = Seconds(30)

// Low frequency checkpointing (higher potential data loss, lower cost)
val lowFrequencyInterval = Seconds(120)

val stream = KinesisUtils.createStream(
  ssc,
  "fault-tolerant-app",
  "reliable-stream",
  endpointUrl,
  regionName,
  InitialPositionInStream.LATEST,
  balancedInterval, // Checkpoint frequency
  StorageLevel.MEMORY_AND_DISK_2
)

Storage Level Configuration

Choose storage levels that provide appropriate fault tolerance:

import org.apache.spark.storage.StorageLevel

// Recommended: Memory and disk with replication
StorageLevel.MEMORY_AND_DISK_2    // Best fault tolerance
StorageLevel.MEMORY_AND_DISK      // Good fault tolerance
StorageLevel.MEMORY_ONLY_2        // Memory-only with replication
StorageLevel.DISK_ONLY_2          // Disk-only with replication

MEMORY_AND_DISK_2 provides the best balance of performance and fault tolerance with both memory caching and disk persistence, plus replication across nodes.

Recovery Timeout Configuration

Configure retry behavior for failed operations:

// Custom timeout for Kinesis data recovery
val customRDD = new KinesisBackedBlockRDD[String](
  sparkContext,
  "us-east-1",
  "https://kinesis.us-east-1.amazonaws.com",
  blockIds,
  sequenceRanges,
  isBlockIdValid,
  retryTimeoutMs = 30000, // 30 second timeout
  messageHandler = record => new String(record.getData.array()),
  None
)

Recovery Scenarios

Worker Node Failure

When a Spark worker node fails:

  1. Block Recovery: KinesisBackedBlockRDD automatically detects missing blocks
  2. Sequence Number Lookup: Retrieves sequence number ranges for missing data
  3. Kinesis Re-read: Fetches data directly from Kinesis using sequence numbers
  4. Processing Continuation: Resumes processing from the recovered data
// Example recovery process (handled automatically)
def recoverFromFailure(lostBlockIds: Array[BlockId]): Iterator[Record] = {
  val correspondingRanges = getSequenceRangesForBlocks(lostBlockIds)
  val kinesisClient = createKinesisClient()
  
  correspondingRanges.flatMap { range =>
    recoverRecordsFromKinesis(kinesisClient, range)
  }.iterator
}

Application Restart

When the entire Spark Streaming application restarts:

  1. Checkpoint Recovery: Loads stream state from Spark checkpoint directory
  2. DynamoDB Lookup: Retrieves last processed sequence numbers from DynamoDB
  3. Stream Resumption: Continues processing from the last checkpointed position
  4. Gap Detection: Identifies any unprocessed data and recovers accordingly
// Configure checkpoint directory for application restart recovery
ssc.checkpoint("hdfs://cluster/checkpoints/kinesis-app")

// Stream will automatically resume from last checkpoint
val recoveredStream = KinesisUtils.createStream(
  ssc, // StreamingContext will load from checkpoint
  "persistent-app",
  "continuous-stream",
  endpointUrl,
  regionName,
  InitialPositionInStream.LATEST, // Only used if no checkpoint exists
  Seconds(30),
  StorageLevel.MEMORY_AND_DISK_2
)

Shard Splits and Merges

Kinesis shard topology changes are handled automatically:

  1. Shard Discovery: KCL automatically discovers new shards
  2. Checkpoint Migration: Sequence numbers are properly migrated
  3. Processing Continuity: Stream processing continues across shard changes
  4. Resource Adjustment: Worker allocation adjusts to new shard count

Monitoring and Alerting

Key Metrics to Monitor

Monitor these metrics for fault tolerance health:

// Checkpoint success rate
val checkpointSuccessRate = successfulCheckpoints / totalCheckpointAttempts

// Recovery frequency
val recoveryRate = recoveryEvents / totalProcessingTime

// Processing lag
val processingLag = currentTime - lastProcessedRecordTime

// DynamoDB throttling
val dynamoThrottleRate = throttledRequests / totalDynamoRequests

Failure Detection Patterns

// Monitor for stuck processing
def detectStuckProcessing(
  lastCheckpointTime: Long, 
  maxAllowedLag: Duration
): Boolean = {
  val currentTime = System.currentTimeMillis()
  val lag = currentTime - lastCheckpointTime
  lag > maxAllowedLag.milliseconds
}

// Monitor for high error rates
def detectHighErrorRate(
  errorCount: Int,
  totalCount: Int,
  threshold: Double = 0.05
): Boolean = {
  val errorRate = errorCount.toDouble / totalCount
  errorRate > threshold
}

Best Practices

Checkpoint Strategy

  1. Consistent Intervals: Use consistent checkpoint intervals across restarts
  2. Secure Storage: Store checkpoints in reliable, secure storage (HDFS, S3)
  3. Regular Cleanup: Implement checkpoint cleanup policies to prevent disk bloat
// Checkpoint cleanup configuration
import org.apache.spark.streaming.StreamingContext._

// Enable automatic checkpoint cleanup
ssc.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
ssc.conf.set("spark.streaming.backpressure.enabled", "true")

Error Handling

// Implement graceful error handling in message processing
val faultTolerantStream = kinesisStream.map { record =>
  try {
    processRecord(record)
  } catch {
    case NonFatal(e) =>
      logError(s"Error processing record: ${e.getMessage}")
      // Return error indicator or skip record
      None
  }
}.filter(_.isDefined).map(_.get)

Monitoring Integration

// Integrate with monitoring systems
stream.foreachRDD { rdd =>
  val recordCount = rdd.count()
  val processingTime = System.currentTimeMillis()
  
  // Send metrics to monitoring system
  metricsCollector.recordGauge("kinesis.records.processed", recordCount)
  metricsCollector.recordGauge("kinesis.processing.timestamp", processingTime)
  
  // Alert on processing delays
  if (isProcessingDelayed(processingTime)) {
    alertingSystem.sendAlert("Kinesis processing delayed")
  }
}

Resource Management

// Configure appropriate resource allocation
val sparkConf = new SparkConf()
  .set("spark.streaming.receiver.maxRate", "1000") // Rate limiting
  .set("spark.streaming.backpressure.enabled", "true") // Automatic backpressure
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Efficient serialization
  .set("spark.streaming.blockInterval", "200ms") // Block interval tuning

Troubleshooting Common Issues

DynamoDB Throttling

// Handle DynamoDB throttling
val kinesisConf = new SparkConf()
  .set("spark.streaming.kinesis.client.maxRetries", "10")
  .set("spark.streaming.kinesis.client.retryDelayMs", "1000")

Memory Pressure

// Configure memory management for large streams
val memoryOptimizedConf = new SparkConf()
  .set("spark.streaming.unpersist", "true") // Auto-unpersist old RDDs
  .set("spark.streaming.blockInterval", "50ms") // Smaller blocks
  .set("spark.executor.memory", "4g") // Adequate executor memory

Network Partitions

// Handle network partitions and connectivity issues
val networkResilientStream = KinesisUtils.createStream(
  ssc,
  "network-resilient-app",
  "reliable-stream",
  endpointUrl,
  regionName,
  InitialPositionInStream.LATEST,
  Seconds(60), // Longer checkpoint interval during network issues
  StorageLevel.MEMORY_AND_DISK_2 // Ensure local persistence
)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

docs

credential-management.md

fault-tolerance.md

index.md

java-api.md

stream-creation.md

tile.json