0
# Stream Creation
1
2
The core functionality for creating Kinesis DStreams using the builder pattern. This provides a fluent API for configuring all aspects of Kinesis stream consumption including required parameters, optional configurations, and message handling.
3
4
## Core API
5
6
### KinesisInputDStream.Builder
7
8
The builder provides a fluent interface for configuring Kinesis streams with comprehensive parameter validation and sensible defaults.
9
10
```scala { .api }
11
object KinesisInputDStream {
12
def builder: KinesisInputDStream.Builder
13
14
// Default message handler that extracts byte arrays from Kinesis records
15
private[kinesis] def defaultMessageHandler(record: Record): Array[Byte]
16
}
17
18
class Builder {
19
// Required configuration methods
20
def streamingContext(ssc: StreamingContext): Builder
21
def streamingContext(jssc: JavaStreamingContext): Builder
22
def streamName(streamName: String): Builder
23
def checkpointAppName(appName: String): Builder
24
25
// Optional configuration methods
26
def endpointUrl(url: String): Builder
27
def regionName(regionName: String): Builder
28
def initialPosition(initialPosition: KinesisInitialPosition): Builder
29
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder // Deprecated in 2.3.0
30
def checkpointInterval(interval: Duration): Builder
31
def storageLevel(storageLevel: StorageLevel): Builder
32
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
33
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
34
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
35
def metricsLevel(metricsLevel: MetricsLevel): Builder
36
def metricsEnabledDimensions(dimensions: Set[String]): Builder
37
38
// Build methods
39
def build(): KinesisInputDStream[Array[Byte]]
40
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
41
}
42
```
43
44
## Required Parameters
45
46
### StreamingContext
47
48
The Spark StreamingContext that will manage the DStream lifecycle.
49
50
```scala
51
val builder = KinesisInputDStream.builder
52
.streamingContext(ssc) // Required
53
```
54
55
For Java API:
56
57
```java
58
import org.apache.spark.streaming.api.java.JavaStreamingContext;
59
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
60
61
KinesisInputDStream.Builder builder = KinesisInputDStream.builder()
62
.streamingContext(jssc); // JavaStreamingContext
63
```
64
65
### Stream Name
66
67
The name of the Kinesis stream to consume from.
68
69
```scala
70
val builder = KinesisInputDStream.builder
71
.streamName("my-kinesis-stream") // Required
72
```
73
74
### Checkpoint Application Name
75
76
The KCL application name used for DynamoDB checkpointing. This must be unique per stream and consumer application.
77
78
```scala
79
val builder = KinesisInputDStream.builder
80
.checkpointAppName("my-unique-app-name") // Required
81
```
82
83
## Optional Configuration
84
85
### AWS Region and Endpoint
86
87
Configure the AWS region and Kinesis endpoint URL. Defaults to `us-east-1` and the standard Kinesis endpoint.
88
89
```scala
90
val builder = KinesisInputDStream.builder
91
.regionName("us-west-2")
92
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
93
```
94
95
### Initial Position
96
97
Specify where to start reading from the stream. See [Initial Position](./initial-position.md) for details.
98
99
```scala
100
val builder = KinesisInputDStream.builder
101
.initialPosition(new KinesisInitialPositions.Latest())
102
```
103
104
### Checkpoint Interval
105
106
How frequently to checkpoint progress to DynamoDB. Defaults to the streaming batch duration.
107
108
```scala
109
import org.apache.spark.streaming.Seconds
110
111
val builder = KinesisInputDStream.builder
112
.checkpointInterval(Seconds(30))
113
```
114
115
### Storage Level
116
117
Storage level for cached blocks. Defaults to `MEMORY_AND_DISK_2`.
118
119
```scala
120
import org.apache.spark.storage.StorageLevel
121
122
val builder = KinesisInputDStream.builder
123
.storageLevel(StorageLevel.MEMORY_ONLY_2)
124
```
125
126
### AWS Credentials
127
128
Configure authentication for Kinesis, DynamoDB, and CloudWatch. See [AWS Credentials](./aws-credentials.md) for details.
129
130
```scala
131
val credentials = SparkAWSCredentials.builder
132
.basicCredentials("access-key", "secret-key")
133
.build()
134
135
val builder = KinesisInputDStream.builder
136
.kinesisCredentials(credentials)
137
.dynamoDBCredentials(credentials) // Optional, defaults to Kinesis credentials
138
.cloudWatchCredentials(credentials) // Optional, defaults to Kinesis credentials
139
```
140
141
### CloudWatch Metrics
142
143
Configure CloudWatch metrics collection level and dimensions.
144
145
```scala
146
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
147
148
val builder = KinesisInputDStream.builder
149
.metricsLevel(MetricsLevel.SUMMARY)
150
.metricsEnabledDimensions(Set("Operation", "ShardId"))
151
```
152
153
## Building the DStream
154
155
### Default Message Handler
156
157
Creates a DStream of byte arrays using the default message handler:
158
159
```scala
160
val kinesisStream: KinesisInputDStream[Array[Byte]] = builder.build()
161
```
162
163
### Custom Message Handler
164
165
Creates a DStream with a custom message transformation function:
166
167
```scala
168
import com.amazonaws.services.kinesis.model.Record
169
170
// Custom handler that extracts JSON strings
171
val jsonHandler: Record => String = record => {
172
val bytes = new Array[Byte](record.getData().remaining())
173
record.getData().get(bytes)
174
new String(bytes, "UTF-8")
175
}
176
177
val kinesisStream: KinesisInputDStream[String] = builder
178
.buildWithMessageHandler(jsonHandler)
179
```
180
181
## Complete Example
182
183
```scala
184
import org.apache.spark.streaming.StreamingContext
185
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions, SparkAWSCredentials}
186
import org.apache.spark.streaming.Seconds
187
import org.apache.spark.storage.StorageLevel
188
import com.amazonaws.services.kinesis.model.Record
189
190
val ssc = new StreamingContext(sparkConf, Seconds(10))
191
192
// Configure credentials
193
val credentials = SparkAWSCredentials.builder
194
.stsCredentials("arn:aws:iam::123456789012:role/KinesisRole", "MySession")
195
.build()
196
197
// Create the stream
198
val kinesisStream = KinesisInputDStream.builder
199
.streamingContext(ssc)
200
.streamName("my-data-stream")
201
.checkpointAppName("spark-kinesis-consumer")
202
.regionName("us-west-2")
203
.initialPosition(new KinesisInitialPositions.TrimHorizon())
204
.checkpointInterval(Seconds(30))
205
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
206
.kinesisCredentials(credentials)
207
.buildWithMessageHandler { record: Record =>
208
val data = new Array[Byte](record.getData().remaining())
209
record.getData().get(data)
210
new String(data, "UTF-8")
211
}
212
213
// Process the stream
214
kinesisStream.foreachRDD { rdd =>
215
rdd.collect().foreach(println)
216
}
217
218
ssc.start()
219
ssc.awaitTermination()
220
```
221
222
## Java API Example
223
224
Complete example using the Java API:
225
226
```java
227
import org.apache.spark.SparkConf;
228
import org.apache.spark.api.java.function.Function;
229
import org.apache.spark.storage.StorageLevel;
230
import org.apache.spark.streaming.Duration;
231
import org.apache.spark.streaming.Durations;
232
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
233
import org.apache.spark.streaming.api.java.JavaStreamingContext;
234
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
235
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
236
import org.apache.spark.streaming.kinesis.SparkAWSCredentials;
237
238
public class JavaKinesisExample {
239
public static void main(String[] args) throws InterruptedException {
240
SparkConf conf = new SparkConf().setAppName("JavaKinesisExample");
241
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
242
243
// Configure credentials
244
SparkAWSCredentials credentials = SparkAWSCredentials.builder()
245
.basicCredentials("access-key", "secret-key")
246
.build();
247
248
// Create the stream
249
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
250
.streamingContext(jssc)
251
.streamName("my-data-stream")
252
.checkpointAppName("java-kinesis-consumer")
253
.regionName("us-west-2")
254
.initialPosition(new KinesisInitialPositions.Latest())
255
.checkpointInterval(Durations.seconds(30))
256
.storageLevel(StorageLevel.MEMORY_AND_DISK_2())
257
.kinesisCredentials(credentials)
258
.build();
259
260
// Process the stream
261
kinesisStream.foreachRDD(rdd -> {
262
rdd.foreach(bytes -> {
263
String data = new String(bytes);
264
System.out.println("Received: " + data);
265
});
266
return null;
267
});
268
269
jssc.start();
270
jssc.awaitTermination();
271
}
272
}
273
```
274
275
## Error Handling
276
277
The builder validates required parameters and throws `IllegalArgumentException` for missing required values:
278
279
```scala
280
// This will throw IllegalArgumentException at build time
281
val invalidStream = KinesisInputDStream.builder
282
.streamingContext(ssc)
283
// Missing streamName and checkpointAppName
284
.build() // Throws exception
285
```
286
287
Common validation errors:
288
- Missing required parameters (streamingContext, streamName, checkpointAppName)
289
- Invalid AWS credentials configuration
290
- Invalid checkpoint interval or storage level settings
291
292
## Default Values
293
294
The builder provides sensible defaults for optional parameters:
295
296
- **endpointUrl**: `"https://kinesis.us-east-1.amazonaws.com"`
297
- **regionName**: `"us-east-1"`
298
- **initialPosition**: `new KinesisInitialPositions.Latest()`
299
- **checkpointInterval**: Streaming batch duration (`ssc.graph.batchDuration`)
300
- **storageLevel**: `StorageLevel.MEMORY_AND_DISK_2`
301
- **kinesisCredentials**: `DefaultCredentials` (AWS default provider chain)
302
- **dynamoDBCredentials**: Uses same as `kinesisCredentials` if not specified
303
- **cloudWatchCredentials**: Uses same as `kinesisCredentials` if not specified
304
- **metricsLevel**: `KinesisClientLibConfiguration.DEFAULT_METRICS_LEVEL`
305
- **metricsEnabledDimensions**: `KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS`