Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library
—
Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.
Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.
/**
* Create an input stream that pulls messages from a Kinesis stream using the KCL.
* Uses DefaultAWSCredentialsProviderChain for AWS authentication.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)
* @return JavaReceiverInputDStream<byte[]> containing raw message data
*/
public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel
);Usage Example:
import org.apache.spark.streaming.kinesis.KinesisUtils;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
jssc,
"MySparkKinesisApp",
"my-kinesis-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2()
);Creates a Kinesis input stream with explicitly provided AWS credentials.
/**
* Create an input stream with explicit AWS credentials.
* Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects
* @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)
* @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)
* @return JavaReceiverInputDStream<byte[]> containing raw message data
*/
public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
String awsAccessKeyId,
String awsSecretKey
);Usage Example:
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
jssc,
"MySparkKinesisApp",
"my-kinesis-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2(),
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
);Creates a typed Kinesis input stream with a custom message handler function.
/**
* Create an input stream with a custom message handler for type-safe data processing.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects
* @param messageHandler Custom function to process Kinesis Records into type T
* @param recordClass Class object for type T (required for Java type erasure)
* @return JavaReceiverInputDStream<T> containing processed data
*/
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass
);Usage Example:
import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;
// Define data class
public class MyEvent implements Serializable {
private String id;
private long timestamp;
private String data;
// Constructors, getters, setters...
public MyEvent() {}
public MyEvent(String id, long timestamp, String data) {
this.id = id;
this.timestamp = timestamp;
this.data = data;
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getData() { return data; }
public void setData(String data) { this.data = data; }
}
// Custom message handler
Function<Record, MyEvent> parseMyEvent = new Function<Record, MyEvent>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public MyEvent call(Record record) throws Exception {
byte[] data = new byte[record.getData().remaining()];
record.getData().get(data);
String json = new String(data, "UTF-8");
return mapper.readValue(json, MyEvent.class);
}
};
JavaReceiverInputDStream<MyEvent> stream = KinesisUtils.createStream(
jssc,
"MySparkKinesisApp",
"my-events-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2(),
parseMyEvent,
MyEvent.class
);Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.
/**
* Create an input stream with custom message handler and explicit AWS credentials.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
* @param initialPositionInStream Starting position in stream
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param storageLevel Storage level for received objects
* @param messageHandler Custom function to process Kinesis Records into type T
* @param recordClass Class object for type T
* @param awsAccessKeyId AWS AccessKeyId
* @param awsSecretKey AWS SecretKey
* @return JavaReceiverInputDStream<T> containing processed data
*/
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass,
String awsAccessKeyId,
String awsSecretKey
);Simplified stream creation method (deprecated since version 1.4.0).
/**
* Create an input stream using app name from SparkConf and region from endpoint.
* @deprecated use other forms of createStream
*
* @param jssc Java StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
* @param initialPositionInStream Starting position in stream
* @param storageLevel Storage level for received objects
* @return JavaReceiverInputDStream<byte[]> containing raw message data
*/
@Deprecated
public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String streamName,
String endpointUrl,
Duration checkpointInterval,
InitialPositionInStream initialPositionInStream,
StorageLevel storageLevel
);import org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;
// Convert bytes to string
Function<Record, String> textConverter = new Function<Record, String>() {
@Override
public String call(Record record) throws Exception {
byte[] data = new byte[record.getData().remaining()];
record.getData().get(data);
return new String(data, "UTF-8");
}
};
JavaReceiverInputDStream<String> textStream = KinesisUtils.createStream(
jssc,
"TextProcessor",
"text-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2(),
textConverter,
String.class
);
// Process text messages
textStream.foreachRDD(rdd -> {
rdd.foreach(text -> {
System.out.println("Received: " + text);
});
return null;
});import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;
// JSON parser with error handling
Function<Record, Optional<JsonNode>> jsonParser = new Function<Record, Optional<JsonNode>>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public Optional<JsonNode> call(Record record) throws Exception {
try {
byte[] data = new byte[record.getData().remaining()];
record.getData().get(data);
String json = new String(data, "UTF-8");
JsonNode node = mapper.readTree(json);
return Optional.of(node);
} catch (Exception e) {
System.err.println("Failed to parse JSON: " + e.getMessage());
return Optional.empty();
}
}
};
JavaReceiverInputDStream<Optional<JsonNode>> jsonStream = KinesisUtils.createStream(
jssc,
"JsonProcessor",
"json-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2(),
jsonParser,
Optional.class
);
// Filter and process valid JSON
JavaDStream<JsonNode> validJson = jsonStream.flatMap(opt -> {
return opt.isPresent() ?
Arrays.asList(opt.get()).iterator() :
Collections.emptyList().iterator();
});// Data class that includes metadata
public class EnrichedMessage implements Serializable {
private String data;
private String partitionKey;
private String sequenceNumber;
private long arrivalTime;
public EnrichedMessage(String data, String partitionKey,
String sequenceNumber, long arrivalTime) {
this.data = data;
this.partitionKey = partitionKey;
this.sequenceNumber = sequenceNumber;
this.arrivalTime = arrivalTime;
}
// Getters...
public String getData() { return data; }
public String getPartitionKey() { return partitionKey; }
public String getSequenceNumber() { return sequenceNumber; }
public long getArrivalTime() { return arrivalTime; }
}
// Message handler that captures metadata
Function<Record, EnrichedMessage> enrichedHandler = new Function<Record, EnrichedMessage>() {
@Override
public EnrichedMessage call(Record record) throws Exception {
byte[] bytes = new byte[record.getData().remaining()];
record.getData().get(bytes);
String data = new String(bytes, "UTF-8");
return new EnrichedMessage(
data,
record.getPartitionKey(),
record.getSequenceNumber(),
record.getApproximateArrivalTimestamp().getTime()
);
}
};
JavaReceiverInputDStream<EnrichedMessage> enrichedStream =
KinesisUtils.createStream(
jssc,
"EnrichedProcessor",
"enriched-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2(),
enrichedHandler,
EnrichedMessage.class
);Complete example demonstrating Java API usage with word counting:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.regex.Pattern;
public class JavaKinesisWordCount {
private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKinesisWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
// Create Kinesis stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
jssc,
"JavaWordCount",
"word-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2()
);
// Convert bytes to strings and split into words
JavaDStream<String> words = kinesisStream.flatMap(
new FlatMapFunction<byte[], String>() {
@Override
public Iterator<String> call(byte[] line) {
String text = new String(line);
return Arrays.asList(WORD_SEPARATOR.split(text)).iterator();
}
}
);
// Count words
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}
).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
// Print results
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}The Java API uses Spark's Function interfaces for message handlers:
// Main function interface for message handlers
org.apache.spark.api.java.function.Function<Record, T>
// For operations that may throw exceptions
org.apache.spark.api.java.function.Function<Record, T> {
T call(Record record) throws Exception;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10