The stream creation capability provides methods for creating Kafka DStreams that deliver exactly-once semantics and high-performance real-time data processing. These DStreams automatically handle consumer lifecycle, offset management, and integration with Spark's rate limiting mechanisms.
Creates a DStream where each Kafka topic/partition corresponds to an RDD partition with default rate limiting configuration.
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]Parameters:
ssc: StreamingContext - The Spark Streaming contextlocationStrategy: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent in most cases)consumerStrategy: ConsumerStrategy[K, V] - Consumer configuration strategy (use ConsumerStrategies.Subscribe in most cases)Returns: InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream
Creates a DStream with custom per-partition configuration for advanced rate limiting and performance tuning.
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]Parameters:
ssc: StreamingContext - The Spark Streaming contextlocationStrategy: LocationStrategy - Consumer placement strategyconsumerStrategy: ConsumerStrategy[K, V] - Consumer configuration strategyperPartitionConfig: PerPartitionConfig - Custom per-partition configurationReturns: InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream with custom configuration
Java version of the basic stream creation method.
public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(
JavaStreamingContext jssc,
LocationStrategy locationStrategy,
ConsumerStrategy<K, V> consumerStrategy
)Java version with custom per-partition configuration.
public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(
JavaStreamingContext jssc,
LocationStrategy locationStrategy,
ConsumerStrategy<K, V> consumerStrategy,
PerPartitionConfig perPartitionConfig
)import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("orders", "payments", "users")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Process the stream
stream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
rdd.foreach { record =>
println(s"Topic: ${record.topic}, Key: ${record.key}, Value: ${record.value}")
}
}
}import org.apache.spark.streaming.kafka010._
// Custom per-partition configuration
class CustomPerPartitionConfig extends PerPartitionConfig {
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
topicPartition.topic() match {
case "high-volume-topic" => 1000 // Higher rate for high-volume topic
case _ => 500 // Default rate
}
}
}
val customConfig = new CustomPerPartitionConfig()
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
customConfig
)import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-streaming-group");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topic1", "topic2");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);
stream.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println("Key: " + record.key() + ", Value: " + record.value());
});
});bootstrap.servers: Kafka broker addresses (required)key.deserializer: Key deserializer class (required)value.deserializer: Value deserializer class (required)group.id: Consumer group ID (recommended)enable.auto.commit: Set to false for manual offset managementauto.offset.reset: Set to "latest" or "earliest" based on requirementsThe configuration spark.streaming.kafka.maxRatePerPartition controls the maximum number of messages per second that each partition will accept. Set to 0 for unlimited rate.
@Experimental in Spark 2.4.8HasOffsetRanges and CanCommitOffsets interfaces