Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly@1.6.0Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams. Provides fault-tolerant, scalable stream processing with automatic checkpointing, shard management, and configurable parallelism through the Kinesis Client Library (KCL).
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
<version>1.6.2</version>
</dependency>import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Recordimport org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val conf = new SparkConf().setAppName("KinesisExample")
val ssc = new StreamingContext(conf, Seconds(10))
// Create Kinesis stream
val kinesisStream = KinesisUtils.createStream(
ssc,
"myKinesisApp",
"myStreamName",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Seconds(30),
StorageLevel.MEMORY_AND_DISK_2
)
// Process the stream
kinesisStream.map(new String(_)).print()
ssc.start()
ssc.awaitTermination()import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.storage.StorageLevel;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// Create Kinesis stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
jssc,
"myKinesisApp",
"myStreamName",
"https://kinesis.us-east-1.amazonaws.com",
"us-east-1",
InitialPositionInStream.LATEST,
Durations.seconds(30),
StorageLevel.MEMORY_AND_DISK_2()
);
// Process the stream
kinesisStream.map(bytes -> new String(bytes)).print();
jssc.start();
jssc.awaitTermination();The Spark Streaming Kinesis integration is built around several key components:
Core functionality for creating Kinesis input streams with various configuration options including custom message handlers, credential specifications, and both Scala and Java APIs.
object KinesisUtils {
// Generic stream creation with custom message handler
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T
): ReceiverInputDStream[T]
// Default byte array stream creation
def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]
}Authentication and credential handling for AWS Kinesis access, supporting both default credential providers and explicit credential specification.
// Stream creation with explicit AWS credentials
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[T]Complete Java API compatibility with functional interfaces and Java-friendly method signatures for integration with Java applications.
// Java API for generic stream creation
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass
);Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures.
// Sequence number range for fault tolerance
case class SequenceNumberRange(
streamName: String,
shardId: String,
fromSeqNumber: String,
toSeqNumber: String
)
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty(): Boolean
def nonEmpty(): Boolean
}// AWS credential wrapper for serialization
case class SerializableAWSCredentials(
accessKeyId: String,
secretKey: String
) extends AWSCredentials {
def getAWSAccessKeyId: String
def getAWSSecretKey: String
}
// Message handler function type
type MessageHandler[T] = Record => T
// Java function interface for message handling
import org.apache.spark.api.java.function.{Function => JFunction}