Apache Spark Kinesis Integration providing real-time stream processing of data from Amazon Kinesis Data Streams
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-13@4.0.0Apache Spark Kinesis Integration enables real-time stream processing of data from Amazon Kinesis Data Streams using Spark Streaming. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to consume data from Kinesis streams and transform it into Spark DStreams for distributed processing.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.13</artifactId>
<version>4.0.0</version>
</dependency>For SBT:
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "4.0.0"import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, DefaultCredentials, BasicCredentials, STSCredentials}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Recordimport org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record
val conf = new SparkConf().setAppName("KinesisExample")
val ssc = new StreamingContext(conf, Seconds(10))
// Create Kinesis DStream using the builder pattern
val kinesisStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-kinesis-stream")
.checkpointAppName("my-app")
.regionName("us-east-1")
.initialPosition(new KinesisInitialPositions.Latest())
.build()
// Process the stream
kinesisStream.foreachRDD { rdd =>
rdd.foreach { bytes =>
val data = new String(bytes)
println(s"Received: $data")
}
}
ssc.start()
ssc.awaitTermination()The Spark Kinesis Integration is built around several key components:
Core functionality for creating Kinesis DStreams with comprehensive configuration options including authentication, checkpointing, and performance tuning.
object KinesisInputDStream {
def builder: KinesisInputDStream.Builder
}
class Builder {
def streamingContext(ssc: StreamingContext): Builder
def streamName(streamName: String): Builder
def checkpointAppName(appName: String): Builder
def build(): KinesisInputDStream[Array[Byte]]
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}Authentication system supporting multiple credential providers including default AWS chain, basic access keys, and STS assume role for cross-account access.
sealed trait SparkAWSCredentials {
def provider: AWSCredentialsProvider
}
object SparkAWSCredentials {
def builder: SparkAWSCredentials.Builder
}
case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String], longLivedCreds: SparkAWSCredentials) extends SparkAWSCredentialsConfiguration system for specifying where to start reading from Kinesis streams, supporting latest records, oldest records, or specific timestamps.
interface KinesisInitialPosition {
def getPosition(): InitialPositionInStream
}
object KinesisInitialPositions {
class Latest extends KinesisInitialPosition
class TrimHorizon extends KinesisInitialPosition
class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
}Advanced configuration options for retry logic, timeouts, storage levels, metrics, and checkpointing intervals to optimize performance and reliability.
case class KinesisReadConfigurations(
maxRetries: Int,
retryWaitTimeMs: Long,
retryTimeoutMs: Long
)
object KinesisReadConfigurations {
def apply(): KinesisReadConfigurations
def apply(ssc: StreamingContext): KinesisReadConfigurations
}Internal utilities for PySpark integration providing Python-friendly interfaces to the Scala/Java APIs.
private class KinesisUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: Int,
checkpointInterval: Duration,
metricsLevel: Int,
storageLevel: StorageLevel,
awsAccessKeyId: String,
awsSecretKey: String,
stsAssumeRoleArn: String,
stsSessionName: String,
stsExternalId: String
): JavaReceiverInputDStream[Array[Byte]]
}// Core DStream type
class KinesisInputDStream[T: ClassTag] extends ReceiverInputDStream[T]
// Sequence number tracking for fault tolerance
case class SequenceNumberRange(
streamName: String,
shardId: String,
fromSeqNumber: String,
toSeqNumber: String,
recordCount: Int
)
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty: Boolean
def nonEmpty: Boolean
}
// Fault tolerant RDD implementation
class KinesisBackedBlockRDD[T: ClassTag](
sc: SparkContext,
regionName: String,
endpointUrl: String,
blockIds: Array[BlockId],
seqNumRanges: Array[SequenceNumberRanges],
isBlockIdValid: Array[Boolean],
messageHandler: Record => T,
kinesisCreds: SparkAWSCredentials,
kinesisReadConfigs: KinesisReadConfigurations
) extends BlockRDD[T]