0
# Java API Integration
1
2
Java-friendly wrappers providing seamless integration for Java applications with lambda expressions, Java collections support, and idiomatic Java patterns for Spark Streaming functionality.
3
4
## Capabilities
5
6
### JavaStreamingContext
7
8
Java-friendly version of StreamingContext with native Java types and collections.
9
10
```java { .api }
11
/**
12
* Java-friendly wrapper for StreamingContext
13
*/
14
public class JavaStreamingContext {
15
/** Create from JavaSparkContext and batch duration */
16
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
17
18
/** Create from SparkConf and batch duration */
19
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
20
21
/** Create with master URL, app name, and batch duration */
22
public JavaStreamingContext(String master, String appName, Duration batchDuration);
23
24
/** Restore from checkpoint */
25
public JavaStreamingContext(String path);
26
27
/** Get underlying StreamingContext */
28
public StreamingContext ssc();
29
30
/** Get underlying JavaSparkContext */
31
public JavaSparkContext sparkContext();
32
33
/** Start the streaming computation */
34
public void start();
35
36
/** Stop the streaming computation */
37
public void stop();
38
39
/** Stop with option to stop SparkContext */
40
public void stop(boolean stopSparkContext);
41
42
/** Stop with graceful shutdown options */
43
public void stop(boolean stopSparkContext, boolean stopGracefully);
44
45
/** Wait for termination */
46
public void awaitTermination();
47
48
/** Wait for termination with timeout */
49
public boolean awaitTerminationOrTimeout(long timeout);
50
51
/** Set checkpoint directory */
52
public void checkpoint(String directory);
53
54
/** Set remember duration */
55
public void remember(Duration duration);
56
57
/** Add streaming listener */
58
public void addStreamingListener(StreamingListener streamingListener);
59
60
/** Remove streaming listener */
61
public void removeStreamingListener(StreamingListener streamingListener);
62
}
63
```
64
65
**Static Factory Methods:**
66
67
```java { .api }
68
public class JavaStreamingContext {
69
/** Get currently active JavaStreamingContext */
70
public static Optional<JavaStreamingContext> getActive();
71
72
/** Get active context or create new one */
73
public static JavaStreamingContext getActiveOrCreate(Function0<JavaStreamingContext> creatingFunc);
74
75
/** Create from checkpoint or use creating function */
76
public static JavaStreamingContext getOrCreate(
77
String checkpointPath,
78
Function0<JavaStreamingContext> creatingFunc
79
);
80
81
/** Create from checkpoint with Hadoop configuration */
82
public static JavaStreamingContext getOrCreate(
83
String checkpointPath,
84
Configuration hadoopConf,
85
Function0<JavaStreamingContext> creatingFunc
86
);
87
}
88
```
89
90
**Usage Examples:**
91
92
```java
93
import org.apache.spark.*;
94
import org.apache.spark.streaming.*;
95
import org.apache.spark.streaming.api.java.*;
96
97
// Create JavaStreamingContext
98
SparkConf conf = new SparkConf().setAppName("JavaStreamingApp");
99
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
100
101
// Create with checkpoint recovery
102
JavaStreamingContext jssc2 = JavaStreamingContext.getOrCreate("/path/to/checkpoint", () -> {
103
SparkConf conf = new SparkConf().setAppName("RecoverableApp");
104
return new JavaStreamingContext(conf, Durations.seconds(1));
105
});
106
107
// Configure and start
108
jssc.checkpoint("/path/to/checkpoint");
109
jssc.start();
110
jssc.awaitTermination();
111
```
112
113
### Input Stream Creation
114
115
Java-friendly methods for creating input streams from various sources.
116
117
```java { .api }
118
public class JavaStreamingContext {
119
/** Create text file stream */
120
public JavaDStream<String> textFileStream(String directory);
121
122
/** Create socket text stream */
123
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port);
124
125
/** Create socket text stream with storage level */
126
public JavaReceiverInputDStream<String> socketTextStream(
127
String hostname,
128
int port,
129
StorageLevel storageLevel
130
);
131
132
/** Create file stream with Hadoop InputFormat */
133
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
134
String directory,
135
Class<K> keyClass,
136
Class<V> valueClass,
137
Class<F> inputFormatClass
138
);
139
140
/** Create file stream with configuration */
141
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
142
String directory,
143
Class<K> keyClass,
144
Class<V> valueClass,
145
Class<F> inputFormatClass,
146
Function<Path, Boolean> filter,
147
boolean newFilesOnly,
148
Configuration conf
149
);
150
151
/** Create queue stream */
152
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue);
153
154
/** Create queue stream with processing options */
155
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime);
156
157
/** Create queue stream with default RDD */
158
public <T> JavaDStream<T> queueStream(
159
Queue<JavaRDD<T>> queue,
160
boolean oneAtATime,
161
JavaRDD<T> defaultRDD
162
);
163
164
/** Create receiver stream */
165
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);
166
167
/** Union multiple streams */
168
public <T> JavaDStream<T> union(JavaDStream<T> first, List<JavaDStream<T>> rest);
169
}
170
```
171
172
**Usage Examples:**
173
174
```java
175
// Text file stream
176
JavaDStream<String> lines = jssc.textFileStream("/path/to/files");
177
178
// Socket stream
179
JavaReceiverInputDStream<String> socketLines = jssc.socketTextStream("localhost", 9999);
180
181
// File stream with Hadoop InputFormat
182
JavaPairInputDStream<LongWritable, Text> fileStream = jssc.fileStream(
183
"/path/to/files",
184
LongWritable.class,
185
Text.class,
186
TextInputFormat.class
187
);
188
189
// Queue stream
190
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
191
JavaRDD<Integer> rdd1 = jssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));
192
rddQueue.add(rdd1);
193
JavaDStream<Integer> queueStream = jssc.queueStream(rddQueue);
194
195
// Union streams
196
JavaDStream<String> stream1 = jssc.textFileStream("/path1");
197
JavaDStream<String> stream2 = jssc.textFileStream("/path2");
198
JavaDStream<String> combined = jssc.union(stream1, Arrays.asList(stream2));
199
```
200
201
### JavaDStream
202
203
Java wrapper for DStream with lambda expression support and Java-friendly transformations.
204
205
```java { .api }
206
/**
207
* Java-friendly wrapper for DStream
208
*/
209
public class JavaDStream<T> {
210
/** Get underlying Scala DStream */
211
public DStream<T> dstream();
212
213
/** Cache RDDs in memory */
214
public JavaDStream<T> cache();
215
216
/** Persist with default storage level */
217
public JavaDStream<T> persist();
218
219
/** Persist with specific storage level */
220
public JavaDStream<T> persist(StorageLevel storageLevel);
221
222
/** Enable checkpointing */
223
public JavaDStream<T> checkpoint(Duration interval);
224
225
/** Get associated JavaStreamingContext */
226
public JavaStreamingContext context();
227
228
/** Print first 10 elements */
229
public void print();
230
231
/** Print first num elements */
232
public void print(int num);
233
}
234
```
235
236
**Basic Transformations:**
237
238
```java { .api }
239
public class JavaDStream<T> {
240
/** Map transformation with Java function */
241
public <U> JavaDStream<U> map(Function<T, U> f);
242
243
/** FlatMap transformation */
244
public <U> JavaDStream<U> flatMap(FlatMapFunction<T, U> f);
245
246
/** Filter transformation */
247
public JavaDStream<T> filter(Function<T, Boolean> f);
248
249
/** Map partitions */
250
public <U> JavaDStream<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);
251
252
/** Group elements by partition */
253
public JavaDStream<T[]> glom();
254
255
/** Repartition RDDs */
256
public JavaDStream<T> repartition(int numPartitions);
257
258
/** Union with another stream */
259
public JavaDStream<T> union(JavaDStream<T> other);
260
261
/** Count elements */
262
public JavaDStream<Long> count();
263
264
/** Count occurrences of each value */
265
public JavaPairDStream<T, Long> countByValue();
266
267
/** Reduce elements */
268
public JavaDStream<T> reduce(Function2<T, T, T> f);
269
}
270
```
271
272
**Advanced Transformations:**
273
274
```java { .api }
275
public class JavaDStream<T> {
276
/** Transform using RDD operations */
277
public <U> JavaDStream<U> transform(Function<JavaRDD<T>, JavaRDD<U>> transformFunc);
278
279
/** Transform with time access */
280
public <U> JavaDStream<U> transform(Function2<JavaRDD<T>, Time, JavaRDD<U>> transformFunc);
281
282
/** Transform with another DStream */
283
public <U, V> JavaDStream<V> transformWith(
284
JavaDStream<U> other,
285
Function2<JavaRDD<T>, JavaRDD<U>, JavaRDD<V>> transformFunc
286
);
287
288
/** Transform with another DStream and time access */
289
public <U, V> JavaDStream<V> transformWith(
290
JavaDStream<U> other,
291
Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<V>> transformFunc
292
);
293
}
294
```
295
296
**Window Operations:**
297
298
```java { .api }
299
public class JavaDStream<T> {
300
/** Create windowed stream */
301
public JavaDStream<T> window(Duration windowDuration);
302
303
/** Create windowed stream with slide duration */
304
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);
305
306
/** Reduce over window */
307
public JavaDStream<T> reduceByWindow(
308
Function2<T, T, T> reduceFunc,
309
Duration windowDuration,
310
Duration slideDuration
311
);
312
313
/** Incremental reduce over window */
314
public JavaDStream<T> reduceByWindow(
315
Function2<T, T, T> reduceFunc,
316
Function2<T, T, T> invReduceFunc,
317
Duration windowDuration,
318
Duration slideDuration
319
);
320
321
/** Count elements in window */
322
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);
323
324
/** Count values in window */
325
public JavaPairDStream<T, Long> countByValueAndWindow(
326
Duration windowDuration,
327
Duration slideDuration
328
);
329
}
330
```
331
332
**Output Operations:**
333
334
```java { .api }
335
public class JavaDStream<T> {
336
/** Apply function to each RDD */
337
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
338
339
/** Apply function to each RDD with time */
340
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc);
341
342
/** Save as object files */
343
public void saveAsObjectFiles(String prefix, String suffix);
344
345
/** Save as text files */
346
public void saveAsTextFiles(String prefix, String suffix);
347
}
348
```
349
350
**Usage Examples:**
351
352
```java
353
import org.apache.spark.api.java.function.*;
354
355
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
356
357
// Basic transformations with lambda expressions
358
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
359
JavaDStream<String> filteredWords = words.filter(word -> word.length() > 2);
360
JavaDStream<String> upperWords = words.map(String::toUpperCase);
361
362
// Window operations
363
JavaDStream<String> windowedWords = words.window(Durations.seconds(30), Durations.seconds(10));
364
JavaDStream<String> reducedWindow = words.reduceByWindow(
365
(s1, s2) -> s1 + " " + s2,
366
Durations.seconds(30),
367
Durations.seconds(10)
368
);
369
370
// Output operations
371
words.foreachRDD(rdd -> {
372
System.out.println("Batch size: " + rdd.count());
373
rdd.take(10).forEach(System.out::println);
374
});
375
376
// Custom transformations
377
JavaDStream<Integer> wordLengths = words.transform(rdd -> {
378
return rdd.map(String::length).filter(len -> len > 0);
379
});
380
```
381
382
### JavaPairDStream
383
384
Java wrapper for pair DStreams with key-value operations.
385
386
```java { .api }
387
/**
388
* Java-friendly wrapper for pair DStreams
389
*/
390
public class JavaPairDStream<K, V> {
391
/** Get underlying Scala DStream */
392
public DStream<Tuple2<K, V>> dstream();
393
394
/** Cache RDDs */
395
public JavaPairDStream<K, V> cache();
396
397
/** Persist with storage level */
398
public JavaPairDStream<K, V> persist(StorageLevel storageLevel);
399
400
/** Enable checkpointing */
401
public JavaPairDStream<K, V> checkpoint(Duration interval);
402
403
/** Convert to regular JavaDStream */
404
public JavaDStream<Tuple2<K, V>> toJavaDStream();
405
}
406
```
407
408
**Key-Value Transformations:**
409
410
```java { .api }
411
public class JavaPairDStream<K, V> {
412
/** Group values by key */
413
public JavaPairDStream<K, Iterable<V>> groupByKey();
414
415
/** Group by key with partitions */
416
public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions);
417
418
/** Group by key with partitioner */
419
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner);
420
421
/** Reduce values by key */
422
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);
423
424
/** Reduce by key with partitions */
425
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);
426
427
/** Reduce by key with partitioner */
428
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, Partitioner partitioner);
429
430
/** Combine by key */
431
public <C> JavaPairDStream<K, C> combineByKey(
432
Function<V, C> createCombiner,
433
Function2<C, V, C> mergeValue,
434
Function2<C, C, C> mergeCombiner
435
);
436
437
/** Map values only */
438
public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);
439
440
/** FlatMap values only */
441
public <U> JavaPairDStream<K, U> flatMapValues(FlatMapFunction<V, U> f);
442
}
443
```
444
445
**Join Operations:**
446
447
```java { .api }
448
public class JavaPairDStream<K, V> {
449
/** Inner join */
450
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);
451
452
/** Left outer join */
453
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other);
454
455
/** Right outer join */
456
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other);
457
458
/** Full outer join */
459
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other);
460
461
/** Cogroup operation */
462
public <W> JavaPairDStream<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairDStream<K, W> other);
463
}
464
```
465
466
**Windowed Operations:**
467
468
```java { .api }
469
public class JavaPairDStream<K, V> {
470
/** Group by key over window */
471
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration);
472
473
/** Group by key with slide duration */
474
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(
475
Duration windowDuration,
476
Duration slideDuration
477
);
478
479
/** Reduce by key over window */
480
public JavaPairDStream<K, V> reduceByKeyAndWindow(
481
Function2<V, V, V> func,
482
Duration windowDuration,
483
Duration slideDuration
484
);
485
486
/** Incremental reduce by key over window */
487
public JavaPairDStream<K, V> reduceByKeyAndWindow(
488
Function2<V, V, V> reduceFunc,
489
Function2<V, V, V> invReduceFunc,
490
Duration windowDuration,
491
Duration slideDuration
492
);
493
}
494
```
495
496
**Stateful Operations:**
497
498
```java { .api }
499
public class JavaPairDStream<K, V> {
500
/** Update state by key */
501
public <S> JavaPairDStream<K, S> updateStateByKey(
502
Function2<List<V>, Optional<S>, Optional<S>> updateFunc
503
);
504
505
/** Update state with partitioner */
506
public <S> JavaPairDStream<K, S> updateStateByKey(
507
Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
508
Partitioner partitioner
509
);
510
511
/** Update state with partition count */
512
public <S> JavaPairDStream<K, S> updateStateByKey(
513
Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
514
int numPartitions
515
);
516
517
/** Map with state */
518
public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
519
StateSpec<K, V, StateType, MappedType> spec
520
);
521
}
522
```
523
524
**Usage Examples:**
525
526
```java
527
// Create pair stream from words
528
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
529
530
// Word count
531
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
532
533
// Group and count
534
JavaPairDStream<String, Iterable<Integer>> grouped = pairs.groupByKey();
535
536
// Windowed word count
537
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
538
Integer::sum,
539
Durations.minutes(1),
540
Durations.seconds(10)
541
);
542
543
// Stateful word count
544
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
545
(values, state) -> {
546
int sum = values.stream().mapToInt(Integer::intValue).sum();
547
return Optional.of(sum + state.orElse(0));
548
}
549
);
550
551
// Join operations
552
JavaPairDStream<String, Double> scores = // another pair stream
553
JavaPairDStream<String, Tuple2<Integer, Double>> joined = wordCounts.join(scores);
554
555
// Output
556
wordCounts.foreachRDD(rdd -> {
557
Map<String, Integer> wordCountMap = rdd.collectAsMap();
558
wordCountMap.forEach((word, count) -> {
559
System.out.println(word + ": " + count);
560
});
561
});
562
```
563
564
### Java Function Interfaces
565
566
Functional interfaces for lambda expressions and method references.
567
568
```java { .api }
569
// Basic function interfaces
570
@FunctionalInterface
571
public interface Function<T, R> extends Serializable {
572
R call(T t) throws Exception;
573
}
574
575
@FunctionalInterface
576
public interface Function2<T1, T2, R> extends Serializable {
577
R call(T1 t1, T2 t2) throws Exception;
578
}
579
580
@FunctionalInterface
581
public interface VoidFunction<T> extends Serializable {
582
void call(T t) throws Exception;
583
}
584
585
@FunctionalInterface
586
public interface VoidFunction2<T1, T2> extends Serializable {
587
void call(T1 t1, T2 t2) throws Exception;
588
}
589
590
@FunctionalInterface
591
public interface FlatMapFunction<T, R> extends Serializable {
592
Iterator<R> call(T t) throws Exception;
593
}
594
595
@FunctionalInterface
596
public interface PairFunction<T, K, V> extends Serializable {
597
Tuple2<K, V> call(T t) throws Exception;
598
}
599
600
@FunctionalInterface
601
public interface Function3<T1, T2, T3, R> extends Serializable {
602
R call(T1 t1, T2 t2, T3 t3) throws Exception;
603
}
604
605
@FunctionalInterface
606
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
607
Iterator<R> call(T1 t1, T2 t2) throws Exception;
608
}
609
610
@FunctionalInterface
611
public interface PairFlatMapFunction<T, K, V> extends Serializable {
612
Iterator<Tuple2<K, V>> call(T t) throws Exception;
613
}
614
615
@FunctionalInterface
616
public interface DoubleFlatMapFunction<T> extends Serializable {
617
Iterator<Double> call(T t) throws Exception;
618
}
619
620
@FunctionalInterface
621
public interface DoubleFunction<T> extends Serializable {
622
double call(T t) throws Exception;
623
}
624
```
625
626
**Usage Examples:**
627
628
```java
629
// Lambda expressions
630
JavaDStream<String> filtered = lines.filter(line -> line.length() > 0);
631
JavaDStream<Integer> lengths = lines.map(String::length);
632
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
633
634
// Method references
635
JavaDStream<String> upper = lines.map(String::toUpperCase);
636
JavaDStream<Integer> wordCounts = pairs.values().reduce(Integer::sum);
637
638
// Anonymous functions
639
JavaDStream<String[]> splitLines = lines.map(new Function<String, String[]>() {
640
@Override
641
public String[] call(String line) {
642
return line.split("\\s+");
643
}
644
});
645
```