Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
—
Configuration interface for controlling processing rates and other settings on a per-partition basis. This provides fine-grained control over resource usage and processing behavior for different Kafka topic partitions.
Interface for user-supplied configurations that can't be set via Spark properties because they need tweaking on a per-partition basis.
abstract class PerPartitionConfig extends Serializable {
/**
* Maximum rate (number of records per second) at which data will be read
* from each Kafka partition.
*/
def maxRatePerPartition(topicPartition: TopicPartition): Long
/**
* Minimum rate (number of records per second) at which data will be read
* from each Kafka partition. Default implementation returns 1.
*/
def minRatePerPartition(topicPartition: TopicPartition): Long = 1
}Abstract Methods:
maxRatePerPartition(topicPartition): Returns the maximum records per second for the given partitionminRatePerPartition(topicPartition): Returns the minimum records per second (default: 1)Spark provides a default implementation that uses global Spark configuration values:
// Internal default implementation (not directly accessible)
private class DefaultPerPartitionConfig(conf: SparkConf) extends PerPartitionConfig {
val maxRate = conf.get("spark.streaming.kafka.maxRatePerPartition", "0").toLong
val minRate = conf.get("spark.streaming.kafka.minRatePerPartition", "1").toLong
def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
override def minRatePerPartition(topicPartition: TopicPartition): Long = minRate
}Configure different rates for different topics based on their characteristics:
import org.apache.spark.streaming.kafka010.PerPartitionConfig
import org.apache.kafka.common.TopicPartition
class TopicBasedConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
topicPartition.topic() match {
case "high-volume-logs" => 5000 // High rate for log processing
case "user-events" => 1000 // Medium rate for user events
case "critical-alerts" => 100 // Low rate for critical processing
case "batch-imports" => 10000 // Very high rate for bulk imports
case _ => 500 // Default rate
}
}
override def minRatePerPartition(topicPartition: TopicPartition): Long = {
topicPartition.topic() match {
case "critical-alerts" => 1 // Ensure critical alerts always process
case _ => 10 // Higher minimum for other topics
}
}
}
// Usage
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
new TopicBasedConfig()
)Balance load based on specific partition characteristics:
class PartitionLoadBalancedConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
val topic = topicPartition.topic()
val partition = topicPartition.partition()
(topic, partition) match {
// Partition 0 typically gets more traffic in some systems
case (_, 0) => 2000
case (_, p) if p <= 2 => 1500 // Partitions 1-2 get medium rate
case (_, p) if p <= 5 => 1000 // Partitions 3-5 get standard rate
case _ => 500 // Higher partition numbers get lower rate
}
}
override def minRatePerPartition(topicPartition: TopicPartition): Long = {
// Ensure all partitions maintain minimum processing
50
}
}Adjust rates based on time of day or other temporal factors:
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
class TimeBasedConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
val hour = LocalDateTime.now().getHour
val topic = topicPartition.topic()
// Adjust rates based on expected traffic patterns
val baseRate = topic match {
case "web-events" => 1000
case "api-calls" => 2000
case "background-jobs" => 500
case _ => 800
}
// Scale based on time of day
val timeMultiplier = hour match {
case h if h >= 9 && h <= 17 => 2.0 // Business hours: double rate
case h if h >= 18 && h <= 22 => 1.5 // Evening: 1.5x rate
case _ => 1.0 // Night/early morning: normal rate
}
(baseRate * timeMultiplier).toLong
}
}
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
new TimeBasedConfig()
)Adjust rates based on available cluster resources:
import org.apache.spark.SparkContext
class ResourceAwareConfig(sc: SparkContext) extends PerPartitionConfig {
private val executorCount = sc.getExecutorMemoryStatus.size
private val totalCores = sc.defaultParallelism
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
val topic = topicPartition.topic()
// Base rate per core
val ratePerCore = topic match {
case "cpu-intensive" => 200 // Lower rate for CPU-heavy processing
case "memory-intensive" => 300 // Medium rate for memory-heavy processing
case "io-intensive" => 500 // Higher rate for I/O heavy processing
case _ => 400
}
// Scale based on available resources
val availableCoresPerPartition = math.max(1, totalCores / getPartitionCount(topic))
ratePerCore * availableCoresPerPartition
}
private def getPartitionCount(topic: String): Int = {
// This would typically come from metadata or configuration
topic match {
case "high-partition-topic" => 50
case "medium-partition-topic" => 20
case _ => 10
}
}
}Implement feedback-based rate adjustment:
import java.util.concurrent.atomic.AtomicLong
import scala.collection.concurrent.TrieMap
class AdaptiveConfig extends PerPartitionConfig {
private val partitionLatencies = TrieMap[TopicPartition, AtomicLong]()
private val partitionRates = TrieMap[TopicPartition, AtomicLong]()
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
val currentRate = partitionRates.getOrElseUpdate(
topicPartition,
new AtomicLong(1000) // Default starting rate
)
val latency = partitionLatencies.get(topicPartition)
.map(_.get()).getOrElse(0L)
// Adjust rate based on processing latency
val adjustedRate = if (latency > 5000) { // High latency
math.max(100, currentRate.get() * 0.8).toLong // Reduce rate
} else if (latency < 1000) { // Low latency
math.min(10000, currentRate.get() * 1.2).toLong // Increase rate
} else {
currentRate.get() // Keep current rate
}
currentRate.set(adjustedRate)
adjustedRate
}
// Method to update latency measurements (called from processing logic)
def updateLatency(topicPartition: TopicPartition, latencyMs: Long): Unit = {
partitionLatencies.getOrElseUpdate(
topicPartition,
new AtomicLong(0)
).set(latencyMs)
}
}
// Usage with latency tracking
val adaptiveConfig = new AdaptiveConfig()
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
adaptiveConfig
)
stream.foreachRDD { rdd =>
val startTime = System.currentTimeMillis()
// Process the RDD
rdd.foreach { record =>
processMessage(record)
}
val endTime = System.currentTimeMillis()
val processingTime = endTime - startTime
// Update latency information for rate adjustment
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach { range =>
val topicPartition = new TopicPartition(range.topic, range.partition)
adaptiveConfig.updateLatency(topicPartition, processingTime)
}
}Per-partition configuration works alongside Spark's backpressure mechanism:
// Configure backpressure in SparkConf
val conf = new SparkConf()
.setAppName("KafkaStreamingApp")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "1000")
.set("spark.streaming.kafka.maxRatePerPartition", "2000") // Global max
.set("spark.streaming.kafka.minRatePerPartition", "100") // Global min
class BackpressureAwareConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
// Per-partition config overrides global settings
topicPartition.topic() match {
case "priority-topic" => 5000 // Higher than global max
case "throttled-topic" => 500 // Lower than global max
case _ => 2000 // Match global max
}
}
}Track per-partition configuration effectiveness:
class MonitoredConfig extends PerPartitionConfig {
private val metricsCollector = new MetricsCollector()
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
val rate = calculateRateForPartition(topicPartition)
// Log rate assignments for monitoring
metricsCollector.recordRateAssignment(topicPartition, rate)
rate
}
private def calculateRateForPartition(tp: TopicPartition): Long = {
// Your rate calculation logic
tp.topic() match {
case "monitored-topic" => 1500
case _ => 1000
}
}
}
class MetricsCollector {
def recordRateAssignment(tp: TopicPartition, rate: Long): Unit = {
// Send to your metrics system (Prometheus, CloudWatch, etc.)
println(s"Assigned rate $rate to ${tp.topic()}-${tp.partition()}")
}
}Start conservative: Begin with lower rates and increase based on monitoring.
Consider downstream capacity: Ensure downstream systems can handle the configured rates.
Monitor resource usage: Track CPU, memory, and network usage to optimize rates.
Topic characteristics matter: Consider message size, processing complexity, and business priority.
Implement gradual changes: Avoid sudden rate changes that could overwhelm the system.
Test under load: Validate your configuration under realistic load conditions.
Document your strategy: Make your rate assignment logic clear for operations teams.
Plan for failures: Ensure minimum rates allow for message processing even under resource constraints.
Per-partition configuration should be resilient to failures:
class ResilientConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
try {
calculateOptimalRate(topicPartition)
} catch {
case ex: Exception =>
// Log error and return safe default
println(s"Error calculating rate for $topicPartition: ${ex.getMessage}")
1000L // Safe default rate
}
}
private def calculateOptimalRate(tp: TopicPartition): Long = {
// Complex rate calculation that might fail
// (e.g., network calls to monitoring systems)
???
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly