Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-12@3.5.0Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data at massive scale. This connector provides a Kinesis receiver that creates input DStreams using the Amazon Kinesis Client Library (KCL), enabling load-balancing, fault-tolerance, and checkpointing capabilities through Workers, Checkpoints, and Shard Leases.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
<version>3.5.6</version>
</dependency>import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import com.amazonaws.services.kinesis.model.Record
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import scala.reflect.ClassTagFor Java:
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;For Python:
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create Kinesis DStream using builder pattern
val kinesisStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("myKinesisStream")
.checkpointAppName("myKinesisApp")
.regionName("us-east-1")
.initialPosition(new Latest())
.build()
// Process the stream
kinesisStream.map(new String(_))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()The Spark Kinesis connector uses the following key components:
Create Kinesis input streams with comprehensive configuration options including credentials, initial positions, and metrics.
object KinesisInputDStream {
def builder: Builder
def defaultMessageHandler: Record => Array[Byte]
}
class Builder {
def streamingContext(ssc: StreamingContext): Builder
def streamingContext(jssc: JavaStreamingContext): Builder
def streamName(streamName: String): Builder
def checkpointAppName(appName: String): Builder
def endpointUrl(url: String): Builder
def regionName(regionName: String): Builder
def initialPosition(initialPosition: KinesisInitialPosition): Builder
def checkpointInterval(interval: Duration): Builder
def storageLevel(storageLevel: StorageLevel): Builder
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
def metricsLevel(metricsLevel: MetricsLevel): Builder
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder
def build(): KinesisInputDStream[Array[Byte]]
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}Stream Creation and Configuration
Configure where to start reading from Kinesis streams with support for latest, earliest, and timestamp-based positioning.
object KinesisInitialPositions {
class Latest() extends KinesisInitialPosition
class TrimHorizon() extends KinesisInitialPosition
class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
}Initial Position Configuration
Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns.
object SparkAWSCredentials {
def builder: Builder
}
class Builder {
def basicCredentials(accessKeyId: String, secretKey: String): Builder
def stsCredentials(roleArn: String, sessionName: String): Builder
def build(): SparkAWSCredentials
}Python interface for creating Kinesis streams with simplified parameter handling.
class KinesisUtils:
@staticmethod
def createStream(
ssc: StreamingContext,
kinesisAppName: str,
streamName: str,
endpointUrl: str,
regionName: str,
initialPositionInStream: int,
checkpointInterval: int,
**kwargs
) -> DStream// Core interface types
trait KinesisInitialPosition {
def getPosition(): InitialPositionInStream
}
sealed trait SparkAWSCredentials extends Serializable {
def provider: AWSCredentialsProvider
}
// Concrete credential implementations
case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(
awsAccessKeyId: String,
awsSecretKey: String
) extends SparkAWSCredentials
case class STSCredentials(
stsRoleArn: String,
stsSessionName: String,
stsExternalId: Option[String] = None,
longLivedCreds: SparkAWSCredentials = DefaultCredentials
) extends SparkAWSCredentials