or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md
tile.json

stream-creation.mddocs/

Stream Creation

The stream creation capability provides methods for creating Kafka DStreams that deliver exactly-once semantics and high-performance real-time data processing. These DStreams automatically handle consumer lifecycle, offset management, and integration with Spark's rate limiting mechanisms.

Core Functions

createDirectStream (Basic)

Creates a DStream where each Kafka topic/partition corresponds to an RDD partition with default rate limiting configuration.

def createDirectStream[K, V](
  ssc: StreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]

Parameters:

  • ssc: StreamingContext - The Spark Streaming context
  • locationStrategy: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent in most cases)
  • consumerStrategy: ConsumerStrategy[K, V] - Consumer configuration strategy (use ConsumerStrategies.Subscribe in most cases)

Returns: InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream

createDirectStream (With Configuration)

Creates a DStream with custom per-partition configuration for advanced rate limiting and performance tuning.

def createDirectStream[K, V](
  ssc: StreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V],
  perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]]

Parameters:

  • ssc: StreamingContext - The Spark Streaming context
  • locationStrategy: LocationStrategy - Consumer placement strategy
  • consumerStrategy: ConsumerStrategy[K, V] - Consumer configuration strategy
  • perPartitionConfig: PerPartitionConfig - Custom per-partition configuration

Returns: InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream with custom configuration

Java API

createDirectStream (Java - Basic)

Java version of the basic stream creation method.

public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(
  JavaStreamingContext jssc,
  LocationStrategy locationStrategy,
  ConsumerStrategy<K, V> consumerStrategy
)

createDirectStream (Java - With Configuration)

Java version with custom per-partition configuration.

public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(
  JavaStreamingContext jssc,
  LocationStrategy locationStrategy,
  ConsumerStrategy<K, V> consumerStrategy,
  PerPartitionConfig perPartitionConfig
)

Usage Examples

Basic Stream Creation

import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("orders", "payments", "users")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// Process the stream
stream.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    rdd.foreach { record =>
      println(s"Topic: ${record.topic}, Key: ${record.key}, Value: ${record.value}")
    }
  }
}

Stream with Custom Rate Limiting

import org.apache.spark.streaming.kafka010._

// Custom per-partition configuration
class CustomPerPartitionConfig extends PerPartitionConfig {
  def maxRatePerPartition(topicPartition: TopicPartition): Long = {
    topicPartition.topic() match {
      case "high-volume-topic" => 1000 // Higher rate for high-volume topic
      case _ => 500 // Default rate
    }
  }
}

val customConfig = new CustomPerPartitionConfig()

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
  customConfig
)

Java Stream Creation

import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-streaming-group");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topic1", "topic2");
JavaInputDStream<ConsumerRecord<String, String>> stream = 
  KafkaUtils.createDirectStream(
    javaStreamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.Subscribe(topics, kafkaParams)
  );

stream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    System.out.println("Key: " + record.key() + ", Value: " + record.value());
  });
});

Configuration Notes

Required Kafka Parameters

  • bootstrap.servers: Kafka broker addresses (required)
  • key.deserializer: Key deserializer class (required)
  • value.deserializer: Value deserializer class (required)
  • group.id: Consumer group ID (recommended)

Recommended Kafka Parameters

  • enable.auto.commit: Set to false for manual offset management
  • auto.offset.reset: Set to "latest" or "earliest" based on requirements

Rate Limiting

The configuration spark.streaming.kafka.maxRatePerPartition controls the maximum number of messages per second that each partition will accept. Set to 0 for unlimited rate.

Important Notes

  • All stream creation methods are marked as @Experimental in Spark 2.4.8
  • The direct approach provides exactly-once semantics when combined with proper offset management
  • Consumer instances are automatically managed and cached for performance
  • Supports both Scala and Java APIs with appropriate type conversions
  • DStreams created with these methods implement both HasOffsetRanges and CanCommitOffsets interfaces