CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

Apache Spark Streaming integration library for consuming data from Amazon Kinesis streams with fault-tolerant checkpointing and automatic shard management

Pending
Overview
Eval results
Files

stream-creation.mddocs/

Stream Creation

The primary capability for creating Kinesis input streams in Spark Streaming applications. KinesisUtils provides multiple overloaded methods to accommodate different use cases and type requirements.

Scala API

Generic Stream Creation with Custom Message Handler

Create a stream that transforms Kinesis Records to a custom type using a message handler function.

def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T
): ReceiverInputDStream[T]

Parameters:

  • ssc - StreamingContext object
  • kinesisAppName - Kinesis application name used by KCL for DynamoDB coordination
  • streamName - Kinesis stream name
  • endpointUrl - Kinesis service URL (e.g., "https://kinesis.us-east-1.amazonaws.com")
  • regionName - AWS region name for DynamoDB and CloudWatch
  • initialPositionInStream - Starting position: TRIM_HORIZON or LATEST
  • checkpointInterval - Checkpoint frequency for fault tolerance
  • storageLevel - Storage level for received objects (recommended: MEMORY_AND_DISK_2)
  • messageHandler - Function to transform Record to type T

Usage Example:

import com.amazonaws.services.kinesis.model.Record
import org.json4s._
import org.json4s.jackson.JsonMethods._

// Custom message handler for JSON data
def jsonMessageHandler(record: Record): JValue = {
  val data = new String(record.getData.array())
  parse(data)
}

val jsonStream = KinesisUtils.createStream[JValue](
  ssc,
  "json-processor-app",
  "json-events",
  "https://kinesis.us-west-2.amazonaws.com",
  "us-west-2", 
  InitialPositionInStream.LATEST,
  Seconds(30),
  StorageLevel.MEMORY_AND_DISK_2,
  jsonMessageHandler
)

Default Byte Array Stream Creation

Create a stream that returns raw byte arrays using the default message handler.

def createStream(
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,  
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]

Usage Example:

val byteStream = KinesisUtils.createStream(
  ssc,
  "data-processor",
  "raw-data-stream",
  "https://kinesis.eu-west-1.amazonaws.com", 
  "eu-west-1",
  InitialPositionInStream.TRIM_HORIZON,
  Seconds(60),
  StorageLevel.MEMORY_AND_DISK_2
)

// Convert bytes to strings
val stringStream = byteStream.map(new String(_))

Deprecated Stream Creation (Legacy)

@deprecated("use other forms of createStream", "1.4.0")
def createStream(
  ssc: StreamingContext,
  streamName: String,
  endpointUrl: String,
  checkpointInterval: Duration,
  initialPositionInStream: InitialPositionInStream,
  storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]

This method uses the SparkConf app name as the Kinesis application name and extracts the region from the endpoint URL.

Java API

Generic Stream Creation with Function Interface

public static <T> JavaReceiverInputDStream<T> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel,
  Function<Record, T> messageHandler,
  Class<T> recordClass
);

Usage Example:

import org.apache.spark.api.java.function.Function;
import com.amazonaws.services.kinesis.model.Record;

// Define message handler
Function<Record, String> messageHandler = new Function<Record, String>() {
    @Override
    public String call(Record record) throws Exception {
        return new String(record.getData().array());
    }
};

// Create stream
JavaReceiverInputDStream<String> stringStream = KinesisUtils.createStream(
    jssc,
    "java-kinesis-app",
    "text-stream", 
    "https://kinesis.us-east-1.amazonaws.com",
    "us-east-1",
    InitialPositionInStream.LATEST,
    Durations.seconds(30),
    StorageLevel.MEMORY_AND_DISK_2(),
    messageHandler,
    String.class
);

Default Byte Array Stream Creation (Java)

public static JavaReceiverInputDStream<byte[]> createStream(
  JavaStreamingContext jssc,
  String kinesisAppName,
  String streamName,
  String endpointUrl,
  String regionName,
  InitialPositionInStream initialPositionInStream,
  Duration checkpointInterval,
  StorageLevel storageLevel
);

Usage Example:

JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(
    jssc,
    "java-byte-processor",
    "binary-data-stream",
    "https://kinesis.ap-southeast-1.amazonaws.com",
    "ap-southeast-1",
    InitialPositionInStream.TRIM_HORIZON,
    Durations.seconds(45),
    StorageLevel.MEMORY_AND_DISK_2()
);

// Convert to strings
JavaDStream<String> stringStream = byteStream.map(
    bytes -> new String(bytes)
);

Configuration Options

Initial Position in Stream

// From AWS KCL
enum InitialPositionInStream {
  LATEST,      // Start from the most recent record
  TRIM_HORIZON // Start from the oldest available record (up to 24 hours)
}
  • LATEST: Start processing from the most recent records in the stream
  • TRIM_HORIZON: Start from the oldest available records (Kinesis retains data for 24 hours minimum)

Storage Level Recommendations

import org.apache.spark.storage.StorageLevel

// Recommended storage levels
StorageLevel.MEMORY_AND_DISK_2    // Replicated in memory and disk (recommended)
StorageLevel.MEMORY_AND_DISK      // Memory and disk fallback
StorageLevel.MEMORY_ONLY_2        // Memory only with replication

MEMORY_AND_DISK_2 is recommended for fault tolerance as it provides both memory performance and disk persistence with replication.

Checkpoint Intervals

Choose checkpoint intervals based on your application requirements:

  • Short intervals (10-30 seconds): Lower data loss risk, higher DynamoDB costs
  • Medium intervals (30-120 seconds): Balanced approach for most applications
  • Long intervals (2-5 minutes): Lower costs, higher potential data loss on failure

Error Handling

Common errors and their solutions:

IllegalArgumentException: Invalid region name

// Ensure region name is valid
val validRegions = Seq("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1")

AWS Authentication Errors: Ensure proper AWS credentials are configured

  • Use DefaultAWSCredentialsProviderChain for automatic credential discovery
  • Or provide explicit credentials using the credential management methods

DynamoDB Access Errors: Ensure the application has proper permissions for DynamoDB table creation and access for checkpointing.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-assembly

docs

credential-management.md

fault-tolerance.md

index.md

java-api.md

stream-creation.md

tile.json