or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md
tile.json

stream-creation.mddocs/

Stream Creation and Configuration

The KinesisInputDStream.Builder provides a fluent API for creating and configuring Kinesis input streams with comprehensive options for performance, reliability, and monitoring.

Builder Pattern API

object KinesisInputDStream {
  def builder: Builder
}

class Builder {
  // Required parameters
  def streamingContext(ssc: StreamingContext): Builder
  def streamingContext(jssc: JavaStreamingContext): Builder
  def streamName(streamName: String): Builder
  def checkpointAppName(appName: String): Builder
  
  // Optional configuration
  def endpointUrl(url: String): Builder
  def regionName(regionName: String): Builder
  def initialPosition(initialPosition: KinesisInitialPosition): Builder
  def checkpointInterval(interval: Duration): Builder
  def storageLevel(storageLevel: StorageLevel): Builder
  
  // Credentials configuration
  def kinesisCredentials(credentials: SparkAWSCredentials): Builder
  def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
  def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
  
  // Metrics configuration
  def metricsLevel(metricsLevel: MetricsLevel): Builder
  def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder
  
  // Build methods
  def build(): KinesisInputDStream[Array[Byte]]
  def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}

Required Parameters

StreamingContext

Sets the Spark StreamingContext that will be used to construct the Kinesis DStream.

val builder = KinesisInputDStream.builder
  .streamingContext(ssc)

Stream Name

The name of the Kinesis stream to read from.

val builder = KinesisInputDStream.builder
  .streamName("my-kinesis-stream")

Checkpoint Application Name

The KCL application name used for checkpointing state to DynamoDB and CloudWatch metrics.

val builder = KinesisInputDStream.builder
  .checkpointAppName("my-spark-kinesis-app")

Optional Configuration

AWS Region and Endpoint

Configure the AWS region and Kinesis endpoint URL.

val builder = KinesisInputDStream.builder
  .regionName("us-west-2")
  .endpointUrl("https://kinesis.us-west-2.amazonaws.com")

Defaults:

  • Region: "us-east-1"
  • Endpoint: "https://kinesis.us-east-1.amazonaws.com"

Storage Level

Configure how received data blocks are stored in memory/disk.

import org.apache.spark.storage.StorageLevel

val builder = KinesisInputDStream.builder
  .storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)

Default: StorageLevel.MEMORY_AND_DISK_2

Checkpoint Interval

How often the KCL application state is checkpointed to DynamoDB.

import org.apache.spark.streaming.Seconds

val builder = KinesisInputDStream.builder
  .checkpointInterval(Seconds(30))

Default: Uses the Spark Streaming batch interval

Metrics Configuration

Metrics Level

Configure CloudWatch metrics detail level.

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

val builder = KinesisInputDStream.builder
  .metricsLevel(MetricsLevel.SUMMARY)

Options:

  • MetricsLevel.DETAILED - All available metrics (default)
  • MetricsLevel.SUMMARY - Summary metrics only
  • MetricsLevel.NONE - No metrics

Metrics Dimensions

Specify which CloudWatch metrics dimensions should be enabled.

val builder = KinesisInputDStream.builder
  .metricsEnabledDimensions(Set("Operation", "ShardId"))

Message Handling

Default Handler

Creates a stream of byte arrays using the default message handler.

val stream: KinesisInputDStream[Array[Byte]] = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .build()

Custom Handler

Process Kinesis records with a custom message handler function.

import com.amazonaws.services.kinesis.model.Record

// Custom handler to extract string data
val customHandler = (record: Record) => {
  val data = new Array[Byte](record.getData.remaining())
  record.getData.get(data)
  new String(data, "UTF-8")
}

val stream: KinesisInputDStream[String] = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .buildWithMessageHandler(customHandler)

Java API Usage

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Duration.seconds(2));

KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()
    .streamingContext(jssc)
    .streamName("my-kinesis-stream")
    .checkpointAppName("my-app")
    .regionName("us-west-2")
    .build();

Complete Configuration Example

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, SparkAWSCredentials}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

val credentials = SparkAWSCredentials.builder
  .basicCredentials("access-key", "secret-key")
  .build()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("production-events")
  .checkpointAppName("event-processor")
  .regionName("us-west-2")
  .endpointUrl("https://kinesis.us-west-2.amazonaws.com")
  .initialPosition(new Latest())
  .checkpointInterval(Seconds(30))
  .storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
  .kinesisCredentials(credentials)
  .metricsLevel(MetricsLevel.SUMMARY)
  .metricsEnabledDimensions(Set("Operation", "ShardId"))
  .buildWithMessageHandler { record =>
    val data = new Array[Byte](record.getData.remaining())
    record.getData.get(data)
    new String(data, "UTF-8")
  }