or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-13

Apache Spark Kinesis Integration providing real-time stream processing of data from Amazon Kinesis Data Streams

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl_2.13@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-13@4.0.0

index.mddocs/

Apache Spark Kinesis Integration

Apache Spark Kinesis Integration enables real-time stream processing of data from Amazon Kinesis Data Streams using Spark Streaming. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to consume data from Kinesis streams and transform it into Spark DStreams for distributed processing.

Package Information

  • Package Name: spark-streaming-kinesis-asl_2.13
  • Package Type: maven
  • Group ID: org.apache.spark
  • Language: Scala
  • Installation: Add Maven dependency:
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kinesis-asl_2.13</artifactId>
  <version>4.0.0</version>
</dependency>

For SBT:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "4.0.0"

Core Imports

import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, DefaultCredentials, BasicCredentials, STSCredentials}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record

Basic Usage

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record

val conf = new SparkConf().setAppName("KinesisExample")
val ssc = new StreamingContext(conf, Seconds(10))

// Create Kinesis DStream using the builder pattern
val kinesisStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-kinesis-stream")
  .checkpointAppName("my-app")
  .regionName("us-east-1")
  .initialPosition(new KinesisInitialPositions.Latest())
  .build()

// Process the stream
kinesisStream.foreachRDD { rdd =>
  rdd.foreach { bytes =>
    val data = new String(bytes)
    println(s"Received: $data")
  }
}

ssc.start()
ssc.awaitTermination()

Architecture

The Spark Kinesis Integration is built around several key components:

  • KinesisInputDStream: The main DStream implementation that creates Kinesis receivers
  • KinesisReceiver: Manages KCL workers for consuming data from Kinesis shards
  • Builder Pattern: Fluent API for configuring Kinesis streams with required and optional parameters
  • Credentials Management: Flexible authentication supporting AWS default chain, basic credentials, and STS assume role
  • Fault Tolerance: Uses KinesisBackedBlockRDD for recovery from Kinesis data when local blocks are lost
  • Checkpointing: Automatic state management through DynamoDB for tracking consumption progress

Capabilities

Stream Creation

Core functionality for creating Kinesis DStreams with comprehensive configuration options including authentication, checkpointing, and performance tuning.

object KinesisInputDStream {
  def builder: KinesisInputDStream.Builder
}

class Builder {
  def streamingContext(ssc: StreamingContext): Builder
  def streamName(streamName: String): Builder  
  def checkpointAppName(appName: String): Builder
  def build(): KinesisInputDStream[Array[Byte]]
  def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}

Stream Creation

AWS Credentials Management

Authentication system supporting multiple credential providers including default AWS chain, basic access keys, and STS assume role for cross-account access.

sealed trait SparkAWSCredentials {
  def provider: AWSCredentialsProvider
}

object SparkAWSCredentials {
  def builder: SparkAWSCredentials.Builder
}

case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String], longLivedCreds: SparkAWSCredentials) extends SparkAWSCredentials

AWS Credentials

Initial Position Configuration

Configuration system for specifying where to start reading from Kinesis streams, supporting latest records, oldest records, or specific timestamps.

interface KinesisInitialPosition {
  def getPosition(): InitialPositionInStream
}

object KinesisInitialPositions {
  class Latest extends KinesisInitialPosition
  class TrimHorizon extends KinesisInitialPosition  
  class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
}

Initial Position

Configuration and Performance Tuning

Advanced configuration options for retry logic, timeouts, storage levels, metrics, and checkpointing intervals to optimize performance and reliability.

case class KinesisReadConfigurations(
  maxRetries: Int,
  retryWaitTimeMs: Long, 
  retryTimeoutMs: Long
)

object KinesisReadConfigurations {
  def apply(): KinesisReadConfigurations
  def apply(ssc: StreamingContext): KinesisReadConfigurations
}

Configuration

Python Integration

Internal utilities for PySpark integration providing Python-friendly interfaces to the Scala/Java APIs.

private class KinesisUtilsPythonHelper {
  def createStream(
    jssc: JavaStreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: Int,
    checkpointInterval: Duration,
    metricsLevel: Int,
    storageLevel: StorageLevel,
    awsAccessKeyId: String,
    awsSecretKey: String,
    stsAssumeRoleArn: String,
    stsSessionName: String,
    stsExternalId: String
  ): JavaReceiverInputDStream[Array[Byte]]
}

Python Integration

Types

// Core DStream type
class KinesisInputDStream[T: ClassTag] extends ReceiverInputDStream[T]

// Sequence number tracking for fault tolerance
case class SequenceNumberRange(
  streamName: String,
  shardId: String, 
  fromSeqNumber: String,
  toSeqNumber: String,
  recordCount: Int
)

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
  def isEmpty: Boolean
  def nonEmpty: Boolean
}

// Fault tolerant RDD implementation
class KinesisBackedBlockRDD[T: ClassTag](
  sc: SparkContext,
  regionName: String,
  endpointUrl: String,
  blockIds: Array[BlockId],
  seqNumRanges: Array[SequenceNumberRanges],
  isBlockIdValid: Array[Boolean],
  messageHandler: Record => T,
  kinesisCreds: SparkAWSCredentials,
  kinesisReadConfigs: KinesisReadConfigurations
) extends BlockRDD[T]