Apache Spark Streaming integration with Amazon Kinesis for real-time data processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-12@2.4.0Apache Spark Streaming integration with Amazon Kinesis Data Streams for real-time processing of streaming data at massive scale. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to create input DStreams with built-in load balancing, fault tolerance, and checkpointing capabilities.
pom.xml or build.sbt with Amazon Software License termsMaven dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
<version>2.4.8</version>
</dependency>SBT dependency:
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.8"import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
import org.apache.spark.streaming.kinesis.SparkAWSCredentialsLegacy imports (deprecated):
import org.apache.spark.streaming.kinesis.KinesisUtilsimport org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
import org.apache.spark.streaming.{Seconds, Duration}
import org.apache.spark.storage.StorageLevel
// Create streaming context
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create Kinesis input stream using Builder pattern (recommended)
val kinesisStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-kinesis-stream")
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
.regionName("us-west-2")
.initialPosition(new Latest())
.checkpointAppName("my-spark-app")
.checkpointInterval(Duration(30000))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
// Process the stream
kinesisStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
// Process each record as Array[Byte]
rdd.collect().foreach { record =>
val data = new String(record, "UTF-8")
println(s"Received: $data")
}
}
}
// Start the streaming context
ssc.start()
ssc.awaitTermination()The Spark Kinesis ASL integration consists of several key components:
Primary interface for creating Kinesis input streams with full configuration options using the modern Builder pattern.
object KinesisInputDStream {
def builder: Builder
class Builder {
def streamingContext(ssc: StreamingContext): Builder
def streamName(streamName: String): Builder
def checkpointAppName(appName: String): Builder
def build(): KinesisInputDStream[Array[Byte]]
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}
}Flexible credential providers supporting default AWS credentials, basic key/secret authentication, and STS role assumption.
sealed trait SparkAWSCredentials {
def provider: AWSCredentialsProvider
}
object SparkAWSCredentials {
def builder: Builder
class Builder {
def basicCredentials(accessKeyId: String, secretKey: String): Builder
def stsCredentials(roleArn: String, sessionName: String): Builder
def build(): SparkAWSCredentials
}
}
case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String]) extends SparkAWSCredentialsConfiguration for specifying where to start reading from Kinesis streams, supporting latest, trim horizon, and timestamp-based positioning.
// Java classes for initial position specification
class Latest extends KinesisInitialPosition
class TrimHorizon extends KinesisInitialPosition
class AtTimestamp extends KinesisInitialPosition {
def AtTimestamp(timestamp: java.util.Date)
def getTimestamp(): java.util.Date
}Advanced configuration options for controlling retry behavior, timeouts, and performance tuning.
case class KinesisReadConfigurations(
maxRetries: Int,
retryWaitTimeMs: Long,
retryTimeoutMs: Long
)
object KinesisReadConfigurations {
def apply(): KinesisReadConfigurations
def apply(ssc: StreamingContext): KinesisReadConfigurations
val RETRY_MAX_ATTEMPTS_KEY: String
val RETRY_WAIT_TIME_KEY: String
val DEFAULT_MAX_RETRIES: Int
val DEFAULT_RETRY_WAIT_TIME: String
val DEFAULT_RETRY_TIMEOUT: Long
}Utilities for testing Kinesis integration including stream creation, data generation, and cleanup operations.
class KinesisTestUtils(streamShardCount: Int = 2) {
def streamName: String
def createStream(): Unit
def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]]
def deleteStream(): Unit
}
object KinesisTestUtils {
val shouldRunTests: Boolean
val endpointUrl: String
def isAWSCredentialsPresent: Boolean
}The library can throw the following exceptions:
IllegalArgumentException - For invalid configuration parameters (region names, credentials, etc.)UnsupportedOperationException - For unsupported initial position types in legacy APIsLegacy factory methods for creating Kinesis streams (deprecated since Spark 2.2.0, still functional).
@deprecated("Use KinesisInputDStream.builder instead", "2.2.0")
object KinesisUtils {
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel,
messageHandler: Record => T
): ReceiverInputDStream[T]
def createStream[T: ClassTag](
ssc: StreamingContext,
kinesisAppName: String,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointInterval: Duration,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]
}The legacy KinesisUtils factory methods are deprecated since Spark 2.2.0. Use KinesisInputDStream.builder for new development:
// Deprecated (still functional)
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, ...)
// Recommended
KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.checkpointAppName(appName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.build()