CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

A shaded JAR assembly containing all dependencies needed for Kafka integration with Spark Streaming

Pending
Overview
Eval results
Files

java-api.mddocs/

Java API

Complete Java API for Apache Spark Streaming Kafka integration, providing type-safe wrappers for all Scala functionality with familiar Java programming patterns.

Capabilities

Java Direct Streaming

Java-friendly API for direct stream creation without receivers.

/**
 * Create an input stream that directly pulls messages from Kafka Brokers (Java API).
 *
 * @param jssc JavaStreamingContext object
 * @param keyClass Class of the keys in the Kafka records
 * @param valueClass Class of the values in the Kafka records
 * @param keyDecoderClass Class of the key decoder
 * @param valueDecoderClass Class type of the value decoder
 * @param kafkaParams Kafka configuration parameters
 * @param topics Names of the topics to consume
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return DStream of (Kafka message key, Kafka message value)
 */
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>> 
JavaPairInputDStream<K, V> createDirectStream(
  JavaStreamingContext jssc,
  Class<K> keyClass,
  Class<V> valueClass,
  Class<KD> keyDecoderClass,
  Class<VD> valueDecoderClass,
  Map<String, String> kafkaParams,
  Set<String> topics
)

Usage Example:

import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.api.java.*;
import kafka.serializer.StringDecoder;
import java.util.*;

// Setup Kafka parameters
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "largest");

Set<String> topics = Collections.singleton("user-events");

// Create direct stream
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParams,
    topics
);

// Process messages
stream.foreachRDD(rdd -> {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
    rdd.foreach(record -> {
        System.out.println("Key: " + record._1 + ", Value: " + record._2);
    });
    
    // Print offset information
    for (OffsetRange o : offsetRanges) {
        System.out.println(o.topic() + " " + o.partition() + 
                          " " + o.fromOffset() + " " + o.untilOffset());
    }
});

Java Direct Streaming with Custom Message Handler

Advanced Java API with custom message transformation and explicit offset control.

/**
 * Create an input stream with custom message handler (Java API).
 *
 * @param jssc JavaStreamingContext object
 * @param keyClass Class of the keys in the Kafka records
 * @param valueClass Class of the values in the Kafka records
 * @param keyDecoderClass Class of the key decoder
 * @param valueDecoderClass Class of the value decoder
 * @param recordClass Class of the records in DStream
 * @param kafkaParams Kafka configuration parameters
 * @param fromOffsets Per-topic/partition Kafka offsets defining starting point
 * @param messageHandler Function for translating each message and metadata
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @tparam R type returned by messageHandler
 * @return DStream of R
 */
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R> 
JavaInputDStream<R> createDirectStream(
  JavaStreamingContext jssc,
  Class<K> keyClass,
  Class<V> valueClass,
  Class<KD> keyDecoderClass,
  Class<VD> valueDecoderClass,
  Class<R> recordClass,
  Map<String, String> kafkaParams,
  Map<TopicAndPartition, Long> fromOffsets,
  Function<MessageAndMetadata<K, V>, R> messageHandler
)

Usage Example:

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import org.apache.spark.api.java.function.Function;

// Define custom message handler
Function<MessageAndMetadata<String, String>, String> messageHandler = 
    new Function<MessageAndMetadata<String, String>, String>() {
        @Override
        public String call(MessageAndMetadata<String, String> mmd) {
            return String.format("%s:%d:%d -> %s:%s", 
                mmd.topic(), mmd.partition(), mmd.offset(), 
                mmd.key(), mmd.message());
        }
    };

// Or using lambda (Java 8+)
Function<MessageAndMetadata<String, String>, String> lambdaHandler = 
    mmd -> String.format("%s:%d:%d -> %s:%s",
        mmd.topic(), mmd.partition(), mmd.offset(),
        mmd.key(), mmd.message());

// Define starting offsets
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicAndPartition("events", 0), 1000L);
fromOffsets.put(new TopicAndPartition("events", 1), 2000L);

// Create stream with custom handler
JavaInputDStream<String> customStream = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    String.class,
    kafkaParams,
    fromOffsets,
    lambdaHandler
);

customStream.print();

Java Receiver-based Streaming

Java API for traditional receiver-based streaming.

/**
 * Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).
 *
 * @param jssc JavaStreamingContext object
 * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
 * @param groupId The group id for this consumer
 * @param topics Map of (topic_name -> numPartitions) to consume
 * @return DStream of (Kafka message key, Kafka message value)
 */
public static JavaPairReceiverInputDStream<String, String> createStream(
  JavaStreamingContext jssc,
  String zkQuorum,
  String groupId,
  Map<String, Integer> topics
)

/**
 * Create receiver-based stream with custom storage level.
 */
public static JavaPairReceiverInputDStream<String, String> createStream(
  JavaStreamingContext jssc,
  String zkQuorum,
  String groupId,
  Map<String, Integer> topics,
  StorageLevel storageLevel
)

Usage Example:

import org.apache.spark.storage.StorageLevel;

Map<String, Integer> topics = new HashMap<>();
topics.put("user-events", 1);
topics.put("system-logs", 2);

JavaPairReceiverInputDStream<String, String> receiverStream = KafkaUtils.createStream(
    jssc,
    "localhost:2181",
    "my-consumer-group",
    topics,
    StorageLevel.MEMORY_AND_DISK_SER_2()
);

receiverStream.foreachRDD(rdd -> {
    System.out.println("Batch size: " + rdd.count());
    rdd.foreach(record -> {
        System.out.println("Received: " + record._1 + " -> " + record._2);
    });
});

Java Batch RDD Creation

Java API for creating RDDs from Kafka with precise offset control.

/**
 * Create a RDD from Kafka using offset ranges (Java API).
 *
 * @param jsc JavaSparkContext object
 * @param keyClass type of Kafka message key
 * @param valueClass type of Kafka message value
 * @param keyDecoderClass type of Kafka message key decoder
 * @param valueDecoderClass type of Kafka message value decoder
 * @param kafkaParams Kafka configuration parameters
 * @param offsetRanges Each OffsetRange corresponds to a range of offsets
 * @tparam K type of Kafka message key
 * @tparam V type of Kafka message value
 * @tparam KD type of Kafka message key decoder
 * @tparam VD type of Kafka message value decoder
 * @return RDD of (Kafka message key, Kafka message value)
 */
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>> 
JavaPairRDD<K, V> createRDD(
  JavaSparkContext jsc,
  Class<K> keyClass,
  Class<V> valueClass,
  Class<KD> keyDecoderClass,
  Class<VD> valueDecoderClass,
  Map<String, String> kafkaParams,
  OffsetRange[] offsetRanges
)

Usage Example:

import org.apache.spark.streaming.kafka.OffsetRange;

// Create offset ranges
OffsetRange[] offsetRanges = {
    OffsetRange.create("events", 0, 1000, 2000),
    OffsetRange.create("events", 1, 500, 1500),
    OffsetRange.create("logs", 0, 100, 200)
};

// Create RDD
JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(
    jsc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParams,
    offsetRanges
);

// Process data
System.out.println("Total messages: " + rdd.count());
rdd.foreach(record -> {
    System.out.println("Key: " + record._1 + ", Value: " + record._2);
});

Java Type System Integration

Generic Type Support

The Java API fully supports generic types with proper type safety:

// Custom key/value types
public class UserEvent {
    public String userId;
    public String eventType;
    public long timestamp;
}

public class CustomDecoder implements Decoder<UserEvent> {
    @Override
    public UserEvent fromBytes(byte[] bytes) {
        // Custom deserialization logic
        return parseUserEvent(bytes);
    }
}

// Type-safe stream creation
JavaPairInputDStream<String, UserEvent> typedStream = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    UserEvent.class,
    StringDecoder.class,
    CustomDecoder.class,
    kafkaParams,
    topics
);

Working with HasOffsetRanges

Accessing offset information from Java RDDs:

import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.OffsetRange;

stream.foreachRDD(rdd -> {
    // Cast to HasOffsetRanges to access offset information
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
    for (OffsetRange offsetRange : offsetRanges) {
        System.out.printf("Topic: %s, Partition: %d, Range: [%d, %d)%n",
            offsetRange.topic(),
            offsetRange.partition(),
            offsetRange.fromOffset(),
            offsetRange.untilOffset()
        );
    }
    
    // Process the data
    rdd.foreach(record -> processRecord(record));
});

Java Collections Integration

Working with Java collections for configuration:

import java.util.*;

// Kafka parameters using Java Maps
Map<String, String> kafkaParams = new HashMap<String, String>() {{
    put("metadata.broker.list", "localhost:9092");
    put("auto.offset.reset", "largest");
    put("group.id", "my-consumer-group");
}};

// Topics using Java Sets
Set<String> topics = new HashSet<String>() {{
    add("user-events");
    add("system-logs");
}};

// Topic partitions using Java Maps
Map<String, Integer> topicPartitions = new HashMap<String, Integer>() {{
    put("user-events", 2);
    put("system-logs", 1);
}};

Lambda Expression Support (Java 8+)

Stream Processing with Lambdas

// Direct stream with lambda processing
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(/*...*/);

stream
    .filter(record -> record._1 != null && !record._1.isEmpty())
    .map(record -> record._1.toUpperCase() + ":" + record._2)
    .foreachRDD(rdd -> {
        rdd.foreach(System.out::println);
    });

Custom Message Handlers with Lambdas

// Lambda message handler
Function<MessageAndMetadata<String, String>, ProcessedMessage> handler = 
    mmd -> new ProcessedMessage(
        mmd.topic(),
        mmd.partition(),
        mmd.offset(),
        mmd.key(),
        mmd.message(),
        System.currentTimeMillis()
    );

JavaInputDStream<ProcessedMessage> processedStream = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    ProcessedMessage.class,
    kafkaParams,
    fromOffsets,
    handler
);

Exception Handling

Java Exception Patterns

try {
    JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        jssc, String.class, String.class,
        StringDecoder.class, StringDecoder.class,
        kafkaParams, topics
    );
    
    stream.foreachRDD(rdd -> {
        try {
            rdd.foreach(record -> {
                processRecord(record._1, record._2);
            });
        } catch (Exception e) {
            System.err.println("Error processing RDD: " + e.getMessage());
            e.printStackTrace();
        }
    });
    
} catch (Exception e) {
    System.err.println("Error creating Kafka stream: " + e.getMessage());
    throw new RuntimeException("Failed to initialize Kafka streaming", e);
}

Handling Serialization Issues

// Custom error-handling decoder
public class SafeStringDecoder implements Decoder<String> {
    private final StringDecoder delegate = new StringDecoder();
    
    @Override
    public String fromBytes(byte[] bytes) {
        try {
            return delegate.fromBytes(bytes);
        } catch (Exception e) {
            System.err.println("Failed to decode message: " + e.getMessage());
            return "<DECODE_ERROR>";
        }
    }
}

Integration Patterns

Spring Framework Integration

@Component
public class KafkaStreamingService {
    
    @Autowired
    private JavaStreamingContext streamingContext;
    
    @Value("${kafka.brokers}")
    private String brokers;
    
    @PostConstruct
    public void initializeStreams() {
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);
        
        Set<String> topics = Set.of("events");
        
        JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
            streamingContext,
            String.class, String.class,
            StringDecoder.class, StringDecoder.class,
            kafkaParams, topics
        );
        
        stream.foreachRDD(this::processRDD);
    }
    
    private void processRDD(JavaPairRDD<String, String> rdd) {
        rdd.foreach(record -> {
            // Business logic here
            handleMessage(record._1, record._2);
        });
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kafka-assembly-2-10

docs

batch-rdd.md

direct-streaming.md

index.md

java-api.md

offset-management.md

receiver-streaming.md

tile.json