0
# Configuration and Performance Tuning
1
2
The configuration system provides advanced options for optimizing Kinesis stream consumption including retry logic, timeouts, storage levels, CloudWatch metrics, and checkpointing intervals. These settings are crucial for production deployments requiring high performance and reliability.
3
4
## Core Configuration API
5
6
### KinesisReadConfigurations
7
8
Controls retry behavior and timeouts for Kinesis API calls, particularly important for handling transient network issues and rate limiting.
9
10
```scala { .api }
11
case class KinesisReadConfigurations(
12
maxRetries: Int, // Maximum number of retry attempts for Kinesis API calls
13
retryWaitTimeMs: Long, // Wait time between retry attempts in milliseconds
14
retryTimeoutMs: Long // Total timeout for Kinesis operations in milliseconds
15
)
16
17
object KinesisReadConfigurations {
18
// Create with default values
19
def apply(): KinesisReadConfigurations
20
21
// Create with values from StreamingContext configuration
22
def apply(ssc: StreamingContext): KinesisReadConfigurations
23
24
// Configuration keys for SparkConf
25
val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"
26
val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"
27
28
// Default values
29
val DEFAULT_MAX_RETRIES: Int = 3
30
val DEFAULT_RETRY_WAIT_TIME: String = "100ms"
31
val DEFAULT_RETRY_TIMEOUT: Long = 10000 // 10 seconds
32
}
33
```
34
35
### Builder Configuration Methods
36
37
Additional configuration options available through the KinesisInputDStream.Builder:
38
39
```scala { .api }
40
class Builder {
41
// Performance and reliability
42
def checkpointInterval(interval: Duration): Builder
43
def storageLevel(storageLevel: StorageLevel): Builder
44
45
// CloudWatch metrics
46
def metricsLevel(metricsLevel: MetricsLevel): Builder
47
def metricsEnabledDimensions(dimensions: Set[String]): Builder
48
49
// AWS service endpoints and regions
50
def endpointUrl(url: String): Builder
51
def regionName(regionName: String): Builder
52
}
53
```
54
55
## Retry Configuration
56
57
### Using Default Configuration
58
59
```scala
60
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations
61
62
// Uses default values: 3 retries, 100ms wait, 10s timeout
63
val defaultConfig = KinesisReadConfigurations()
64
65
val stream = KinesisInputDStream.builder
66
.streamingContext(ssc)
67
.streamName("my-stream")
68
.checkpointAppName("my-app")
69
.build()
70
```
71
72
### Using StreamingContext Configuration
73
74
Configure through SparkConf and let the system read from StreamingContext:
75
76
```scala
77
import org.apache.spark.SparkConf
78
import org.apache.spark.streaming.StreamingContext
79
80
val conf = new SparkConf()
81
.setAppName("KinesisApp")
82
.set("spark.streaming.kinesis.retry.maxAttempts", "5")
83
.set("spark.streaming.kinesis.retry.waitTime", "200ms")
84
85
val ssc = new StreamingContext(conf, Seconds(10))
86
87
// This will use the SparkConf values above
88
val configFromContext = KinesisReadConfigurations(ssc)
89
90
val stream = KinesisInputDStream.builder
91
.streamingContext(ssc)
92
.streamName("my-stream")
93
.checkpointAppName("my-app")
94
.build()
95
```
96
97
### Custom Retry Configuration
98
99
```scala
100
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations
101
102
val customConfig = KinesisReadConfigurations(
103
maxRetries = 5, // Retry up to 5 times
104
retryWaitTimeMs = 500L, // Wait 500ms between retries
105
retryTimeoutMs = 30000L // 30 second total timeout
106
)
107
108
// Note: KinesisReadConfigurations is used internally by KinesisBackedBlockRDD
109
// The main builder uses default configurations, but you can influence this
110
// through SparkConf settings
111
```
112
113
## Storage Level Configuration
114
115
Configure how DStream blocks are stored in memory and disk for fault tolerance and performance.
116
117
```scala
118
import org.apache.spark.storage.StorageLevel
119
120
val stream = KinesisInputDStream.builder
121
.streamingContext(ssc)
122
.streamName("my-stream")
123
.checkpointAppName("my-app")
124
.storageLevel(StorageLevel.MEMORY_ONLY_2) // Store in memory with 2x replication
125
.build()
126
```
127
128
### Storage Level Options
129
130
```scala
131
// Memory only (fastest, but not fault tolerant to node failures)
132
.storageLevel(StorageLevel.MEMORY_ONLY)
133
.storageLevel(StorageLevel.MEMORY_ONLY_2) // With replication
134
135
// Memory and disk (balanced performance and fault tolerance)
136
.storageLevel(StorageLevel.MEMORY_AND_DISK)
137
.storageLevel(StorageLevel.MEMORY_AND_DISK_2) // Default, with replication
138
139
// Memory with serialization (more memory efficient)
140
.storageLevel(StorageLevel.MEMORY_ONLY_SER)
141
.storageLevel(StorageLevel.MEMORY_ONLY_SER_2)
142
143
// Disk only (most fault tolerant, slowest)
144
.storageLevel(StorageLevel.DISK_ONLY)
145
.storageLevel(StorageLevel.DISK_ONLY_2)
146
```
147
148
## Checkpoint Interval Configuration
149
150
Controls how frequently the KCL checkpoints progress to DynamoDB. This affects both fault tolerance and performance.
151
152
```scala
153
import org.apache.spark.streaming.Seconds
154
155
val stream = KinesisInputDStream.builder
156
.streamingContext(ssc)
157
.streamName("my-stream")
158
.checkpointAppName("my-app")
159
.checkpointInterval(Seconds(30)) // Checkpoint every 30 seconds
160
.build()
161
```
162
163
### Checkpoint Interval Guidelines
164
165
- **Shorter intervals** (5-10 seconds): Better fault tolerance, less data loss on failure, but higher DynamoDB costs
166
- **Longer intervals** (30-60 seconds): Lower DynamoDB costs, but more potential data loss on failure
167
- **Default**: Uses the streaming batch duration (recommended starting point)
168
169
## CloudWatch Metrics Configuration
170
171
Configure collection and reporting of metrics to AWS CloudWatch for monitoring and alerting.
172
173
```scala
174
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
175
176
val stream = KinesisInputDStream.builder
177
.streamingContext(ssc)
178
.streamName("my-stream")
179
.checkpointAppName("my-app")
180
.metricsLevel(MetricsLevel.SUMMARY)
181
.metricsEnabledDimensions(Set("Operation", "ShardId", "WorkerId"))
182
.build()
183
```
184
185
### Metrics Levels
186
187
```scala
188
// No metrics (best performance)
189
.metricsLevel(MetricsLevel.NONE)
190
191
// Summary metrics only (recommended for production)
192
.metricsLevel(MetricsLevel.SUMMARY)
193
194
// Detailed metrics (useful for debugging, higher overhead)
195
.metricsLevel(MetricsLevel.DETAILED)
196
```
197
198
### Common Metrics Dimensions
199
200
```scala
201
val productionDimensions = Set(
202
"Operation", // Type of operation (ProcessRecords, Checkpoint, etc.)
203
"ShardId", // Kinesis shard identifier
204
"WorkerId" // KCL worker identifier
205
)
206
207
val debugDimensions = Set(
208
"Operation",
209
"ShardId",
210
"WorkerId",
211
"StreamName" // Additional dimension for debugging
212
)
213
```
214
215
## Regional and Endpoint Configuration
216
217
Configure AWS service endpoints for different regions or custom endpoints.
218
219
```scala
220
val stream = KinesisInputDStream.builder
221
.streamingContext(ssc)
222
.streamName("my-stream")
223
.checkpointAppName("my-app")
224
.regionName("us-west-2")
225
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
226
.build()
227
```
228
229
### Region Configuration Best Practices
230
231
- **Match your compute region**: Use the same region as your Spark cluster to minimize latency
232
- **Consider data locality**: Place processing near your data sources and sinks
233
- **Compliance requirements**: Some regions may be required for regulatory compliance
234
235
## Complete Production Configuration Example
236
237
```scala
238
import org.apache.spark.streaming.{StreamingContext, Seconds}
239
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}
240
import org.apache.spark.storage.StorageLevel
241
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
242
import org.apache.spark.SparkConf
243
244
// Configure Spark with Kinesis-specific settings
245
val conf = new SparkConf()
246
.setAppName("ProductionKinesisApp")
247
.set("spark.streaming.kinesis.retry.maxAttempts", "5")
248
.set("spark.streaming.kinesis.retry.waitTime", "250ms")
249
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
250
251
val ssc = new StreamingContext(conf, Seconds(10))
252
253
// Configure credentials with assume role
254
val credentials = SparkAWSCredentials.builder
255
.stsCredentials("arn:aws:iam::123456789012:role/ProductionKinesisRole", "prod-session")
256
.build()
257
258
// Create production-ready stream
259
val stream = KinesisInputDStream.builder
260
.streamingContext(ssc)
261
.streamName("production-data-stream")
262
.checkpointAppName("production-consumer-v2")
263
.regionName("us-west-2")
264
.initialPosition(new KinesisInitialPositions.Latest())
265
.checkpointInterval(Seconds(30))
266
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
267
.kinesisCredentials(credentials)
268
.dynamoDBCredentials(credentials)
269
.cloudWatchCredentials(credentials)
270
.metricsLevel(MetricsLevel.SUMMARY)
271
.metricsEnabledDimensions(Set("Operation", "ShardId"))
272
.build()
273
```
274
275
## Performance Tuning Guidelines
276
277
### For High Throughput
278
279
```scala
280
val highThroughputStream = KinesisInputDStream.builder
281
.streamingContext(ssc)
282
.streamName("high-volume-stream")
283
.checkpointAppName("high-throughput-consumer")
284
.storageLevel(StorageLevel.MEMORY_ONLY_2) // Fastest storage
285
.checkpointInterval(Seconds(60)) // Less frequent checkpointing
286
.metricsLevel(MetricsLevel.NONE) // Disable metrics overhead
287
.build()
288
```
289
290
### For Maximum Reliability
291
292
```scala
293
val reliableStream = KinesisInputDStream.builder
294
.streamingContext(ssc)
295
.streamName("critical-data-stream")
296
.checkpointAppName("reliable-consumer")
297
.storageLevel(StorageLevel.MEMORY_AND_DISK_2) // Fault tolerant storage
298
.checkpointInterval(Seconds(10)) // Frequent checkpointing
299
.metricsLevel(MetricsLevel.DETAILED) // Full monitoring
300
.build()
301
```
302
303
### For Cost Optimization
304
305
```scala
306
val costOptimizedStream = KinesisInputDStream.builder
307
.streamingContext(ssc)
308
.streamName("batch-processing-stream")
309
.checkpointAppName("cost-optimized-consumer")
310
.storageLevel(StorageLevel.DISK_ONLY) // Cheapest storage
311
.checkpointInterval(Seconds(300)) // Minimize DynamoDB writes
312
.metricsLevel(MetricsLevel.SUMMARY) // Basic monitoring only
313
.build()
314
```
315
316
## Monitoring and Troubleshooting
317
318
### Key SparkConf Settings for Debugging
319
320
```scala
321
val debugConf = new SparkConf()
322
.set("spark.streaming.kinesis.retry.maxAttempts", "10")
323
.set("spark.streaming.kinesis.retry.waitTime", "1s")
324
.set("spark.sql.adaptive.enabled", "false")
325
.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
326
```
327
328
### CloudWatch Metrics to Monitor
329
330
- **IncomingRecords**: Records received from Kinesis
331
- **ProcessedRecords**: Records successfully processed
332
- **MillisBehindLatest**: How far behind the stream tip you are
333
- **Success/Failure counts**: For each operation type
334
335
### Common Configuration Issues
336
337
1. **Insufficient retries**: Increase `maxRetries` for unreliable networks
338
2. **Too frequent checkpointing**: Causes DynamoDB throttling and high costs
339
3. **Wrong storage level**: `MEMORY_ONLY` can cause job failures if nodes fail
340
4. **Incorrect region settings**: Causes high latency and potential failures
341
5. **Missing IAM permissions**: For DynamoDB, CloudWatch, or Kinesis access