Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10@1.6.0Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library (KCL). This library enables Spark Streaming applications to consume data from Amazon Kinesis streams with automatic load-balancing, fault-tolerance, and checkpointing capabilities.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
<version>1.6.3</version>
</dependency>libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.6.3"import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, StreamingContext}import org.apache.spark.streaming.kinesis.KinesisUtils;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, StreamingContext}
// Create Spark Streaming context
val ssc = new StreamingContext(sparkContext, Duration.milliseconds(2000))
// Create Kinesis input stream
val kinesisStream = KinesisUtils.createStream(
ssc,
"MyKinesisApp", // KCL application name
"MyKinesisStream", // Kinesis stream name
"https://kinesis.us-east-1.amazonaws.com", // Endpoint URL
"us-east-1", // Region name
InitialPositionInStream.LATEST, // Starting position
Duration.milliseconds(2000), // Checkpoint interval
StorageLevel.MEMORY_AND_DISK_2 // Storage level
)
// Process the stream
kinesisStream.foreachRDD { rdd =>
rdd.foreach { byteArray =>
println(new String(byteArray))
}
}
ssc.start()
ssc.awaitTermination()import org.apache.spark.streaming.kinesis.KinesisUtils;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
// Create Java Streaming context
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
// Create Kinesis input stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
jssc,
"MyKinesisApp", // KCL application name
"MyKinesisStream", // Kinesis stream name
"https://kinesis.us-east-1.amazonaws.com", // Endpoint URL
"us-east-1", // Region name
InitialPositionInStream.LATEST, // Starting position
new Duration(2000), // Checkpoint interval
StorageLevel.MEMORY_AND_DISK_2() // Storage level
);
// Process the stream
kinesisStream.foreachRDD(rdd -> {
rdd.foreach(byteArray -> {
System.out.println(new String(byteArray));
return null;
});
return null;
});
jssc.start();
jssc.awaitTermination();The Spark Streaming Kinesis ASL integration is built around several key components:
Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.
// Basic stream creation with default byte array handler
def createStream(
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]
// Stream creation with custom message handler
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T
): ReceiverInputDStream[T]Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.
// Custom message handler function type
type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T
// Default message handler for byte arrays
def defaultMessageHandler(record: Record): Array[Byte]Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.
// Java API stream creation
public static JavaReceiverInputDStream<byte[]> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel
);
// Java API with custom message handler
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String kinesisAppName,
String streamName,
String endpointUrl,
String regionName,
InitialPositionInStream initialPositionInStream,
Duration checkpointInterval,
StorageLevel storageLevel,
Function<Record, T> messageHandler,
Class<T> recordClass
);Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.
// Stream creation with explicit AWS credentials
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T,
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[T]// AWS Credentials wrapper for serialization
case class SerializableAWSCredentials(
accessKeyId: String,
secretKey: String
) extends AWSCredentials
// Sequence number range for fault tolerance
case class SequenceNumberRange(
streamName: String,
shardId: String,
fromSeqNumber: String,
toSeqNumber: String
)
// Collection of sequence number ranges
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty(): Boolean
def nonEmpty(): Boolean
}
// External AWS KCL types (from com.amazonaws.services.kinesis.clientlibrary.lib.worker)
// InitialPositionInStream enum values:
// - InitialPositionInStream.LATEST: Start from most recent records
// - InitialPositionInStream.TRIM_HORIZON: Start from oldest available records (up to 24 hours)