or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

credential-management.mdfault-tolerance.mdindex.mdjava-api.mdstream-creation.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl-assembly_2.11@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly@1.6.0

index.mddocs/

Spark Streaming Kinesis ASL Assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams. Provides fault-tolerant, scalable stream processing with automatic checkpointing, shard management, and configurable parallelism through the Kinesis Client Library (KCL).

Package Information

  • Package Name: spark-streaming-kinesis-asl-assembly_2.11
  • Package Type: maven
  • Language: Scala/Java
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
      <version>1.6.2</version>
    </dependency>

Core Imports

Scala API

import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record

Java API

import org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

Basic Usage

Scala Example

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val conf = new SparkConf().setAppName("KinesisExample")
val ssc = new StreamingContext(conf, Seconds(10))

// Create Kinesis stream
val kinesisStream = KinesisUtils.createStream(
  ssc,
  "myKinesisApp",
  "myStreamName", 
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  Seconds(30),
  StorageLevel.MEMORY_AND_DISK_2
)

// Process the stream
kinesisStream.map(new String(_)).print()

ssc.start()
ssc.awaitTermination()

Java Example

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.kinesis.KinesisUtils;
import org.apache.spark.storage.StorageLevel;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

// Create Kinesis stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
    jssc,
    "myKinesisApp",
    "myStreamName",
    "https://kinesis.us-east-1.amazonaws.com", 
    "us-east-1",
    InitialPositionInStream.LATEST,
    Durations.seconds(30),
    StorageLevel.MEMORY_AND_DISK_2()
);

// Process the stream
kinesisStream.map(bytes -> new String(bytes)).print();

jssc.start();
jssc.awaitTermination();

Architecture

The Spark Streaming Kinesis integration is built around several key components:

  • KinesisUtils: Main entry point providing factory methods for creating Kinesis input streams
  • Kinesis Client Library (KCL) Integration: Uses AWS KCL for reliable stream consumption and checkpointing
  • Fault Tolerance: Sequence number-based recovery allowing streams to recover from failures using stored metadata
  • Automatic Checkpointing: DynamoDB-based checkpoint coordination for tracking stream progress
  • Multi-Shard Support: Automatic parallelization across Kinesis shards with configurable processing
  • Credential Management: Support for both default AWS credential providers and explicit credential specification

Capabilities

Stream Creation

Core functionality for creating Kinesis input streams with various configuration options including custom message handlers, credential specifications, and both Scala and Java APIs.

object KinesisUtils {
  // Generic stream creation with custom message handler
  def createStream[T: ClassTag](
    ssc: StreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: InitialPositionInStream,
    checkpointInterval: Duration,
    storageLevel: StorageLevel,
    messageHandler: Record => T
  ): ReceiverInputDStream[T]

  // Default byte array stream creation
  def createStream(
    ssc: StreamingContext,
    kinesisAppName: String,
    streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: InitialPositionInStream,
    checkpointInterval: Duration,
    storageLevel: StorageLevel
  ): ReceiverInputDStream[Array[Byte]]
}

Stream Creation

Credential Management

Authentication and credential handling for AWS Kinesis access, supporting both default credential providers and explicit credential specification.

// Stream creation with explicit AWS credentials
def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T,
  awsAccessKeyId: String,
  awsSecretKey: String
): ReceiverInputDStream[T]

Credential Management

Java API Support

Complete Java API compatibility with functional interfaces and Java-friendly method signatures for integration with Java applications.

// Java API for generic stream creation
public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass
);

Java API

Fault Tolerance & Recovery

Built-in fault tolerance mechanisms using Kinesis sequence numbers for reliable stream processing and recovery from failures.

// Sequence number range for fault tolerance
case class SequenceNumberRange(
  streamName: String,
  shardId: String, 
  fromSeqNumber: String,
  toSeqNumber: String
)

case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
  def isEmpty(): Boolean
  def nonEmpty(): Boolean
}

Fault Tolerance

Core Types

// AWS credential wrapper for serialization
case class SerializableAWSCredentials(
  accessKeyId: String,
  secretKey: String
) extends AWSCredentials {
  def getAWSAccessKeyId: String
  def getAWSSecretKey: String
}

// Message handler function type
type MessageHandler[T] = Record => T

// Java function interface for message handling
import org.apache.spark.api.java.function.{Function => JFunction}

Key Parameters

  • kinesisAppName: Unique identifier for the Kinesis application used by KCL for DynamoDB coordination
  • streamName: Name of the Kinesis stream to consume from
  • endpointUrl: AWS Kinesis service endpoint (e.g., "https://kinesis.us-east-1.amazonaws.com")
  • regionName: AWS region name for DynamoDB and CloudWatch operations
  • initialPositionInStream: Starting position when no checkpoint exists (LATEST or TRIM_HORIZON)
  • checkpointInterval: Frequency of checkpointing to DynamoDB
  • storageLevel: Spark storage level for received data (recommended: MEMORY_AND_DISK_2)
  • messageHandler: Function to transform Kinesis Record objects to desired output type