0
# Java API
1
2
Complete Java API support for Spark Streaming Kinesis integration, providing Java-friendly method signatures, functional interfaces, and seamless integration with Java applications and frameworks.
3
4
## Core Java API Methods
5
6
### Generic Type Stream Creation
7
8
Create streams with custom type transformation using Java Function interfaces.
9
10
```java { .api }
11
public static <T> JavaReceiverInputDStream<T> createStream(
12
JavaStreamingContext jssc,
13
String kinesisAppName,
14
String streamName,
15
String endpointUrl,
16
String regionName,
17
InitialPositionInStream initialPositionInStream,
18
Duration checkpointInterval,
19
StorageLevel storageLevel,
20
Function<Record, T> messageHandler,
21
Class<T> recordClass
22
);
23
```
24
25
**Parameters:**
26
- `jssc` - JavaStreamingContext object
27
- `kinesisAppName` - Kinesis application name for KCL coordination
28
- `streamName` - Kinesis stream name
29
- `endpointUrl` - Kinesis service endpoint URL
30
- `regionName` - AWS region name
31
- `initialPositionInStream` - Starting position (LATEST or TRIM_HORIZON)
32
- `checkpointInterval` - Checkpoint frequency using Duration
33
- `storageLevel` - Spark storage level for received data
34
- `messageHandler` - Function interface for Record transformation
35
- `recordClass` - Class object for type T
36
37
**Usage Example:**
38
39
```java
40
import org.apache.spark.api.java.function.Function;
41
import com.amazonaws.services.kinesis.model.Record;
42
import org.json.JSONObject;
43
44
// Define message handler for JSON processing
45
Function<Record, JSONObject> jsonHandler = new Function<Record, JSONObject>() {
46
@Override
47
public JSONObject call(Record record) throws Exception {
48
String data = new String(record.getData().array());
49
return new JSONObject(data);
50
}
51
};
52
53
// Create stream
54
JavaReceiverInputDStream<JSONObject> jsonStream = KinesisUtils.createStream(
55
jssc,
56
"java-json-processor",
57
"json-events-stream",
58
"https://kinesis.us-east-1.amazonaws.com",
59
"us-east-1",
60
InitialPositionInStream.LATEST,
61
Durations.seconds(30),
62
StorageLevel.MEMORY_AND_DISK_2(),
63
jsonHandler,
64
JSONObject.class
65
);
66
67
// Process JSON stream
68
jsonStream.foreachRDD(rdd -> {
69
rdd.foreach(json -> {
70
System.out.println("Event ID: " + json.getString("eventId"));
71
System.out.println("Timestamp: " + json.getLong("timestamp"));
72
});
73
});
74
```
75
76
### Generic Type Stream with Credentials
77
78
```java { .api }
79
public static <T> JavaReceiverInputDStream<T> createStream(
80
JavaStreamingContext jssc,
81
String kinesisAppName,
82
String streamName,
83
String endpointUrl,
84
String regionName,
85
InitialPositionInStream initialPositionInStream,
86
Duration checkpointInterval,
87
StorageLevel storageLevel,
88
Function<Record, T> messageHandler,
89
Class<T> recordClass,
90
String awsAccessKeyId,
91
String awsSecretKey
92
);
93
```
94
95
**Usage Example:**
96
97
```java
98
// Custom message handler with error handling
99
Function<Record, String> safeStringHandler = new Function<Record, String>() {
100
@Override
101
public String call(Record record) throws Exception {
102
try {
103
return new String(record.getData().array(), "UTF-8");
104
} catch (Exception e) {
105
return "ERROR: " + e.getMessage();
106
}
107
}
108
};
109
110
JavaReceiverInputDStream<String> secureStream = KinesisUtils.createStream(
111
jssc,
112
"secure-java-app",
113
"secure-text-stream",
114
"https://kinesis.us-west-2.amazonaws.com",
115
"us-west-2",
116
InitialPositionInStream.TRIM_HORIZON,
117
Durations.seconds(45),
118
StorageLevel.MEMORY_AND_DISK_2(),
119
safeStringHandler,
120
String.class,
121
System.getenv("AWS_ACCESS_KEY_ID"),
122
System.getenv("AWS_SECRET_ACCESS_KEY")
123
);
124
```
125
126
### Default Byte Array Streams
127
128
Create streams returning raw byte arrays using default message handling.
129
130
```java { .api }
131
public static JavaReceiverInputDStream<byte[]> createStream(
132
JavaStreamingContext jssc,
133
String kinesisAppName,
134
String streamName,
135
String endpointUrl,
136
String regionName,
137
InitialPositionInStream initialPositionInStream,
138
Duration checkpointInterval,
139
StorageLevel storageLevel
140
);
141
```
142
143
**Usage Example:**
144
145
```java
146
// Create byte array stream
147
JavaReceiverInputDStream<byte[]> byteStream = KinesisUtils.createStream(
148
jssc,
149
"java-binary-processor",
150
"binary-data-stream",
151
"https://kinesis.ap-southeast-1.amazonaws.com",
152
"ap-southeast-1",
153
InitialPositionInStream.LATEST,
154
Durations.seconds(60),
155
StorageLevel.MEMORY_AND_DISK_2()
156
);
157
158
// Convert bytes to strings and process
159
JavaDStream<String> stringStream = byteStream.map(
160
bytes -> new String(bytes, "UTF-8")
161
);
162
163
// Filter and transform
164
JavaDStream<String> processedStream = stringStream
165
.filter(text -> text.length() > 10)
166
.map(text -> text.toUpperCase());
167
168
processedStream.print();
169
```
170
171
### Default Byte Array Stream with Credentials
172
173
```java { .api }
174
public static JavaReceiverInputDStream<byte[]> createStream(
175
JavaStreamingContext jssc,
176
String kinesisAppName,
177
String streamName,
178
String endpointUrl,
179
String regionName,
180
InitialPositionInStream initialPositionInStream,
181
Duration checkpointInterval,
182
StorageLevel storageLevel,
183
String awsAccessKeyId,
184
String awsSecretKey
185
);
186
```
187
188
### Deprecated Method (Legacy Support)
189
190
```java { .api }
191
@Deprecated
192
public static JavaReceiverInputDStream<byte[]> createStream(
193
JavaStreamingContext jssc,
194
String streamName,
195
String endpointUrl,
196
Duration checkpointInterval,
197
InitialPositionInStream initialPositionInStream,
198
StorageLevel storageLevel
199
);
200
```
201
202
## Java-Specific Patterns
203
204
### Using Lambda Expressions (Java 8+)
205
206
Modern Java applications can use lambda expressions for cleaner message handling:
207
208
```java
209
// Lambda expression for message handling
210
JavaReceiverInputDStream<String> lambdaStream = KinesisUtils.createStream(
211
jssc,
212
"lambda-app",
213
"text-stream",
214
"https://kinesis.eu-west-1.amazonaws.com",
215
"eu-west-1",
216
InitialPositionInStream.LATEST,
217
Durations.seconds(30),
218
StorageLevel.MEMORY_AND_DISK_2(),
219
record -> new String(record.getData().array()), // Lambda expression
220
String.class
221
);
222
223
// Process with lambda expressions
224
lambdaStream
225
.filter(text -> !text.isEmpty())
226
.map(String::trim)
227
.foreachRDD(rdd -> {
228
rdd.collect().forEach(System.out::println);
229
});
230
```
231
232
### Method References
233
234
```java
235
// Using method references for common transformations
236
JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(
237
jssc, "app", "stream", endpoint, region,
238
InitialPositionInStream.LATEST, Durations.seconds(30),
239
StorageLevel.MEMORY_AND_DISK_2(),
240
this::parseRecord, // Method reference
241
String.class
242
);
243
244
private String parseRecord(Record record) {
245
return new String(record.getData().array());
246
}
247
```
248
249
### Exception Handling in Message Handlers
250
251
```java
252
// Robust error handling in message handlers
253
Function<Record, String> robustHandler = new Function<Record, String>() {
254
@Override
255
public String call(Record record) throws Exception {
256
try {
257
byte[] data = record.getData().array();
258
String text = new String(data, "UTF-8");
259
260
// Validate data
261
if (text.trim().isEmpty()) {
262
return null; // Filter out empty records
263
}
264
265
return text.trim();
266
} catch (Exception e) {
267
// Log error and return indicator
268
System.err.println("Error processing record: " + e.getMessage());
269
return "ERROR";
270
}
271
}
272
};
273
```
274
275
## Java Integration Examples
276
277
### Spring Framework Integration
278
279
```java
280
import org.springframework.beans.factory.annotation.Value;
281
import org.springframework.stereotype.Component;
282
283
@Component
284
public class KinesisStreamProcessor {
285
286
@Value("${kinesis.app.name}")
287
private String kinesisAppName;
288
289
@Value("${kinesis.stream.name}")
290
private String streamName;
291
292
@Value("${aws.kinesis.endpoint}")
293
private String endpointUrl;
294
295
@Value("${aws.region}")
296
private String region;
297
298
public void startProcessing(JavaStreamingContext jssc) {
299
JavaReceiverInputDStream<String> stream = KinesisUtils.createStream(
300
jssc,
301
kinesisAppName,
302
streamName,
303
endpointUrl,
304
region,
305
InitialPositionInStream.LATEST,
306
Durations.seconds(30),
307
StorageLevel.MEMORY_AND_DISK_2(),
308
this::processRecord,
309
String.class
310
);
311
312
stream.foreachRDD(this::handleBatch);
313
}
314
315
private String processRecord(Record record) {
316
return new String(record.getData().array());
317
}
318
319
private void handleBatch(JavaRDD<String> rdd) {
320
// Process batch with Spring services
321
rdd.collect().forEach(this::processMessage);
322
}
323
324
private void processMessage(String message) {
325
// Business logic here
326
System.out.println("Processing: " + message);
327
}
328
}
329
```
330
331
### Serialization Considerations
332
333
When using custom objects, ensure they are serializable:
334
335
```java
336
import java.io.Serializable;
337
338
public class EventData implements Serializable {
339
private static final long serialVersionUID = 1L;
340
341
private String eventId;
342
private long timestamp;
343
private String payload;
344
345
// Constructors, getters, setters
346
public EventData(String eventId, long timestamp, String payload) {
347
this.eventId = eventId;
348
this.timestamp = timestamp;
349
this.payload = payload;
350
}
351
352
// Getters and setters...
353
}
354
355
// Message handler creating serializable objects
356
Function<Record, EventData> eventHandler = record -> {
357
String data = new String(record.getData().array());
358
JSONObject json = new JSONObject(data);
359
return new EventData(
360
json.getString("eventId"),
361
json.getLong("timestamp"),
362
json.getString("payload")
363
);
364
};
365
```
366
367
## Java Type System Integration
368
369
### Working with Generic Types
370
371
```java
372
// Create custom parameterized types
373
import java.lang.reflect.ParameterizedType;
374
import java.lang.reflect.Type;
375
376
public class TypeReference<T> {
377
private final Type type;
378
379
protected TypeReference() {
380
Type superClass = getClass().getGenericSuperclass();
381
this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0];
382
}
383
384
public Type getType() {
385
return type;
386
}
387
}
388
389
// Usage with complex types
390
TypeReference<List<String>> typeRef = new TypeReference<List<String>>() {};
391
```
392
393
### Null Safety and Optional Integration
394
395
```java
396
import java.util.Optional;
397
398
// Message handler with Optional return
399
Function<Record, Optional<String>> optionalHandler = record -> {
400
try {
401
String data = new String(record.getData().array());
402
return data.trim().isEmpty() ? Optional.empty() : Optional.of(data);
403
} catch (Exception e) {
404
return Optional.empty();
405
}
406
};
407
408
// Filter out empty optionals
409
JavaReceiverInputDStream<Optional<String>> optionalStream = KinesisUtils.createStream(
410
jssc, "app", "stream", endpoint, region,
411
InitialPositionInStream.LATEST, Durations.seconds(30),
412
StorageLevel.MEMORY_AND_DISK_2(),
413
optionalHandler,
414
Optional.class
415
);
416
417
JavaDStream<String> filteredStream = optionalStream
418
.filter(Optional::isPresent)
419
.map(Optional::get);
420
```