0
# Stream Creation
1
2
Core functionality for creating Kinesis input streams using the modern Builder pattern with comprehensive configuration options.
3
4
## Capabilities
5
6
### KinesisInputDStream Builder
7
8
Creates a fluent builder for configuring and instantiating Kinesis input streams.
9
10
```scala { .api }
11
/**
12
* Creates a new Builder instance for constructing KinesisInputDStream instances
13
* @return Builder instance for configuration
14
*/
15
object KinesisInputDStream {
16
def builder: Builder
17
}
18
```
19
20
### Builder Configuration
21
22
Fluent builder class providing comprehensive configuration options for Kinesis streams.
23
24
```scala { .api }
25
class Builder {
26
/** Sets the StreamingContext (required) */
27
def streamingContext(ssc: StreamingContext): Builder
28
29
/** Sets the Java StreamingContext (required) */
30
def streamingContext(jssc: JavaStreamingContext): Builder
31
32
/** Sets the Kinesis stream name (required) */
33
def streamName(streamName: String): Builder
34
35
/** Sets the KCL application name for checkpointing (required) */
36
def checkpointAppName(appName: String): Builder
37
38
/** Sets the AWS Kinesis endpoint URL (optional, defaults to us-east-1) */
39
def endpointUrl(url: String): Builder
40
41
/** Sets the AWS region name (optional, defaults to us-east-1) */
42
def regionName(regionName: String): Builder
43
44
/** Sets the initial stream position (optional, defaults to Latest) */
45
def initialPosition(initialPosition: KinesisInitialPosition): Builder
46
47
/** Sets the initial stream position using legacy enum (deprecated since 2.3.0) */
48
@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
49
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder
50
51
/** Sets the checkpoint interval (optional, defaults to batch duration) */
52
def checkpointInterval(interval: Duration): Builder
53
54
/** Sets the storage level for received data (optional, defaults to MEMORY_AND_DISK_2) */
55
def storageLevel(storageLevel: StorageLevel): Builder
56
57
/** Sets AWS credentials for Kinesis access (optional, defaults to DefaultCredentials) */
58
def kinesisCredentials(credentials: SparkAWSCredentials): Builder
59
60
/** Sets AWS credentials for DynamoDB access (optional, uses Kinesis credentials if not set) */
61
def dynamoDBCredentials(credentials: SparkAWSCredentials): Builder
62
63
/** Sets AWS credentials for CloudWatch access (optional, uses Kinesis credentials if not set) */
64
def cloudWatchCredentials(credentials: SparkAWSCredentials): Builder
65
}
66
```
67
68
### Stream Building
69
70
Methods to create the final KinesisInputDStream instances with configured parameters.
71
72
```scala { .api }
73
class Builder {
74
/**
75
* Builds KinesisInputDStream with default message handler returning Array[Byte]
76
* @return Configured KinesisInputDStream instance
77
*/
78
def build(): KinesisInputDStream[Array[Byte]]
79
80
/**
81
* Builds KinesisInputDStream with custom message handler
82
* @param handler Function to transform Kinesis Record to desired type
83
* @return Configured KinesisInputDStream instance with custom type
84
*/
85
def buildWithMessageHandler[T: ClassTag](handler: Record => T): KinesisInputDStream[T]
86
}
87
```
88
89
**Usage Examples:**
90
91
```scala
92
import org.apache.spark.streaming.StreamingContext
93
import org.apache.spark.streaming.kinesis.KinesisInputDStream
94
import org.apache.spark.streaming.kinesis.KinesisInitialPositions._
95
import org.apache.spark.streaming.Seconds
96
import org.apache.spark.storage.StorageLevel
97
import com.amazonaws.services.kinesis.model.Record
98
99
// Basic stream creation with default byte array handler
100
val basicStream = KinesisInputDStream.builder
101
.streamingContext(ssc)
102
.streamName("my-kinesis-stream")
103
.checkpointAppName("my-spark-app")
104
.build()
105
106
// Advanced stream creation with custom configuration
107
val advancedStream = KinesisInputDStream.builder
108
.streamingContext(ssc)
109
.streamName("my-kinesis-stream")
110
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
111
.regionName("us-west-2")
112
.initialPosition(new TrimHorizon())
113
.checkpointAppName("my-spark-app")
114
.checkpointInterval(Seconds(30))
115
.storageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
116
.build()
117
118
// Stream with custom message handler for JSON processing
119
case class MyEvent(id: String, timestamp: Long, data: String)
120
121
val jsonStream = KinesisInputDStream.builder
122
.streamingContext(ssc)
123
.streamName("json-events-stream")
124
.checkpointAppName("json-processor")
125
.buildWithMessageHandler { record: Record =>
126
val json = new String(record.getData().array(), "UTF-8")
127
// Parse JSON to case class (assuming JSON parsing library)
128
parseJson[MyEvent](json)
129
}
130
```
131
132
### Default Values
133
134
The builder uses these default values for optional parameters:
135
136
```scala { .api }
137
object KinesisInputDStream {
138
private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String = "https://kinesis.us-east-1.amazonaws.com"
139
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
140
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
141
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
142
}
143
```
144
145
### Message Handler Function
146
147
Custom message handlers transform Kinesis Record objects into desired types:
148
149
```scala { .api }
150
/**
151
* Default message handler that extracts byte array from Kinesis Record
152
* @param record Kinesis Record containing data and metadata
153
* @return Byte array of record data
154
*/
155
def defaultMessageHandler(record: Record): Array[Byte]
156
```
157
158
**Custom Handler Examples:**
159
160
```scala
161
import com.amazonaws.services.kinesis.model.Record
162
import java.nio.charset.StandardCharsets
163
164
// String handler
165
val stringHandler: Record => String = { record =>
166
new String(record.getData().array(), StandardCharsets.UTF_8)
167
}
168
169
// Handler with metadata extraction
170
case class KinesisEvent(data: String, partitionKey: String, sequenceNumber: String)
171
172
val eventHandler: Record => KinesisEvent = { record =>
173
KinesisEvent(
174
data = new String(record.getData().array(), StandardCharsets.UTF_8),
175
partitionKey = record.getPartitionKey(),
176
sequenceNumber = record.getSequenceNumber()
177
)
178
}
179
180
// Error-handling wrapper
181
val safeHandler: Record => Option[String] = { record =>
182
try {
183
Some(new String(record.getData().array(), StandardCharsets.UTF_8))
184
} catch {
185
case _: Exception => None
186
}
187
}
188
```
189
190
### Required Parameters
191
192
These parameters must be provided or the builder will throw `IllegalArgumentException`:
193
194
- `streamingContext` - Either Scala or Java StreamingContext
195
- `streamName` - Name of the Kinesis stream to consume
196
- `checkpointAppName` - Application name for KCL checkpointing
197
198
### Parameter Validation
199
200
The builder validates parameters during construction:
201
202
- Region names are validated against AWS region list
203
- Endpoint URLs must be valid HTTP/HTTPS URLs
204
- Checkpoint intervals must be positive durations
205
- Storage levels must be valid Spark storage level constants