0
# Fault Tolerance & Recovery
1
2
Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures. The integration provides automatic checkpointing, sequence number tracking, and stream recovery capabilities.
3
4
## Core Fault Tolerance Components
5
6
### Sequence Number-Based Recovery
7
8
The system tracks Kinesis sequence numbers to enable precise recovery from stream processing failures.
9
10
```scala { .api }
11
case class SequenceNumberRange(
12
streamName: String,
13
shardId: String,
14
fromSeqNumber: String,
15
toSeqNumber: String
16
)
17
18
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
19
def isEmpty(): Boolean
20
def nonEmpty(): Boolean
21
override def toString(): String
22
}
23
```
24
25
**Usage Pattern:**
26
- Each processed batch of records is associated with sequence number ranges
27
- When failures occur, the system can recover by resuming from the last successfully processed sequence number
28
- Sequence numbers are monotonically increasing strings that provide ordering guarantees
29
30
### KinesisBackedBlockRDD
31
32
Fault-tolerant RDD implementation that can recover data directly from Kinesis using stored sequence numbers.
33
34
```scala { .api }
35
class KinesisBackedBlockRDD[T: ClassTag](
36
sc: SparkContext,
37
regionName: String,
38
endpointUrl: String,
39
blockIds: Array[BlockId],
40
arrayOfseqNumberRanges: Array[SequenceNumberRanges],
41
isBlockIdValid: Array[Boolean] = Array.empty,
42
retryTimeoutMs: Int = 10000,
43
messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
44
awsCredentialsOption: Option[SerializableAWSCredentials] = None
45
) extends BlockRDD[T](sc, blockIds) {
46
47
def isValid(): Boolean
48
def getPartitions: Array[Partition]
49
def compute(split: Partition, context: TaskContext): Iterator[T]
50
}
51
```
52
53
**Key Features:**
54
- Automatically recreates data from Kinesis when local storage is unavailable
55
- Uses sequence number ranges to fetch exact data ranges that were lost
56
- Provides configurable retry timeout for recovery operations
57
- Maintains data lineage for reliable stream processing
58
59
### Checkpointing System
60
61
Automatic checkpoint coordination through DynamoDB for tracking stream progress.
62
63
```scala { .api }
64
class KinesisCheckpointer(
65
receiver: KinesisReceiver[_],
66
checkpointInterval: Duration,
67
workerId: String,
68
clock: Clock = new SystemClock
69
) extends Logging {
70
71
def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit
72
def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit
73
}
74
```
75
76
**Checkpointing Process:**
77
1. Periodic checkpointing to DynamoDB based on `checkpointInterval`
78
2. Per-shard sequence number tracking
79
3. Coordinated checkpoint management across multiple workers
80
4. Automatic cleanup when shards are closed or reassigned
81
82
## Fault Tolerance Configuration
83
84
### Checkpoint Intervals
85
86
Configure checkpoint frequency to balance fault tolerance and performance:
87
88
```scala
89
// High frequency checkpointing (lower data loss, higher cost)
90
val highFrequencyInterval = Seconds(10)
91
92
// Balanced checkpointing (recommended for most applications)
93
val balancedInterval = Seconds(30)
94
95
// Low frequency checkpointing (higher potential data loss, lower cost)
96
val lowFrequencyInterval = Seconds(120)
97
98
val stream = KinesisUtils.createStream(
99
ssc,
100
"fault-tolerant-app",
101
"reliable-stream",
102
endpointUrl,
103
regionName,
104
InitialPositionInStream.LATEST,
105
balancedInterval, // Checkpoint frequency
106
StorageLevel.MEMORY_AND_DISK_2
107
)
108
```
109
110
### Storage Level Configuration
111
112
Choose storage levels that provide appropriate fault tolerance:
113
114
```scala { .api }
115
import org.apache.spark.storage.StorageLevel
116
117
// Recommended: Memory and disk with replication
118
StorageLevel.MEMORY_AND_DISK_2 // Best fault tolerance
119
StorageLevel.MEMORY_AND_DISK // Good fault tolerance
120
StorageLevel.MEMORY_ONLY_2 // Memory-only with replication
121
StorageLevel.DISK_ONLY_2 // Disk-only with replication
122
```
123
124
**MEMORY_AND_DISK_2** provides the best balance of performance and fault tolerance with both memory caching and disk persistence, plus replication across nodes.
125
126
### Recovery Timeout Configuration
127
128
Configure retry behavior for failed operations:
129
130
```scala
131
// Custom timeout for Kinesis data recovery
132
val customRDD = new KinesisBackedBlockRDD[String](
133
sparkContext,
134
"us-east-1",
135
"https://kinesis.us-east-1.amazonaws.com",
136
blockIds,
137
sequenceRanges,
138
isBlockIdValid,
139
retryTimeoutMs = 30000, // 30 second timeout
140
messageHandler = record => new String(record.getData.array()),
141
None
142
)
143
```
144
145
## Recovery Scenarios
146
147
### Worker Node Failure
148
149
When a Spark worker node fails:
150
151
1. **Block Recovery**: KinesisBackedBlockRDD automatically detects missing blocks
152
2. **Sequence Number Lookup**: Retrieves sequence number ranges for missing data
153
3. **Kinesis Re-read**: Fetches data directly from Kinesis using sequence numbers
154
4. **Processing Continuation**: Resumes processing from the recovered data
155
156
```scala
157
// Example recovery process (handled automatically)
158
def recoverFromFailure(lostBlockIds: Array[BlockId]): Iterator[Record] = {
159
val correspondingRanges = getSequenceRangesForBlocks(lostBlockIds)
160
val kinesisClient = createKinesisClient()
161
162
correspondingRanges.flatMap { range =>
163
recoverRecordsFromKinesis(kinesisClient, range)
164
}.iterator
165
}
166
```
167
168
### Application Restart
169
170
When the entire Spark Streaming application restarts:
171
172
1. **Checkpoint Recovery**: Loads stream state from Spark checkpoint directory
173
2. **DynamoDB Lookup**: Retrieves last processed sequence numbers from DynamoDB
174
3. **Stream Resumption**: Continues processing from the last checkpointed position
175
4. **Gap Detection**: Identifies any unprocessed data and recovers accordingly
176
177
```scala
178
// Configure checkpoint directory for application restart recovery
179
ssc.checkpoint("hdfs://cluster/checkpoints/kinesis-app")
180
181
// Stream will automatically resume from last checkpoint
182
val recoveredStream = KinesisUtils.createStream(
183
ssc, // StreamingContext will load from checkpoint
184
"persistent-app",
185
"continuous-stream",
186
endpointUrl,
187
regionName,
188
InitialPositionInStream.LATEST, // Only used if no checkpoint exists
189
Seconds(30),
190
StorageLevel.MEMORY_AND_DISK_2
191
)
192
```
193
194
### Shard Splits and Merges
195
196
Kinesis shard topology changes are handled automatically:
197
198
1. **Shard Discovery**: KCL automatically discovers new shards
199
2. **Checkpoint Migration**: Sequence numbers are properly migrated
200
3. **Processing Continuity**: Stream processing continues across shard changes
201
4. **Resource Adjustment**: Worker allocation adjusts to new shard count
202
203
## Monitoring and Alerting
204
205
### Key Metrics to Monitor
206
207
Monitor these metrics for fault tolerance health:
208
209
```scala
210
// Checkpoint success rate
211
val checkpointSuccessRate = successfulCheckpoints / totalCheckpointAttempts
212
213
// Recovery frequency
214
val recoveryRate = recoveryEvents / totalProcessingTime
215
216
// Processing lag
217
val processingLag = currentTime - lastProcessedRecordTime
218
219
// DynamoDB throttling
220
val dynamoThrottleRate = throttledRequests / totalDynamoRequests
221
```
222
223
### Failure Detection Patterns
224
225
```scala
226
// Monitor for stuck processing
227
def detectStuckProcessing(
228
lastCheckpointTime: Long,
229
maxAllowedLag: Duration
230
): Boolean = {
231
val currentTime = System.currentTimeMillis()
232
val lag = currentTime - lastCheckpointTime
233
lag > maxAllowedLag.milliseconds
234
}
235
236
// Monitor for high error rates
237
def detectHighErrorRate(
238
errorCount: Int,
239
totalCount: Int,
240
threshold: Double = 0.05
241
): Boolean = {
242
val errorRate = errorCount.toDouble / totalCount
243
errorRate > threshold
244
}
245
```
246
247
## Best Practices
248
249
### Checkpoint Strategy
250
251
1. **Consistent Intervals**: Use consistent checkpoint intervals across restarts
252
2. **Secure Storage**: Store checkpoints in reliable, secure storage (HDFS, S3)
253
3. **Regular Cleanup**: Implement checkpoint cleanup policies to prevent disk bloat
254
255
```scala
256
// Checkpoint cleanup configuration
257
import org.apache.spark.streaming.StreamingContext._
258
259
// Enable automatic checkpoint cleanup
260
ssc.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
261
ssc.conf.set("spark.streaming.backpressure.enabled", "true")
262
```
263
264
### Error Handling
265
266
```scala
267
// Implement graceful error handling in message processing
268
val faultTolerantStream = kinesisStream.map { record =>
269
try {
270
processRecord(record)
271
} catch {
272
case NonFatal(e) =>
273
logError(s"Error processing record: ${e.getMessage}")
274
// Return error indicator or skip record
275
None
276
}
277
}.filter(_.isDefined).map(_.get)
278
```
279
280
### Monitoring Integration
281
282
```scala
283
// Integrate with monitoring systems
284
stream.foreachRDD { rdd =>
285
val recordCount = rdd.count()
286
val processingTime = System.currentTimeMillis()
287
288
// Send metrics to monitoring system
289
metricsCollector.recordGauge("kinesis.records.processed", recordCount)
290
metricsCollector.recordGauge("kinesis.processing.timestamp", processingTime)
291
292
// Alert on processing delays
293
if (isProcessingDelayed(processingTime)) {
294
alertingSystem.sendAlert("Kinesis processing delayed")
295
}
296
}
297
```
298
299
### Resource Management
300
301
```scala
302
// Configure appropriate resource allocation
303
val sparkConf = new SparkConf()
304
.set("spark.streaming.receiver.maxRate", "1000") // Rate limiting
305
.set("spark.streaming.backpressure.enabled", "true") // Automatic backpressure
306
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Efficient serialization
307
.set("spark.streaming.blockInterval", "200ms") // Block interval tuning
308
```
309
310
## Troubleshooting Common Issues
311
312
### DynamoDB Throttling
313
314
```scala
315
// Handle DynamoDB throttling
316
val kinesisConf = new SparkConf()
317
.set("spark.streaming.kinesis.client.maxRetries", "10")
318
.set("spark.streaming.kinesis.client.retryDelayMs", "1000")
319
```
320
321
### Memory Pressure
322
323
```scala
324
// Configure memory management for large streams
325
val memoryOptimizedConf = new SparkConf()
326
.set("spark.streaming.unpersist", "true") // Auto-unpersist old RDDs
327
.set("spark.streaming.blockInterval", "50ms") // Smaller blocks
328
.set("spark.executor.memory", "4g") // Adequate executor memory
329
```
330
331
### Network Partitions
332
333
```scala
334
// Handle network partitions and connectivity issues
335
val networkResilientStream = KinesisUtils.createStream(
336
ssc,
337
"network-resilient-app",
338
"reliable-stream",
339
endpointUrl,
340
regionName,
341
InitialPositionInStream.LATEST,
342
Seconds(60), // Longer checkpoint interval during network issues
343
StorageLevel.MEMORY_AND_DISK_2 // Ensure local persistence
344
)
345
```