The KinesisInputDStream.Builder provides a fluent API for creating and configuring Kinesis input streams with comprehensive options for performance, reliability, and monitoring.
object KinesisInputDStream {
def builder: Builder
}
class Builder {
// Required parameters
def streamingContext(ssc: StreamingContext): Builder
def streamingContext(jssc: JavaStreamingContext): Builder
def streamName(streamName: String): Builder
def checkpointAppName(appName: String): Builder
// Optional configuration
def endpointUrl(url: String): Builder
def regionName(regionName: String): Builder
def initialPosition(initialPosition: KinesisInitialPosition): Builder
def checkpointInterval(interval: Duration): Builder
def storageLevel(storageLevel: StorageLevel): Builder
// Credentials configuration
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
// Metrics configuration
def metricsLevel(metricsLevel: MetricsLevel): Builder
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder
// Build methods
def build(): KinesisInputDStream[Array[Byte]]
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}Sets the Spark StreamingContext that will be used to construct the Kinesis DStream.
val builder = KinesisInputDStream.builder
.streamingContext(ssc)The name of the Kinesis stream to read from.
val builder = KinesisInputDStream.builder
.streamName("my-kinesis-stream")The KCL application name used for checkpointing state to DynamoDB and CloudWatch metrics.
val builder = KinesisInputDStream.builder
.checkpointAppName("my-spark-kinesis-app")Configure the AWS region and Kinesis endpoint URL.
val builder = KinesisInputDStream.builder
.regionName("us-west-2")
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")Defaults:
Configure how received data blocks are stored in memory/disk.
import org.apache.spark.storage.StorageLevel
val builder = KinesisInputDStream.builder
.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)Default: StorageLevel.MEMORY_AND_DISK_2
How often the KCL application state is checkpointed to DynamoDB.
import org.apache.spark.streaming.Seconds
val builder = KinesisInputDStream.builder
.checkpointInterval(Seconds(30))Default: Uses the Spark Streaming batch interval
Configure CloudWatch metrics detail level.
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
val builder = KinesisInputDStream.builder
.metricsLevel(MetricsLevel.SUMMARY)Options:
MetricsLevel.DETAILED - All available metrics (default)MetricsLevel.SUMMARY - Summary metrics onlyMetricsLevel.NONE - No metricsSpecify which CloudWatch metrics dimensions should be enabled.
val builder = KinesisInputDStream.builder
.metricsEnabledDimensions(Set("Operation", "ShardId"))Creates a stream of byte arrays using the default message handler.
val stream: KinesisInputDStream[Array[Byte]] = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.build()Process Kinesis records with a custom message handler function.
import com.amazonaws.services.kinesis.model.Record
// Custom handler to extract string data
val customHandler = (record: Record) => {
val data = new Array[Byte](record.getData.remaining())
record.getData.get(data)
new String(data, "UTF-8")
}
val stream: KinesisInputDStream[String] = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.buildWithMessageHandler(customHandler)import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Duration.seconds(2));
KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()
.streamingContext(jssc)
.streamName("my-kinesis-stream")
.checkpointAppName("my-app")
.regionName("us-west-2")
.build();import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, SparkAWSCredentials}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
val credentials = SparkAWSCredentials.builder
.basicCredentials("access-key", "secret-key")
.build()
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("production-events")
.checkpointAppName("event-processor")
.regionName("us-west-2")
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
.initialPosition(new Latest())
.checkpointInterval(Seconds(30))
.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
.kinesisCredentials(credentials)
.metricsLevel(MetricsLevel.SUMMARY)
.metricsEnabledDimensions(Set("Operation", "ShardId"))
.buildWithMessageHandler { record =>
val data = new Array[Byte](record.getData.remaining())
record.getData.get(data)
new String(data, "UTF-8")
}