Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management
—
Complete Java API support for Spark Streaming Kinesis integration, providing Java-friendly method signatures, functional interfaces, and seamless integration with Java applications and frameworks.
Create streams with custom type transformation using Java Function interfaces.
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass
);Parameters:
jssc - JavaStreamingContext objectkinesisAppName - Kinesis application name for KCL coordinationstreamName - Kinesis stream nameendpointUrl - Kinesis service endpoint URLregionName - AWS region nameinitialPositionInStream - Starting position (LATEST or TRIM_HORIZON)checkpointInterval - Checkpoint frequency using DurationstorageLevel - Spark storage level for received datamessageHandler - Function interface for Record transformationrecordClass - Class object for type TUsage Example:
import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;
import org.json.JSONObject;
// Define message handler for JSON processing
Function<Record, JSONObject> jsonHandler = new Function<Record, JSONObject>() {
@Override
public JSONObject call(Record record) throws Exception {
String data = new String(record.getData().array());
return new JSONObject(data);
}
};
// Create stream
JavaReceiverInputDStream<JSONObject> jsonStream = KinesisUtils.createStream(
jssc,
"java-json-processor",
"json-events-stream",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
jsonHandler,
JSONObject.class
);
// Process JSON stream
jsonStream.foreachRDD(rdd -> {
rdd.foreach(json -> {
System.out.println("Event ID: " + json.getString("eventId"));
System.out.println("Timestamp: " + json.getLong("timestamp"));
});
});public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass,
String awsAccessKeyId,
String awsSecretKey
);Usage Example:
// Custom message handler with error handling
Function<Record, String> safeStringHandler = new Function<Record, String>() {
@Override
public String call(Record record) throws Exception {
try {
return new String(record.getData().array(), "UTF-8");
} catch (Exception e) {
return "ERROR: " + e.getMessage();
}
}
};
JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(
jssc,
"secure-java-app",
"secure-text-stream",
"https://kinesis.us-west-2.amazonaws.com",
"us-west-2",
InitialPositionInStream.TRIM_HORIZON,
Durations.seconds(45),
StorageLevel.MEMORY_AND_DISK_2(),
safeStringHandler,
String.class,
System.getenv("AWS_ACCESS_KEY_ID"),
System.getenv("AWS_SECRET_ACCESS_KEY")
);Create streams returning raw byte arrays using default message handling.
public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel
);Usage Example:
// Create byte array stream
JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(
jssc,
"java-binary-processor",
"binary-data-stream",
"https://kinesis.ap-southeast-1.amazonaws.com",
"ap-southeast-1",
InitialPositionInStream.LATEST,
Durations.seconds(60),
StorageLevel.MEMORY_AND_DISK_2()
);
// Convert bytes to strings and process
JavaDStream<String> stringStream = byteStream.map(
bytes -> new String(bytes, "UTF-8")
);
// Filter and transform
JavaDStream<String> processedStream = stringStream
.filter(text -> text.length() > 10)
.map(text -> text.toUpperCase());
processedStream.print();public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
String awsAccessKeyId,
String awsSecretKey
);@Deprecated
public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String streamName,
String endpointUrl,
Duration checkpointInterval,
InitialPositionInStream initialPositionInStream,
StorageLevel storageLevel
);Modern Java applications can use lambda expressions for cleaner message handling:
// Lambda expression for message handling
JavaReceiverInputDStream<String> lambdaStream = KinesisUtils.createStream(
jssc,
"lambda-app",
"text-stream",
"https://kinesis.eu-west-1.amazonaws.com",
"eu-west-1",
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
record -> new String(record.getData().array()), // Lambda expression
String.class
);
// Process with lambda expressions
lambdaStream
.filter(text -> !text.isEmpty())
.map(String::trim)
.foreachRDD(rdd -> {
rdd.collect().forEach(System.out::println);
});// Using method references for common transformations
JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(
jssc, "app", "stream", endpoint, region,
InitialPositionInStream.LATEST, Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
this::parseRecord, // Method reference
String.class
);
private String parseRecord(Record record) {
return new String(record.getData().array());
}// Robust error handling in message handlers
Function<Record, String> robustHandler = new Function<Record, String>() {
@Override
public String call(Record record) throws Exception {
try {
byte[] data = record.getData().array();
String text = new String(data, "UTF-8");
// Validate data
if (text.trim().isEmpty()) {
return null; // Filter out empty records
}
return text.trim();
} catch (Exception e) {
// Log error and return indicator
System.err.println("Error processing record: " + e.getMessage());
return "ERROR";
}
}
};import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class KinesisStreamProcessor {
@Value("${kinesis.app.name}")
private String kinesisAppName;
@Value("${kinesis.stream.name}")
private String streamName;
@Value("${aws.kinesis.endpoint}")
private String endpointUrl;
@Value("${aws.region}")
private String region;
public void startProcessing(JavaStreamingContext jssc) {
JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(
jssc,
kinesisAppName,
streamName,
endpointUrl,
region,
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
this::processRecord,
String.class
);
stream.foreachRDD(this::handleBatch);
}
private String processRecord(Record record) {
return new String(record.getData().array());
}
private void handleBatch(JavaRDD<String> rdd) {
// Process batch with Spring services
rdd.collect().forEach(this::processMessage);
}
private void processMessage(String message) {
// Business logic here
System.out.println("Processing: " + message);
}
}When using custom objects, ensure they are serializable:
import java.io.Serializable;
public class EventData implements Serializable {
private static final long serialVersionUID = 1L;
private String eventId;
private long timestamp;
private String payload;
// Constructors, getters, setters
public EventData(String eventId, long timestamp, String payload) {
this.eventId = eventId;
this.timestamp = timestamp;
this.payload = payload;
}
// Getters and setters...
}
// Message handler creating serializable objects
Function<Record, EventData> eventHandler = record -> {
String data = new String(record.getData().array());
JSONObject json = new JSONObject(data);
return new EventData(
json.getString("eventId"),
json.getLong("timestamp"),
json.getString("payload")
);
};// Create custom parameterized types
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
public class TypeReference<T> {
private final Type type;
protected TypeReference() {
Type superClass = getClass().getGenericSuperclass();
this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
}
public Type getType() {
return type;
}
}
// Usage with complex types
TypeReference<List<String>> typeRef = new TypeReference<List<String>>() {};import java.util.Optional;
// Message handler with Optional return
Function<Record, Optional<String>> optionalHandler = record -> {
try {
String data = new String(record.getData().array());
return data.trim().isEmpty() ? Optional.empty() : Optional.of(data);
} catch (Exception e) {
return Optional.empty();
}
};
// Filter out empty optionals
JavaReceiverInputDStream<Optional<String>> optionalStream = KinesisUtils.createStream(
jssc, "app", "stream", endpoint, region,
InitialPositionInStream.LATEST, Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2(),
optionalHandler,
Optional.class
);
JavaDStream<String> filteredStream = optionalStream
.filter(Optional::isPresent)
.map(Optional::get);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly