0
# Stream Creation and Configuration
1
2
The KinesisInputDStream.Builder provides a fluent API for creating and configuring Kinesis input streams with comprehensive options for performance, reliability, and monitoring.
3
4
## Builder Pattern API
5
6
```scala { .api }
7
object KinesisInputDStream {
8
def builder: Builder
9
}
10
11
class Builder {
12
// Required parameters
13
def streamingContext(ssc: StreamingContext): Builder
14
def streamingContext(jssc: JavaStreamingContext): Builder
15
def streamName(streamName: String): Builder
16
def checkpointAppName(appName: String): Builder
17
18
// Optional configuration
19
def endpointUrl(url: String): Builder
20
def regionName(regionName: String): Builder
21
def initialPosition(initialPosition: KinesisInitialPosition): Builder
22
def checkpointInterval(interval: Duration): Builder
23
def storageLevel(storageLevel: StorageLevel): Builder
24
25
// Credentials configuration
26
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
27
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
28
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
29
30
// Metrics configuration
31
def metricsLevel(metricsLevel: MetricsLevel): Builder
32
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder
33
34
// Build methods
35
def build(): KinesisInputDStream[Array[Byte]]
36
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
37
}
38
```
39
40
## Required Parameters
41
42
### StreamingContext
43
Sets the Spark StreamingContext that will be used to construct the Kinesis DStream.
44
45
```scala
46
val builder = KinesisInputDStream.builder
47
.streamingContext(ssc)
48
```
49
50
### Stream Name
51
The name of the Kinesis stream to read from.
52
53
```scala
54
val builder = KinesisInputDStream.builder
55
.streamName("my-kinesis-stream")
56
```
57
58
### Checkpoint Application Name
59
The KCL application name used for checkpointing state to DynamoDB and CloudWatch metrics.
60
61
```scala
62
val builder = KinesisInputDStream.builder
63
.checkpointAppName("my-spark-kinesis-app")
64
```
65
66
## Optional Configuration
67
68
### AWS Region and Endpoint
69
Configure the AWS region and Kinesis endpoint URL.
70
71
```scala
72
val builder = KinesisInputDStream.builder
73
.regionName("us-west-2")
74
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
75
```
76
77
**Defaults:**
78
- Region: "us-east-1"
79
- Endpoint: "https://kinesis.us-east-1.amazonaws.com"
80
81
### Storage Level
82
Configure how received data blocks are stored in memory/disk.
83
84
```scala
85
import org.apache.spark.storage.StorageLevel
86
87
val builder = KinesisInputDStream.builder
88
.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
89
```
90
91
**Default:** `StorageLevel.MEMORY_AND_DISK_2`
92
93
### Checkpoint Interval
94
How often the KCL application state is checkpointed to DynamoDB.
95
96
```scala
97
import org.apache.spark.streaming.Seconds
98
99
val builder = KinesisInputDStream.builder
100
.checkpointInterval(Seconds(30))
101
```
102
103
**Default:** Uses the Spark Streaming batch interval
104
105
## Metrics Configuration
106
107
### Metrics Level
108
Configure CloudWatch metrics detail level.
109
110
```scala
111
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
112
113
val builder = KinesisInputDStream.builder
114
.metricsLevel(MetricsLevel.SUMMARY)
115
```
116
117
**Options:**
118
- `MetricsLevel.DETAILED` - All available metrics (default)
119
- `MetricsLevel.SUMMARY` - Summary metrics only
120
- `MetricsLevel.NONE` - No metrics
121
122
### Metrics Dimensions
123
Specify which CloudWatch metrics dimensions should be enabled.
124
125
```scala
126
val builder = KinesisInputDStream.builder
127
.metricsEnabledDimensions(Set("Operation", "ShardId"))
128
```
129
130
## Message Handling
131
132
### Default Handler
133
Creates a stream of byte arrays using the default message handler.
134
135
```scala
136
val stream: KinesisInputDStream[Array[Byte]] = KinesisInputDStream.builder
137
.streamingContext(ssc)
138
.streamName("my-stream")
139
.checkpointAppName("my-app")
140
.build()
141
```
142
143
### Custom Handler
144
Process Kinesis records with a custom message handler function.
145
146
```scala
147
import com.amazonaws.services.kinesis.model.Record
148
149
// Custom handler to extract string data
150
val customHandler = (record: Record) => {
151
val data = new Array[Byte](record.getData.remaining())
152
record.getData.get(data)
153
new String(data, "UTF-8")
154
}
155
156
val stream: KinesisInputDStream[String] = KinesisInputDStream.builder
157
.streamingContext(ssc)
158
.streamName("my-stream")
159
.checkpointAppName("my-app")
160
.buildWithMessageHandler(customHandler)
161
```
162
163
## Java API Usage
164
165
```java
166
import org.apache.spark.streaming.api.java.JavaStreamingContext;
167
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
168
169
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Duration.seconds(2));
170
171
KinesisInputDStream<byte[]> stream = KinesisInputDStream.builder()
172
.streamingContext(jssc)
173
.streamName("my-kinesis-stream")
174
.checkpointAppName("my-app")
175
.regionName("us-west-2")
176
.build();
177
```
178
179
## Complete Configuration Example
180
181
```scala
182
import org.apache.spark.streaming.{Seconds, StreamingContext}
183
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, SparkAWSCredentials}
184
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
185
import org.apache.spark.storage.StorageLevel
186
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
187
188
val credentials = SparkAWSCredentials.builder
189
.basicCredentials("access-key", "secret-key")
190
.build()
191
192
val stream = KinesisInputDStream.builder
193
.streamingContext(ssc)
194
.streamName("production-events")
195
.checkpointAppName("event-processor")
196
.regionName("us-west-2")
197
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
198
.initialPosition(new Latest())
199
.checkpointInterval(Seconds(30))
200
.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
201
.kinesisCredentials(credentials)
202
.metricsLevel(MetricsLevel.SUMMARY)
203
.metricsEnabledDimensions(Set("Operation", "ShardId"))
204
.buildWithMessageHandler { record =>
205
val data = new Array[Byte](record.getData.remaining())
206
record.getData.get(data)
207
new String(data, "UTF-8")
208
}
209
```