or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md

configuration.mddocs/

0

# Configuration and Performance Tuning

1

2

The configuration system provides advanced options for optimizing Kinesis stream consumption including retry logic, timeouts, storage levels, CloudWatch metrics, and checkpointing intervals. These settings are crucial for production deployments requiring high performance and reliability.

3

4

## Core Configuration API

5

6

### KinesisReadConfigurations

7

8

Controls retry behavior and timeouts for Kinesis API calls, particularly important for handling transient network issues and rate limiting.

9

10

```scala { .api }

11

case class KinesisReadConfigurations(

12

maxRetries: Int, // Maximum number of retry attempts for Kinesis API calls

13

retryWaitTimeMs: Long, // Wait time between retry attempts in milliseconds

14

retryTimeoutMs: Long // Total timeout for Kinesis operations in milliseconds

15

)

16

17

object KinesisReadConfigurations {

18

// Create with default values

19

def apply(): KinesisReadConfigurations

20

21

// Create with values from StreamingContext configuration

22

def apply(ssc: StreamingContext): KinesisReadConfigurations

23

24

// Configuration keys for SparkConf

25

val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"

26

val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"

27

28

// Default values

29

val DEFAULT_MAX_RETRIES: Int = 3

30

val DEFAULT_RETRY_WAIT_TIME: String = "100ms"

31

val DEFAULT_RETRY_TIMEOUT: Long = 10000 // 10 seconds

32

}

33

```

34

35

### Builder Configuration Methods

36

37

Additional configuration options available through the KinesisInputDStream.Builder:

38

39

```scala { .api }

40

class Builder {

41

// Performance and reliability

42

def checkpointInterval(interval: Duration): Builder

43

def storageLevel(storageLevel: StorageLevel): Builder

44

45

// CloudWatch metrics

46

def metricsLevel(metricsLevel: MetricsLevel): Builder

47

def metricsEnabledDimensions(dimensions: Set[String]): Builder

48

49

// AWS service endpoints and regions

50

def endpointUrl(url: String): Builder

51

def regionName(regionName: String): Builder

52

}

53

```

54

55

## Retry Configuration

56

57

### Using Default Configuration

58

59

```scala

60

import org.apache.spark.streaming.kinesis.KinesisReadConfigurations

61

62

// Uses default values: 3 retries, 100ms wait, 10s timeout

63

val defaultConfig = KinesisReadConfigurations()

64

65

val stream = KinesisInputDStream.builder

66

.streamingContext(ssc)

67

.streamName("my-stream")

68

.checkpointAppName("my-app")

69

.build()

70

```

71

72

### Using StreamingContext Configuration

73

74

Configure through SparkConf and let the system read from StreamingContext:

75

76

```scala

77

import org.apache.spark.SparkConf

78

import org.apache.spark.streaming.StreamingContext

79

80

val conf = new SparkConf()

81

.setAppName("KinesisApp")

82

.set("spark.streaming.kinesis.retry.maxAttempts", "5")

83

.set("spark.streaming.kinesis.retry.waitTime", "200ms")

84

85

val ssc = new StreamingContext(conf, Seconds(10))

86

87

// This will use the SparkConf values above

88

val configFromContext = KinesisReadConfigurations(ssc)

89

90

val stream = KinesisInputDStream.builder

91

.streamingContext(ssc)

92

.streamName("my-stream")

93

.checkpointAppName("my-app")

94

.build()

95

```

96

97

### Custom Retry Configuration

98

99

```scala

100

import org.apache.spark.streaming.kinesis.KinesisReadConfigurations

101

102

val customConfig = KinesisReadConfigurations(

103

maxRetries = 5, // Retry up to 5 times

104

retryWaitTimeMs = 500L, // Wait 500ms between retries

105

retryTimeoutMs = 30000L // 30 second total timeout

106

)

107

108

// Note: KinesisReadConfigurations is used internally by KinesisBackedBlockRDD

109

// The main builder uses default configurations, but you can influence this

110

// through SparkConf settings

111

```

112

113

## Storage Level Configuration

114

115

Configure how DStream blocks are stored in memory and disk for fault tolerance and performance.

116

117

```scala

118

import org.apache.spark.storage.StorageLevel

119

120

val stream = KinesisInputDStream.builder

121

.streamingContext(ssc)

122

.streamName("my-stream")

123

.checkpointAppName("my-app")

124

.storageLevel(StorageLevel.MEMORY_ONLY_2) // Store in memory with 2x replication

125

.build()

126

```

127

128

### Storage Level Options

129

130

```scala

131

// Memory only (fastest, but not fault tolerant to node failures)

132

.storageLevel(StorageLevel.MEMORY_ONLY)

133

.storageLevel(StorageLevel.MEMORY_ONLY_2) // With replication

134

135

// Memory and disk (balanced performance and fault tolerance)

136

.storageLevel(StorageLevel.MEMORY_AND_DISK)

137

.storageLevel(StorageLevel.MEMORY_AND_DISK_2) // Default, with replication

138

139

// Memory with serialization (more memory efficient)

140

.storageLevel(StorageLevel.MEMORY_ONLY_SER)

141

.storageLevel(StorageLevel.MEMORY_ONLY_SER_2)

142

143

// Disk only (most fault tolerant, slowest)

144

.storageLevel(StorageLevel.DISK_ONLY)

145

.storageLevel(StorageLevel.DISK_ONLY_2)

146

```

147

148

## Checkpoint Interval Configuration

149

150

Controls how frequently the KCL checkpoints progress to DynamoDB. This affects both fault tolerance and performance.

151

152

```scala

153

import org.apache.spark.streaming.Seconds

154

155

val stream = KinesisInputDStream.builder

156

.streamingContext(ssc)

157

.streamName("my-stream")

158

.checkpointAppName("my-app")

159

.checkpointInterval(Seconds(30)) // Checkpoint every 30 seconds

160

.build()

161

```

162

163

### Checkpoint Interval Guidelines

164

165

- **Shorter intervals** (5-10 seconds): Better fault tolerance, less data loss on failure, but higher DynamoDB costs

166

- **Longer intervals** (30-60 seconds): Lower DynamoDB costs, but more potential data loss on failure

167

- **Default**: Uses the streaming batch duration (recommended starting point)

168

169

## CloudWatch Metrics Configuration

170

171

Configure collection and reporting of metrics to AWS CloudWatch for monitoring and alerting.

172

173

```scala

174

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

175

176

val stream = KinesisInputDStream.builder

177

.streamingContext(ssc)

178

.streamName("my-stream")

179

.checkpointAppName("my-app")

180

.metricsLevel(MetricsLevel.SUMMARY)

181

.metricsEnabledDimensions(Set("Operation", "ShardId", "WorkerId"))

182

.build()

183

```

184

185

### Metrics Levels

186

187

```scala

188

// No metrics (best performance)

189

.metricsLevel(MetricsLevel.NONE)

190

191

// Summary metrics only (recommended for production)

192

.metricsLevel(MetricsLevel.SUMMARY)

193

194

// Detailed metrics (useful for debugging, higher overhead)

195

.metricsLevel(MetricsLevel.DETAILED)

196

```

197

198

### Common Metrics Dimensions

199

200

```scala

201

val productionDimensions = Set(

202

"Operation", // Type of operation (ProcessRecords, Checkpoint, etc.)

203

"ShardId", // Kinesis shard identifier

204

"WorkerId" // KCL worker identifier

205

)

206

207

val debugDimensions = Set(

208

"Operation",

209

"ShardId",

210

"WorkerId",

211

"StreamName" // Additional dimension for debugging

212

)

213

```

214

215

## Regional and Endpoint Configuration

216

217

Configure AWS service endpoints for different regions or custom endpoints.

218

219

```scala

220

val stream = KinesisInputDStream.builder

221

.streamingContext(ssc)

222

.streamName("my-stream")

223

.checkpointAppName("my-app")

224

.regionName("us-west-2")

225

.endpointUrl("https://kinesis.us-west-2.amazonaws.com")

226

.build()

227

```

228

229

### Region Configuration Best Practices

230

231

- **Match your compute region**: Use the same region as your Spark cluster to minimize latency

232

- **Consider data locality**: Place processing near your data sources and sinks

233

- **Compliance requirements**: Some regions may be required for regulatory compliance

234

235

## Complete Production Configuration Example

236

237

```scala

238

import org.apache.spark.streaming.{StreamingContext, Seconds}

239

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}

240

import org.apache.spark.storage.StorageLevel

241

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

242

import org.apache.spark.SparkConf

243

244

// Configure Spark with Kinesis-specific settings

245

val conf = new SparkConf()

246

.setAppName("ProductionKinesisApp")

247

.set("spark.streaming.kinesis.retry.maxAttempts", "5")

248

.set("spark.streaming.kinesis.retry.waitTime", "250ms")

249

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

250

251

val ssc = new StreamingContext(conf, Seconds(10))

252

253

// Configure credentials with assume role

254

val credentials = SparkAWSCredentials.builder

255

.stsCredentials("arn:aws:iam::123456789012:role/ProductionKinesisRole", "prod-session")

256

.build()

257

258

// Create production-ready stream

259

val stream = KinesisInputDStream.builder

260

.streamingContext(ssc)

261

.streamName("production-data-stream")

262

.checkpointAppName("production-consumer-v2")

263

.regionName("us-west-2")

264

.initialPosition(new KinesisInitialPositions.Latest())

265

.checkpointInterval(Seconds(30))

266

.storageLevel(StorageLevel.MEMORY_AND_DISK_2)

267

.kinesisCredentials(credentials)

268

.dynamoDBCredentials(credentials)

269

.cloudWatchCredentials(credentials)

270

.metricsLevel(MetricsLevel.SUMMARY)

271

.metricsEnabledDimensions(Set("Operation", "ShardId"))

272

.build()

273

```

274

275

## Performance Tuning Guidelines

276

277

### For High Throughput

278

279

```scala

280

val highThroughputStream = KinesisInputDStream.builder

281

.streamingContext(ssc)

282

.streamName("high-volume-stream")

283

.checkpointAppName("high-throughput-consumer")

284

.storageLevel(StorageLevel.MEMORY_ONLY_2) // Fastest storage

285

.checkpointInterval(Seconds(60)) // Less frequent checkpointing

286

.metricsLevel(MetricsLevel.NONE) // Disable metrics overhead

287

.build()

288

```

289

290

### For Maximum Reliability

291

292

```scala

293

val reliableStream = KinesisInputDStream.builder

294

.streamingContext(ssc)

295

.streamName("critical-data-stream")

296

.checkpointAppName("reliable-consumer")

297

.storageLevel(StorageLevel.MEMORY_AND_DISK_2) // Fault tolerant storage

298

.checkpointInterval(Seconds(10)) // Frequent checkpointing

299

.metricsLevel(MetricsLevel.DETAILED) // Full monitoring

300

.build()

301

```

302

303

### For Cost Optimization

304

305

```scala

306

val costOptimizedStream = KinesisInputDStream.builder

307

.streamingContext(ssc)

308

.streamName("batch-processing-stream")

309

.checkpointAppName("cost-optimized-consumer")

310

.storageLevel(StorageLevel.DISK_ONLY) // Cheapest storage

311

.checkpointInterval(Seconds(300)) // Minimize DynamoDB writes

312

.metricsLevel(MetricsLevel.SUMMARY) // Basic monitoring only

313

.build()

314

```

315

316

## Monitoring and Troubleshooting

317

318

### Key SparkConf Settings for Debugging

319

320

```scala

321

val debugConf = new SparkConf()

322

.set("spark.streaming.kinesis.retry.maxAttempts", "10")

323

.set("spark.streaming.kinesis.retry.waitTime", "1s")

324

.set("spark.sql.adaptive.enabled", "false")

325

.set("spark.sql.adaptive.coalescePartitions.enabled", "false")

326

```

327

328

### CloudWatch Metrics to Monitor

329

330

- **IncomingRecords**: Records received from Kinesis

331

- **ProcessedRecords**: Records successfully processed

332

- **MillisBehindLatest**: How far behind the stream tip you are

333

- **Success/Failure counts**: For each operation type

334

335

### Common Configuration Issues

336

337

1. **Insufficient retries**: Increase `maxRetries` for unreliable networks

338

2. **Too frequent checkpointing**: Causes DynamoDB throttling and high costs

339

3. **Wrong storage level**: `MEMORY_ONLY` can cause job failures if nodes fail

340

4. **Incorrect region settings**: Causes high latency and potential failures

341

5. **Missing IAM permissions**: For DynamoDB, CloudWatch, or Kinesis access