The configuration system provides advanced options for optimizing Kinesis stream consumption including retry logic, timeouts, storage levels, CloudWatch metrics, and checkpointing intervals. These settings are crucial for production deployments requiring high performance and reliability.
Controls retry behavior and timeouts for Kinesis API calls, particularly important for handling transient network issues and rate limiting.
case class KinesisReadConfigurations(
maxRetries: Int, // Maximum number of retry attempts for Kinesis API calls
retryWaitTimeMs: Long, // Wait time between retry attempts in milliseconds
retryTimeoutMs: Long // Total timeout for Kinesis operations in milliseconds
)
object KinesisReadConfigurations {
// Create with default values
def apply(): KinesisReadConfigurations
// Create with values from StreamingContext configuration
def apply(ssc: StreamingContext): KinesisReadConfigurations
// Configuration keys for SparkConf
val RETRY_MAX_ATTEMPTS_KEY: String = "spark.streaming.kinesis.retry.maxAttempts"
val RETRY_WAIT_TIME_KEY: String = "spark.streaming.kinesis.retry.waitTime"
// Default values
val DEFAULT_MAX_RETRIES: Int = 3
val DEFAULT_RETRY_WAIT_TIME: String = "100ms"
val DEFAULT_RETRY_TIMEOUT: Long = 10000 // 10 seconds
}Additional configuration options available through the KinesisInputDStream.Builder:
class Builder {
// Performance and reliability
def checkpointInterval(interval: Duration): Builder
def storageLevel(storageLevel: StorageLevel): Builder
// CloudWatch metrics
def metricsLevel(metricsLevel: MetricsLevel): Builder
def metricsEnabledDimensions(dimensions: Set[String]): Builder
// AWS service endpoints and regions
def endpointUrl(url: String): Builder
def regionName(regionName: String): Builder
}import org.apache.spark.streaming.kinesis.KinesisReadConfigurations
// Uses default values: 3 retries, 100ms wait, 10s timeout
val defaultConfig = KinesisReadConfigurations()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.build()Configure through SparkConf and let the system read from StreamingContext:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
val conf = new SparkConf()
.setAppName("KinesisApp")
.set("spark.streaming.kinesis.retry.maxAttempts", "5")
.set("spark.streaming.kinesis.retry.waitTime", "200ms")
val ssc = new StreamingContext(conf, Seconds(10))
// This will use the SparkConf values above
val configFromContext = KinesisReadConfigurations(ssc)
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.build()import org.apache.spark.streaming.kinesis.KinesisReadConfigurations
val customConfig = KinesisReadConfigurations(
maxRetries = 5, // Retry up to 5 times
retryWaitTimeMs = 500L, // Wait 500ms between retries
retryTimeoutMs = 30000L // 30 second total timeout
)
// Note: KinesisReadConfigurations is used internally by KinesisBackedBlockRDD
// The main builder uses default configurations, but you can influence this
// through SparkConf settingsConfigure how DStream blocks are stored in memory and disk for fault tolerance and performance.
import org.apache.spark.storage.StorageLevel
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.storageLevel(StorageLevel.MEMORY_ONLY_2) // Store in memory with 2x replication
.build()// Memory only (fastest, but not fault tolerant to node failures)
.storageLevel(StorageLevel.MEMORY_ONLY)
.storageLevel(StorageLevel.MEMORY_ONLY_2) // With replication
// Memory and disk (balanced performance and fault tolerance)
.storageLevel(StorageLevel.MEMORY_AND_DISK)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2) // Default, with replication
// Memory with serialization (more memory efficient)
.storageLevel(StorageLevel.MEMORY_ONLY_SER)
.storageLevel(StorageLevel.MEMORY_ONLY_SER_2)
// Disk only (most fault tolerant, slowest)
.storageLevel(StorageLevel.DISK_ONLY)
.storageLevel(StorageLevel.DISK_ONLY_2)Controls how frequently the KCL checkpoints progress to DynamoDB. This affects both fault tolerance and performance.
import org.apache.spark.streaming.Seconds
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.checkpointInterval(Seconds(30)) // Checkpoint every 30 seconds
.build()Configure collection and reporting of metrics to AWS CloudWatch for monitoring and alerting.
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.metricsLevel(MetricsLevel.SUMMARY)
.metricsEnabledDimensions(Set("Operation", "ShardId", "WorkerId"))
.build()// No metrics (best performance)
.metricsLevel(MetricsLevel.NONE)
// Summary metrics only (recommended for production)
.metricsLevel(MetricsLevel.SUMMARY)
// Detailed metrics (useful for debugging, higher overhead)
.metricsLevel(MetricsLevel.DETAILED)val productionDimensions = Set(
"Operation", // Type of operation (ProcessRecords, Checkpoint, etc.)
"ShardId", // Kinesis shard identifier
"WorkerId" // KCL worker identifier
)
val debugDimensions = Set(
"Operation",
"ShardId",
"WorkerId",
"StreamName" // Additional dimension for debugging
)Configure AWS service endpoints for different regions or custom endpoints.
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.regionName("us-west-2")
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
.build()import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import org.apache.spark.SparkConf
// Configure Spark with Kinesis-specific settings
val conf = new SparkConf()
.setAppName("ProductionKinesisApp")
.set("spark.streaming.kinesis.retry.maxAttempts", "5")
.set("spark.streaming.kinesis.retry.waitTime", "250ms")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(conf, Seconds(10))
// Configure credentials with assume role
val credentials = SparkAWSCredentials.builder
.stsCredentials("arn:aws:iam::123456789012:role/ProductionKinesisRole", "prod-session")
.build()
// Create production-ready stream
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("production-data-stream")
.checkpointAppName("production-consumer-v2")
.regionName("us-west-2")
.initialPosition(new KinesisInitialPositions.Latest())
.checkpointInterval(Seconds(30))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.kinesisCredentials(credentials)
.dynamoDBCredentials(credentials)
.cloudWatchCredentials(credentials)
.metricsLevel(MetricsLevel.SUMMARY)
.metricsEnabledDimensions(Set("Operation", "ShardId"))
.build()val highThroughputStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("high-volume-stream")
.checkpointAppName("high-throughput-consumer")
.storageLevel(StorageLevel.MEMORY_ONLY_2) // Fastest storage
.checkpointInterval(Seconds(60)) // Less frequent checkpointing
.metricsLevel(MetricsLevel.NONE) // Disable metrics overhead
.build()val reliableStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("critical-data-stream")
.checkpointAppName("reliable-consumer")
.storageLevel(StorageLevel.MEMORY_AND_DISK_2) // Fault tolerant storage
.checkpointInterval(Seconds(10)) // Frequent checkpointing
.metricsLevel(MetricsLevel.DETAILED) // Full monitoring
.build()val costOptimizedStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("batch-processing-stream")
.checkpointAppName("cost-optimized-consumer")
.storageLevel(StorageLevel.DISK_ONLY) // Cheapest storage
.checkpointInterval(Seconds(300)) // Minimize DynamoDB writes
.metricsLevel(MetricsLevel.SUMMARY) // Basic monitoring only
.build()val debugConf = new SparkConf()
.set("spark.streaming.kinesis.retry.maxAttempts", "10")
.set("spark.streaming.kinesis.retry.waitTime", "1s")
.set("spark.sql.adaptive.enabled", "false")
.set("spark.sql.adaptive.coalescePartitions.enabled", "false")maxRetries for unreliable networksMEMORY_ONLY can cause job failures if nodes fail