A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming
—
Complete Java API for Apache Spark Streaming Kafka integration, providing type-safe wrappers for all Scala functionality with familiar Java programming patterns.
Java-friendly API for direct stream creation without receivers.
/**
* Create an input stream that directly pulls messages from Kafka Brokers (Java API).
*
* @param jssc JavaStreamingContext object
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param keyDecoderClass Class of the key decoder
* @param valueDecoderClass Class type of the value decoder
* @param kafkaParams Kafka configuration parameters
* @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
JavaPairInputDStream<K, V> createDirectStream(
JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Map<String, String> kafkaParams,
Set<String> topics
)Usage Example:
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.api.java.*;
import kafka.serializer.StringDecoder;
import java.util.*;
// Setup Kafka parameters
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "largest");
Set<String> topics = Collections.singleton("user-events");
// Create direct stream
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics
);
// Process messages
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreach(record -> {
System.out.println("Key: " + record._1 + ", Value: " + record._2);
});
// Print offset information
for (OffsetRange o : offsetRanges) {
System.out.println(o.topic() + " " + o.partition() +
" " + o.fromOffset() + " " + o.untilOffset());
}
});Advanced Java API with custom message transformation and explicit offset control.
/**
* Create an input stream with custom message handler (Java API).
*
* @param jssc JavaStreamingContext object
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param keyDecoderClass Class of the key decoder
* @param valueDecoderClass Class of the value decoder
* @param recordClass Class of the records in DStream
* @param kafkaParams Kafka configuration parameters
* @param fromOffsets Per-topic/partition Kafka offsets defining starting point
* @param messageHandler Function for translating each message and metadata
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return DStream of R
*/
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R>
JavaInputDStream<R> createDirectStream(
JavaStreamingContext jssc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Class<R> recordClass,
Map<String, String> kafkaParams,
Map<TopicAndPartition, Long> fromOffsets,
Function<MessageAndMetadata<K, V>, R> messageHandler
)Usage Example:
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import org.apache.spark.api.java.function.Function;
// Define custom message handler
Function<MessageAndMetadata<String, String>, String> messageHandler =
new Function<MessageAndMetadata<String, String>, String>() {
@Override
public String call(MessageAndMetadata<String, String> mmd) {
return String.format("%s:%d:%d -> %s:%s",
mmd.topic(), mmd.partition(), mmd.offset(),
mmd.key(), mmd.message());
}
};
// Or using lambda (Java 8+)
Function<MessageAndMetadata<String, String>, String> lambdaHandler =
mmd -> String.format("%s:%d:%d -> %s:%s",
mmd.topic(), mmd.partition(), mmd.offset(),
mmd.key(), mmd.message());
// Define starting offsets
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicAndPartition("events", 0), 1000L);
fromOffsets.put(new TopicAndPartition("events", 1), 2000L);
// Create stream with custom handler
JavaInputDStream<String> customStream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
fromOffsets,
lambdaHandler
);
customStream.print();Java API for traditional receiver-based streaming.
/**
* Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).
*
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume
* @return DStream of (Kafka message key, Kafka message value)
*/
public static JavaPairReceiverInputDStream<String, String> createStream(
JavaStreamingContext jssc,
String zkQuorum,
String groupId,
Map<String, Integer> topics
)
/**
* Create receiver-based stream with custom storage level.
*/
public static JavaPairReceiverInputDStream<String, String> createStream(
JavaStreamingContext jssc,
String zkQuorum,
String groupId,
Map<String, Integer> topics,
StorageLevel storageLevel
)Usage Example:
import org.apache.spark.storage.StorageLevel;
Map<String, Integer> topics = new HashMap<>();
topics.put("user-events", 1);
topics.put("system-logs", 2);
JavaPairReceiverInputDStream<String, String> receiverStream = KafkaUtils.createStream(
jssc,
"localhost:2181",
"my-consumer-group",
topics,
StorageLevel.MEMORY_AND_DISK_SER_2()
);
receiverStream.foreachRDD(rdd -> {
System.out.println("Batch size: " + rdd.count());
rdd.foreach(record -> {
System.out.println("Received: " + record._1 + " -> " + record._2);
});
});Java API for creating RDDs from Kafka with precise offset control.
/**
* Create a RDD from Kafka using offset ranges (Java API).
*
* @param jsc JavaSparkContext object
* @param keyClass type of Kafka message key
* @param valueClass type of Kafka message value
* @param keyDecoderClass type of Kafka message key decoder
* @param valueDecoderClass type of Kafka message value decoder
* @param kafkaParams Kafka configuration parameters
* @param offsetRanges Each OffsetRange corresponds to a range of offsets
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return RDD of (Kafka message key, Kafka message value)
*/
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
JavaPairRDD<K, V> createRDD(
JavaSparkContext jsc,
Class<K> keyClass,
Class<V> valueClass,
Class<KD> keyDecoderClass,
Class<VD> valueDecoderClass,
Map<String, String> kafkaParams,
OffsetRange[] offsetRanges
)Usage Example:
import org.apache.spark.streaming.kafka.OffsetRange;
// Create offset ranges
OffsetRange[] offsetRanges = {
OffsetRange.create("events", 0, 1000, 2000),
OffsetRange.create("events", 1, 500, 1500),
OffsetRange.create("logs", 0, 100, 200)
};
// Create RDD
JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(
jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
offsetRanges
);
// Process data
System.out.println("Total messages: " + rdd.count());
rdd.foreach(record -> {
System.out.println("Key: " + record._1 + ", Value: " + record._2);
});The Java API fully supports generic types with proper type safety:
// Custom key/value types
public class UserEvent {
public String userId;
public String eventType;
public long timestamp;
}
public class CustomDecoder implements Decoder<UserEvent> {
@Override
public UserEvent fromBytes(byte[] bytes) {
// Custom deserialization logic
return parseUserEvent(bytes);
}
}
// Type-safe stream creation
JavaPairInputDStream<String, UserEvent> typedStream = KafkaUtils.createDirectStream(
jssc,
String.class,
UserEvent.class,
StringDecoder.class,
CustomDecoder.class,
kafkaParams,
topics
);Accessing offset information from Java RDDs:
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.OffsetRange;
stream.foreachRDD(rdd -> {
// Cast to HasOffsetRanges to access offset information
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
for (OffsetRange offsetRange : offsetRanges) {
System.out.printf("Topic: %s, Partition: %d, Range: [%d, %d)%n",
offsetRange.topic(),
offsetRange.partition(),
offsetRange.fromOffset(),
offsetRange.untilOffset()
);
}
// Process the data
rdd.foreach(record -> processRecord(record));
});Working with Java collections for configuration:
import java.util.*;
// Kafka parameters using Java Maps
Map<String, String> kafkaParams = new HashMap<String, String>() {{
put("metadata.broker.list", "localhost:9092");
put("auto.offset.reset", "largest");
put("group.id", "my-consumer-group");
}};
// Topics using Java Sets
Set<String> topics = new HashSet<String>() {{
add("user-events");
add("system-logs");
}};
// Topic partitions using Java Maps
Map<String, Integer> topicPartitions = new HashMap<String, Integer>() {{
put("user-events", 2);
put("system-logs", 1);
}};// Direct stream with lambda processing
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(/*...*/);
stream
.filter(record -> record._1 != null && !record._1.isEmpty())
.map(record -> record._1.toUpperCase() + ":" + record._2)
.foreachRDD(rdd -> {
rdd.foreach(System.out::println);
});// Lambda message handler
Function<MessageAndMetadata<String, String>, ProcessedMessage> handler =
mmd -> new ProcessedMessage(
mmd.topic(),
mmd.partition(),
mmd.offset(),
mmd.key(),
mmd.message(),
System.currentTimeMillis()
);
JavaInputDStream<ProcessedMessage> processedStream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
ProcessedMessage.class,
kafkaParams,
fromOffsets,
handler
);try {
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParams, topics
);
stream.foreachRDD(rdd -> {
try {
rdd.foreach(record -> {
processRecord(record._1, record._2);
});
} catch (Exception e) {
System.err.println("Error processing RDD: " + e.getMessage());
e.printStackTrace();
}
});
} catch (Exception e) {
System.err.println("Error creating Kafka stream: " + e.getMessage());
throw new RuntimeException("Failed to initialize Kafka streaming", e);
}// Custom error-handling decoder
public class SafeStringDecoder implements Decoder<String> {
private final StringDecoder delegate = new StringDecoder();
@Override
public String fromBytes(byte[] bytes) {
try {
return delegate.fromBytes(bytes);
} catch (Exception e) {
System.err.println("Failed to decode message: " + e.getMessage());
return "<DECODE_ERROR>";
}
}
}@Component
public class KafkaStreamingService {
@Autowired
private JavaStreamingContext streamingContext;
@Value("${kafka.brokers}")
private String brokers;
@PostConstruct
public void initializeStreams() {
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
Set<String> topics = Set.of("events");
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
streamingContext,
String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParams, topics
);
stream.foreachRDD(this::processRDD);
}
private void processRDD(JavaPairRDD<String, String> rdd) {
rdd.foreach(record -> {
// Business logic here
handleMessage(record._1, record._2);
});
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10