0
# Java API
1
2
Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.
3
4
## JavaStreamingContext
5
6
### Creation
7
8
Create JavaStreamingContext with configuration:
9
```java { .api }
10
public JavaStreamingContext(SparkConf conf, Duration batchDuration)
11
public JavaStreamingContext(String master, String appName, Duration batchDuration)
12
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String jarFile)
13
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars)
14
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars, Map<String, String> environment)
15
```
16
17
Create with existing JavaSparkContext:
18
```java { .api }
19
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)
20
```
21
22
Create from checkpoint:
23
```java { .api }
24
public JavaStreamingContext(String path)
25
```
26
27
Example context creation:
28
```java
29
SparkConf conf = new SparkConf()
30
.setAppName("JavaWordCount")
31
.setMaster("local[2]");
32
33
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
34
```
35
36
### Lifecycle Management
37
38
Start and stop operations:
39
```java { .api }
40
public void start()
41
public void stop()
42
public void stop(boolean stopSparkContext)
43
public void stop(boolean stopSparkContext, boolean stopGracefully)
44
public void awaitTermination() throws InterruptedException
45
public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedException
46
```
47
48
Context state and configuration:
49
```java { .api }
50
public StreamingContextState getState()
51
public JavaSparkContext sparkContext()
52
public void checkpoint(String directory)
53
public void remember(Duration duration)
54
```
55
56
## Input Sources
57
58
### Socket Streams
59
60
Text socket stream:
61
```java { .api }
62
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port)
63
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)
64
```
65
66
Custom socket stream:
67
```java { .api }
68
public <T> JavaReceiverInputDStream<T> socketStream(
69
String hostname,
70
int port,
71
Function<InputStream, Iterable<T>> converter,
72
StorageLevel storageLevel
73
)
74
```
75
76
Example socket streams:
77
```java
78
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
79
80
// Custom converter
81
JavaReceiverInputDStream<Integer> numbers = jssc.socketStream(
82
"localhost", 8080,
83
inputStream -> {
84
List<Integer> result = new ArrayList<>();
85
// Custom parsing logic
86
return result;
87
},
88
StorageLevel.MEMORY_AND_DISK_SER()
89
);
90
```
91
92
### File Streams
93
94
Text file stream:
95
```java { .api }
96
public JavaDStream<String> textFileStream(String directory)
97
```
98
99
Generic file stream:
100
```java { .api }
101
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
102
String directory,
103
Class<K> kClass,
104
Class<V> vClass,
105
Class<F> fClass
106
)
107
108
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
109
String directory,
110
Class<K> kClass,
111
Class<V> vClass,
112
Class<F> fClass,
113
Function<Path, Boolean> filter,
114
boolean newFilesOnly
115
)
116
```
117
118
Binary records stream:
119
```java { .api }
120
public JavaDStream<byte[]> binaryRecordsStream(String directory, int recordLength)
121
```
122
123
Example file streams:
124
```java
125
JavaDStream<String> fileStream = jssc.textFileStream("/data/input");
126
127
// Hadoop file stream
128
JavaPairInputDStream<LongWritable, Text> hadoopStream = jssc.fileStream(
129
"/data/input",
130
LongWritable.class,
131
Text.class,
132
TextInputFormat.class
133
);
134
```
135
136
### Queue and Receiver Streams
137
138
Queue stream:
139
```java { .api }
140
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue)
141
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime)
142
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime, JavaRDD<T> defaultRDD)
143
```
144
145
Receiver stream:
146
```java { .api }
147
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver)
148
```
149
150
Example queue stream:
151
```java
152
Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
153
JavaDStream<String> queueStream = jssc.queueStream(rddQueue);
154
155
// Add RDDs to queue
156
rddQueue.add(jssc.sparkContext().parallelize(Arrays.asList("hello", "world")));
157
```
158
159
## JavaDStream Transformations
160
161
### Basic Transformations
162
163
Map operations:
164
```java { .api }
165
public <R> JavaDStream<R> map(Function<T, R> f)
166
public <R> JavaDStream<R> mapPartitions(FlatMapFunction<Iterator<T>, R> f)
167
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f)
168
```
169
170
Filter and utility operations:
171
```java { .api }
172
public JavaDStream<T> filter(Function<T, Boolean> f)
173
public JavaDStream<T[]> glom()
174
public JavaDStream<T> cache()
175
public JavaDStream<T> persist(StorageLevel level)
176
public JavaDStream<T> repartition(int numPartitions)
177
```
178
179
Example basic transformations:
180
```java
181
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
182
183
JavaDStream<Integer> lengths = lines.map(String::length);
184
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
185
JavaDStream<String> nonEmpty = lines.filter(line -> !line.isEmpty());
186
```
187
188
### Aggregation Operations
189
190
Reduce and count operations:
191
```java { .api }
192
public JavaDStream<T> reduce(Function2<T, T, T> f)
193
public JavaDStream<Long> count()
194
public JavaPairDStream<T, Long> countByValue()
195
public JavaPairDStream<T, Long> countByValue(int numPartitions)
196
```
197
198
Example aggregations:
199
```java
200
JavaDStream<Integer> numbers = lines.map(Integer::parseInt);
201
202
JavaDStream<Integer> sum = numbers.reduce(Integer::sum);
203
JavaDStream<Long> count = numbers.count();
204
JavaPairDStream<Integer, Long> histogram = numbers.countByValue();
205
```
206
207
### Window Operations
208
209
Basic windowing:
210
```java { .api }
211
public JavaDStream<T> window(Duration windowDuration)
212
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration)
213
```
214
215
Windowed reductions:
216
```java { .api }
217
public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Duration windowDuration, Duration slideDuration)
218
public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Function2<T, T, T> invReduceFunc, Duration windowDuration, Duration slideDuration)
219
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration)
220
```
221
222
Example windowing:
223
```java
224
JavaDStream<String> windowedLines = lines.window(Durations.seconds(30), Durations.seconds(10));
225
JavaDStream<Integer> windowSum = numbers.reduceByWindow(
226
Integer::sum,
227
Durations.minutes(1),
228
Durations.seconds(10)
229
);
230
```
231
232
### Transform Operations
233
234
RDD-level transformations:
235
```java { .api }
236
public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc)
237
public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc)
238
public <U, R> JavaDStream<R> transformWith(JavaDStream<U> other, Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<R>> transformFunc)
239
```
240
241
Example transforms:
242
```java
243
JavaDStream<String> processed = lines.transform(rdd -> {
244
return rdd.filter(line -> !line.isEmpty())
245
.map(String::toUpperCase);
246
});
247
248
JavaDStream<String> timestamped = lines.transform((rdd, time) -> {
249
return rdd.map(line -> time.milliseconds() + ": " + line);
250
});
251
```
252
253
## JavaPairDStream Operations
254
255
### Pair Creation
256
257
Create pair DStream:
258
```java { .api }
259
public <K2, V2> JavaPairDStream<K2, V2> mapToPair(PairFunction<T, K2, V2> f)
260
public <K2, V2> JavaPairDStream<K2, V2> flatMapToPair(PairFlatMapFunction<T, K2, V2> f)
261
```
262
263
Example pair creation:
264
```java
265
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
266
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
267
```
268
269
### Key-Value Transformations
270
271
Value transformations:
272
```java { .api }
273
public <W> JavaPairDStream<K, W> mapValues(Function<V, W> f) // On JavaPairDStream<K, V>
274
public <W> JavaPairDStream<K, W> flatMapValues(Function<V, Iterable<W>> f)
275
```
276
277
Grouping and reduction:
278
```java { .api }
279
public JavaPairDStream<K, Iterable<V>> groupByKey()
280
public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions)
281
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner)
282
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func)
283
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)
284
```
285
286
Example key-value operations:
287
```java
288
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
289
JavaPairDStream<String, String> upperValues = pairs.mapValues(String::valueOf).mapValues(String::toUpperCase);
290
```
291
292
### Windowed Key-Value Operations
293
294
Windowed grouping and reduction:
295
```java { .api }
296
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration)
297
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration, Duration slideDuration)
298
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration)
299
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration, Duration slideDuration)
300
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> reduceFunc, Function2<V, V, V> invReduceFunc, Duration windowDuration, Duration slideDuration)
301
```
302
303
Example windowed operations:
304
```java
305
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
306
Integer::sum,
307
Durations.minutes(5),
308
Durations.seconds(30)
309
);
310
```
311
312
### Join Operations
313
314
Join DStreams:
315
```java { .api }
316
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other)
317
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other, int numPartitions)
318
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other)
319
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other)
320
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other)
321
```
322
323
Example joins:
324
```java
325
JavaPairDStream<String, String> stream1 = lines1.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));
326
JavaPairDStream<String, String> stream2 = lines2.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));
327
328
JavaPairDStream<String, Tuple2<String, String>> joined = stream1.join(stream2);
329
```
330
331
## Stateful Operations
332
333
### UpdateStateByKey
334
335
Update state by key:
336
```java { .api }
337
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc) // On JavaPairDStream<K, V>
338
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, int numPartitions)
339
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, Partitioner partitioner, JavaRDD<Tuple2<K, S>> initialRDD)
340
```
341
342
Example updateStateByKey:
343
```java
344
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
345
(values, state) -> {
346
int currentCount = values.stream().mapToInt(Integer::intValue).sum();
347
int newCount = state.orElse(0) + currentCount;
348
return Optional.of(newCount);
349
}
350
);
351
```
352
353
### MapWithState
354
355
State specification and mapping:
356
```java { .api }
357
// StateSpec factory methods for Java
358
public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction3<K, Optional<V>, State<S>, T> mappingFunction)
359
public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction4<Time, K, Optional<V>, State<S>, Optional<T>> mappingFunction)
360
```
361
362
StateSpec configuration:
363
```java { .api }
364
public StateSpec<K, V, S, T> initialState(JavaPairRDD<K, S> rdd)
365
public StateSpec<K, V, S, T> numPartitions(int numPartitions)
366
public StateSpec<K, V, S, T> partitioner(Partitioner partitioner)
367
public StateSpec<K, V, S, T> timeout(Duration idleDuration)
368
```
369
370
Example mapWithState:
371
```java
372
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> stateSpec =
373
StateSpec.function((word, one, state) -> {
374
int sum = one.orElse(0) + state.getOption().orElse(0);
375
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
376
state.update(sum);
377
return output;
378
});
379
380
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDStream =
381
pairs.mapWithState(stateSpec);
382
```
383
384
## Output Operations
385
386
### Basic Output
387
388
Print and forEach operations:
389
```java { .api }
390
public void print()
391
public void print(int num)
392
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc)
393
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc)
394
```
395
396
### File Output
397
398
Save operations:
399
```java { .api }
400
public void saveAsTextFiles(String prefix, String suffix) // On JavaDStream
401
public void saveAsObjectFiles(String prefix, String suffix)
402
public void saveAsHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass) // On JavaPairDStream
403
public void saveAsNewAPIHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends NewOutputFormat> outputFormatClass)
404
```
405
406
Example output operations:
407
```java
408
wordCounts.print();
409
410
wordCounts.foreachRDD(rdd -> {
411
System.out.println("Batch size: " + rdd.count());
412
rdd.collect().forEach(System.out::println);
413
});
414
415
lines.saveAsTextFiles("output", "txt");
416
```
417
418
## Event Listeners
419
420
### JavaStreamingListener
421
422
Java streaming listener interface:
423
```java { .api }
424
public abstract class JavaStreamingListener {
425
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
426
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
427
public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}
428
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
429
public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}
430
public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}
431
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
432
public void onReceiverError(StreamingListenerReceiverError receiverError) {}
433
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
434
}
435
```
436
437
Add/remove listeners:
438
```java { .api }
439
public void addStreamingListener(StreamingListener streamingListener)
440
public void removeStreamingListener(StreamingListener streamingListener)
441
```
442
443
Example listener:
444
```java
445
jssc.addStreamingListener(new JavaStreamingListener() {
446
@Override
447
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
448
BatchInfo info = batchCompleted.batchInfo();
449
System.out.println("Batch completed: " + info.batchTime() +
450
" Processing time: " + info.processingDelay());
451
}
452
});
453
```
454
455
## Duration Utilities
456
457
Duration creation:
458
```java { .api }
459
public class Durations {
460
public static Duration milliseconds(long milliseconds)
461
public static Duration seconds(long seconds)
462
public static Duration minutes(long minutes)
463
}
464
```
465
466
Example duration usage:
467
```java
468
Duration batchInterval = Durations.seconds(5);
469
Duration windowSize = Durations.minutes(10);
470
Duration slideInterval = Durations.seconds(30);
471
```
472
473
## Complete Java Example
474
475
```java
476
import org.apache.spark.SparkConf;
477
import org.apache.spark.streaming.api.java.*;
478
import org.apache.spark.streaming.Durations;
479
import scala.Tuple2;
480
import java.util.Arrays;
481
import java.util.Optional;
482
483
public class JavaWordCount {
484
public static void main(String[] args) throws InterruptedException {
485
SparkConf conf = new SparkConf()
486
.setAppName("JavaWordCount")
487
.setMaster("local[2]");
488
489
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
490
jssc.checkpoint("checkpoint");
491
492
// Create input stream
493
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
494
495
// Transform and count words
496
JavaDStream<String> words = lines.flatMap(line ->
497
Arrays.asList(line.split(" ")).iterator());
498
499
JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->
500
new Tuple2<>(word, 1));
501
502
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
503
504
// Running count across batches
505
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
506
(values, state) -> {
507
int currentCount = values.stream().mapToInt(Integer::intValue).sum();
508
int newCount = state.orElse(0) + currentCount;
509
return Optional.of(newCount);
510
}
511
);
512
513
wordCounts.print();
514
runningCounts.print();
515
516
jssc.start();
517
jssc.awaitTermination();
518
}
519
}
520
```