0
# Stream Creation
1
2
Primary factory methods for creating Kinesis input streams with various configuration options including custom message handlers, explicit AWS credentials, and different storage levels.
3
4
## Capabilities
5
6
### Basic Stream Creation (Scala)
7
8
Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.
9
10
```scala { .api }
11
/**
12
* Create an input stream that pulls messages from a Kinesis stream using the KCL.
13
* Uses DefaultAWSCredentialsProviderChain for AWS authentication.
14
*
15
* @param ssc StreamingContext object
16
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
17
* @param streamName Kinesis stream name
18
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
19
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
20
* @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)
21
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
22
* @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)
23
* @return ReceiverInputDStream[Array[Byte]] containing raw message data
24
*/
25
def createStream(
26
ssc: StreamingContext,
27
kinesisAppName: String,
28
streamName: String,
29
endpointUrl: String,
30
regionName: String,
31
initialPositionInStream: InitialPositionInStream,
32
checkpointInterval: Duration,
33
storageLevel: StorageLevel
34
): ReceiverInputDStream[Array[Byte]]
35
```
36
37
**Usage Example:**
38
39
```scala
40
import org.apache.spark.streaming.kinesis._
41
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
42
import org.apache.spark.storage.StorageLevel
43
import org.apache.spark.streaming.Duration
44
45
val stream = KinesisUtils.createStream(
46
ssc,
47
"MySparkKinesisApp",
48
"my-kinesis-stream",
49
"https://kinesis.us-east-1.amazonaws.com",
50
"us-east-1",
51
InitialPositionInStream.LATEST,
52
Duration.milliseconds(2000),
53
StorageLevel.MEMORY_AND_DISK_2
54
)
55
```
56
57
### Stream Creation with Explicit Credentials (Scala)
58
59
Creates a Kinesis input stream with explicitly provided AWS credentials.
60
61
```scala { .api }
62
/**
63
* Create an input stream with explicit AWS credentials.
64
* Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.
65
*
66
* @param ssc StreamingContext object
67
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
68
* @param streamName Kinesis stream name
69
* @param endpointUrl Url of Kinesis service
70
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
71
* @param initialPositionInStream Starting position in stream
72
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
73
* @param storageLevel Storage level for received objects
74
* @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)
75
* @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)
76
* @return ReceiverInputDStream[Array[Byte]] containing raw message data
77
*/
78
def createStream(
79
ssc: StreamingContext,
80
kinesisAppName: String,
81
streamName: String,
82
endpointUrl: String,
83
regionName: String,
84
initialPositionInStream: InitialPositionInStream,
85
checkpointInterval: Duration,
86
storageLevel: StorageLevel,
87
awsAccessKeyId: String,
88
awsSecretKey: String
89
): ReceiverInputDStream[Array[Byte]]
90
```
91
92
**Usage Example:**
93
94
```scala
95
val stream = KinesisUtils.createStream(
96
ssc,
97
"MySparkKinesisApp",
98
"my-kinesis-stream",
99
"https://kinesis.us-east-1.amazonaws.com",
100
"us-east-1",
101
InitialPositionInStream.LATEST,
102
Duration.milliseconds(2000),
103
StorageLevel.MEMORY_AND_DISK_2,
104
"AKIAIOSFODNN7EXAMPLE",
105
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
106
)
107
```
108
109
### Stream Creation with Custom Message Handler (Scala)
110
111
Creates a typed Kinesis input stream with a custom message handler function.
112
113
```scala { .api }
114
/**
115
* Create an input stream with a custom message handler for type-safe data processing.
116
*
117
* @param ssc StreamingContext object
118
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
119
* @param streamName Kinesis stream name
120
* @param endpointUrl Url of Kinesis service
121
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
122
* @param initialPositionInStream Starting position in stream
123
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
124
* @param storageLevel Storage level for received objects
125
* @param messageHandler Custom function to process Kinesis Records into type T
126
* @return ReceiverInputDStream[T] containing processed data
127
*/
128
def createStream[T: ClassTag](
129
ssc: StreamingContext,
130
kinesisAppName: String,
131
streamName: String,
132
endpointUrl: String,
133
regionName: String,
134
initialPositionInStream: InitialPositionInStream,
135
checkpointInterval: Duration,
136
storageLevel: StorageLevel,
137
messageHandler: Record => T
138
): ReceiverInputDStream[T]
139
```
140
141
**Usage Example:**
142
143
```scala
144
import com.amazonaws.services.kinesis.model.Record
145
import spray.json._
146
147
case class MyEvent(id: String, timestamp: Long, data: String)
148
149
// Custom message handler to parse JSON events
150
def parseMyEvent(record: Record): MyEvent = {
151
val data = new String(record.getData.array())
152
data.parseJson.convertTo[MyEvent]
153
}
154
155
val stream = KinesisUtils.createStream[MyEvent](
156
ssc,
157
"MySparkKinesisApp",
158
"my-events-stream",
159
"https://kinesis.us-east-1.amazonaws.com",
160
"us-east-1",
161
InitialPositionInStream.LATEST,
162
Duration.milliseconds(2000),
163
StorageLevel.MEMORY_AND_DISK_2,
164
parseMyEvent
165
)
166
```
167
168
### Stream Creation with Custom Handler and Credentials (Scala)
169
170
Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.
171
172
```scala { .api }
173
/**
174
* Create an input stream with custom message handler and explicit AWS credentials.
175
*
176
* @param ssc StreamingContext object
177
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
178
* @param streamName Kinesis stream name
179
* @param endpointUrl Url of Kinesis service
180
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
181
* @param initialPositionInStream Starting position in stream
182
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
183
* @param storageLevel Storage level for received objects
184
* @param messageHandler Custom function to process Kinesis Records into type T
185
* @param awsAccessKeyId AWS AccessKeyId
186
* @param awsSecretKey AWS SecretKey
187
* @return ReceiverInputDStream[T] containing processed data
188
*/
189
def createStream[T: ClassTag](
190
ssc: StreamingContext,
191
kinesisAppName: String,
192
streamName: String,
193
endpointUrl: String,
194
regionName: String,
195
initialPositionInStream: InitialPositionInStream,
196
checkpointInterval: Duration,
197
storageLevel: StorageLevel,
198
messageHandler: Record => T,
199
awsAccessKeyId: String,
200
awsSecretKey: String
201
): ReceiverInputDStream[T]
202
```
203
204
### Deprecated Stream Creation (Scala)
205
206
Simplified stream creation method (deprecated since version 1.4.0).
207
208
```scala { .api }
209
/**
210
* Create an input stream using app name from SparkConf and region from endpoint.
211
* @deprecated use other forms of createStream
212
*
213
* @param ssc StreamingContext object
214
* @param streamName Kinesis stream name
215
* @param endpointUrl Endpoint url of Kinesis service
216
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
217
* @param initialPositionInStream Starting position in stream
218
* @param storageLevel Storage level for received objects
219
* @return ReceiverInputDStream[Array[Byte]] containing raw message data
220
*/
221
@deprecated("use other forms of createStream", "1.4.0")
222
def createStream(
223
ssc: StreamingContext,
224
streamName: String,
225
endpointUrl: String,
226
checkpointInterval: Duration,
227
initialPositionInStream: InitialPositionInStream,
228
storageLevel: StorageLevel
229
): ReceiverInputDStream[Array[Byte]]
230
```
231
232
## Configuration Guidelines
233
234
### Application Name (kinesisAppName)
235
- Must be unique per Kinesis stream
236
- Used by KCL for DynamoDB coordination table naming
237
- Changing this requires deleting the associated DynamoDB table
238
239
### Checkpoint Interval
240
- Should typically match or be a multiple of the batch interval
241
- Shorter intervals provide better fault tolerance but increase DynamoDB usage
242
- Must be >= 1 second
243
244
### Storage Level
245
- `StorageLevel.MEMORY_AND_DISK_2` is recommended for fault tolerance
246
- Higher replication levels provide better fault tolerance
247
- Consider memory constraints when choosing storage levels
248
249
### Initial Position
250
- `InitialPositionInStream.LATEST`: Start from the most recent records
251
- `InitialPositionInStream.TRIM_HORIZON`: Start from the oldest available records (up to 24 hours)
252
- Only applies when no checkpoint exists in DynamoDB
253
254
## Error Handling
255
256
Common error scenarios and handling:
257
258
- **Invalid region names**: Throws `IllegalArgumentException`
259
- **Authentication failures**: Runtime exceptions during stream processing
260
- **Network connectivity**: Automatic retries via KCL with exponential backoff
261
- **DynamoDB access**: Requires proper IAM permissions for lease coordination
262
- **CloudWatch access**: Optional but recommended for monitoring metrics