CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming

Pending
Overview
Eval results
Files

receiver-streaming.mddocs/

Receiver-based Streaming

Receiver-based streaming creates input streams that use long-running receivers to pull messages from Kafka brokers. This is the legacy approach that relies on Zookeeper for coordination and consumer group management.

Capabilities

Basic Receiver Stream Creation

Creates a receiver-based stream with String key/value types.

/**
 * Create an input stream that pulls messages from Kafka Brokers using receivers.
 *
 * @param ssc StreamingContext object
 * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
 * @param groupId The group id for this consumer
 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
 *               in its own thread
 * @param storageLevel Storage level to use for storing the received objects
 *                     (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 * @return DStream of (Kafka message key, Kafka message value)
 */
def createStream(
  ssc: StreamingContext,
  zkQuorum: String,
  groupId: String,
  topics: Map[String, Int],
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)]

Usage Example:

import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel

val zkQuorum = "localhost:2181"
val groupId = "my-consumer-group"
val topics = Map("user-events" -> 1, "purchase-events" -> 2)

val stream = KafkaUtils.createStream(
  streamingContext, zkQuorum, groupId, topics, StorageLevel.MEMORY_AND_DISK_SER_2
)

stream.foreachRDD { rdd =>
  rdd.foreach { case (key, value) =>
    println(s"Received: $key -> $value")
  }
}

Generic Receiver Stream Creation

Creates a receiver-based stream with custom key/value types and decoders.

/**
 * Create an input stream that pulls messages from Kafka Brokers using receivers
 * with custom key/value types.
 *
 * @param ssc StreamingContext object
 * @param kafkaParams Map of kafka configuration parameters
 * @param topics Map of (topic_name -> numPartitions) to consume
 * @param storageLevel Storage level to use for storing the received objects
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam U type of Kafka message key decoder
 * @tparam T type of Kafka message value decoder
 * @return DStream of (Kafka message key, Kafka message value)
 */
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[K]: ClassTag, T <: Decoder[V]: ClassTag](
  ssc: StreamingContext,
  kafkaParams: Map[String, String],
  topics: Map[String, Int],
  storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)]

Usage Example:

import kafka.serializer.{StringDecoder, DefaultDecoder}

val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "my-consumer-group",
  "zookeeper.connection.timeout.ms" -> "10000"
)

val topics = Map("binary-data" -> 1)

val binaryStream = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
  streamingContext, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER
)

binaryStream.foreachRDD { rdd =>
  rdd.foreach { case (keyBytes, valueBytes) =>
    println(s"Key length: ${keyBytes.length}, Value length: ${valueBytes.length}")
  }
}

Java Receiver Stream API

Java-friendly API for receiver-based stream creation.

/**
 * Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).
 *
 * @param jssc JavaStreamingContext object
 * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
 * @param groupId The group id for this consumer
 * @param topics Map of (topic_name -> numPartitions) to consume
 * @return DStream of (Kafka message key, Kafka message value)
 */
public static JavaPairReceiverInputDStream<String, String> createStream(
  JavaStreamingContext jssc,
  String zkQuorum,
  String groupId,
  Map<String, Integer> topics
)

/**
 * Create an input stream with custom storage level (Java API).
 */
public static JavaPairReceiverInputDStream<String, String> createStream(
  JavaStreamingContext jssc,
  String zkQuorum,
  String groupId,
  Map<String, Integer> topics,
  StorageLevel storageLevel
)

Java Usage Example:

import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.storage.StorageLevel;

Map<String, Integer> topics = new HashMap<>();
topics.put("my-topic", 1);

JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(
    jssc,
    "localhost:2181",
    "my-consumer-group", 
    topics,
    StorageLevel.MEMORY_AND_DISK_SER_2()
);

stream.foreachRDD(rdd -> {
    rdd.foreach(record -> {
        System.out.println("Key: " + record._1 + ", Value: " + record._2);
    });
});

Advanced Java Receiver Stream with Custom Types

Java API with custom key/value types and decoders.

/**
 * Create an input stream with custom types and decoders (Java API).
 *
 * @param jssc JavaStreamingContext object
 * @param keyTypeClass Key type of DStream
 * @param valueTypeClass Value type of DStream
 * @param keyDecoderClass Type of kafka key decoder
 * @param valueDecoderClass Type of kafka value decoder
 * @param kafkaParams Map of kafka configuration parameters
 * @param topics Map of (topic_name -> numPartitions) to consume
 * @param storageLevel RDD storage level
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam U type of Kafka message key decoder
 * @tparam T type of Kafka message value decoder
 * @return DStream of (Kafka message key, Kafka message value)
 */
public static <K, V, U extends Decoder<K>, T extends Decoder<V>> 
JavaPairReceiverInputDStream<K, V> createStream(
  JavaStreamingContext jssc,
  Class<K> keyTypeClass,
  Class<V> valueTypeClass,
  Class<U> keyDecoderClass,
  Class<T> valueDecoderClass,
  Map<String, String> kafkaParams,
  Map<String, Integer> topics,
  StorageLevel storageLevel
)

Key Features

Write-Ahead Logs (WAL)

  • Automatic WAL enabled when configured: spark.streaming.receiver.writeAheadLog.enable=true
  • Ensures data recovery in case of driver failures
  • Trades performance for reliability

Consumer Group Management

  • Automatic consumer group coordination through Zookeeper
  • Offset management handled by Kafka consumer group
  • Multiple instances can share the same consumer group for load balancing

Storage Levels

Common storage levels for different reliability/performance trade-offs:

StorageLevel.MEMORY_AND_DISK_SER_2    // Default: Memory + disk, serialized, replicated
StorageLevel.MEMORY_AND_DISK_SER      // Memory + disk, serialized, not replicated  
StorageLevel.MEMORY_ONLY_SER_2        // Memory only, serialized, replicated
StorageLevel.DISK_ONLY_2              // Disk only, replicated

Configuration Parameters

Kafka Parameters

Map(
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "my-consumer-group",
  "zookeeper.connection.timeout.ms" -> "10000",
  "zookeeper.session.timeout.ms" -> "10000",
  "zookeeper.sync.time.ms" -> "2000",
  "auto.commit.interval.ms" -> "1000"
)

Topic Partition Mapping

// Map of topic name to number of threads/partitions to consume
Map(
  "topic1" -> 1,      // 1 thread for topic1
  "topic2" -> 2,      // 2 threads for topic2  
  "topic3" -> 4       // 4 threads for topic3
)

Limitations

At-Least-Once Semantics

  • Receiver-based approach provides at-least-once delivery guarantees
  • Duplicate messages possible during failures
  • Use direct streaming for exactly-once semantics

Scalability Constraints

  • Number of partitions limited by number of cores available
  • Each partition consumed in separate thread
  • Receiver runs on executor, consuming cluster resources

Zookeeper Dependency

  • Requires Zookeeper for consumer coordination
  • Additional operational complexity
  • Single point of failure for consumer group management

Migration to Direct Streaming

For new applications, consider using direct streaming instead:

// Old receiver-based approach
val receiverStream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)

// New direct streaming approach  
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics.keySet
)

Benefits of migration:

  • Exactly-once semantics
  • Better performance and throughput
  • No Zookeeper dependency for streaming
  • More control over offset management

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

docs

batch-rdd.md

direct-streaming.md

index.md

java-api.md

offset-management.md

receiver-streaming.md

tile.json