0
# Stream Creation
1
2
The primary capability for creating Kinesis input streams in Spark Streaming applications. KinesisUtils provides multiple overloaded methods to accommodate different use cases and type requirements.
3
4
## Scala API
5
6
### Generic Stream Creation with Custom Message Handler
7
8
Create a stream that transforms Kinesis Records to a custom type using a message handler function.
9
10
```scala { .api }
11
def createStream[T: ClassTag](
12
ssc: StreamingContext,
13
kinesisAppName: String,
14
streamName: String,
15
endpointUrl: String,
16
regionName: String,
17
initialPositionInStream: InitialPositionInStream,
18
checkpointInterval: Duration,
19
storageLevel: StorageLevel,
20
messageHandler: Record => T
21
): ReceiverInputDStream[T]
22
```
23
24
**Parameters:**
25
- `ssc` - StreamingContext object
26
- `kinesisAppName` - Kinesis application name used by KCL for DynamoDB coordination
27
- `streamName` - Kinesis stream name
28
- `endpointUrl` - Kinesis service URL (e.g., "https://kinesis.us-east-1.amazonaws.com")
29
- `regionName` - AWS region name for DynamoDB and CloudWatch
30
- `initialPositionInStream` - Starting position: TRIM_HORIZON or LATEST
31
- `checkpointInterval` - Checkpoint frequency for fault tolerance
32
- `storageLevel` - Storage level for received objects (recommended: MEMORY_AND_DISK_2)
33
- `messageHandler` - Function to transform Record to type T
34
35
**Usage Example:**
36
37
```scala
38
import com.amazonaws.services.kinesis.model.Record
39
import org.json4s._
40
import org.json4s.jackson.JsonMethods._
41
42
// Custom message handler for JSON data
43
def jsonMessageHandler(record: Record): JValue = {
44
val data = new String(record.getData.array())
45
parse(data)
46
}
47
48
val jsonStream = KinesisUtils.createStream[JValue](
49
ssc,
50
"json-processor-app",
51
"json-events",
52
"https://kinesis.us-west-2.amazonaws.com",
53
"us-west-2",
54
InitialPositionInStream.LATEST,
55
Seconds(30),
56
StorageLevel.MEMORY_AND_DISK_2,
57
jsonMessageHandler
58
)
59
```
60
61
### Default Byte Array Stream Creation
62
63
Create a stream that returns raw byte arrays using the default message handler.
64
65
```scala { .api }
66
def createStream(
67
ssc: StreamingContext,
68
kinesisAppName: String,
69
streamName: String,
70
endpointUrl: String,
71
regionName: String,
72
initialPositionInStream: InitialPositionInStream,
73
checkpointInterval: Duration,
74
storageLevel: StorageLevel
75
): ReceiverInputDStream[Array[Byte]]
76
```
77
78
**Usage Example:**
79
80
```scala
81
val byteStream = KinesisUtils.createStream(
82
ssc,
83
"data-processor",
84
"raw-data-stream",
85
"https://kinesis.eu-west-1.amazonaws.com",
86
"eu-west-1",
87
InitialPositionInStream.TRIM_HORIZON,
88
Seconds(60),
89
StorageLevel.MEMORY_AND_DISK_2
90
)
91
92
// Convert bytes to strings
93
val stringStream = byteStream.map(new String(_))
94
```
95
96
### Deprecated Stream Creation (Legacy)
97
98
```scala { .api }
99
@deprecated("use other forms of createStream", "1.4.0")
100
def createStream(
101
ssc: StreamingContext,
102
streamName: String,
103
endpointUrl: String,
104
checkpointInterval: Duration,
105
initialPositionInStream: InitialPositionInStream,
106
storageLevel: StorageLevel
107
): ReceiverInputDStream[Array[Byte]]
108
```
109
110
This method uses the SparkConf app name as the Kinesis application name and extracts the region from the endpoint URL.
111
112
## Java API
113
114
### Generic Stream Creation with Function Interface
115
116
```java { .api }
117
public static <T> JavaReceiverInputDStream<T> createStream(
118
JavaStreamingContext jssc,
119
String kinesisAppName,
120
String streamName,
121
String endpointUrl,
122
String regionName,
123
InitialPositionInStream initialPositionInStream,
124
Duration checkpointInterval,
125
StorageLevel storageLevel,
126
Function<Record, T> messageHandler,
127
Class<T> recordClass
128
);
129
```
130
131
**Usage Example:**
132
133
```java
134
import org.apache.spark.api.java.function.Function;
135
import com.amazonaws.services.kinesis.model.Record;
136
137
// Define message handler
138
Function<Record, String> messageHandler = new Function<Record, String>() {
139
@Override
140
public String call(Record record) throws Exception {
141
return new String(record.getData().array());
142
}
143
};
144
145
// Create stream
146
JavaReceiverInputDStream<String> stringStream = KinesisUtils.createStream(
147
jssc,
148
"java-kinesis-app",
149
"text-stream",
150
"https://kinesis.us-east-1.amazonaws.com",
151
"us-east-1",
152
InitialPositionInStream.LATEST,
153
Durations.seconds(30),
154
StorageLevel.MEMORY_AND_DISK_2(),
155
messageHandler,
156
String.class
157
);
158
```
159
160
### Default Byte Array Stream Creation (Java)
161
162
```java { .api }
163
public static JavaReceiverInputDStream<byte[]> createStream(
164
JavaStreamingContext jssc,
165
String kinesisAppName,
166
String streamName,
167
String endpointUrl,
168
String regionName,
169
InitialPositionInStream initialPositionInStream,
170
Duration checkpointInterval,
171
StorageLevel storageLevel
172
);
173
```
174
175
**Usage Example:**
176
177
```java
178
JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(
179
jssc,
180
"java-byte-processor",
181
"binary-data-stream",
182
"https://kinesis.ap-southeast-1.amazonaws.com",
183
"ap-southeast-1",
184
InitialPositionInStream.TRIM_HORIZON,
185
Durations.seconds(45),
186
StorageLevel.MEMORY_AND_DISK_2()
187
);
188
189
// Convert to strings
190
JavaDStream<String> stringStream = byteStream.map(
191
bytes -> new String(bytes)
192
);
193
```
194
195
## Configuration Options
196
197
### Initial Position in Stream
198
199
```scala { .api }
200
// From AWS KCL
201
enum InitialPositionInStream {
202
LATEST, // Start from the most recent record
203
TRIM_HORIZON // Start from the oldest available record (up to 24 hours)
204
}
205
```
206
207
- **LATEST**: Start processing from the most recent records in the stream
208
- **TRIM_HORIZON**: Start from the oldest available records (Kinesis retains data for 24 hours minimum)
209
210
### Storage Level Recommendations
211
212
```scala { .api }
213
import org.apache.spark.storage.StorageLevel
214
215
// Recommended storage levels
216
StorageLevel.MEMORY_AND_DISK_2 // Replicated in memory and disk (recommended)
217
StorageLevel.MEMORY_AND_DISK // Memory and disk fallback
218
StorageLevel.MEMORY_ONLY_2 // Memory only with replication
219
```
220
221
**MEMORY_AND_DISK_2** is recommended for fault tolerance as it provides both memory performance and disk persistence with replication.
222
223
### Checkpoint Intervals
224
225
Choose checkpoint intervals based on your application requirements:
226
227
- **Short intervals (10-30 seconds)**: Lower data loss risk, higher DynamoDB costs
228
- **Medium intervals (30-120 seconds)**: Balanced approach for most applications
229
- **Long intervals (2-5 minutes)**: Lower costs, higher potential data loss on failure
230
231
## Error Handling
232
233
Common errors and their solutions:
234
235
**IllegalArgumentException**: Invalid region name
236
```scala
237
// Ensure region name is valid
238
val validRegions = Seq("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1")
239
```
240
241
**AWS Authentication Errors**: Ensure proper AWS credentials are configured
242
- Use DefaultAWSCredentialsProviderChain for automatic credential discovery
243
- Or provide explicit credentials using the credential management methods
244
245
**DynamoDB Access Errors**: Ensure the application has proper permissions for DynamoDB table creation and access for checkpointing.