0
# Per-Partition Configuration
1
2
Configuration interface for controlling processing rates and other settings on a per-partition basis. This provides fine-grained control over resource usage and processing behavior for different Kafka topic partitions.
3
4
## Capabilities
5
6
### PerPartitionConfig Abstract Class
7
8
Interface for user-supplied configurations that can't be set via Spark properties because they need tweaking on a per-partition basis.
9
10
```scala { .api }
11
abstract class PerPartitionConfig extends Serializable {
12
/**
13
* Maximum rate (number of records per second) at which data will be read
14
* from each Kafka partition.
15
*/
16
def maxRatePerPartition(topicPartition: TopicPartition): Long
17
18
/**
19
* Minimum rate (number of records per second) at which data will be read
20
* from each Kafka partition. Default implementation returns 1.
21
*/
22
def minRatePerPartition(topicPartition: TopicPartition): Long = 1
23
}
24
```
25
26
**Abstract Methods:**
27
- `maxRatePerPartition(topicPartition)`: Returns the maximum records per second for the given partition
28
- `minRatePerPartition(topicPartition)`: Returns the minimum records per second (default: 1)
29
30
### Default Implementation
31
32
Spark provides a default implementation that uses global Spark configuration values:
33
34
```scala
35
// Internal default implementation (not directly accessible)
36
private class DefaultPerPartitionConfig(conf: SparkConf) extends PerPartitionConfig {
37
val maxRate = conf.get("spark.streaming.kafka.maxRatePerPartition", "0").toLong
38
val minRate = conf.get("spark.streaming.kafka.minRatePerPartition", "1").toLong
39
40
def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
41
override def minRatePerPartition(topicPartition: TopicPartition): Long = minRate
42
}
43
```
44
45
## Custom Implementations
46
47
### Topic-Based Rate Limiting
48
49
Configure different rates for different topics based on their characteristics:
50
51
```scala
52
import org.apache.spark.streaming.kafka010.PerPartitionConfig
53
import org.apache.kafka.common.TopicPartition
54
55
class TopicBasedConfig extends PerPartitionConfig {
56
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
57
topicPartition.topic() match {
58
case "high-volume-logs" => 5000 // High rate for log processing
59
case "user-events" => 1000 // Medium rate for user events
60
case "critical-alerts" => 100 // Low rate for critical processing
61
case "batch-imports" => 10000 // Very high rate for bulk imports
62
case _ => 500 // Default rate
63
}
64
}
65
66
override def minRatePerPartition(topicPartition: TopicPartition): Long = {
67
topicPartition.topic() match {
68
case "critical-alerts" => 1 // Ensure critical alerts always process
69
case _ => 10 // Higher minimum for other topics
70
}
71
}
72
}
73
74
// Usage
75
val stream = KafkaUtils.createDirectStream[String, String](
76
ssc,
77
LocationStrategies.PreferConsistent,
78
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
79
new TopicBasedConfig()
80
)
81
```
82
83
### Partition-Based Load Balancing
84
85
Balance load based on specific partition characteristics:
86
87
```scala
88
class PartitionLoadBalancedConfig extends PerPartitionConfig {
89
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
90
val topic = topicPartition.topic()
91
val partition = topicPartition.partition()
92
93
(topic, partition) match {
94
// Partition 0 typically gets more traffic in some systems
95
case (_, 0) => 2000
96
case (_, p) if p <= 2 => 1500 // Partitions 1-2 get medium rate
97
case (_, p) if p <= 5 => 1000 // Partitions 3-5 get standard rate
98
case _ => 500 // Higher partition numbers get lower rate
99
}
100
}
101
102
override def minRatePerPartition(topicPartition: TopicPartition): Long = {
103
// Ensure all partitions maintain minimum processing
104
50
105
}
106
}
107
```
108
109
### Time-Based Rate Configuration
110
111
Adjust rates based on time of day or other temporal factors:
112
113
```scala
114
import java.time.LocalDateTime
115
import java.time.format.DateTimeFormatter
116
117
class TimeBasedConfig extends PerPartitionConfig {
118
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
119
val hour = LocalDateTime.now().getHour
120
val topic = topicPartition.topic()
121
122
// Adjust rates based on expected traffic patterns
123
val baseRate = topic match {
124
case "web-events" => 1000
125
case "api-calls" => 2000
126
case "background-jobs" => 500
127
case _ => 800
128
}
129
130
// Scale based on time of day
131
val timeMultiplier = hour match {
132
case h if h >= 9 && h <= 17 => 2.0 // Business hours: double rate
133
case h if h >= 18 && h <= 22 => 1.5 // Evening: 1.5x rate
134
case _ => 1.0 // Night/early morning: normal rate
135
}
136
137
(baseRate * timeMultiplier).toLong
138
}
139
}
140
141
val stream = KafkaUtils.createDirectStream[String, String](
142
ssc,
143
LocationStrategies.PreferConsistent,
144
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
145
new TimeBasedConfig()
146
)
147
```
148
149
### Resource-Aware Configuration
150
151
Adjust rates based on available cluster resources:
152
153
```scala
154
import org.apache.spark.SparkContext
155
156
class ResourceAwareConfig(sc: SparkContext) extends PerPartitionConfig {
157
private val executorCount = sc.getExecutorMemoryStatus.size
158
private val totalCores = sc.defaultParallelism
159
160
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
161
val topic = topicPartition.topic()
162
163
// Base rate per core
164
val ratePerCore = topic match {
165
case "cpu-intensive" => 200 // Lower rate for CPU-heavy processing
166
case "memory-intensive" => 300 // Medium rate for memory-heavy processing
167
case "io-intensive" => 500 // Higher rate for I/O heavy processing
168
case _ => 400
169
}
170
171
// Scale based on available resources
172
val availableCoresPerPartition = math.max(1, totalCores / getPartitionCount(topic))
173
ratePerCore * availableCoresPerPartition
174
}
175
176
private def getPartitionCount(topic: String): Int = {
177
// This would typically come from metadata or configuration
178
topic match {
179
case "high-partition-topic" => 50
180
case "medium-partition-topic" => 20
181
case _ => 10
182
}
183
}
184
}
185
```
186
187
### Dynamic Rate Adjustment
188
189
Implement feedback-based rate adjustment:
190
191
```scala
192
import java.util.concurrent.atomic.AtomicLong
193
import scala.collection.concurrent.TrieMap
194
195
class AdaptiveConfig extends PerPartitionConfig {
196
private val partitionLatencies = TrieMap[TopicPartition, AtomicLong]()
197
private val partitionRates = TrieMap[TopicPartition, AtomicLong]()
198
199
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
200
val currentRate = partitionRates.getOrElseUpdate(
201
topicPartition,
202
new AtomicLong(1000) // Default starting rate
203
)
204
205
val latency = partitionLatencies.get(topicPartition)
206
.map(_.get()).getOrElse(0L)
207
208
// Adjust rate based on processing latency
209
val adjustedRate = if (latency > 5000) { // High latency
210
math.max(100, currentRate.get() * 0.8).toLong // Reduce rate
211
} else if (latency < 1000) { // Low latency
212
math.min(10000, currentRate.get() * 1.2).toLong // Increase rate
213
} else {
214
currentRate.get() // Keep current rate
215
}
216
217
currentRate.set(adjustedRate)
218
adjustedRate
219
}
220
221
// Method to update latency measurements (called from processing logic)
222
def updateLatency(topicPartition: TopicPartition, latencyMs: Long): Unit = {
223
partitionLatencies.getOrElseUpdate(
224
topicPartition,
225
new AtomicLong(0)
226
).set(latencyMs)
227
}
228
}
229
230
// Usage with latency tracking
231
val adaptiveConfig = new AdaptiveConfig()
232
233
val stream = KafkaUtils.createDirectStream[String, String](
234
ssc,
235
LocationStrategies.PreferConsistent,
236
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
237
adaptiveConfig
238
)
239
240
stream.foreachRDD { rdd =>
241
val startTime = System.currentTimeMillis()
242
243
// Process the RDD
244
rdd.foreach { record =>
245
processMessage(record)
246
}
247
248
val endTime = System.currentTimeMillis()
249
val processingTime = endTime - startTime
250
251
// Update latency information for rate adjustment
252
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
253
offsetRanges.foreach { range =>
254
val topicPartition = new TopicPartition(range.topic, range.partition)
255
adaptiveConfig.updateLatency(topicPartition, processingTime)
256
}
257
}
258
```
259
260
## Integration with Backpressure
261
262
Per-partition configuration works alongside Spark's backpressure mechanism:
263
264
```scala
265
// Configure backpressure in SparkConf
266
val conf = new SparkConf()
267
.setAppName("KafkaStreamingApp")
268
.set("spark.streaming.backpressure.enabled", "true")
269
.set("spark.streaming.backpressure.initialRate", "1000")
270
.set("spark.streaming.kafka.maxRatePerPartition", "2000") // Global max
271
.set("spark.streaming.kafka.minRatePerPartition", "100") // Global min
272
273
class BackpressureAwareConfig extends PerPartitionConfig {
274
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
275
// Per-partition config overrides global settings
276
topicPartition.topic() match {
277
case "priority-topic" => 5000 // Higher than global max
278
case "throttled-topic" => 500 // Lower than global max
279
case _ => 2000 // Match global max
280
}
281
}
282
}
283
```
284
285
## Monitoring and Metrics
286
287
Track per-partition configuration effectiveness:
288
289
```scala
290
class MonitoredConfig extends PerPartitionConfig {
291
private val metricsCollector = new MetricsCollector()
292
293
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
294
val rate = calculateRateForPartition(topicPartition)
295
296
// Log rate assignments for monitoring
297
metricsCollector.recordRateAssignment(topicPartition, rate)
298
299
rate
300
}
301
302
private def calculateRateForPartition(tp: TopicPartition): Long = {
303
// Your rate calculation logic
304
tp.topic() match {
305
case "monitored-topic" => 1500
306
case _ => 1000
307
}
308
}
309
}
310
311
class MetricsCollector {
312
def recordRateAssignment(tp: TopicPartition, rate: Long): Unit = {
313
// Send to your metrics system (Prometheus, CloudWatch, etc.)
314
println(s"Assigned rate $rate to ${tp.topic()}-${tp.partition()}")
315
}
316
}
317
```
318
319
## Best Practices
320
321
1. **Start conservative**: Begin with lower rates and increase based on monitoring.
322
323
2. **Consider downstream capacity**: Ensure downstream systems can handle the configured rates.
324
325
3. **Monitor resource usage**: Track CPU, memory, and network usage to optimize rates.
326
327
4. **Topic characteristics matter**: Consider message size, processing complexity, and business priority.
328
329
5. **Implement gradual changes**: Avoid sudden rate changes that could overwhelm the system.
330
331
6. **Test under load**: Validate your configuration under realistic load conditions.
332
333
7. **Document your strategy**: Make your rate assignment logic clear for operations teams.
334
335
8. **Plan for failures**: Ensure minimum rates allow for message processing even under resource constraints.
336
337
## Error Handling
338
339
Per-partition configuration should be resilient to failures:
340
341
```scala
342
class ResilientConfig extends PerPartitionConfig {
343
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
344
try {
345
calculateOptimalRate(topicPartition)
346
} catch {
347
case ex: Exception =>
348
// Log error and return safe default
349
println(s"Error calculating rate for $topicPartition: ${ex.getMessage}")
350
1000L // Safe default rate
351
}
352
}
353
354
private def calculateOptimalRate(tp: TopicPartition): Long = {
355
// Complex rate calculation that might fail
356
// (e.g., network calls to monitoring systems)
357
???
358
}
359
}
360
```