0
# Java API
1
2
Complete Java API support with type-safe bindings, function interfaces, and seamless integration with Java Streaming contexts and data processing pipelines.
3
4
## Capabilities
5
6
### Basic Stream Creation (Java)
7
8
Creates a Kinesis input stream using default AWS credential discovery and byte array message handler.
9
10
```java { .api }
11
/**
12
* Create an input stream that pulls messages from a Kinesis stream using the KCL.
13
* Uses DefaultAWSCredentialsProviderChain for AWS authentication.
14
*
15
* @param jssc Java StreamingContext object
16
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
17
* @param streamName Kinesis stream name
18
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
19
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
20
* @param initialPositionInStream Starting position in stream (TRIM_HORIZON or LATEST)
21
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
22
* @param storageLevel Storage level for received objects (MEMORY_AND_DISK_2 recommended)
23
* @return JavaReceiverInputDStream<byte[]> containing raw message data
24
*/
25
public static JavaReceiverInputDStream<byte[]> createStream(
26
JavaStreamingContext jssc,
27
String kinesisAppName,
28
String streamName,
29
String endpointUrl,
30
String regionName,
31
InitialPositionInStream initialPositionInStream,
32
Duration checkpointInterval,
33
StorageLevel storageLevel
34
);
35
```
36
37
**Usage Example:**
38
39
```java
40
import org.apache.spark.streaming.kinesis.KinesisUtils;
41
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
42
import org.apache.spark.storage.StorageLevel;
43
import org.apache.spark.streaming.Duration;
44
import org.apache.spark.streaming.api.java.JavaStreamingContext;
45
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
46
47
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
48
jssc,
49
"MySparkKinesisApp",
50
"my-kinesis-stream",
51
"https://kinesis.us-east-1.amazonaws.com",
52
"us-east-1",
53
InitialPositionInStream.LATEST,
54
new Duration(2000),
55
StorageLevel.MEMORY_AND_DISK_2()
56
);
57
```
58
59
### Stream Creation with Explicit Credentials (Java)
60
61
Creates a Kinesis input stream with explicitly provided AWS credentials.
62
63
```java { .api }
64
/**
65
* Create an input stream with explicit AWS credentials.
66
* Note: Credentials will be saved in DStream checkpoints if checkpointing is enabled.
67
*
68
* @param jssc Java StreamingContext object
69
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
70
* @param streamName Kinesis stream name
71
* @param endpointUrl Url of Kinesis service
72
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
73
* @param initialPositionInStream Starting position in stream
74
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
75
* @param storageLevel Storage level for received objects
76
* @param awsAccessKeyId AWS AccessKeyId (if null, uses DefaultAWSCredentialsProviderChain)
77
* @param awsSecretKey AWS SecretKey (if null, uses DefaultAWSCredentialsProviderChain)
78
* @return JavaReceiverInputDStream<byte[]> containing raw message data
79
*/
80
public static JavaReceiverInputDStream<byte[]> createStream(
81
JavaStreamingContext jssc,
82
String kinesisAppName,
83
String streamName,
84
String endpointUrl,
85
String regionName,
86
InitialPositionInStream initialPositionInStream,
87
Duration checkpointInterval,
88
StorageLevel storageLevel,
89
String awsAccessKeyId,
90
String awsSecretKey
91
);
92
```
93
94
**Usage Example:**
95
96
```java
97
JavaReceiverInputDStream<byte[]> stream = KinesisUtils.createStream(
98
jssc,
99
"MySparkKinesisApp",
100
"my-kinesis-stream",
101
"https://kinesis.us-east-1.amazonaws.com",
102
"us-east-1",
103
InitialPositionInStream.LATEST,
104
new Duration(2000),
105
StorageLevel.MEMORY_AND_DISK_2(),
106
"AKIAIOSFODNN7EXAMPLE",
107
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
108
);
109
```
110
111
### Stream Creation with Custom Message Handler (Java)
112
113
Creates a typed Kinesis input stream with a custom message handler function.
114
115
```java { .api }
116
/**
117
* Create an input stream with a custom message handler for type-safe data processing.
118
*
119
* @param jssc Java StreamingContext object
120
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
121
* @param streamName Kinesis stream name
122
* @param endpointUrl Url of Kinesis service
123
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
124
* @param initialPositionInStream Starting position in stream
125
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
126
* @param storageLevel Storage level for received objects
127
* @param messageHandler Custom function to process Kinesis Records into type T
128
* @param recordClass Class object for type T (required for Java type erasure)
129
* @return JavaReceiverInputDStream<T> containing processed data
130
*/
131
public static <T> JavaReceiverInputDStream<T> createStream(
132
JavaStreamingContext jssc,
133
String kinesisAppName,
134
String streamName,
135
String endpointUrl,
136
String regionName,
137
InitialPositionInStream initialPositionInStream,
138
Duration checkpointInterval,
139
StorageLevel storageLevel,
140
Function<Record, T> messageHandler,
141
Class<T> recordClass
142
);
143
```
144
145
**Usage Example:**
146
147
```java
148
import org.apache.spark.api.java.function.Function;
149
import com.amazonaws.services.kinesis.model.Record;
150
import com.fasterxml.jackson.databind.ObjectMapper;
151
152
// Define data class
153
public class MyEvent implements Serializable {
154
private String id;
155
private long timestamp;
156
private String data;
157
158
// Constructors, getters, setters...
159
public MyEvent() {}
160
public MyEvent(String id, long timestamp, String data) {
161
this.id = id;
162
this.timestamp = timestamp;
163
this.data = data;
164
}
165
166
// Getters and setters
167
public String getId() { return id; }
168
public void setId(String id) { this.id = id; }
169
public long getTimestamp() { return timestamp; }
170
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
171
public String getData() { return data; }
172
public void setData(String data) { this.data = data; }
173
}
174
175
// Custom message handler
176
Function<Record, MyEvent> parseMyEvent = new Function<Record, MyEvent>() {
177
private final ObjectMapper mapper = new ObjectMapper();
178
179
@Override
180
public MyEvent call(Record record) throws Exception {
181
byte[] data = new byte[record.getData().remaining()];
182
record.getData().get(data);
183
String json = new String(data, "UTF-8");
184
return mapper.readValue(json, MyEvent.class);
185
}
186
};
187
188
JavaReceiverInputDStream<MyEvent> stream = KinesisUtils.createStream(
189
jssc,
190
"MySparkKinesisApp",
191
"my-events-stream",
192
"https://kinesis.us-east-1.amazonaws.com",
193
"us-east-1",
194
InitialPositionInStream.LATEST,
195
new Duration(2000),
196
StorageLevel.MEMORY_AND_DISK_2(),
197
parseMyEvent,
198
MyEvent.class
199
);
200
```
201
202
### Stream Creation with Custom Handler and Credentials (Java)
203
204
Creates a typed Kinesis input stream with both custom message handler and explicit AWS credentials.
205
206
```java { .api }
207
/**
208
* Create an input stream with custom message handler and explicit AWS credentials.
209
*
210
* @param jssc Java StreamingContext object
211
* @param kinesisAppName Kinesis application name used by the KCL to update DynamoDB
212
* @param streamName Kinesis stream name
213
* @param endpointUrl Url of Kinesis service
214
* @param regionName Name of region used by the KCL for DynamoDB and CloudWatch
215
* @param initialPositionInStream Starting position in stream
216
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
217
* @param storageLevel Storage level for received objects
218
* @param messageHandler Custom function to process Kinesis Records into type T
219
* @param recordClass Class object for type T
220
* @param awsAccessKeyId AWS AccessKeyId
221
* @param awsSecretKey AWS SecretKey
222
* @return JavaReceiverInputDStream<T> containing processed data
223
*/
224
public static <T> JavaReceiverInputDStream<T> createStream(
225
JavaStreamingContext jssc,
226
String kinesisAppName,
227
String streamName,
228
String endpointUrl,
229
String regionName,
230
InitialPositionInStream initialPositionInStream,
231
Duration checkpointInterval,
232
StorageLevel storageLevel,
233
Function<Record, T> messageHandler,
234
Class<T> recordClass,
235
String awsAccessKeyId,
236
String awsSecretKey
237
);
238
```
239
240
### Deprecated Stream Creation (Java)
241
242
Simplified stream creation method (deprecated since version 1.4.0).
243
244
```java { .api }
245
/**
246
* Create an input stream using app name from SparkConf and region from endpoint.
247
* @deprecated use other forms of createStream
248
*
249
* @param jssc Java StreamingContext object
250
* @param streamName Kinesis stream name
251
* @param endpointUrl Endpoint url of Kinesis service
252
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing
253
* @param initialPositionInStream Starting position in stream
254
* @param storageLevel Storage level for received objects
255
* @return JavaReceiverInputDStream<byte[]> containing raw message data
256
*/
257
@Deprecated
258
public static JavaReceiverInputDStream<byte[]> createStream(
259
JavaStreamingContext jssc,
260
String streamName,
261
String endpointUrl,
262
Duration checkpointInterval,
263
InitialPositionInStream initialPositionInStream,
264
StorageLevel storageLevel
265
);
266
```
267
268
## Java Usage Examples
269
270
### Simple Text Processing
271
272
```java
273
import org.apache.spark.streaming.kinesis.KinesisUtils;
274
import org.apache.spark.api.java.function.Function;
275
import com.amazonaws.services.kinesis.model.Record;
276
277
// Convert bytes to string
278
Function<Record, String> textConverter = new Function<Record, String>() {
279
@Override
280
public String call(Record record) throws Exception {
281
byte[] data = new byte[record.getData().remaining()];
282
record.getData().get(data);
283
return new String(data, "UTF-8");
284
}
285
};
286
287
JavaReceiverInputDStream<String> textStream = KinesisUtils.createStream(
288
jssc,
289
"TextProcessor",
290
"text-stream",
291
"https://kinesis.us-east-1.amazonaws.com",
292
"us-east-1",
293
InitialPositionInStream.LATEST,
294
new Duration(2000),
295
StorageLevel.MEMORY_AND_DISK_2(),
296
textConverter,
297
String.class
298
);
299
300
// Process text messages
301
textStream.foreachRDD(rdd -> {
302
rdd.foreach(text -> {
303
System.out.println("Received: " + text);
304
});
305
return null;
306
});
307
```
308
309
### JSON Processing with Error Handling
310
311
```java
312
import com.fasterxml.jackson.databind.ObjectMapper;
313
import com.fasterxml.jackson.databind.JsonNode;
314
import java.util.Optional;
315
316
// JSON parser with error handling
317
Function<Record, Optional<JsonNode>> jsonParser = new Function<Record, Optional<JsonNode>>() {
318
private final ObjectMapper mapper = new ObjectMapper();
319
320
@Override
321
public Optional<JsonNode> call(Record record) throws Exception {
322
try {
323
byte[] data = new byte[record.getData().remaining()];
324
record.getData().get(data);
325
String json = new String(data, "UTF-8");
326
JsonNode node = mapper.readTree(json);
327
return Optional.of(node);
328
} catch (Exception e) {
329
System.err.println("Failed to parse JSON: " + e.getMessage());
330
return Optional.empty();
331
}
332
}
333
};
334
335
JavaReceiverInputDStream<Optional<JsonNode>> jsonStream = KinesisUtils.createStream(
336
jssc,
337
"JsonProcessor",
338
"json-stream",
339
"https://kinesis.us-east-1.amazonaws.com",
340
"us-east-1",
341
InitialPositionInStream.LATEST,
342
new Duration(2000),
343
StorageLevel.MEMORY_AND_DISK_2(),
344
jsonParser,
345
Optional.class
346
);
347
348
// Filter and process valid JSON
349
JavaDStream<JsonNode> validJson = jsonStream.flatMap(opt -> {
350
return opt.isPresent() ?
351
Arrays.asList(opt.get()).iterator() :
352
Collections.emptyList().iterator();
353
});
354
```
355
356
### Message Handler with Metadata Access
357
358
```java
359
// Data class that includes metadata
360
public class EnrichedMessage implements Serializable {
361
private String data;
362
private String partitionKey;
363
private String sequenceNumber;
364
private long arrivalTime;
365
366
public EnrichedMessage(String data, String partitionKey,
367
String sequenceNumber, long arrivalTime) {
368
this.data = data;
369
this.partitionKey = partitionKey;
370
this.sequenceNumber = sequenceNumber;
371
this.arrivalTime = arrivalTime;
372
}
373
374
// Getters...
375
public String getData() { return data; }
376
public String getPartitionKey() { return partitionKey; }
377
public String getSequenceNumber() { return sequenceNumber; }
378
public long getArrivalTime() { return arrivalTime; }
379
}
380
381
// Message handler that captures metadata
382
Function<Record, EnrichedMessage> enrichedHandler = new Function<Record, EnrichedMessage>() {
383
@Override
384
public EnrichedMessage call(Record record) throws Exception {
385
byte[] bytes = new byte[record.getData().remaining()];
386
record.getData().get(bytes);
387
String data = new String(bytes, "UTF-8");
388
389
return new EnrichedMessage(
390
data,
391
record.getPartitionKey(),
392
record.getSequenceNumber(),
393
record.getApproximateArrivalTimestamp().getTime()
394
);
395
}
396
};
397
398
JavaReceiverInputDStream<EnrichedMessage> enrichedStream =
399
KinesisUtils.createStream(
400
jssc,
401
"EnrichedProcessor",
402
"enriched-stream",
403
"https://kinesis.us-east-1.amazonaws.com",
404
"us-east-1",
405
InitialPositionInStream.LATEST,
406
new Duration(2000),
407
StorageLevel.MEMORY_AND_DISK_2(),
408
enrichedHandler,
409
EnrichedMessage.class
410
);
411
```
412
413
### Word Count Example
414
415
Complete example demonstrating Java API usage with word counting:
416
417
```java
418
import org.apache.spark.SparkConf;
419
import org.apache.spark.api.java.function.FlatMapFunction;
420
import org.apache.spark.api.java.function.Function2;
421
import org.apache.spark.api.java.function.PairFunction;
422
import org.apache.spark.streaming.Duration;
423
import org.apache.spark.streaming.api.java.JavaDStream;
424
import org.apache.spark.streaming.api.java.JavaPairDStream;
425
import org.apache.spark.streaming.api.java.JavaStreamingContext;
426
import scala.Tuple2;
427
import java.util.Arrays;
428
import java.util.regex.Pattern;
429
430
public class JavaKinesisWordCount {
431
private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
432
433
public static void main(String[] args) {
434
SparkConf sparkConf = new SparkConf().setAppName("JavaKinesisWordCount");
435
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
436
437
// Create Kinesis stream
438
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
439
jssc,
440
"JavaWordCount",
441
"word-stream",
442
"https://kinesis.us-east-1.amazonaws.com",
443
"us-east-1",
444
InitialPositionInStream.LATEST,
445
new Duration(2000),
446
StorageLevel.MEMORY_AND_DISK_2()
447
);
448
449
// Convert bytes to strings and split into words
450
JavaDStream<String> words = kinesisStream.flatMap(
451
new FlatMapFunction<byte[], String>() {
452
@Override
453
public Iterator<String> call(byte[] line) {
454
String text = new String(line);
455
return Arrays.asList(WORD_SEPARATOR.split(text)).iterator();
456
}
457
}
458
);
459
460
// Count words
461
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
462
new PairFunction<String, String, Integer>() {
463
@Override
464
public Tuple2<String, Integer> call(String s) {
465
return new Tuple2<>(s, 1);
466
}
467
}
468
).reduceByKey(
469
new Function2<Integer, Integer, Integer>() {
470
@Override
471
public Integer call(Integer i1, Integer i2) {
472
return i1 + i2;
473
}
474
}
475
);
476
477
// Print results
478
wordCounts.print();
479
480
jssc.start();
481
jssc.awaitTermination();
482
}
483
}
484
```
485
486
## Java Function Interfaces
487
488
The Java API uses Spark's Function interfaces for message handlers:
489
490
```java { .api }
491
// Main function interface for message handlers
492
org.apache.spark.api.java.function.Function<Record, T>
493
494
// For operations that may throw exceptions
495
org.apache.spark.api.java.function.Function<Record, T> {
496
T call(Record record) throws Exception;
497
}
498
```
499
500
## Best Practices for Java API
501
502
### Type Safety
503
- Always specify the record class parameter for type safety
504
- Use Optional<T> for operations that may fail
505
- Leverage Java generics for compile-time type checking
506
507
### Error Handling
508
- Wrap parsing operations in try-catch blocks
509
- Return Optional or use custom error types
510
- Log errors appropriately for monitoring
511
512
### Performance
513
- Reuse expensive objects like ObjectMapper in message handlers
514
- Avoid creating new objects in tight loops
515
- Consider using static methods for stateless operations
516
517
### Memory Management
518
- Be careful with large message payloads
519
- Don't hold references to ByteBuffer objects
520
- Use streaming parsers for large JSON/XML documents