Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
—
Configuration strategies for creating and managing Kafka consumers with different subscription patterns. Consumer strategies encapsulate the complex setup required for Kafka 0.10 consumers and allow the configuration to be checkpointed with Spark Streaming applications.
Subscribe to a collection of specific topics. This is the most common pattern for consuming from known topics.
// Scala versions
def Subscribe[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
def Subscribe[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]
// Java versions
def Subscribe[K, V](
topics: java.util.Collection[java.lang.String],
kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]
def Subscribe[K, V](
topics: java.util.Collection[java.lang.String],
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]Parameters:
topics: Collection of topic names to subscribe tokafkaParams: Kafka configuration parameters (must include "bootstrap.servers")offsets: Optional starting offsets for specific partitionsUsage Example (Scala):
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-consumer-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("orders", "payments", "inventory")
// Subscribe without specific starting offsets
val strategy1 = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
// Subscribe with specific starting offsets
val startingOffsets = Map[TopicPartition, Long](
new TopicPartition("orders", 0) -> 1000L,
new TopicPartition("orders", 1) -> 2000L,
new TopicPartition("payments", 0) -> 500L
)
val strategy2 = ConsumerStrategies.Subscribe[String, String](
topics,
kafkaParams,
startingOffsets
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
strategy1
)Usage Example (Java):
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "my-consumer-group");
kafkaParams.put("auto.offset.reset", "latest");
Collection<String> topics = Arrays.asList("orders", "payments", "inventory");
ConsumerStrategy<String, String> strategy =
ConsumerStrategies.Subscribe(topics, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
strategy
);Subscribe to all topics matching a specified regular expression pattern. Useful for consuming from dynamically created topics that follow a naming convention.
// Scala versions
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]
// Java versions
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]
def SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]Parameters:
pattern: Regular expression pattern to match topic nameskafkaParams: Kafka configuration parametersoffsets: Optional starting offsets for specific partitionsUsage Example (Scala):
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import java.util.regex.Pattern
// Subscribe to all topics starting with "user-events-"
val topicPattern = Pattern.compile("user-events-.*")
val strategy = ConsumerStrategies.SubscribePattern[String, String](
topicPattern,
kafkaParams
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
strategy
)
// This will automatically consume from topics like:
// - user-events-clicks
// - user-events-purchases
// - user-events-registrations
// as they are createdUsage Example (Java):
import java.util.regex.Pattern;
Pattern topicPattern = Pattern.compile("log-.*-\\d{4}-\\d{2}-\\d{2}");
ConsumerStrategy<String, String> strategy =
ConsumerStrategies.SubscribePattern(topicPattern, kafkaParams);Dynamic Topic Discovery:
The pattern matching is performed periodically against topics existing at the time of check. New topics matching the pattern will be automatically included in subsequent micro-batches.
Assign a fixed collection of specific TopicPartitions. This gives you complete control over which partitions are consumed, useful for advanced use cases requiring precise partition assignment.
// Scala versions
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]
): ConsumerStrategy[K, V]
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]
): ConsumerStrategy[K, V]
// Java versions
def Assign[K, V](
topicPartitions: java.util.Collection[TopicPartition],
kafkaParams: java.util.Map[String, Object]
): ConsumerStrategy[K, V]
def Assign[K, V](
topicPartitions: java.util.Collection[TopicPartition],
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.lang.Long]
): ConsumerStrategy[K, V]Parameters:
topicPartitions: Specific collection of TopicPartitions to consume fromkafkaParams: Kafka configuration parametersoffsets: Optional starting offsets for the assigned partitionsUsage Example (Scala):
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.kafka.common.TopicPartition
// Assign specific partitions for precise control
val assignedPartitions = Array(
new TopicPartition("high-priority", 0),
new TopicPartition("high-priority", 1),
new TopicPartition("medium-priority", 0),
new TopicPartition("low-priority", 2)
)
val strategy = ConsumerStrategies.Assign[String, String](
assignedPartitions,
kafkaParams
)
// With specific starting offsets
val partitionOffsets = Map[TopicPartition, Long](
new TopicPartition("high-priority", 0) -> 10000L,
new TopicPartition("high-priority", 1) -> 15000L,
new TopicPartition("medium-priority", 0) -> 5000L,
new TopicPartition("low-priority", 2) -> 1000L
)
val strategyWithOffsets = ConsumerStrategies.Assign[String, String](
assignedPartitions,
kafkaParams,
partitionOffsets
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
strategyWithOffsets
)Usage Example (Java):
import org.apache.kafka.common.TopicPartition;
import java.util.*;
Collection<TopicPartition> partitions = Arrays.asList(
new TopicPartition("transactions", 0),
new TopicPartition("transactions", 1),
new TopicPartition("transactions", 2)
);
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(new TopicPartition("transactions", 0), 50000L);
offsets.put(new TopicPartition("transactions", 1), 75000L);
offsets.put(new TopicPartition("transactions", 2), 60000L);
ConsumerStrategy<String, String> strategy =
ConsumerStrategies.Assign(partitions, kafkaParams, offsets);Consumer strategies automatically handle security configuration updates:
// Security parameters are automatically processed
val secureKafkaParams = Map[String, Object](
"bootstrap.servers" -> "secure-broker:9093",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "PLAIN",
"sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
// ... other parameters
)
val strategy = ConsumerStrategies.Subscribe[String, String](topics, secureKafkaParams)import org.apache.kafka.common.serialization.StringDeserializer
import com.example.MyCustomDeserializer
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[MyCustomDeserializer],
"group.id" -> "custom-deserializer-group"
)
val strategy = ConsumerStrategies.Subscribe[String, MyCustomObject](topics, kafkaParams)// Start from earliest available offset
val earliestParams = kafkaParams + ("auto.offset.reset" -> "earliest")
// Start from latest offset (default)
val latestParams = kafkaParams + ("auto.offset.reset" -> "latest")
// Fail if no committed offset exists
val noneParams = kafkaParams + ("auto.offset.reset" -> "none")
val strategy = ConsumerStrategies.Subscribe[String, String](topics, earliestParams)Consumer strategies automatically handle the KAFKA-3370 issue when auto.offset.reset=none:
val strictParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "strict-group",
"auto.offset.reset" -> "none" // This triggers the workaround
)
// The strategy will handle NoOffsetForPartitionException internally
val strategy = ConsumerStrategies.Subscribe[String, String](topics, strictParams)Consumer strategies ensure parameters are properly configured for Spark executors:
enable.auto.commit is automatically set to falseauto.offset.reset is set to "none" for executorsgroup.id is modified for executors to avoid conflictsreceive.buffer.bytes is increased to 65536 as a KAFKA-3135 workaroundFor SubscribePattern, the strategy handles:
Use Subscribe for known topics: When you know the exact topic names, prefer Subscribe over pattern matching.
Use SubscribePattern for dynamic topics: When topics are created dynamically with consistent naming patterns.
Use Assign for advanced control: When you need precise control over partition assignment or want to implement custom load balancing.
Always specify group.id: Required for offset management and consumer coordination.
Disable auto-commit: Set enable.auto.commit=false and manage offsets manually for exactly-once semantics.
Handle starting offsets: Specify starting offsets when resuming from checkpoints or specific points in time.
Security first: Include all necessary security parameters for production deployments.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly