or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md
tile.json

stream-creation.mddocs/

Stream Creation

The core functionality for creating Kinesis DStreams using the builder pattern. This provides a fluent API for configuring all aspects of Kinesis stream consumption including required parameters, optional configurations, and message handling.

Core API

KinesisInputDStream.Builder

The builder provides a fluent interface for configuring Kinesis streams with comprehensive parameter validation and sensible defaults.

object KinesisInputDStream {
  def builder: KinesisInputDStream.Builder
  
  // Default message handler that extracts byte arrays from Kinesis records
  private[kinesis] def defaultMessageHandler(record: Record): Array[Byte]
}

class Builder {
  // Required configuration methods
  def streamingContext(ssc: StreamingContext): Builder
  def streamingContext(jssc: JavaStreamingContext): Builder  
  def streamName(streamName: String): Builder
  def checkpointAppName(appName: String): Builder
  
  // Optional configuration methods
  def endpointUrl(url: String): Builder
  def regionName(regionName: String): Builder
  def initialPosition(initialPosition: KinesisInitialPosition): Builder
  def initialPositionInStream(initialPosition: InitialPositionInStream): Builder // Deprecated in 2.3.0
  def checkpointInterval(interval: Duration): Builder
  def storageLevel(storageLevel: StorageLevel): Builder
  def kinesisCredentials(credentials: SparkAWSCredentials): Builder
  def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
  def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
  def metricsLevel(metricsLevel: MetricsLevel): Builder
  def metricsEnabledDimensions(dimensions: Set[String]): Builder
  
  // Build methods
  def build(): KinesisInputDStream[Array[Byte]]
  def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}

Required Parameters

StreamingContext

The Spark StreamingContext that will manage the DStream lifecycle.

val builder = KinesisInputDStream.builder
  .streamingContext(ssc)  // Required

For Java API:

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;

KinesisInputDStream.Builder builder = KinesisInputDStream.builder()
  .streamingContext(jssc);  // JavaStreamingContext

Stream Name

The name of the Kinesis stream to consume from.

val builder = KinesisInputDStream.builder
  .streamName("my-kinesis-stream")  // Required

Checkpoint Application Name

The KCL application name used for DynamoDB checkpointing. This must be unique per stream and consumer application.

val builder = KinesisInputDStream.builder
  .checkpointAppName("my-unique-app-name")  // Required

Optional Configuration

AWS Region and Endpoint

Configure the AWS region and Kinesis endpoint URL. Defaults to us-east-1 and the standard Kinesis endpoint.

val builder = KinesisInputDStream.builder
  .regionName("us-west-2")
  .endpointUrl("https://kinesis.us-west-2.amazonaws.com")

Initial Position

Specify where to start reading from the stream. See Initial Position for details.

val builder = KinesisInputDStream.builder
  .initialPosition(new KinesisInitialPositions.Latest())

Checkpoint Interval

How frequently to checkpoint progress to DynamoDB. Defaults to the streaming batch duration.

import org.apache.spark.streaming.Seconds

val builder = KinesisInputDStream.builder
  .checkpointInterval(Seconds(30))

Storage Level

Storage level for cached blocks. Defaults to MEMORY_AND_DISK_2.

import org.apache.spark.storage.StorageLevel

val builder = KinesisInputDStream.builder
  .storageLevel(StorageLevel.MEMORY_ONLY_2)

AWS Credentials

Configure authentication for Kinesis, DynamoDB, and CloudWatch. See AWS Credentials for details.

val credentials = SparkAWSCredentials.builder
  .basicCredentials("access-key", "secret-key")
  .build()

val builder = KinesisInputDStream.builder
  .kinesisCredentials(credentials)
  .dynamoDBCredentials(credentials)  // Optional, defaults to Kinesis credentials
  .cloudWatchCredentials(credentials)  // Optional, defaults to Kinesis credentials

CloudWatch Metrics

Configure CloudWatch metrics collection level and dimensions.

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

val builder = KinesisInputDStream.builder
  .metricsLevel(MetricsLevel.SUMMARY)
  .metricsEnabledDimensions(Set("Operation", "ShardId"))

Building the DStream

Default Message Handler

Creates a DStream of byte arrays using the default message handler:

val kinesisStream: KinesisInputDStream[Array[Byte]] = builder.build()

Custom Message Handler

Creates a DStream with a custom message transformation function:

import com.amazonaws.services.kinesis.model.Record

// Custom handler that extracts JSON strings
val jsonHandler: Record => String = record => {
  val bytes = new Array[Byte](record.getData().remaining())
  record.getData().get(bytes)
  new String(bytes, "UTF-8")
}

val kinesisStream: KinesisInputDStream[String] = builder
  .buildWithMessageHandler(jsonHandler)

Complete Example

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record

val ssc = new StreamingContext(sparkConf, Seconds(10))

// Configure credentials
val credentials = SparkAWSCredentials.builder
  .stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "MySession")
  .build()

// Create the stream
val kinesisStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-data-stream")
  .checkpointAppName("spark-kinesis-consumer")
  .regionName("us-west-2")
  .initialPosition(new KinesisInitialPositions.TrimHorizon())
  .checkpointInterval(Seconds(30))
  .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
  .kinesisCredentials(credentials)
  .buildWithMessageHandler { record: Record =>
    val data = new Array[Byte](record.getData().remaining())
    record.getData().get(data)
    new String(data, "UTF-8")
  }

// Process the stream
kinesisStream.foreachRDD { rdd =>
  rdd.collect().foreach(println)
}

ssc.start()
ssc.awaitTermination()

Java API Example

Complete example using the Java API:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;

public class JavaKinesisExample {
  public static void main(String[] args) throws InterruptedException {
    SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    
    // Configure credentials
    SparkAWSCredentials credentials = SparkAWSCredentials.builder()
      .basicCredentials("access-key", "secret-key")
      .build();
    
    // Create the stream
    JavaReceiverInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
      .streamingContext(jssc)
      .streamName("my-data-stream")
      .checkpointAppName("java-kinesis-consumer")
      .regionName("us-west-2")
      .initialPosition(new KinesisInitialPositions.Latest())
      .checkpointInterval(Durations.seconds(30))
      .storageLevel(StorageLevel.MEMORY_AND_DISK_2())
      .kinesisCredentials(credentials)
      .build();
    
    // Process the stream
    kinesisStream.foreachRDD(rdd -> {
      rdd.foreach(bytes -> {
        String data = new String(bytes);
        System.out.println("Received: " + data);
      });
      return null;
    });
    
    jssc.start();
    jssc.awaitTermination();
  }
}

Error Handling

The builder validates required parameters and throws IllegalArgumentException for missing required values:

// This will throw IllegalArgumentException at build time
val invalidStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  // Missing streamName and checkpointAppName
  .build()  // Throws exception

Common validation errors:

  • Missing required parameters (streamingContext, streamName, checkpointAppName)
  • Invalid AWS credentials configuration
  • Invalid checkpoint interval or storage level settings

Default Values

The builder provides sensible defaults for optional parameters:

  • endpointUrl: "https://kinesis.us-east-1.amazonaws.com"
  • regionName: "us-east-1"
  • initialPosition: new KinesisInitialPositions.Latest()
  • checkpointInterval: Streaming batch duration (ssc.graph.batchDuration)
  • storageLevel: StorageLevel.MEMORY_AND_DISK_2
  • kinesisCredentials: DefaultCredentials (AWS default provider chain)
  • dynamoDBCredentials: Uses same as kinesisCredentials if not specified
  • cloudWatchCredentials: Uses same as kinesisCredentials if not specified
  • metricsLevel: KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL
  • metricsEnabledDimensions: KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS