CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

Spark Streaming integration with Amazon Kinesis for real-time data processing using the Kinesis Client Library

Pending
Overview
Eval results
Files

stream-creation.mddocs/

Stream Creation

Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.

Capabilities

Basic Stream Creation (Scala)

Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.

/**
 * Create an input stream that pulls messages from a Kinesis stream using the KCL.
 * Uses DefaultAWSCredentialsProviderChain for AWS authentication.
 *
 * @param ssc StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)
 * @return ReceiverInputDStream[Array[Byte]] containing raw message data
 */
def createStream(
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]

Usage Example:

import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration

val stream = KinesisUtils.createStream(
  ssc,
  "MySparkKinesisApp",
  "my-kinesis-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  Duration.milliseconds(2000),
  StorageLevel.MEMORY_AND_DISK_2
)

Stream Creation with Explicit Credentials (Scala)

Creates a Kinesis input stream with explicitly provided AWS credentials.

/**
 * Create an input stream with explicit AWS credentials.
 * Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.
 *
 * @param ssc StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects
 * @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)
 * @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)
 * @return ReceiverInputDStream[Array[Byte]] containing raw message data
 */
def createStream(
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  awsAccessKeyId: String,
  awsSecretKey: String
): ReceiverInputDStream[Array[Byte]]

Usage Example:

val stream = KinesisUtils.createStream(
  ssc,
  "MySparkKinesisApp",
  "my-kinesis-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  Duration.milliseconds(2000),
  StorageLevel.MEMORY_AND_DISK_2,
  "AKIAIOSFODNN7EXAMPLE",
  "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)

Stream Creation with Custom Message Handler (Scala)

Creates a typed Kinesis input stream with a custom message handler function.

/**
 * Create an input stream with a custom message handler for type-safe data processing.
 *
 * @param ssc StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects
 * @param messageHandler Custom function to process Kinesis Records into type T
 * @return ReceiverInputDStream[T] containing processed data
 */
def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T
): ReceiverInputDStream[T]

Usage Example:

import com.amazonaws.services.kinesis.model.Record
import spray.json._

case class MyEvent(id: String, timestamp: Long, data: String)

// Custom message handler to parse JSON events
def parseMyEvent(record: Record): MyEvent = {
  val data = new String(record.getData.array())
  data.parseJson.convertTo[MyEvent]
}

val stream = KinesisUtils.createStream[MyEvent](
  ssc,
  "MySparkKinesisApp", 
  "my-events-stream",
  "https://kinesis.us-east-1.amazonaws.com",
  "us-east-1",
  InitialPositionInStream.LATEST,
  Duration.milliseconds(2000),
  StorageLevel.MEMORY_AND_DISK_2,
  parseMyEvent
)

Stream Creation with Custom Handler and Credentials (Scala)

Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.

/**
 * Create an input stream with custom message handler and explicit AWS credentials.
 *
 * @param ssc StreamingContext object
 * @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
 * @param streamName Kinesis stream name
 * @param endpointUrl Url of Kinesis service
 * @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
 * @param initialPositionInStream Starting position in stream
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param storageLevel Storage level for received objects
 * @param messageHandler Custom function to process Kinesis Records into type T
 * @param awsAccessKeyId AWS AccessKeyId
 * @param awsSecretKey AWS SecretKey
 * @return ReceiverInputDStream[T] containing processed data
 */
def createStream[T: ClassTag](
  ssc: StreamingContext,
  kinesisAppName: String,
  streamName: String,
  endpointUrl: String,
  regionName: String,
  initialPositionInStream: InitialPositionInStream,
  checkpointInterval: Duration,
  storageLevel: StorageLevel,
  messageHandler: Record => T,
  awsAccessKeyId: String,
  awsSecretKey: String
): ReceiverInputDStream[T]

Deprecated Stream Creation (Scala)

Simplified stream creation method (deprecated since version 1.4.0).

/**
 * Create an input stream using app name from SparkConf and region from endpoint.
 * @deprecated use other forms of createStream
 *
 * @param ssc StreamingContext object
 * @param streamName Kinesis stream name
 * @param endpointUrl Endpoint url of Kinesis service
 * @param checkpointInterval Checkpoint interval for Kinesis checkpointing
 * @param initialPositionInStream Starting position in stream
 * @param storageLevel Storage level for received objects
 * @return ReceiverInputDStream[Array[Byte]] containing raw message data
 */
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
  ssc: StreamingContext,
  streamName: String,
  endpointUrl: String,
  checkpointInterval: Duration,
  initialPositionInStream: InitialPositionInStream,
  storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]]

Configuration Guidelines

Application Name (kinesisAppName)

  • Must be unique per Kinesis stream
  • Used by KCL for DynamoDB coordination table naming
  • Changing this requires deleting the associated DynamoDB table

Checkpoint Interval

  • Should typically match or be a multiple of the batch interval
  • Shorter intervals provide better fault tolerance but increase DynamoDB usage
  • Must be >= 1 second

Storage Level

  • StorageLevel.MEMORY_AND_DISK_2 is recommended for fault tolerance
  • Higher replication levels provide better fault tolerance
  • Consider memory constraints when choosing storage levels

Initial Position

  • InitialPositionInStream.LATEST: Start from the most recent records
  • InitialPositionInStream.TRIM_HORIZON: Start from the oldest available records (up to 24 hours)
  • Only applies when no checkpoint exists in DynamoDB

Error Handling

Common error scenarios and handling:

  • Invalid region names: Throws IllegalArgumentException
  • Authentication failures: Runtime exceptions during stream processing
  • Network connectivity: Automatic retries via KCL with exponential backoff
  • DynamoDB access: Requires proper IAM permissions for lease coordination
  • CloudWatch access: Optional but recommended for monitoring metrics

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-kinesis-asl-2-10

docs

aws-configuration.md

data-processing.md

index.md

java-api.md

stream-creation.md

tile.json