CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-13

Apache Spark Kinesis Integration providing real-time stream processing of data from Amazon Kinesis Data Streams

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Spark Kinesis Integration

Apache Spark Kinesis Integration enables real-time stream processing of data from Amazon Kinesis Data Streams using Spark Streaming. It provides a receiver-based approach using the Amazon Kinesis Client Library (KCL) to consume data from Kinesis streams and transform it into Spark DStreams for distributed processing.

Package Information

  • Package Name: spark-streaming-kinesis-asl_2.13
  • Package Type: maven
  • Group ID: org.apache.spark
  • Language: Scala
  • Installation: Add Maven dependency:
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kinesis-asl_2.13</artifactId>
  <version>4.0.0</version>
</dependency>

For SBT:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "4.0.0"

Core Imports

import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.{SparkAWSCredentials, DefaultCredentials, BasicCredentials, STSCredentials}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record

Basic Usage

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record

val conf = new SparkConf().setAppName("KinesisExample")
val ssc = new StreamingContext(conf, Seconds(10))

// Create Kinesis DStream using the builder pattern
val kinesisStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-kinesis-stream")
  .checkpointAppName("my-app")
  .regionName("us-east-1")
  .initialPosition(new KinesisInitialPositions.Latest())
  .build()

// Process the stream
kinesisStream.foreachRDD { rdd =>
  rdd.foreach { bytes =>
    val data = new String(bytes)
    println(s"Received: $data")
  }
}

ssc.start()
ssc.awaitTermination()

Architecture

The Spark Kinesis Integration is built around several key components:

  • KinesisInputDStream: The main DStream implementation that creates Kinesis receivers
  • KinesisReceiver: Manages KCL workers for consuming data from Kinesis shards
  • Builder Pattern: Fluent API for configuring Kinesis streams with required and optional parameters
  • Credentials Management: Flexible authentication supporting AWS default chain, basic credentials, and STS assume role
  • Fault Tolerance: Uses KinesisBackedBlockRDD for recovery from Kinesis data when local blocks are lost
  • Checkpointing: Automatic state management through DynamoDB for tracking consumption progress

Capabilities

Stream Creation

Core functionality for creating Kinesis DStreams with comprehensive configuration options including authentication, checkpointing, and performance tuning.

object KinesisInputDStream {
  def builder: KinesisInputDStream.Builder
}

class Builder {
  def streamingContext(ssc: StreamingContext): Builder
  def streamName(streamName: String): Builder  
  def checkpointAppName(appName: String): Builder
  def build(): KinesisInputDStream[Array[Byte]]
  def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}

Stream Creation

AWS Credentials Management

Authentication system supporting multiple credential providers including default AWS chain, basic access keys, and STS assume role for cross-account access.

sealed trait SparkAWSCredentials {
  def provider: AWSCredentialsProvider
}

object SparkAWSCredentials {
  def builder: SparkAWSCredentials.Builder
}

case object DefaultCredentials extends SparkAWSCredentials
case class BasicCredentials(awsAccessKeyId: String, awsSecretKey: String) extends SparkAWSCredentials
case class STSCredentials(stsRoleArn: String, stsSessionName: String, stsExternalId: Option[String], longLivedCreds: SparkAWSCredentials) extends SparkAWSCredentials

AWS Credentials

Initial Position Configuration

Configuration system for specifying where to start reading from Kinesis streams, supporting latest records, oldest records, or specific timestamps.

interface KinesisInitialPosition {
  def getPosition(): InitialPositionInStream
}

object KinesisInitialPositions {
  class Latest extends KinesisInitialPosition
  class TrimHorizon extends KinesisInitialPosition  
  class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
}

Initial Position

Configuration and Performance Tuning

Advanced configuration options for retry logic, timeouts, storage levels, metrics, and checkpointing intervals to optimize performance and reliability.

case class KinesisReadConfigurations(
  maxRetries: Int,
  retryWaitTimeMs: Long, 
  retryTimeoutMs: Long
)

object KinesisReadConfigurations {
  def apply(): KinesisReadConfigurations
  def apply(ssc: StreamingContext): KinesisReadConfigurations
}

Configuration

Python Integration

Internal utilities for PySpark integration providing Python-friendly interfaces to the Scala/Java APIs.

private class KinesisUtilsPythonHelper {
  def createStream(
    jssc: JavaStreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: Int,
    checkpointInterval: Duration,
    metricsLevel: Int,
    storageLevel: StorageLevel,
    awsAccessKeyId: String,
    awsSecretKey: String,
    stsAssumeRoleArn: String,
    stsSessionName: String,
    stsExternalId: String
  ): JavaReceiverInputDStream[Array[Byte]]
}

Python Integration

Types

// Core DStream type
class KinesisInputDStream[T: ClassTag] extends ReceiverInputDStream[T]

// Sequence number tracking for fault tolerance
case class SequenceNumberRange(
  streamName: String,
  shardId: String, 
  fromSeqNumber: String,
  toSeqNumber: String,
  recordCount: Int
)

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
  def isEmpty: Boolean
  def nonEmpty: Boolean
}

// Fault tolerant RDD implementation
class KinesisBackedBlockRDD[T: ClassTag](
  sc: SparkContext,
  regionName: String,
  endpointUrl: String,
  blockIds: Array[BlockId],
  seqNumRanges: Array[SequenceNumberRanges],
  isBlockIdValid: Array[Boolean],
  messageHandler: Record => T,
  kinesisCreds: SparkAWSCredentials,
  kinesisReadConfigs: KinesisReadConfigurations
) extends BlockRDD[T]
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl_2.13@4.0.x
Publish Source
CLI
Badge
tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-13 badge