or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md
tile.json

configuration.mddocs/

Configuration and Performance Tuning

The configuration system provides advanced options for optimizing Kinesis stream consumption including retry logic, timeouts, storage levels, CloudWatch metrics, and checkpointing intervals. These settings are crucial for production deployments requiring high performance and reliability.

Core Configuration API

KinesisReadConfigurations

Controls retry behavior and timeouts for Kinesis API calls, particularly important for handling transient network issues and rate limiting.

case class KinesisReadConfigurations(
  maxRetries: Int,           // Maximum number of retry attempts for Kinesis API calls
  retryWaitTimeMs: Long,     // Wait time between retry attempts in milliseconds  
  retryTimeoutMs: Long       // Total timeout for Kinesis operations in milliseconds
)

object KinesisReadConfigurations {
  // Create with default values
  def apply(): KinesisReadConfigurations
  
  // Create with values from StreamingContext configuration
  def apply(ssc: StreamingContext): KinesisReadConfigurations
  
  // Configuration keys for SparkConf
  val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"
  val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"
  
  // Default values
  val DEFAULT_MAX_RETRIES: Int = 3
  val DEFAULT_RETRY_WAIT_TIME: String = "100ms"
  val DEFAULT_RETRY_TIMEOUT: Long = 10000  // 10 seconds
}

Builder Configuration Methods

Additional configuration options available through the KinesisInputDStream.Builder:

class Builder {
  // Performance and reliability
  def checkpointInterval(interval: Duration): Builder
  def storageLevel(storageLevel: StorageLevel): Builder
  
  // CloudWatch metrics
  def metricsLevel(metricsLevel: MetricsLevel): Builder  
  def metricsEnabledDimensions(dimensions: Set[String]): Builder
  
  // AWS service endpoints and regions
  def endpointUrl(url: String): Builder
  def regionName(regionName: String): Builder
}

Retry Configuration

Using Default Configuration

import org.apache.spark.streaming.kinesis.KinesisReadConfigurations

// Uses default values: 3 retries, 100ms wait, 10s timeout
val defaultConfig = KinesisReadConfigurations()

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app") 
  .build()

Using StreamingContext Configuration

Configure through SparkConf and let the system read from StreamingContext:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext

val conf = new SparkConf()
  .setAppName("KinesisApp")
  .set("spark.streaming.kinesis.retry.maxAttempts", "5")
  .set("spark.streaming.kinesis.retry.waitTime", "200ms")

val ssc = new StreamingContext(conf, Seconds(10))

// This will use the SparkConf values above
val configFromContext = KinesisReadConfigurations(ssc)

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .build()

Custom Retry Configuration

import org.apache.spark.streaming.kinesis.KinesisReadConfigurations

val customConfig = KinesisReadConfigurations(
  maxRetries = 5,           // Retry up to 5 times
  retryWaitTimeMs = 500L,   // Wait 500ms between retries
  retryTimeoutMs = 30000L   // 30 second total timeout
)

// Note: KinesisReadConfigurations is used internally by KinesisBackedBlockRDD
// The main builder uses default configurations, but you can influence this
// through SparkConf settings

Storage Level Configuration

Configure how DStream blocks are stored in memory and disk for fault tolerance and performance.

import org.apache.spark.storage.StorageLevel

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .storageLevel(StorageLevel.MEMORY_ONLY_2)  // Store in memory with 2x replication
  .build()

Storage Level Options

// Memory only (fastest, but not fault tolerant to node failures)
.storageLevel(StorageLevel.MEMORY_ONLY)
.storageLevel(StorageLevel.MEMORY_ONLY_2)  // With replication

// Memory and disk (balanced performance and fault tolerance)
.storageLevel(StorageLevel.MEMORY_AND_DISK)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)  // Default, with replication

// Memory with serialization (more memory efficient)
.storageLevel(StorageLevel.MEMORY_ONLY_SER)
.storageLevel(StorageLevel.MEMORY_ONLY_SER_2)

// Disk only (most fault tolerant, slowest)
.storageLevel(StorageLevel.DISK_ONLY)
.storageLevel(StorageLevel.DISK_ONLY_2)

Checkpoint Interval Configuration

Controls how frequently the KCL checkpoints progress to DynamoDB. This affects both fault tolerance and performance.

import org.apache.spark.streaming.Seconds

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .checkpointInterval(Seconds(30))  // Checkpoint every 30 seconds
  .build()

Checkpoint Interval Guidelines

  • Shorter intervals (5-10 seconds): Better fault tolerance, less data loss on failure, but higher DynamoDB costs
  • Longer intervals (30-60 seconds): Lower DynamoDB costs, but more potential data loss on failure
  • Default: Uses the streaming batch duration (recommended starting point)

CloudWatch Metrics Configuration

Configure collection and reporting of metrics to AWS CloudWatch for monitoring and alerting.

import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .metricsLevel(MetricsLevel.SUMMARY)
  .metricsEnabledDimensions(Set("Operation", "ShardId", "WorkerId"))
  .build()

Metrics Levels

// No metrics (best performance)
.metricsLevel(MetricsLevel.NONE)

// Summary metrics only (recommended for production)
.metricsLevel(MetricsLevel.SUMMARY)

// Detailed metrics (useful for debugging, higher overhead)
.metricsLevel(MetricsLevel.DETAILED)

Common Metrics Dimensions

val productionDimensions = Set(
  "Operation",    // Type of operation (ProcessRecords, Checkpoint, etc.)
  "ShardId",      // Kinesis shard identifier
  "WorkerId"      // KCL worker identifier
)

val debugDimensions = Set(
  "Operation",
  "ShardId", 
  "WorkerId",
  "StreamName"    // Additional dimension for debugging
)

Regional and Endpoint Configuration

Configure AWS service endpoints for different regions or custom endpoints.

val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("my-stream")
  .checkpointAppName("my-app")
  .regionName("us-west-2")
  .endpointUrl("https://kinesis.us-west-2.amazonaws.com")
  .build()

Region Configuration Best Practices

  • Match your compute region: Use the same region as your Spark cluster to minimize latency
  • Consider data locality: Place processing near your data sources and sinks
  • Compliance requirements: Some regions may be required for regulatory compliance

Complete Production Configuration Example

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.apache.spark.SparkConf

// Configure Spark with Kinesis-specific settings
val conf = new SparkConf()
  .setAppName("ProductionKinesisApp")
  .set("spark.streaming.kinesis.retry.maxAttempts", "5")
  .set("spark.streaming.kinesis.retry.waitTime", "250ms")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val ssc = new StreamingContext(conf, Seconds(10))

// Configure credentials with assume role
val credentials = SparkAWSCredentials.builder
  .stsCredentials("arn:aws:iam::123456789012:role/ProductionKinesisRole", "prod-session")
  .build()

// Create production-ready stream
val stream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("production-data-stream")
  .checkpointAppName("production-consumer-v2")
  .regionName("us-west-2")
  .initialPosition(new KinesisInitialPositions.Latest())
  .checkpointInterval(Seconds(30))
  .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
  .kinesisCredentials(credentials)
  .dynamoDBCredentials(credentials)
  .cloudWatchCredentials(credentials)
  .metricsLevel(MetricsLevel.SUMMARY)
  .metricsEnabledDimensions(Set("Operation", "ShardId"))
  .build()

Performance Tuning Guidelines

For High Throughput

val highThroughputStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("high-volume-stream")
  .checkpointAppName("high-throughput-consumer")
  .storageLevel(StorageLevel.MEMORY_ONLY_2)           // Fastest storage
  .checkpointInterval(Seconds(60))                    // Less frequent checkpointing
  .metricsLevel(MetricsLevel.NONE)                    // Disable metrics overhead
  .build()

For Maximum Reliability

val reliableStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("critical-data-stream")
  .checkpointAppName("reliable-consumer")
  .storageLevel(StorageLevel.MEMORY_AND_DISK_2)       // Fault tolerant storage
  .checkpointInterval(Seconds(10))                    // Frequent checkpointing
  .metricsLevel(MetricsLevel.DETAILED)                // Full monitoring
  .build()

For Cost Optimization

val costOptimizedStream = KinesisInputDStream.builder
  .streamingContext(ssc)
  .streamName("batch-processing-stream")
  .checkpointAppName("cost-optimized-consumer")
  .storageLevel(StorageLevel.DISK_ONLY)               // Cheapest storage
  .checkpointInterval(Seconds(300))                   // Minimize DynamoDB writes
  .metricsLevel(MetricsLevel.SUMMARY)                 // Basic monitoring only
  .build()

Monitoring and Troubleshooting

Key SparkConf Settings for Debugging

val debugConf = new SparkConf()
  .set("spark.streaming.kinesis.retry.maxAttempts", "10")
  .set("spark.streaming.kinesis.retry.waitTime", "1s")
  .set("spark.sql.adaptive.enabled", "false")
  .set("spark.sql.adaptive.coalescePartitions.enabled", "false")

CloudWatch Metrics to Monitor

  • IncomingRecords: Records received from Kinesis
  • ProcessedRecords: Records successfully processed
  • MillisBehindLatest: How far behind the stream tip you are
  • Success/Failure counts: For each operation type

Common Configuration Issues

  1. Insufficient retries: Increase maxRetries for unreliable networks
  2. Too frequent checkpointing: Causes DynamoDB throttling and high costs
  3. Wrong storage level: MEMORY_ONLY can cause job failures if nodes fail
  4. Incorrect region settings: Causes high latency and potential failures
  5. Missing IAM permissions: For DynamoDB, CloudWatch, or Kinesis access