0
# Configuration
1
2
Advanced configuration options for controlling retry behavior, timeouts, and performance tuning of Kinesis stream processing.
3
4
## Capabilities
5
6
### KinesisReadConfigurations
7
8
Configuration case class for controlling Kinesis request retry behavior and timeout settings.
9
10
```scala { .api }
11
/**
12
* Configuration for Kinesis reading behavior during record recovery and processing
13
* @param maxRetries Maximum number of retry attempts for failed Kinesis requests
14
* @param retryWaitTimeMs Wait time in milliseconds between retry attempts
15
* @param retryTimeoutMs Timeout in milliseconds for individual Kinesis requests
16
*/
17
case class KinesisReadConfigurations(
18
maxRetries: Int,
19
retryWaitTimeMs: Long,
20
retryTimeoutMs: Long
21
)
22
```
23
24
### Configuration Factory
25
26
Factory methods for creating KinesisReadConfigurations with default or context-specific values.
27
28
```scala { .api }
29
object KinesisReadConfigurations {
30
/**
31
* Creates configuration with system defaults
32
* @return KinesisReadConfigurations with default retry and timeout values
33
*/
34
def apply(): KinesisReadConfigurations
35
36
/**
37
* Creates configuration using StreamingContext settings and batch duration
38
* @param ssc StreamingContext to extract configuration from
39
* @return KinesisReadConfigurations with context-specific timeout settings
40
*/
41
def apply(ssc: StreamingContext): KinesisReadConfigurations
42
}
43
```
44
45
**Usage Examples:**
46
47
```scala
48
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations
49
import org.apache.spark.streaming.StreamingContext
50
51
// Use default configuration
52
val defaultConfig = KinesisReadConfigurations()
53
// maxRetries = 3, retryWaitTimeMs = 100, retryTimeoutMs = 10000
54
55
// Use configuration derived from StreamingContext
56
val contextConfig = KinesisReadConfigurations(ssc)
57
// Uses batch duration for retryTimeoutMs, respects SparkConf overrides
58
59
// Custom configuration
60
val customConfig = KinesisReadConfigurations(
61
maxRetries = 5,
62
retryWaitTimeMs = 200,
63
retryTimeoutMs = 30000
64
)
65
```
66
67
### Configuration Constants
68
69
Spark configuration keys and default values for Kinesis retry behavior.
70
71
```scala { .api }
72
object KinesisReadConfigurations {
73
/** SparkConf key for configuring maximum retry attempts */
74
val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"
75
76
/** SparkConf key for configuring retry wait time */
77
val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"
78
79
/** Default maximum number of retry attempts */
80
val DEFAULT_MAX_RETRIES: Int = 3
81
82
/** Default wait time between retries */
83
val DEFAULT_RETRY_WAIT_TIME: String = "100ms"
84
85
/** Default timeout for Kinesis requests in milliseconds */
86
val DEFAULT_RETRY_TIMEOUT: Long = 10000
87
}
88
```
89
90
### Spark Configuration Integration
91
92
Configure retry behavior through SparkConf settings that override default values.
93
94
**SparkConf Configuration:**
95
96
```scala
97
import org.apache.spark.SparkConf
98
import org.apache.spark.streaming.StreamingContext
99
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations
100
101
// Configure through SparkConf
102
val conf = new SparkConf()
103
.setAppName("KinesisStreamingApp")
104
.set("spark.streaming.kinesis.retry.maxAttempts", "5")
105
.set("spark.streaming.kinesis.retry.waitTime", "200ms")
106
107
val ssc = new StreamingContext(conf, Seconds(10))
108
109
// Configuration will use SparkConf values
110
val config = KinesisReadConfigurations(ssc)
111
// maxRetries = 5, retryWaitTimeMs = 200, retryTimeoutMs = 10000 (batch duration)
112
```
113
114
**Configuration File Example (spark-defaults.conf):**
115
116
```properties
117
spark.streaming.kinesis.retry.maxAttempts=5
118
spark.streaming.kinesis.retry.waitTime=250ms
119
```
120
121
### Configuration Usage Context
122
123
KinesisReadConfigurations are used internally by KinesisBackedBlockRDD for data recovery scenarios:
124
125
```scala
126
// Internal usage - configurations are applied automatically
127
// when using KinesisInputDStream.builder pattern
128
129
val stream = KinesisInputDStream.builder
130
.streamingContext(ssc) // ssc contains SparkConf with retry settings
131
.streamName("my-stream")
132
.checkpointAppName("my-app")
133
.build()
134
135
// The resulting stream will use KinesisReadConfigurations(ssc) internally
136
// for any data recovery operations
137
```
138
139
### Performance Tuning Guidelines
140
141
**Retry Configuration:**
142
143
- **Low-latency applications**: Use fewer retries (1-2) with shorter wait times (50-100ms)
144
- **High-reliability applications**: Use more retries (5-10) with exponential backoff
145
- **High-throughput applications**: Balance retries to avoid cascading delays
146
147
**Timeout Configuration:**
148
149
- **Batch duration relationship**: Set `retryTimeoutMs` < batch duration to prevent blocking
150
- **Network latency**: Account for network latency to AWS endpoints
151
- **Shard count**: Higher shard counts may need longer timeouts
152
153
**Example Configurations by Use Case:**
154
155
```scala
156
// Low-latency real-time processing
157
val lowLatencyConfig = KinesisReadConfigurations(
158
maxRetries = 2,
159
retryWaitTimeMs = 50,
160
retryTimeoutMs = 5000
161
)
162
163
// High-reliability batch processing
164
val highReliabilityConfig = KinesisReadConfigurations(
165
maxRetries = 10,
166
retryWaitTimeMs = 500,
167
retryTimeoutMs = 60000
168
)
169
170
// High-throughput streaming
171
val highThroughputConfig = KinesisReadConfigurations(
172
maxRetries = 3,
173
retryWaitTimeMs = 100,
174
retryTimeoutMs = 15000
175
)
176
```
177
178
### Monitoring and Observability
179
180
Configure retry behavior based on observed failure patterns:
181
182
```scala
183
import org.apache.spark.SparkConf
184
185
// Enable detailed Kinesis metrics
186
val conf = new SparkConf()
187
.set("spark.streaming.kinesis.retry.maxAttempts", "5")
188
.set("spark.streaming.kinesis.retry.waitTime", "200ms")
189
.set("spark.metrics.conf.*.sink.cloudwatch.class", "org.apache.spark.metrics.sink.CloudWatchSink")
190
.set("spark.sql.streaming.metricsEnabled", "true")
191
192
// Monitor retry attempts and adjust configuration accordingly
193
```
194
195
### Error Scenarios and Recovery
196
197
KinesisReadConfigurations help handle various failure scenarios:
198
199
**Network Issues:**
200
- Transient network failures between Spark and Kinesis
201
- DNS resolution problems
202
- Connection timeouts
203
204
**Service Throttling:**
205
- Kinesis API rate limiting
206
- DynamoDB checkpoint table throttling
207
- AWS service quotas
208
209
**Data Recovery:**
210
- Block recovery after executor failures
211
- Sequence number range reconstruction
212
- Checkpoint inconsistency resolution
213
214
### Integration with Other Components
215
216
**Relationship to Checkpointing:**
217
218
```scala
219
// Retry configurations affect checkpoint recovery reliability
220
val stream = KinesisInputDStream.builder
221
.streamingContext(ssc)
222
.streamName("my-stream")
223
.checkpointAppName("my-app")
224
.checkpointInterval(Seconds(30)) // Checkpoint frequency
225
.build()
226
227
// If checkpointInterval is 30s, ensure retryTimeoutMs << 30000
228
// to prevent checkpoint delays
229
```
230
231
**Relationship to Storage Levels:**
232
233
```scala
234
import org.apache.spark.storage.StorageLevel
235
236
// Higher reliability storage levels benefit from more retries
237
val stream = KinesisInputDStream.builder
238
.streamingContext(ssc)
239
.streamName("my-stream")
240
.checkpointAppName("my-app")
241
.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2) // Serialized + replicated
242
.build()
243
244
// Configure higher retry counts for replicated storage to ensure consistency
245
```
246
247
### Advanced Configuration Patterns
248
249
**Environment-specific Configuration:**
250
251
```scala
252
import org.apache.spark.SparkConf
253
254
def createSparkConf(environment: String): SparkConf = {
255
val conf = new SparkConf().setAppName("KinesisApp")
256
257
environment match {
258
case "development" =>
259
conf.set("spark.streaming.kinesis.retry.maxAttempts", "2")
260
.set("spark.streaming.kinesis.retry.waitTime", "100ms")
261
case "staging" =>
262
conf.set("spark.streaming.kinesis.retry.maxAttempts", "3")
263
.set("spark.streaming.kinesis.retry.waitTime", "150ms")
264
case "production" =>
265
conf.set("spark.streaming.kinesis.retry.maxAttempts", "5")
266
.set("spark.streaming.kinesis.retry.waitTime", "200ms")
267
}
268
}
269
```
270
271
**Dynamic Configuration Updates:**
272
273
While KinesisReadConfigurations are set at stream creation time, applications can implement configuration refresh patterns:
274
275
```scala
276
// Configuration refresh requires stream restart
277
def recreateStreamWithNewConfig(newMaxRetries: Int): Unit = {
278
ssc.stop(stopSparkContext = false, stopGracefully = true)
279
280
val newConf = sparkContext.conf.clone()
281
newConf.set("spark.streaming.kinesis.retry.maxAttempts", newMaxRetries.toString)
282
283
// Create new StreamingContext with updated configuration
284
val newSsc = new StreamingContext(sparkContext, Seconds(10))
285
// Recreate stream with new configuration...
286
}
287
```