or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-12

Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl_2-12@3.5.0

index.mddocs/

Apache Spark Streaming Kinesis Connector

Apache Spark Streaming integration with Amazon Kinesis for real-time processing of streaming data at massive scale. This connector provides a Kinesis receiver that creates input DStreams using the Amazon Kinesis Client Library (KCL), enabling load-balancing, fault-tolerance, and checkpointing capabilities through Workers, Checkpoints, and Shard Leases.

Package Information

  • Package Name: spark-streaming-kinesis-asl_2.12
  • Package Type: maven
  • Language: Scala
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
      <version>3.5.6</version>
    </dependency>

Core Imports

import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import org.apache.spark.streaming.kinesis.SparkAWSCredentials
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import com.amazonaws.services.kinesis.model.Record
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import scala.reflect.ClassTag

For Java:

import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;

For Python:

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

Basic Usage

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create Kinesis DStream using builder pattern
val kinesisStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("myKinesisStream")
  .checkpointAppName("myKinesisApp")
  .regionName("us-east-1")
  .initialPosition(new Latest())
  .build()

// Process the stream
kinesisStream.map(new String(_))
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .print()

ssc.start()
ssc.awaitTermination()

Architecture

The Spark Kinesis connector uses the following key components:

  • KinesisInputDStream: The main DStream implementation that creates Kinesis receivers
  • KinesisReceiver: Receives data from Kinesis shards using the KCL
  • KinesisBackedBlockRDD: Enables fault-tolerant recovery by re-reading data from Kinesis
  • SparkAWSCredentials: Provides flexible AWS credential management
  • Checkpointing: Uses DynamoDB for KCL state and Spark checkpointing for stream processing state

Capabilities

Stream Creation

Create Kinesis input streams with comprehensive configuration options including credentials, initial positions, and metrics.

object KinesisInputDStream {
  def builder: Builder
  def defaultMessageHandler: Record => Array[Byte]
}

class Builder {
  def streamingContext(ssc: StreamingContext): Builder
  def streamingContext(jssc: JavaStreamingContext): Builder
  def streamName(streamName: String): Builder
  def checkpointAppName(appName: String): Builder
  def endpointUrl(url: String): Builder
  def regionName(regionName: String): Builder
  def initialPosition(initialPosition: KinesisInitialPosition): Builder
  def checkpointInterval(interval: Duration): Builder
  def storageLevel(storageLevel: StorageLevel): Builder
  def kinesisCredentials(credentials: SparkAWSCredentials): Builder
  def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
  def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
  def metricsLevel(metricsLevel: MetricsLevel): Builder
  def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder
  def build(): KinesisInputDStream[Array[Byte]]
  def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}

Stream Creation and Configuration

Initial Position Management

Configure where to start reading from Kinesis streams with support for latest, earliest, and timestamp-based positioning.

object KinesisInitialPositions {
  class Latest() extends KinesisInitialPosition
  class TrimHorizon() extends KinesisInitialPosition
  class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
}

Initial Position Configuration

AWS Credentials Management

Flexible credential management supporting default provider chains, basic credentials, and STS assume role patterns.

object SparkAWSCredentials {
  def builder: Builder
}

class Builder {
  def basicCredentials(accessKeyId: String, secretKey: String): Builder
  def stsCredentials(roleArn: String, sessionName: String): Builder
  def build(): SparkAWSCredentials
}

AWS Credentials Configuration

Python API

Python interface for creating Kinesis streams with simplified parameter handling.

class KinesisUtils:
    @staticmethod
    def createStream(
        ssc: StreamingContext,
        kinesisAppName: str,
        streamName: str,
        endpointUrl: str,
        regionName: str,
        initialPositionInStream: int,
        checkpointInterval: int,
        **kwargs
    ) -> DStream

Python API Usage

Types

// Core interface types
trait KinesisInitialPosition {
  def getPosition(): InitialPositionInStream
}

sealed trait SparkAWSCredentials extends Serializable {
  def provider: AWSCredentialsProvider
}

// Concrete credential implementations
case object DefaultCredentials extends SparkAWSCredentials

case class BasicCredentials(
  awsAccessKeyId: String,
  awsSecretKey: String
) extends SparkAWSCredentials

case class STSCredentials(
  stsRoleArn: String,
  stsSessionName: String,
  stsExternalId: Option[String] = None,
  longLivedCreds: SparkAWSCredentials = DefaultCredentials
) extends SparkAWSCredentials