or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md

per-partition-config.mddocs/

0

# Per-Partition Configuration

1

2

Configuration interface for controlling processing rates and other settings on a per-partition basis. This provides fine-grained control over resource usage and processing behavior for different Kafka topic partitions.

3

4

## Capabilities

5

6

### PerPartitionConfig Abstract Class

7

8

Interface for user-supplied configurations that can't be set via Spark properties because they need tweaking on a per-partition basis.

9

10

```scala { .api }

11

abstract class PerPartitionConfig extends Serializable {

12

/**

13

* Maximum rate (number of records per second) at which data will be read

14

* from each Kafka partition.

15

*/

16

def maxRatePerPartition(topicPartition: TopicPartition): Long

17

18

/**

19

* Minimum rate (number of records per second) at which data will be read

20

* from each Kafka partition. Default implementation returns 1.

21

*/

22

def minRatePerPartition(topicPartition: TopicPartition): Long = 1

23

}

24

```

25

26

**Abstract Methods:**

27

- `maxRatePerPartition(topicPartition)`: Returns the maximum records per second for the given partition

28

- `minRatePerPartition(topicPartition)`: Returns the minimum records per second (default: 1)

29

30

### Default Implementation

31

32

Spark provides a default implementation that uses global Spark configuration values:

33

34

```scala

35

// Internal default implementation (not directly accessible)

36

private class DefaultPerPartitionConfig(conf: SparkConf) extends PerPartitionConfig {

37

val maxRate = conf.get("spark.streaming.kafka.maxRatePerPartition", "0").toLong

38

val minRate = conf.get("spark.streaming.kafka.minRatePerPartition", "1").toLong

39

40

def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate

41

override def minRatePerPartition(topicPartition: TopicPartition): Long = minRate

42

}

43

```

44

45

## Custom Implementations

46

47

### Topic-Based Rate Limiting

48

49

Configure different rates for different topics based on their characteristics:

50

51

```scala

52

import org.apache.spark.streaming.kafka010.PerPartitionConfig

53

import org.apache.kafka.common.TopicPartition

54

55

class TopicBasedConfig extends PerPartitionConfig {

56

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

57

topicPartition.topic() match {

58

case "high-volume-logs" => 5000 // High rate for log processing

59

case "user-events" => 1000 // Medium rate for user events

60

case "critical-alerts" => 100 // Low rate for critical processing

61

case "batch-imports" => 10000 // Very high rate for bulk imports

62

case _ => 500 // Default rate

63

}

64

}

65

66

override def minRatePerPartition(topicPartition: TopicPartition): Long = {

67

topicPartition.topic() match {

68

case "critical-alerts" => 1 // Ensure critical alerts always process

69

case _ => 10 // Higher minimum for other topics

70

}

71

}

72

}

73

74

// Usage

75

val stream = KafkaUtils.createDirectStream[String, String](

76

ssc,

77

LocationStrategies.PreferConsistent,

78

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),

79

new TopicBasedConfig()

80

)

81

```

82

83

### Partition-Based Load Balancing

84

85

Balance load based on specific partition characteristics:

86

87

```scala

88

class PartitionLoadBalancedConfig extends PerPartitionConfig {

89

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

90

val topic = topicPartition.topic()

91

val partition = topicPartition.partition()

92

93

(topic, partition) match {

94

// Partition 0 typically gets more traffic in some systems

95

case (_, 0) => 2000

96

case (_, p) if p <= 2 => 1500 // Partitions 1-2 get medium rate

97

case (_, p) if p <= 5 => 1000 // Partitions 3-5 get standard rate

98

case _ => 500 // Higher partition numbers get lower rate

99

}

100

}

101

102

override def minRatePerPartition(topicPartition: TopicPartition): Long = {

103

// Ensure all partitions maintain minimum processing

104

50

105

}

106

}

107

```

108

109

### Time-Based Rate Configuration

110

111

Adjust rates based on time of day or other temporal factors:

112

113

```scala

114

import java.time.LocalDateTime

115

import java.time.format.DateTimeFormatter

116

117

class TimeBasedConfig extends PerPartitionConfig {

118

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

119

val hour = LocalDateTime.now().getHour

120

val topic = topicPartition.topic()

121

122

// Adjust rates based on expected traffic patterns

123

val baseRate = topic match {

124

case "web-events" => 1000

125

case "api-calls" => 2000

126

case "background-jobs" => 500

127

case _ => 800

128

}

129

130

// Scale based on time of day

131

val timeMultiplier = hour match {

132

case h if h >= 9 && h <= 17 => 2.0 // Business hours: double rate

133

case h if h >= 18 && h <= 22 => 1.5 // Evening: 1.5x rate

134

case _ => 1.0 // Night/early morning: normal rate

135

}

136

137

(baseRate * timeMultiplier).toLong

138

}

139

}

140

141

val stream = KafkaUtils.createDirectStream[String, String](

142

ssc,

143

LocationStrategies.PreferConsistent,

144

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),

145

new TimeBasedConfig()

146

)

147

```

148

149

### Resource-Aware Configuration

150

151

Adjust rates based on available cluster resources:

152

153

```scala

154

import org.apache.spark.SparkContext

155

156

class ResourceAwareConfig(sc: SparkContext) extends PerPartitionConfig {

157

private val executorCount = sc.getExecutorMemoryStatus.size

158

private val totalCores = sc.defaultParallelism

159

160

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

161

val topic = topicPartition.topic()

162

163

// Base rate per core

164

val ratePerCore = topic match {

165

case "cpu-intensive" => 200 // Lower rate for CPU-heavy processing

166

case "memory-intensive" => 300 // Medium rate for memory-heavy processing

167

case "io-intensive" => 500 // Higher rate for I/O heavy processing

168

case _ => 400

169

}

170

171

// Scale based on available resources

172

val availableCoresPerPartition = math.max(1, totalCores / getPartitionCount(topic))

173

ratePerCore * availableCoresPerPartition

174

}

175

176

private def getPartitionCount(topic: String): Int = {

177

// This would typically come from metadata or configuration

178

topic match {

179

case "high-partition-topic" => 50

180

case "medium-partition-topic" => 20

181

case _ => 10

182

}

183

}

184

}

185

```

186

187

### Dynamic Rate Adjustment

188

189

Implement feedback-based rate adjustment:

190

191

```scala

192

import java.util.concurrent.atomic.AtomicLong

193

import scala.collection.concurrent.TrieMap

194

195

class AdaptiveConfig extends PerPartitionConfig {

196

private val partitionLatencies = TrieMap[TopicPartition, AtomicLong]()

197

private val partitionRates = TrieMap[TopicPartition, AtomicLong]()

198

199

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

200

val currentRate = partitionRates.getOrElseUpdate(

201

topicPartition,

202

new AtomicLong(1000) // Default starting rate

203

)

204

205

val latency = partitionLatencies.get(topicPartition)

206

.map(_.get()).getOrElse(0L)

207

208

// Adjust rate based on processing latency

209

val adjustedRate = if (latency > 5000) { // High latency

210

math.max(100, currentRate.get() * 0.8).toLong // Reduce rate

211

} else if (latency < 1000) { // Low latency

212

math.min(10000, currentRate.get() * 1.2).toLong // Increase rate

213

} else {

214

currentRate.get() // Keep current rate

215

}

216

217

currentRate.set(adjustedRate)

218

adjustedRate

219

}

220

221

// Method to update latency measurements (called from processing logic)

222

def updateLatency(topicPartition: TopicPartition, latencyMs: Long): Unit = {

223

partitionLatencies.getOrElseUpdate(

224

topicPartition,

225

new AtomicLong(0)

226

).set(latencyMs)

227

}

228

}

229

230

// Usage with latency tracking

231

val adaptiveConfig = new AdaptiveConfig()

232

233

val stream = KafkaUtils.createDirectStream[String, String](

234

ssc,

235

LocationStrategies.PreferConsistent,

236

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),

237

adaptiveConfig

238

)

239

240

stream.foreachRDD { rdd =>

241

val startTime = System.currentTimeMillis()

242

243

// Process the RDD

244

rdd.foreach { record =>

245

processMessage(record)

246

}

247

248

val endTime = System.currentTimeMillis()

249

val processingTime = endTime - startTime

250

251

// Update latency information for rate adjustment

252

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

253

offsetRanges.foreach { range =>

254

val topicPartition = new TopicPartition(range.topic, range.partition)

255

adaptiveConfig.updateLatency(topicPartition, processingTime)

256

}

257

}

258

```

259

260

## Integration with Backpressure

261

262

Per-partition configuration works alongside Spark's backpressure mechanism:

263

264

```scala

265

// Configure backpressure in SparkConf

266

val conf = new SparkConf()

267

.setAppName("KafkaStreamingApp")

268

.set("spark.streaming.backpressure.enabled", "true")

269

.set("spark.streaming.backpressure.initialRate", "1000")

270

.set("spark.streaming.kafka.maxRatePerPartition", "2000") // Global max

271

.set("spark.streaming.kafka.minRatePerPartition", "100") // Global min

272

273

class BackpressureAwareConfig extends PerPartitionConfig {

274

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

275

// Per-partition config overrides global settings

276

topicPartition.topic() match {

277

case "priority-topic" => 5000 // Higher than global max

278

case "throttled-topic" => 500 // Lower than global max

279

case _ => 2000 // Match global max

280

}

281

}

282

}

283

```

284

285

## Monitoring and Metrics

286

287

Track per-partition configuration effectiveness:

288

289

```scala

290

class MonitoredConfig extends PerPartitionConfig {

291

private val metricsCollector = new MetricsCollector()

292

293

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

294

val rate = calculateRateForPartition(topicPartition)

295

296

// Log rate assignments for monitoring

297

metricsCollector.recordRateAssignment(topicPartition, rate)

298

299

rate

300

}

301

302

private def calculateRateForPartition(tp: TopicPartition): Long = {

303

// Your rate calculation logic

304

tp.topic() match {

305

case "monitored-topic" => 1500

306

case _ => 1000

307

}

308

}

309

}

310

311

class MetricsCollector {

312

def recordRateAssignment(tp: TopicPartition, rate: Long): Unit = {

313

// Send to your metrics system (Prometheus, CloudWatch, etc.)

314

println(s"Assigned rate $rate to ${tp.topic()}-${tp.partition()}")

315

}

316

}

317

```

318

319

## Best Practices

320

321

1. **Start conservative**: Begin with lower rates and increase based on monitoring.

322

323

2. **Consider downstream capacity**: Ensure downstream systems can handle the configured rates.

324

325

3. **Monitor resource usage**: Track CPU, memory, and network usage to optimize rates.

326

327

4. **Topic characteristics matter**: Consider message size, processing complexity, and business priority.

328

329

5. **Implement gradual changes**: Avoid sudden rate changes that could overwhelm the system.

330

331

6. **Test under load**: Validate your configuration under realistic load conditions.

332

333

7. **Document your strategy**: Make your rate assignment logic clear for operations teams.

334

335

8. **Plan for failures**: Ensure minimum rates allow for message processing even under resource constraints.

336

337

## Error Handling

338

339

Per-partition configuration should be resilient to failures:

340

341

```scala

342

class ResilientConfig extends PerPartitionConfig {

343

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

344

try {

345

calculateOptimalRate(topicPartition)

346

} catch {

347

case ex: Exception =>

348

// Log error and return safe default

349

println(s"Error calculating rate for $topicPartition: ${ex.getMessage}")

350

1000L // Safe default rate

351

}

352

}

353

354

private def calculateOptimalRate(tp: TopicPartition): Long = {

355

// Complex rate calculation that might fail

356

// (e.g., network calls to monitoring systems)

357

???

358

}

359

}

360

```