or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-12

Apache Spark Streaming integration with Amazon Kinesis for real-time data processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl_2.12@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-12@2.4.0

index.mddocs/

Spark Streaming Kinesis ASL

Apache Spark Streaming integration with Amazon Kinesis Data Streams for real-time processing of streaming data at massive scale. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to create input DStreams with built-in load balancing, fault tolerance, and checkpointing capabilities.

Package Information

  • Package Name: spark-streaming-kinesis-asl_2.12
  • Package Type: maven
  • Language: Scala (with Java support)
  • Group ID: org.apache.spark
  • Installation: Add to pom.xml or build.sbt with Amazon Software License terms

Maven dependency:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
  <version>2.4.8</version>
</dependency>

SBT dependency:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.8"

Core Imports

import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
import org.apache.spark.streaming.kinesis.SparkAWSCredentials

Legacy imports (deprecated):

import org.apache.spark.streaming.kinesis.KinesisUtils

Basic Usage

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
import org.apache.spark.streaming.{Seconds, Duration}
import org.apache.spark.storage.StorageLevel

// Create streaming context
val ssc = new StreamingContext(sparkConf, Seconds(10))

// Create Kinesis input stream using Builder pattern (recommended)
val kinesisStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-kinesis-stream")
  .endpointUrl("https://kinesis.us-west-2.amazonaws.com")
  .regionName("us-west-2")
  .initialPosition(new Latest())
  .checkpointAppName("my-spark-app")
  .checkpointInterval(Duration(30000))
  .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
  .build()

// Process the stream
kinesisStream.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    // Process each record as Array[Byte]
    rdd.collect().foreach { record =>
      val data = new String(record, "UTF-8")
      println(s"Received: $data")
    }
  }
}

// Start the streaming context
ssc.start()
ssc.awaitTermination()

Architecture

The Spark Kinesis ASL integration consists of several key components:

  • KinesisInputDStream: Core DStream implementation that creates Kinesis-backed RDDs
  • KinesisReceiver: Receiver implementation using Amazon KCL for consuming data
  • Builder Pattern: Modern configuration API for flexible stream creation
  • AWS Credentials: Pluggable credential providers supporting various authentication methods
  • Checkpointing: Automatic state management through DynamoDB for fault tolerance
  • Recovery: Sequence number-based recovery mechanism for exactly-once processing

Capabilities

Stream Creation

Primary interface for creating Kinesis input streams with full configuration options using the modern Builder pattern.

object KinesisInputDStream {
  def builder: Builder
  
  class Builder {
    def streamingContext(ssc: StreamingContext): Builder
    def streamName(streamName: String): Builder
    def checkpointAppName(appName: String): Builder
    def build(): KinesisInputDStream[Array[Byte]]
    def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
  }
}

Stream Creation

AWS Credentials Management

Flexible credential providers supporting default AWS credentials, basic key/secret authentication, and STS role assumption.

sealed trait SparkAWSCredentials {
  def provider: AWSCredentialsProvider
}

object SparkAWSCredentials {
  def builder: Builder
  
  class Builder {
    def basicCredentials(accessKeyId: String, secretKey: String): Builder
    def stsCredentials(roleArn: String, sessionName: String): Builder
    def build(): SparkAWSCredentials
  }
}

case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String]) extends SparkAWSCredentials

AWS Credentials

Initial Stream Positioning

Configuration for specifying where to start reading from Kinesis streams, supporting latest, trim horizon, and timestamp-based positioning.

// Java classes for initial position specification
class Latest extends KinesisInitialPosition
class TrimHorizon extends KinesisInitialPosition  
class AtTimestamp extends KinesisInitialPosition {
  def AtTimestamp(timestamp: java.util.Date)
  def getTimestamp(): java.util.Date
}

Stream Positioning

Configuration Management

Advanced configuration options for controlling retry behavior, timeouts, and performance tuning.

case class KinesisReadConfigurations(
  maxRetries: Int,
  retryWaitTimeMs: Long, 
  retryTimeoutMs: Long
)

object KinesisReadConfigurations {
  def apply(): KinesisReadConfigurations
  def apply(ssc: StreamingContext): KinesisReadConfigurations
  
  val RETRY_MAX_ATTEMPTS_KEY: String
  val RETRY_WAIT_TIME_KEY: String
  val DEFAULT_MAX_RETRIES: Int
  val DEFAULT_RETRY_WAIT_TIME: String
  val DEFAULT_RETRY_TIMEOUT: Long
}

Configuration

Testing Utilities

Utilities for testing Kinesis integration including stream creation, data generation, and cleanup operations.

class KinesisTestUtils(streamShardCount: Int = 2) {
  def streamName: String
  def createStream(): Unit
  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]]
  def deleteStream(): Unit
}

object KinesisTestUtils {
  val shouldRunTests: Boolean
  val endpointUrl: String
  def isAWSCredentialsPresent: Boolean
}

Testing Utilities

Error Handling

The library can throw the following exceptions:

  • IllegalArgumentException - For invalid configuration parameters (region names, credentials, etc.)
  • UnsupportedOperationException - For unsupported initial position types in legacy APIs
  • AWS SDK exceptions - For authentication failures, network issues, and service errors
  • KCL exceptions - For Kinesis-specific operational errors during stream processing

Legacy KinesisUtils (Deprecated)

Legacy factory methods for creating Kinesis streams (deprecated since Spark 2.2.0, still functional).

@deprecated("Use KinesisInputDStream.builder instead", "2.2.0")
object KinesisUtils {
  def createStream[T: ClassTag](
    ssc: StreamingContext,
    kinesisAppName: String,
    streamName: String, 
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: InitialPositionInStream,
    checkpointInterval: Duration,
    storageLevel: StorageLevel,
    messageHandler: Record => T
  ): ReceiverInputDStream[T]
  
  def createStream[T: ClassTag](
    ssc: StreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String, 
    regionName: String,
    initialPositionInStream: InitialPositionInStream,
    checkpointInterval: Duration,
    storageLevel: StorageLevel
  ): ReceiverInputDStream[Array[Byte]]
}

Migration Notes

The legacy KinesisUtils factory methods are deprecated since Spark 2.2.0. Use KinesisInputDStream.builder for new development:

// Deprecated (still functional)
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, ...)

// Recommended
KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName(streamName)
  .checkpointAppName(appName)
  .endpointUrl(endpointUrl)
  .regionName(regionName)
  .build()