CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing

Pending
Overview
Eval results
Files

per-partition-config.mddocs/

Per-Partition Configuration

Configuration interface for controlling processing rates and other settings on a per-partition basis. This provides fine-grained control over resource usage and processing behavior for different Kafka topic partitions.

Capabilities

PerPartitionConfig Abstract Class

Interface for user-supplied configurations that can't be set via Spark properties because they need tweaking on a per-partition basis.

abstract class PerPartitionConfig extends Serializable {
  /**
   * Maximum rate (number of records per second) at which data will be read
   * from each Kafka partition.
   */
  def maxRatePerPartition(topicPartition: TopicPartition): Long
  
  /**
   * Minimum rate (number of records per second) at which data will be read
   * from each Kafka partition. Default implementation returns 1.
   */
  def minRatePerPartition(topicPartition: TopicPartition): Long = 1
}

Abstract Methods:

  • maxRatePerPartition(topicPartition): Returns the maximum records per second for the given partition
  • minRatePerPartition(topicPartition): Returns the minimum records per second (default: 1)

Default Implementation

Spark provides a default implementation that uses global Spark configuration values:

// Internal default implementation (not directly accessible)
private class DefaultPerPartitionConfig(conf: SparkConf) extends PerPartitionConfig {
  val maxRate = conf.get("spark.streaming.kafka.maxRatePerPartition", "0").toLong
  val minRate = conf.get("spark.streaming.kafka.minRatePerPartition", "1").toLong
  
  def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
  override def minRatePerPartition(topicPartition: TopicPartition): Long = minRate
}

Custom Implementations

Topic-Based Rate Limiting

Configure different rates for different topics based on their characteristics:

import org.apache.spark.streaming.kafka010.PerPartitionConfig
import org.apache.kafka.common.TopicPartition

class TopicBasedConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    topicPartition.topic() match {
      case "high-volume-logs" => 5000      // High rate for log processing
      case "user-events" => 1000           // Medium rate for user events  
      case "critical-alerts" => 100        // Low rate for critical processing
      case "batch-imports" => 10000        // Very high rate for bulk imports
      case _ => 500                        // Default rate
    }
  }
  
  override def minRatePerPartition(topicPartition: TopicPartition): Long = {
    topicPartition.topic() match {
      case "critical-alerts" => 1          // Ensure critical alerts always process
      case _ => 10                         // Higher minimum for other topics
    }
  }
}

// Usage
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
  new TopicBasedConfig()
)

Partition-Based Load Balancing

Balance load based on specific partition characteristics:

class PartitionLoadBalancedConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    val topic = topicPartition.topic()
    val partition = topicPartition.partition()
    
    (topic, partition) match {
      // Partition 0 typically gets more traffic in some systems
      case (_, 0) => 2000
      case (_, p) if p <= 2 => 1500      // Partitions 1-2 get medium rate
      case (_, p) if p <= 5 => 1000      // Partitions 3-5 get standard rate  
      case _ => 500                      // Higher partition numbers get lower rate
    }
  }
  
  override def minRatePerPartition(topicPartition: TopicPartition): Long = {
    // Ensure all partitions maintain minimum processing
    50
  }
}

Time-Based Rate Configuration

Adjust rates based on time of day or other temporal factors:

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

class TimeBasedConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    val hour = LocalDateTime.now().getHour
    val topic = topicPartition.topic()
    
    // Adjust rates based on expected traffic patterns
    val baseRate = topic match {
      case "web-events" => 1000
      case "api-calls" => 2000
      case "background-jobs" => 500
      case _ => 800
    }
    
    // Scale based on time of day
    val timeMultiplier = hour match {
      case h if h >= 9 && h <= 17 => 2.0   // Business hours: double rate
      case h if h >= 18 && h <= 22 => 1.5  // Evening: 1.5x rate
      case _ => 1.0                        // Night/early morning: normal rate
    }
    
    (baseRate * timeMultiplier).toLong
  }
}

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
  new TimeBasedConfig()
)

Resource-Aware Configuration

Adjust rates based on available cluster resources:

import org.apache.spark.SparkContext

class ResourceAwareConfig(sc: SparkContext) extends PerPartitionConfig {
  private val executorCount = sc.getExecutorMemoryStatus.size
  private val totalCores = sc.defaultParallelism
  
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    val topic = topicPartition.topic()
    
    // Base rate per core
    val ratePerCore = topic match {
      case "cpu-intensive" => 200    // Lower rate for CPU-heavy processing
      case "memory-intensive" => 300 // Medium rate for memory-heavy processing  
      case "io-intensive" => 500     // Higher rate for I/O heavy processing
      case _ => 400
    }
    
    // Scale based on available resources
    val availableCoresPerPartition = math.max(1, totalCores / getPartitionCount(topic))
    ratePerCore * availableCoresPerPartition
  }
  
  private def getPartitionCount(topic: String): Int = {
    // This would typically come from metadata or configuration
    topic match {
      case "high-partition-topic" => 50
      case "medium-partition-topic" => 20
      case _ => 10
    }
  }
}

Dynamic Rate Adjustment

Implement feedback-based rate adjustment:

import java.util.concurrent.atomic.AtomicLong
import scala.collection.concurrent.TrieMap

class AdaptiveConfig extends PerPartitionConfig {
  private val partitionLatencies = TrieMap[TopicPartition, AtomicLong]()
  private val partitionRates = TrieMap[TopicPartition, AtomicLong]()
  
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    val currentRate = partitionRates.getOrElseUpdate(
      topicPartition, 
      new AtomicLong(1000) // Default starting rate
    )
    
    val latency = partitionLatencies.get(topicPartition)
      .map(_.get()).getOrElse(0L)
    
    // Adjust rate based on processing latency
    val adjustedRate = if (latency > 5000) { // High latency
      math.max(100, currentRate.get() * 0.8).toLong  // Reduce rate
    } else if (latency < 1000) { // Low latency
      math.min(10000, currentRate.get() * 1.2).toLong // Increase rate
    } else {
      currentRate.get() // Keep current rate
    }
    
    currentRate.set(adjustedRate)
    adjustedRate
  }
  
  // Method to update latency measurements (called from processing logic)
  def updateLatency(topicPartition: TopicPartition, latencyMs: Long): Unit = {
    partitionLatencies.getOrElseUpdate(
      topicPartition, 
      new AtomicLong(0)
    ).set(latencyMs)
  }
}

// Usage with latency tracking
val adaptiveConfig = new AdaptiveConfig()

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
  adaptiveConfig
)

stream.foreachRDD { rdd =>
  val startTime = System.currentTimeMillis()
  
  // Process the RDD
  rdd.foreach { record =>
    processMessage(record)
  }
  
  val endTime = System.currentTimeMillis()
  val processingTime = endTime - startTime
  
  // Update latency information for rate adjustment
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  offsetRanges.foreach { range =>
    val topicPartition = new TopicPartition(range.topic, range.partition)
    adaptiveConfig.updateLatency(topicPartition, processingTime)
  }
}

Integration with Backpressure

Per-partition configuration works alongside Spark's backpressure mechanism:

// Configure backpressure in SparkConf
val conf = new SparkConf()
  .setAppName("KafkaStreamingApp")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.backpressure.initialRate", "1000")
  .set("spark.streaming.kafka.maxRatePerPartition", "2000") // Global max
  .set("spark.streaming.kafka.minRatePerPartition", "100")  // Global min

class BackpressureAwareConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    // Per-partition config overrides global settings
    topicPartition.topic() match {
      case "priority-topic" => 5000     // Higher than global max
      case "throttled-topic" => 500     // Lower than global max
      case _ => 2000                    // Match global max
    }
  }
}

Monitoring and Metrics

Track per-partition configuration effectiveness:

class MonitoredConfig extends PerPartitionConfig {
  private val metricsCollector = new MetricsCollector()
  
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    val rate = calculateRateForPartition(topicPartition)
    
    // Log rate assignments for monitoring
    metricsCollector.recordRateAssignment(topicPartition, rate)
    
    rate
  }
  
  private def calculateRateForPartition(tp: TopicPartition): Long = {
    // Your rate calculation logic
    tp.topic() match {
      case "monitored-topic" => 1500
      case _ => 1000
    }
  }
}

class MetricsCollector {
  def recordRateAssignment(tp: TopicPartition, rate: Long): Unit = {
    // Send to your metrics system (Prometheus, CloudWatch, etc.)
    println(s"Assigned rate $rate to ${tp.topic()}-${tp.partition()}")
  }
}

Best Practices

  1. Start conservative: Begin with lower rates and increase based on monitoring.

  2. Consider downstream capacity: Ensure downstream systems can handle the configured rates.

  3. Monitor resource usage: Track CPU, memory, and network usage to optimize rates.

  4. Topic characteristics matter: Consider message size, processing complexity, and business priority.

  5. Implement gradual changes: Avoid sudden rate changes that could overwhelm the system.

  6. Test under load: Validate your configuration under realistic load conditions.

  7. Document your strategy: Make your rate assignment logic clear for operations teams.

  8. Plan for failures: Ensure minimum rates allow for message processing even under resource constraints.

Error Handling

Per-partition configuration should be resilient to failures:

class ResilientConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    try {
      calculateOptimalRate(topicPartition)
    } catch {
      case ex: Exception =>
        // Log error and return safe default
        println(s"Error calculating rate for $topicPartition: ${ex.getMessage}")
        1000L // Safe default rate
    }
  }
  
  private def calculateOptimalRate(tp: TopicPartition): Long = {
    // Complex rate calculation that might fail
    // (e.g., network calls to monitoring systems)
    ???
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly

docs

consumer-strategies.md

index.md

location-strategies.md

offset-management.md

per-partition-config.md

stream-creation.md

tile.json