0
# Java API
1
2
Complete Java API for Apache Spark Streaming Kafka integration, providing type-safe wrappers for all Scala functionality with familiar Java programming patterns.
3
4
## Capabilities
5
6
### Java Direct Streaming
7
8
Java-friendly API for direct stream creation without receivers.
9
10
```java { .api }
11
/**
12
* Create an input stream that directly pulls messages from Kafka Brokers (Java API).
13
*
14
* @param jssc JavaStreamingContext object
15
* @param keyClass Class of the keys in the Kafka records
16
* @param valueClass Class of the values in the Kafka records
17
* @param keyDecoderClass Class of the key decoder
18
* @param valueDecoderClass Class type of the value decoder
19
* @param kafkaParams Kafka configuration parameters
20
* @param topics Names of the topics to consume
21
* @tparam K type of Kafka message key
22
* @tparam V type of Kafka message value
23
* @tparam KD type of Kafka message key decoder
24
* @tparam VD type of Kafka message value decoder
25
* @return DStream of (Kafka message key, Kafka message value)
26
*/
27
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
28
JavaPairInputDStream<K, V> createDirectStream(
29
JavaStreamingContext jssc,
30
Class<K> keyClass,
31
Class<V> valueClass,
32
Class<KD> keyDecoderClass,
33
Class<VD> valueDecoderClass,
34
Map<String, String> kafkaParams,
35
Set<String> topics
36
)
37
```
38
39
**Usage Example:**
40
41
```java
42
import org.apache.spark.streaming.kafka.KafkaUtils;
43
import org.apache.spark.streaming.api.java.*;
44
import kafka.serializer.StringDecoder;
45
import java.util.*;
46
47
// Setup Kafka parameters
48
Map<String, String> kafkaParams = new HashMap<>();
49
kafkaParams.put("metadata.broker.list", "localhost:9092");
50
kafkaParams.put("auto.offset.reset", "largest");
51
52
Set<String> topics = Collections.singleton("user-events");
53
54
// Create direct stream
55
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
56
jssc,
57
String.class,
58
String.class,
59
StringDecoder.class,
60
StringDecoder.class,
61
kafkaParams,
62
topics
63
);
64
65
// Process messages
66
stream.foreachRDD(rdd -> {
67
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
68
69
rdd.foreach(record -> {
70
System.out.println("Key: " + record._1 + ", Value: " + record._2);
71
});
72
73
// Print offset information
74
for (OffsetRange o : offsetRanges) {
75
System.out.println(o.topic() + " " + o.partition() +
76
" " + o.fromOffset() + " " + o.untilOffset());
77
}
78
});
79
```
80
81
### Java Direct Streaming with Custom Message Handler
82
83
Advanced Java API with custom message transformation and explicit offset control.
84
85
```java { .api }
86
/**
87
* Create an input stream with custom message handler (Java API).
88
*
89
* @param jssc JavaStreamingContext object
90
* @param keyClass Class of the keys in the Kafka records
91
* @param valueClass Class of the values in the Kafka records
92
* @param keyDecoderClass Class of the key decoder
93
* @param valueDecoderClass Class of the value decoder
94
* @param recordClass Class of the records in DStream
95
* @param kafkaParams Kafka configuration parameters
96
* @param fromOffsets Per-topic/partition Kafka offsets defining starting point
97
* @param messageHandler Function for translating each message and metadata
98
* @tparam K type of Kafka message key
99
* @tparam V type of Kafka message value
100
* @tparam KD type of Kafka message key decoder
101
* @tparam VD type of Kafka message value decoder
102
* @tparam R type returned by messageHandler
103
* @return DStream of R
104
*/
105
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R>
106
JavaInputDStream<R> createDirectStream(
107
JavaStreamingContext jssc,
108
Class<K> keyClass,
109
Class<V> valueClass,
110
Class<KD> keyDecoderClass,
111
Class<VD> valueDecoderClass,
112
Class<R> recordClass,
113
Map<String, String> kafkaParams,
114
Map<TopicAndPartition, Long> fromOffsets,
115
Function<MessageAndMetadata<K, V>, R> messageHandler
116
)
117
```
118
119
**Usage Example:**
120
121
```java
122
import kafka.common.TopicAndPartition;
123
import kafka.message.MessageAndMetadata;
124
import org.apache.spark.api.java.function.Function;
125
126
// Define custom message handler
127
Function<MessageAndMetadata<String, String>, String> messageHandler =
128
new Function<MessageAndMetadata<String, String>, String>() {
129
@Override
130
public String call(MessageAndMetadata<String, String> mmd) {
131
return String.format("%s:%d:%d -> %s:%s",
132
mmd.topic(), mmd.partition(), mmd.offset(),
133
mmd.key(), mmd.message());
134
}
135
};
136
137
// Or using lambda (Java 8+)
138
Function<MessageAndMetadata<String, String>, String> lambdaHandler =
139
mmd -> String.format("%s:%d:%d -> %s:%s",
140
mmd.topic(), mmd.partition(), mmd.offset(),
141
mmd.key(), mmd.message());
142
143
// Define starting offsets
144
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
145
fromOffsets.put(new TopicAndPartition("events", 0), 1000L);
146
fromOffsets.put(new TopicAndPartition("events", 1), 2000L);
147
148
// Create stream with custom handler
149
JavaInputDStream<String> customStream = KafkaUtils.createDirectStream(
150
jssc,
151
String.class,
152
String.class,
153
StringDecoder.class,
154
StringDecoder.class,
155
String.class,
156
kafkaParams,
157
fromOffsets,
158
lambdaHandler
159
);
160
161
customStream.print();
162
```
163
164
### Java Receiver-based Streaming
165
166
Java API for traditional receiver-based streaming.
167
168
```java { .api }
169
/**
170
* Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).
171
*
172
* @param jssc JavaStreamingContext object
173
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
174
* @param groupId The group id for this consumer
175
* @param topics Map of (topic_name -> numPartitions) to consume
176
* @return DStream of (Kafka message key, Kafka message value)
177
*/
178
public static JavaPairReceiverInputDStream<String, String> createStream(
179
JavaStreamingContext jssc,
180
String zkQuorum,
181
String groupId,
182
Map<String, Integer> topics
183
)
184
185
/**
186
* Create receiver-based stream with custom storage level.
187
*/
188
public static JavaPairReceiverInputDStream<String, String> createStream(
189
JavaStreamingContext jssc,
190
String zkQuorum,
191
String groupId,
192
Map<String, Integer> topics,
193
StorageLevel storageLevel
194
)
195
```
196
197
**Usage Example:**
198
199
```java
200
import org.apache.spark.storage.StorageLevel;
201
202
Map<String, Integer> topics = new HashMap<>();
203
topics.put("user-events", 1);
204
topics.put("system-logs", 2);
205
206
JavaPairReceiverInputDStream<String, String> receiverStream = KafkaUtils.createStream(
207
jssc,
208
"localhost:2181",
209
"my-consumer-group",
210
topics,
211
StorageLevel.MEMORY_AND_DISK_SER_2()
212
);
213
214
receiverStream.foreachRDD(rdd -> {
215
System.out.println("Batch size: " + rdd.count());
216
rdd.foreach(record -> {
217
System.out.println("Received: " + record._1 + " -> " + record._2);
218
});
219
});
220
```
221
222
### Java Batch RDD Creation
223
224
Java API for creating RDDs from Kafka with precise offset control.
225
226
```java { .api }
227
/**
228
* Create a RDD from Kafka using offset ranges (Java API).
229
*
230
* @param jsc JavaSparkContext object
231
* @param keyClass type of Kafka message key
232
* @param valueClass type of Kafka message value
233
* @param keyDecoderClass type of Kafka message key decoder
234
* @param valueDecoderClass type of Kafka message value decoder
235
* @param kafkaParams Kafka configuration parameters
236
* @param offsetRanges Each OffsetRange corresponds to a range of offsets
237
* @tparam K type of Kafka message key
238
* @tparam V type of Kafka message value
239
* @tparam KD type of Kafka message key decoder
240
* @tparam VD type of Kafka message value decoder
241
* @return RDD of (Kafka message key, Kafka message value)
242
*/
243
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
244
JavaPairRDD<K, V> createRDD(
245
JavaSparkContext jsc,
246
Class<K> keyClass,
247
Class<V> valueClass,
248
Class<KD> keyDecoderClass,
249
Class<VD> valueDecoderClass,
250
Map<String, String> kafkaParams,
251
OffsetRange[] offsetRanges
252
)
253
```
254
255
**Usage Example:**
256
257
```java
258
import org.apache.spark.streaming.kafka.OffsetRange;
259
260
// Create offset ranges
261
OffsetRange[] offsetRanges = {
262
OffsetRange.create("events", 0, 1000, 2000),
263
OffsetRange.create("events", 1, 500, 1500),
264
OffsetRange.create("logs", 0, 100, 200)
265
};
266
267
// Create RDD
268
JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(
269
jsc,
270
String.class,
271
String.class,
272
StringDecoder.class,
273
StringDecoder.class,
274
kafkaParams,
275
offsetRanges
276
);
277
278
// Process data
279
System.out.println("Total messages: " + rdd.count());
280
rdd.foreach(record -> {
281
System.out.println("Key: " + record._1 + ", Value: " + record._2);
282
});
283
```
284
285
## Java Type System Integration
286
287
### Generic Type Support
288
289
The Java API fully supports generic types with proper type safety:
290
291
```java
292
// Custom key/value types
293
public class UserEvent {
294
public String userId;
295
public String eventType;
296
public long timestamp;
297
}
298
299
public class CustomDecoder implements Decoder<UserEvent> {
300
@Override
301
public UserEvent fromBytes(byte[] bytes) {
302
// Custom deserialization logic
303
return parseUserEvent(bytes);
304
}
305
}
306
307
// Type-safe stream creation
308
JavaPairInputDStream<String, UserEvent> typedStream = KafkaUtils.createDirectStream(
309
jssc,
310
String.class,
311
UserEvent.class,
312
StringDecoder.class,
313
CustomDecoder.class,
314
kafkaParams,
315
topics
316
);
317
```
318
319
### Working with HasOffsetRanges
320
321
Accessing offset information from Java RDDs:
322
323
```java
324
import org.apache.spark.streaming.kafka.HasOffsetRanges;
325
import org.apache.spark.streaming.kafka.OffsetRange;
326
327
stream.foreachRDD(rdd -> {
328
// Cast to HasOffsetRanges to access offset information
329
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
330
331
for (OffsetRange offsetRange : offsetRanges) {
332
System.out.printf("Topic: %s, Partition: %d, Range: [%d, %d)%n",
333
offsetRange.topic(),
334
offsetRange.partition(),
335
offsetRange.fromOffset(),
336
offsetRange.untilOffset()
337
);
338
}
339
340
// Process the data
341
rdd.foreach(record -> processRecord(record));
342
});
343
```
344
345
### Java Collections Integration
346
347
Working with Java collections for configuration:
348
349
```java
350
import java.util.*;
351
352
// Kafka parameters using Java Maps
353
Map<String, String> kafkaParams = new HashMap<String, String>() {{
354
put("metadata.broker.list", "localhost:9092");
355
put("auto.offset.reset", "largest");
356
put("group.id", "my-consumer-group");
357
}};
358
359
// Topics using Java Sets
360
Set<String> topics = new HashSet<String>() {{
361
add("user-events");
362
add("system-logs");
363
}};
364
365
// Topic partitions using Java Maps
366
Map<String, Integer> topicPartitions = new HashMap<String, Integer>() {{
367
put("user-events", 2);
368
put("system-logs", 1);
369
}};
370
```
371
372
## Lambda Expression Support (Java 8+)
373
374
### Stream Processing with Lambdas
375
376
```java
377
// Direct stream with lambda processing
378
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(/*...*/);
379
380
stream
381
.filter(record -> record._1 != null && !record._1.isEmpty())
382
.map(record -> record._1.toUpperCase() + ":" + record._2)
383
.foreachRDD(rdd -> {
384
rdd.foreach(System.out::println);
385
});
386
```
387
388
### Custom Message Handlers with Lambdas
389
390
```java
391
// Lambda message handler
392
Function<MessageAndMetadata<String, String>, ProcessedMessage> handler =
393
mmd -> new ProcessedMessage(
394
mmd.topic(),
395
mmd.partition(),
396
mmd.offset(),
397
mmd.key(),
398
mmd.message(),
399
System.currentTimeMillis()
400
);
401
402
JavaInputDStream<ProcessedMessage> processedStream = KafkaUtils.createDirectStream(
403
jssc,
404
String.class,
405
String.class,
406
StringDecoder.class,
407
StringDecoder.class,
408
ProcessedMessage.class,
409
kafkaParams,
410
fromOffsets,
411
handler
412
);
413
```
414
415
## Exception Handling
416
417
### Java Exception Patterns
418
419
```java
420
try {
421
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
422
jssc, String.class, String.class,
423
StringDecoder.class, StringDecoder.class,
424
kafkaParams, topics
425
);
426
427
stream.foreachRDD(rdd -> {
428
try {
429
rdd.foreach(record -> {
430
processRecord(record._1, record._2);
431
});
432
} catch (Exception e) {
433
System.err.println("Error processing RDD: " + e.getMessage());
434
e.printStackTrace();
435
}
436
});
437
438
} catch (Exception e) {
439
System.err.println("Error creating Kafka stream: " + e.getMessage());
440
throw new RuntimeException("Failed to initialize Kafka streaming", e);
441
}
442
```
443
444
### Handling Serialization Issues
445
446
```java
447
// Custom error-handling decoder
448
public class SafeStringDecoder implements Decoder<String> {
449
private final StringDecoder delegate = new StringDecoder();
450
451
@Override
452
public String fromBytes(byte[] bytes) {
453
try {
454
return delegate.fromBytes(bytes);
455
} catch (Exception e) {
456
System.err.println("Failed to decode message: " + e.getMessage());
457
return "<DECODE_ERROR>";
458
}
459
}
460
}
461
```
462
463
## Integration Patterns
464
465
### Spring Framework Integration
466
467
```java
468
@Component
469
public class KafkaStreamingService {
470
471
@Autowired
472
private JavaStreamingContext streamingContext;
473
474
@Value("${kafka.brokers}")
475
private String brokers;
476
477
@PostConstruct
478
public void initializeStreams() {
479
Map<String, String> kafkaParams = new HashMap<>();
480
kafkaParams.put("metadata.broker.list", brokers);
481
482
Set<String> topics = Set.of("events");
483
484
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
485
streamingContext,
486
String.class, String.class,
487
StringDecoder.class, StringDecoder.class,
488
kafkaParams, topics
489
);
490
491
stream.foreachRDD(this::processRDD);
492
}
493
494
private void processRDD(JavaPairRDD<String, String> rdd) {
495
rdd.foreach(record -> {
496
// Business logic here
497
handleMessage(record._1, record._2);
498
});
499
}
500
}
501
```