or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md

configuration.mddocs/

0

# Configuration

1

2

Advanced configuration options for controlling retry behavior, timeouts, and performance tuning of Kinesis stream processing.

3

4

## Capabilities

5

6

### KinesisReadConfigurations

7

8

Configuration case class for controlling Kinesis request retry behavior and timeout settings.

9

10

```scala { .api }

11

/**

12

* Configuration for Kinesis reading behavior during record recovery and processing

13

* @param maxRetries Maximum number of retry attempts for failed Kinesis requests

14

* @param retryWaitTimeMs Wait time in milliseconds between retry attempts

15

* @param retryTimeoutMs Timeout in milliseconds for individual Kinesis requests

16

*/

17

case class KinesisReadConfigurations(

18

maxRetries: Int,

19

retryWaitTimeMs: Long,

20

retryTimeoutMs: Long

21

)

22

```

23

24

### Configuration Factory

25

26

Factory methods for creating KinesisReadConfigurations with default or context-specific values.

27

28

```scala { .api }

29

object KinesisReadConfigurations {

30

/**

31

* Creates configuration with system defaults

32

* @return KinesisReadConfigurations with default retry and timeout values

33

*/

34

def apply(): KinesisReadConfigurations

35

36

/**

37

* Creates configuration using StreamingContext settings and batch duration

38

* @param ssc StreamingContext to extract configuration from

39

* @return KinesisReadConfigurations with context-specific timeout settings

40

*/

41

def apply(ssc: StreamingContext): KinesisReadConfigurations

42

}

43

```

44

45

**Usage Examples:**

46

47

```scala

48

import org.apache.spark.streaming.kinesis.KinesisReadConfigurations

49

import org.apache.spark.streaming.StreamingContext

50

51

// Use default configuration

52

val defaultConfig = KinesisReadConfigurations()

53

// maxRetries = 3, retryWaitTimeMs = 100, retryTimeoutMs = 10000

54

55

// Use configuration derived from StreamingContext

56

val contextConfig = KinesisReadConfigurations(ssc)

57

// Uses batch duration for retryTimeoutMs, respects SparkConf overrides

58

59

// Custom configuration

60

val customConfig = KinesisReadConfigurations(

61

maxRetries = 5,

62

retryWaitTimeMs = 200,

63

retryTimeoutMs = 30000

64

)

65

```

66

67

### Configuration Constants

68

69

Spark configuration keys and default values for Kinesis retry behavior.

70

71

```scala { .api }

72

object KinesisReadConfigurations {

73

/** SparkConf key for configuring maximum retry attempts */

74

val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"

75

76

/** SparkConf key for configuring retry wait time */

77

val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"

78

79

/** Default maximum number of retry attempts */

80

val DEFAULT_MAX_RETRIES: Int = 3

81

82

/** Default wait time between retries */

83

val DEFAULT_RETRY_WAIT_TIME: String = "100ms"

84

85

/** Default timeout for Kinesis requests in milliseconds */

86

val DEFAULT_RETRY_TIMEOUT: Long = 10000

87

}

88

```

89

90

### Spark Configuration Integration

91

92

Configure retry behavior through SparkConf settings that override default values.

93

94

**SparkConf Configuration:**

95

96

```scala

97

import org.apache.spark.SparkConf

98

import org.apache.spark.streaming.StreamingContext

99

import org.apache.spark.streaming.kinesis.KinesisReadConfigurations

100

101

// Configure through SparkConf

102

val conf = new SparkConf()

103

.setAppName("KinesisStreamingApp")

104

.set("spark.streaming.kinesis.retry.maxAttempts", "5")

105

.set("spark.streaming.kinesis.retry.waitTime", "200ms")

106

107

val ssc = new StreamingContext(conf, Seconds(10))

108

109

// Configuration will use SparkConf values

110

val config = KinesisReadConfigurations(ssc)

111

// maxRetries = 5, retryWaitTimeMs = 200, retryTimeoutMs = 10000 (batch duration)

112

```

113

114

**Configuration File Example (spark-defaults.conf):**

115

116

```properties

117

spark.streaming.kinesis.retry.maxAttempts=5

118

spark.streaming.kinesis.retry.waitTime=250ms

119

```

120

121

### Configuration Usage Context

122

123

KinesisReadConfigurations are used internally by KinesisBackedBlockRDD for data recovery scenarios:

124

125

```scala

126

// Internal usage - configurations are applied automatically

127

// when using KinesisInputDStream.builder pattern

128

129

val stream = KinesisInputDStream.builder

130

.streamingContext(ssc) // ssc contains SparkConf with retry settings

131

.streamName("my-stream")

132

.checkpointAppName("my-app")

133

.build()

134

135

// The resulting stream will use KinesisReadConfigurations(ssc) internally

136

// for any data recovery operations

137

```

138

139

### Performance Tuning Guidelines

140

141

**Retry Configuration:**

142

143

- **Low-latency applications**: Use fewer retries (1-2) with shorter wait times (50-100ms)

144

- **High-reliability applications**: Use more retries (5-10) with exponential backoff

145

- **High-throughput applications**: Balance retries to avoid cascading delays

146

147

**Timeout Configuration:**

148

149

- **Batch duration relationship**: Set `retryTimeoutMs` < batch duration to prevent blocking

150

- **Network latency**: Account for network latency to AWS endpoints

151

- **Shard count**: Higher shard counts may need longer timeouts

152

153

**Example Configurations by Use Case:**

154

155

```scala

156

// Low-latency real-time processing

157

val lowLatencyConfig = KinesisReadConfigurations(

158

maxRetries = 2,

159

retryWaitTimeMs = 50,

160

retryTimeoutMs = 5000

161

)

162

163

// High-reliability batch processing

164

val highReliabilityConfig = KinesisReadConfigurations(

165

maxRetries = 10,

166

retryWaitTimeMs = 500,

167

retryTimeoutMs = 60000

168

)

169

170

// High-throughput streaming

171

val highThroughputConfig = KinesisReadConfigurations(

172

maxRetries = 3,

173

retryWaitTimeMs = 100,

174

retryTimeoutMs = 15000

175

)

176

```

177

178

### Monitoring and Observability

179

180

Configure retry behavior based on observed failure patterns:

181

182

```scala

183

import org.apache.spark.SparkConf

184

185

// Enable detailed Kinesis metrics

186

val conf = new SparkConf()

187

.set("spark.streaming.kinesis.retry.maxAttempts", "5")

188

.set("spark.streaming.kinesis.retry.waitTime", "200ms")

189

.set("spark.metrics.conf.*.sink.cloudwatch.class", "org.apache.spark.metrics.sink.CloudWatchSink")

190

.set("spark.sql.streaming.metricsEnabled", "true")

191

192

// Monitor retry attempts and adjust configuration accordingly

193

```

194

195

### Error Scenarios and Recovery

196

197

KinesisReadConfigurations help handle various failure scenarios:

198

199

**Network Issues:**

200

- Transient network failures between Spark and Kinesis

201

- DNS resolution problems

202

- Connection timeouts

203

204

**Service Throttling:**

205

- Kinesis API rate limiting

206

- DynamoDB checkpoint table throttling

207

- AWS service quotas

208

209

**Data Recovery:**

210

- Block recovery after executor failures

211

- Sequence number range reconstruction

212

- Checkpoint inconsistency resolution

213

214

### Integration with Other Components

215

216

**Relationship to Checkpointing:**

217

218

```scala

219

// Retry configurations affect checkpoint recovery reliability

220

val stream = KinesisInputDStream.builder

221

.streamingContext(ssc)

222

.streamName("my-stream")

223

.checkpointAppName("my-app")

224

.checkpointInterval(Seconds(30)) // Checkpoint frequency

225

.build()

226

227

// If checkpointInterval is 30s, ensure retryTimeoutMs << 30000

228

// to prevent checkpoint delays

229

```

230

231

**Relationship to Storage Levels:**

232

233

```scala

234

import org.apache.spark.storage.StorageLevel

235

236

// Higher reliability storage levels benefit from more retries

237

val stream = KinesisInputDStream.builder

238

.streamingContext(ssc)

239

.streamName("my-stream")

240

.checkpointAppName("my-app")

241

.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2) // Serialized + replicated

242

.build()

243

244

// Configure higher retry counts for replicated storage to ensure consistency

245

```

246

247

### Advanced Configuration Patterns

248

249

**Environment-specific Configuration:**

250

251

```scala

252

import org.apache.spark.SparkConf

253

254

def createSparkConf(environment: String): SparkConf = {

255

val conf = new SparkConf().setAppName("KinesisApp")

256

257

environment match {

258

case "development" =>

259

conf.set("spark.streaming.kinesis.retry.maxAttempts", "2")

260

.set("spark.streaming.kinesis.retry.waitTime", "100ms")

261

case "staging" =>

262

conf.set("spark.streaming.kinesis.retry.maxAttempts", "3")

263

.set("spark.streaming.kinesis.retry.waitTime", "150ms")

264

case "production" =>

265

conf.set("spark.streaming.kinesis.retry.maxAttempts", "5")

266

.set("spark.streaming.kinesis.retry.waitTime", "200ms")

267

}

268

}

269

```

270

271

**Dynamic Configuration Updates:**

272

273

While KinesisReadConfigurations are set at stream creation time, applications can implement configuration refresh patterns:

274

275

```scala

276

// Configuration refresh requires stream restart

277

def recreateStreamWithNewConfig(newMaxRetries: Int): Unit = {

278

ssc.stop(stopSparkContext = false, stopGracefully = true)

279

280

val newConf = sparkContext.conf.clone()

281

newConf.set("spark.streaming.kinesis.retry.maxAttempts", newMaxRetries.toString)

282

283

// Create new StreamingContext with updated configuration

284

val newSsc = new StreamingContext(sparkContext, Seconds(10))

285

// Recreate stream with new configuration...

286

}

287

```