or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

credential-management.mdfault-tolerance.mdindex.mdjava-api.mdstream-creation.md

fault-tolerance.mddocs/

0

# Fault Tolerance & Recovery

1

2

Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures. The integration provides automatic checkpointing, sequence number tracking, and stream recovery capabilities.

3

4

## Core Fault Tolerance Components

5

6

### Sequence Number-Based Recovery

7

8

The system tracks Kinesis sequence numbers to enable precise recovery from stream processing failures.

9

10

```scala { .api }

11

case class SequenceNumberRange(

12

streamName: String,

13

shardId: String,

14

fromSeqNumber: String,

15

toSeqNumber: String

16

)

17

18

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {

19

def isEmpty(): Boolean

20

def nonEmpty(): Boolean

21

override def toString(): String

22

}

23

```

24

25

**Usage Pattern:**

26

- Each processed batch of records is associated with sequence number ranges

27

- When failures occur, the system can recover by resuming from the last successfully processed sequence number

28

- Sequence numbers are monotonically increasing strings that provide ordering guarantees

29

30

### KinesisBackedBlockRDD

31

32

Fault-tolerant RDD implementation that can recover data directly from Kinesis using stored sequence numbers.

33

34

```scala { .api }

35

class KinesisBackedBlockRDD[T: ClassTag](

36

sc: SparkContext,

37

regionName: String,

38

endpointUrl: String,

39

blockIds: Array[BlockId],

40

arrayOfseqNumberRanges: Array[SequenceNumberRanges],

41

isBlockIdValid: Array[Boolean] = Array.empty,

42

retryTimeoutMs: Int = 10000,

43

messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,

44

awsCredentialsOption: Option[SerializableAWSCredentials] = None

45

) extends BlockRDD[T](sc, blockIds) {

46

47

def isValid(): Boolean

48

def getPartitions: Array[Partition]

49

def compute(split: Partition, context: TaskContext): Iterator[T]

50

}

51

```

52

53

**Key Features:**

54

- Automatically recreates data from Kinesis when local storage is unavailable

55

- Uses sequence number ranges to fetch exact data ranges that were lost

56

- Provides configurable retry timeout for recovery operations

57

- Maintains data lineage for reliable stream processing

58

59

### Checkpointing System

60

61

Automatic checkpoint coordination through DynamoDB for tracking stream progress.

62

63

```scala { .api }

64

class KinesisCheckpointer(

65

receiver: KinesisReceiver[_],

66

checkpointInterval: Duration,

67

workerId: String,

68

clock: Clock = new SystemClock

69

) extends Logging {

70

71

def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit

72

def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit

73

}

74

```

75

76

**Checkpointing Process:**

77

1. Periodic checkpointing to DynamoDB based on `checkpointInterval`

78

2. Per-shard sequence number tracking

79

3. Coordinated checkpoint management across multiple workers

80

4. Automatic cleanup when shards are closed or reassigned

81

82

## Fault Tolerance Configuration

83

84

### Checkpoint Intervals

85

86

Configure checkpoint frequency to balance fault tolerance and performance:

87

88

```scala

89

// High frequency checkpointing (lower data loss, higher cost)

90

val highFrequencyInterval = Seconds(10)

91

92

// Balanced checkpointing (recommended for most applications)

93

val balancedInterval = Seconds(30)

94

95

// Low frequency checkpointing (higher potential data loss, lower cost)

96

val lowFrequencyInterval = Seconds(120)

97

98

val stream = KinesisUtils.createStream(

99

ssc,

100

"fault-tolerant-app",

101

"reliable-stream",

102

endpointUrl,

103

regionName,

104

InitialPositionInStream.LATEST,

105

balancedInterval, // Checkpoint frequency

106

StorageLevel.MEMORY_AND_DISK_2

107

)

108

```

109

110

### Storage Level Configuration

111

112

Choose storage levels that provide appropriate fault tolerance:

113

114

```scala { .api }

115

import org.apache.spark.storage.StorageLevel

116

117

// Recommended: Memory and disk with replication

118

StorageLevel.MEMORY_AND_DISK_2 // Best fault tolerance

119

StorageLevel.MEMORY_AND_DISK // Good fault tolerance

120

StorageLevel.MEMORY_ONLY_2 // Memory-only with replication

121

StorageLevel.DISK_ONLY_2 // Disk-only with replication

122

```

123

124

**MEMORY_AND_DISK_2** provides the best balance of performance and fault tolerance with both memory caching and disk persistence, plus replication across nodes.

125

126

### Recovery Timeout Configuration

127

128

Configure retry behavior for failed operations:

129

130

```scala

131

// Custom timeout for Kinesis data recovery

132

val customRDD = new KinesisBackedBlockRDD[String](

133

sparkContext,

134

"us-east-1",

135

"https://kinesis.us-east-1.amazonaws.com",

136

blockIds,

137

sequenceRanges,

138

isBlockIdValid,

139

retryTimeoutMs = 30000, // 30 second timeout

140

messageHandler = record => new String(record.getData.array()),

141

None

142

)

143

```

144

145

## Recovery Scenarios

146

147

### Worker Node Failure

148

149

When a Spark worker node fails:

150

151

1. **Block Recovery**: KinesisBackedBlockRDD automatically detects missing blocks

152

2. **Sequence Number Lookup**: Retrieves sequence number ranges for missing data

153

3. **Kinesis Re-read**: Fetches data directly from Kinesis using sequence numbers

154

4. **Processing Continuation**: Resumes processing from the recovered data

155

156

```scala

157

// Example recovery process (handled automatically)

158

def recoverFromFailure(lostBlockIds: Array[BlockId]): Iterator[Record] = {

159

val correspondingRanges = getSequenceRangesForBlocks(lostBlockIds)

160

val kinesisClient = createKinesisClient()

161

162

correspondingRanges.flatMap { range =>

163

recoverRecordsFromKinesis(kinesisClient, range)

164

}.iterator

165

}

166

```

167

168

### Application Restart

169

170

When the entire Spark Streaming application restarts:

171

172

1. **Checkpoint Recovery**: Loads stream state from Spark checkpoint directory

173

2. **DynamoDB Lookup**: Retrieves last processed sequence numbers from DynamoDB

174

3. **Stream Resumption**: Continues processing from the last checkpointed position

175

4. **Gap Detection**: Identifies any unprocessed data and recovers accordingly

176

177

```scala

178

// Configure checkpoint directory for application restart recovery

179

ssc.checkpoint("hdfs://cluster/checkpoints/kinesis-app")

180

181

// Stream will automatically resume from last checkpoint

182

val recoveredStream = KinesisUtils.createStream(

183

ssc, // StreamingContext will load from checkpoint

184

"persistent-app",

185

"continuous-stream",

186

endpointUrl,

187

regionName,

188

InitialPositionInStream.LATEST, // Only used if no checkpoint exists

189

Seconds(30),

190

StorageLevel.MEMORY_AND_DISK_2

191

)

192

```

193

194

### Shard Splits and Merges

195

196

Kinesis shard topology changes are handled automatically:

197

198

1. **Shard Discovery**: KCL automatically discovers new shards

199

2. **Checkpoint Migration**: Sequence numbers are properly migrated

200

3. **Processing Continuity**: Stream processing continues across shard changes

201

4. **Resource Adjustment**: Worker allocation adjusts to new shard count

202

203

## Monitoring and Alerting

204

205

### Key Metrics to Monitor

206

207

Monitor these metrics for fault tolerance health:

208

209

```scala

210

// Checkpoint success rate

211

val checkpointSuccessRate = successfulCheckpoints / totalCheckpointAttempts

212

213

// Recovery frequency

214

val recoveryRate = recoveryEvents / totalProcessingTime

215

216

// Processing lag

217

val processingLag = currentTime - lastProcessedRecordTime

218

219

// DynamoDB throttling

220

val dynamoThrottleRate = throttledRequests / totalDynamoRequests

221

```

222

223

### Failure Detection Patterns

224

225

```scala

226

// Monitor for stuck processing

227

def detectStuckProcessing(

228

lastCheckpointTime: Long,

229

maxAllowedLag: Duration

230

): Boolean = {

231

val currentTime = System.currentTimeMillis()

232

val lag = currentTime - lastCheckpointTime

233

lag > maxAllowedLag.milliseconds

234

}

235

236

// Monitor for high error rates

237

def detectHighErrorRate(

238

errorCount: Int,

239

totalCount: Int,

240

threshold: Double = 0.05

241

): Boolean = {

242

val errorRate = errorCount.toDouble / totalCount

243

errorRate > threshold

244

}

245

```

246

247

## Best Practices

248

249

### Checkpoint Strategy

250

251

1. **Consistent Intervals**: Use consistent checkpoint intervals across restarts

252

2. **Secure Storage**: Store checkpoints in reliable, secure storage (HDFS, S3)

253

3. **Regular Cleanup**: Implement checkpoint cleanup policies to prevent disk bloat

254

255

```scala

256

// Checkpoint cleanup configuration

257

import org.apache.spark.streaming.StreamingContext._

258

259

// Enable automatic checkpoint cleanup

260

ssc.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

261

ssc.conf.set("spark.streaming.backpressure.enabled", "true")

262

```

263

264

### Error Handling

265

266

```scala

267

// Implement graceful error handling in message processing

268

val faultTolerantStream = kinesisStream.map { record =>

269

try {

270

processRecord(record)

271

} catch {

272

case NonFatal(e) =>

273

logError(s"Error processing record: ${e.getMessage}")

274

// Return error indicator or skip record

275

None

276

}

277

}.filter(_.isDefined).map(_.get)

278

```

279

280

### Monitoring Integration

281

282

```scala

283

// Integrate with monitoring systems

284

stream.foreachRDD { rdd =>

285

val recordCount = rdd.count()

286

val processingTime = System.currentTimeMillis()

287

288

// Send metrics to monitoring system

289

metricsCollector.recordGauge("kinesis.records.processed", recordCount)

290

metricsCollector.recordGauge("kinesis.processing.timestamp", processingTime)

291

292

// Alert on processing delays

293

if (isProcessingDelayed(processingTime)) {

294

alertingSystem.sendAlert("Kinesis processing delayed")

295

}

296

}

297

```

298

299

### Resource Management

300

301

```scala

302

// Configure appropriate resource allocation

303

val sparkConf = new SparkConf()

304

.set("spark.streaming.receiver.maxRate", "1000") // Rate limiting

305

.set("spark.streaming.backpressure.enabled", "true") // Automatic backpressure

306

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Efficient serialization

307

.set("spark.streaming.blockInterval", "200ms") // Block interval tuning

308

```

309

310

## Troubleshooting Common Issues

311

312

### DynamoDB Throttling

313

314

```scala

315

// Handle DynamoDB throttling

316

val kinesisConf = new SparkConf()

317

.set("spark.streaming.kinesis.client.maxRetries", "10")

318

.set("spark.streaming.kinesis.client.retryDelayMs", "1000")

319

```

320

321

### Memory Pressure

322

323

```scala

324

// Configure memory management for large streams

325

val memoryOptimizedConf = new SparkConf()

326

.set("spark.streaming.unpersist", "true") // Auto-unpersist old RDDs

327

.set("spark.streaming.blockInterval", "50ms") // Smaller blocks

328

.set("spark.executor.memory", "4g") // Adequate executor memory

329

```

330

331

### Network Partitions

332

333

```scala

334

// Handle network partitions and connectivity issues

335

val networkResilientStream = KinesisUtils.createStream(

336

ssc,

337

"network-resilient-app",

338

"reliable-stream",

339

endpointUrl,

340

regionName,

341

InitialPositionInStream.LATEST,

342

Seconds(60), // Longer checkpoint interval during network issues

343

StorageLevel.MEMORY_AND_DISK_2 // Ensure local persistence

344

)

345

```