or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-configuration.mddata-processing.mdindex.mdjava-api.mdstream-creation.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-kinesis-asl_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10@1.6.0

index.mddocs/

Spark Streaming Kinesis ASL

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library (KCL). This library enables Spark Streaming applications to consume data from Amazon Kinesis streams with automatic load-balancing, fault-tolerance, and checkpointing capabilities.

Package Information

  • Package Name: spark-streaming-kinesis-asl_2.10
  • Package Type: maven
  • Language: Scala/Java
  • Installation: Add dependency to your Maven pom.xml or SBT build.sbt

Maven Dependency

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
  <version>1.6.3</version>
</dependency>

SBT Dependency

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.6.3"

Core Imports

Scala

import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, StreamingContext}

Java

import org.apache.spark.streaming.kinesis.KinesisUtils;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

Basic Usage

Scala Example

import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Duration, StreamingContext}

// Create Spark Streaming context
val ssc = new StreamingContext(sparkContext, Duration.milliseconds(2000))

// Create Kinesis input stream
val kinesisStream = KinesisUtils.createStream(
  ssc,
  "MyKinesisApp",        // KCL application name
  "MyKinesisStream",     // Kinesis stream name
  "https://kinesis.us-east-1.amazonaws.com", // Endpoint URL
  "us-east-1",           // Region name
  InitialPositionInStream.LATEST, // Starting position
  Duration.milliseconds(2000),    // Checkpoint interval
  StorageLevel.MEMORY_AND_DISK_2  // Storage level
)

// Process the stream
kinesisStream.foreachRDD { rdd =>
  rdd.foreach { byteArray =>
    println(new String(byteArray))
  }
}

ssc.start()
ssc.awaitTermination()

Java Example

import org.apache.spark.streaming.kinesis.KinesisUtils;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

// Create Java Streaming context
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

// Create Kinesis input stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
  jssc,
  "MyKinesisApp",        // KCL application name
  "MyKinesisStream",     // Kinesis stream name
  "https://kinesis.us-east-1.amazonaws.com", // Endpoint URL
  "us-east-1",           // Region name
  InitialPositionInStream.LATEST, // Starting position
  new Duration(2000),    // Checkpoint interval
  StorageLevel.MEMORY_AND_DISK_2() // Storage level
);

// Process the stream
kinesisStream.foreachRDD(rdd -> {
  rdd.foreach(byteArray -> {
    System.out.println(new String(byteArray));
    return null;
  });
  return null;
});

jssc.start();
jssc.awaitTermination();

Architecture

The Spark Streaming Kinesis ASL integration is built around several key components:

  • KinesisUtils: Factory object providing static methods to create Kinesis input streams
  • KinesisInputDStream: Kinesis-specific implementation of ReceiverInputDStream with enhanced fault tolerance
  • KinesisReceiver: Custom Spark Streaming Receiver that uses the Kinesis Client Library (KCL) Worker
  • KinesisBackedBlockRDD: Specialized RDD that can re-read data from Kinesis using sequence number ranges
  • Checkpointing System: Automatic checkpointing through DynamoDB for fault tolerance and exactly-once processing
  • Multi-shard Support: Automatic distribution and load balancing across multiple Kinesis shards

Capabilities

Stream Creation

Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.

// Basic stream creation with default byte array handler
def createStream(
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]

// Stream creation with custom message handler
def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T
): ReceiverInputDStream[T]

Stream Creation

Data Processing and Message Handling

Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.

// Custom message handler function type
type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T

// Default message handler for byte arrays
def defaultMessageHandler(record: Record): Array[Byte]

Data Processing

Java API Integration

Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.

// Java API stream creation
public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel
);

// Java API with custom message handler
public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl, 
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass
);

Java API

AWS Authentication and Configuration

Flexible AWS authentication options including default credential provider chains, explicit credential specification, and IAM role integration for secure access to Kinesis streams.

// Stream creation with explicit AWS credentials
def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T,
  awsAccessKeyId: String,
  awsSecretKey: String
): ReceiverInputDStream[T]

AWS Configuration

Types

// AWS Credentials wrapper for serialization
case class SerializableAWSCredentials(
  accessKeyId: String, 
  secretKey: String
) extends AWSCredentials

// Sequence number range for fault tolerance
case class SequenceNumberRange(
  streamName: String,
  shardId: String, 
  fromSeqNumber: String,
  toSeqNumber: String
)

// Collection of sequence number ranges
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
  def isEmpty(): Boolean
  def nonEmpty(): Boolean
}

// External AWS KCL types (from com.amazonaws.services.kinesis.clientlibrary.lib.worker)
// InitialPositionInStream enum values:
// - InitialPositionInStream.LATEST: Start from most recent records
// - InitialPositionInStream.TRIM_HORIZON: Start from oldest available records (up to 24 hours)