The core functionality for creating Kinesis DStreams using the builder pattern. This provides a fluent API for configuring all aspects of Kinesis stream consumption including required parameters, optional configurations, and message handling.
The builder provides a fluent interface for configuring Kinesis streams with comprehensive parameter validation and sensible defaults.
object KinesisInputDStream {
def builder: KinesisInputDStream.Builder
// Default message handler that extracts byte arrays from Kinesis records
private[kinesis] def defaultMessageHandler(record: Record): Array[Byte]
}
class Builder {
// Required configuration methods
def streamingContext(ssc: StreamingContext): Builder
def streamingContext(jssc: JavaStreamingContext): Builder
def streamName(streamName: String): Builder
def checkpointAppName(appName: String): Builder
// Optional configuration methods
def endpointUrl(url: String): Builder
def regionName(regionName: String): Builder
def initialPosition(initialPosition: KinesisInitialPosition): Builder
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder // Deprecated in 2.3.0
def checkpointInterval(interval: Duration): Builder
def storageLevel(storageLevel: StorageLevel): Builder
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
def metricsLevel(metricsLevel: MetricsLevel): Builder
def metricsEnabledDimensions(dimensions: Set[String]): Builder
// Build methods
def build(): KinesisInputDStream[Array[Byte]]
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
}The Spark StreamingContext that will manage the DStream lifecycle.
val builder = KinesisInputDStream.builder
.streamingContext(ssc) // RequiredFor Java API:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
KinesisInputDStream.Builder builder = KinesisInputDStream.builder()
.streamingContext(jssc); // JavaStreamingContextThe name of the Kinesis stream to consume from.
val builder = KinesisInputDStream.builder
.streamName("my-kinesis-stream") // RequiredThe KCL application name used for DynamoDB checkpointing. This must be unique per stream and consumer application.
val builder = KinesisInputDStream.builder
.checkpointAppName("my-unique-app-name") // RequiredConfigure the AWS region and Kinesis endpoint URL. Defaults to us-east-1 and the standard Kinesis endpoint.
val builder = KinesisInputDStream.builder
.regionName("us-west-2")
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")Specify where to start reading from the stream. See Initial Position for details.
val builder = KinesisInputDStream.builder
.initialPosition(new KinesisInitialPositions.Latest())How frequently to checkpoint progress to DynamoDB. Defaults to the streaming batch duration.
import org.apache.spark.streaming.Seconds
val builder = KinesisInputDStream.builder
.checkpointInterval(Seconds(30))Storage level for cached blocks. Defaults to MEMORY_AND_DISK_2.
import org.apache.spark.storage.StorageLevel
val builder = KinesisInputDStream.builder
.storageLevel(StorageLevel.MEMORY_ONLY_2)Configure authentication for Kinesis, DynamoDB, and CloudWatch. See AWS Credentials for details.
val credentials = SparkAWSCredentials.builder
.basicCredentials("access-key", "secret-key")
.build()
val builder = KinesisInputDStream.builder
.kinesisCredentials(credentials)
.dynamoDBCredentials(credentials) // Optional, defaults to Kinesis credentials
.cloudWatchCredentials(credentials) // Optional, defaults to Kinesis credentialsConfigure CloudWatch metrics collection level and dimensions.
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
val builder = KinesisInputDStream.builder
.metricsLevel(MetricsLevel.SUMMARY)
.metricsEnabledDimensions(Set("Operation", "ShardId"))Creates a DStream of byte arrays using the default message handler:
val kinesisStream: KinesisInputDStream[Array[Byte]] = builder.build()Creates a DStream with a custom message transformation function:
import com.amazonaws.services.kinesis.model.Record
// Custom handler that extracts JSON strings
val jsonHandler: Record => String = record => {
val bytes = new Array[Byte](record.getData().remaining())
record.getData().get(bytes)
new String(bytes, "UTF-8")
}
val kinesisStream: KinesisInputDStream[String] = builder
.buildWithMessageHandler(jsonHandler)import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.model.Record
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Configure credentials
val credentials = SparkAWSCredentials.builder
.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "MySession")
.build()
// Create the stream
val kinesisStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-data-stream")
.checkpointAppName("spark-kinesis-consumer")
.regionName("us-west-2")
.initialPosition(new KinesisInitialPositions.TrimHorizon())
.checkpointInterval(Seconds(30))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.kinesisCredentials(credentials)
.buildWithMessageHandler { record: Record =>
val data = new Array[Byte](record.getData().remaining())
record.getData().get(data)
new String(data, "UTF-8")
}
// Process the stream
kinesisStream.foreachRDD { rdd =>
rdd.collect().foreach(println)
}
ssc.start()
ssc.awaitTermination()Complete example using the Java API:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
public class JavaKinesisExample {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// Configure credentials
SparkAWSCredentials credentials = SparkAWSCredentials.builder()
.basicCredentials("access-key", "secret-key")
.build();
// Create the stream
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(jssc)
.streamName("my-data-stream")
.checkpointAppName("java-kinesis-consumer")
.regionName("us-west-2")
.initialPosition(new KinesisInitialPositions.Latest())
.checkpointInterval(Durations.seconds(30))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2())
.kinesisCredentials(credentials)
.build();
// Process the stream
kinesisStream.foreachRDD(rdd -> {
rdd.foreach(bytes -> {
String data = new String(bytes);
System.out.println("Received: " + data);
});
return null;
});
jssc.start();
jssc.awaitTermination();
}
}The builder validates required parameters and throws IllegalArgumentException for missing required values:
// This will throw IllegalArgumentException at build time
val invalidStream = KinesisInputDStream.builder
.streamingContext(ssc)
// Missing streamName and checkpointAppName
.build() // Throws exceptionCommon validation errors:
The builder provides sensible defaults for optional parameters:
"https://kinesis.us-east-1.amazonaws.com""us-east-1"new KinesisInitialPositions.Latest()ssc.graph.batchDuration)StorageLevel.MEMORY_AND_DISK_2DefaultCredentials (AWS default provider chain)kinesisCredentials if not specifiedkinesisCredentials if not specifiedKinesisClientLibConfiguration.DEFAULT_METRICS_LEVELKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS