0
# Java API
1
2
The Java API for Spark Streaming provides Java-friendly wrappers around the Scala implementation, using Java 8 functional interfaces and familiar Java patterns. All core Spark Streaming functionality is available through the Java API with appropriate type safety and lambda support.
3
4
## Capabilities
5
6
### Java Streaming Context
7
8
Entry point for Java-based Spark Streaming applications.
9
10
```java { .api }
11
/**
12
* Create JavaStreamingContext from SparkConf
13
* @param conf - Spark configuration
14
* @param batchDuration - Time interval for batching streaming data
15
*/
16
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
17
18
/**
19
* Create JavaStreamingContext from JavaSparkContext
20
* @param sparkContext - Existing JavaSparkContext instance
21
* @param batchDuration - Time interval for batching streaming data
22
*/
23
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
24
25
/**
26
* Create JavaStreamingContext with master and app name
27
* @param master - Cluster URL to connect to
28
* @param appName - Name for your application
29
* @param batchDuration - Time interval for batching streaming data
30
*/
31
public JavaStreamingContext(String master, String appName, Duration batchDuration);
32
33
// Lifecycle management
34
public void start();
35
public void stop();
36
public void stop(boolean stopSparkContext);
37
public void awaitTermination() throws InterruptedException;
38
public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedException;
39
40
// Configuration
41
public void checkpoint(String directory);
42
public void remember(Duration duration);
43
```
44
45
**Usage Examples:**
46
47
```java
48
import org.apache.spark.SparkConf;
49
import org.apache.spark.streaming.api.java.JavaStreamingContext;
50
import org.apache.spark.streaming.Durations;
51
52
// Create streaming context
53
SparkConf conf = new SparkConf().setAppName("JavaStreamingApp").setMaster("local[2]");
54
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
55
56
// Set checkpoint
57
jssc.checkpoint("hdfs://checkpoint");
58
59
// Start processing
60
jssc.start();
61
jssc.awaitTermination();
62
```
63
64
### Java Input Streams
65
66
Java-friendly methods for creating input streams from various sources.
67
68
```java { .api }
69
/**
70
* Create text input stream from TCP socket
71
* @param hostname - Hostname to connect to
72
* @param port - Port number to connect to
73
* @returns JavaReceiverInputDStream of strings
74
*/
75
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port);
76
77
/**
78
* Create text input stream with storage level
79
* @param hostname - Hostname to connect to
80
* @param port - Port number to connect to
81
* @param storageLevel - Storage level for received data
82
* @returns JavaReceiverInputDStream of strings
83
*/
84
public JavaReceiverInputDStream<String> socketTextStream(
85
String hostname,
86
int port,
87
StorageLevel storageLevel
88
);
89
90
/**
91
* Create input stream from text files in directory
92
* @param directory - Directory path to monitor
93
* @returns JavaDStream of strings
94
*/
95
public JavaDStream<String> textFileStream(String directory);
96
97
/**
98
* Create input stream from queue of JavaRDDs
99
* @param queue - Queue containing JavaRDDs to process
100
* @returns JavaInputDStream from queue
101
*/
102
public <T> JavaInputDStream<T> queueStream(Queue<JavaRDD<T>> queue);
103
104
/**
105
* Create input stream from custom receiver
106
* @param receiver - Custom receiver implementation
107
* @returns JavaReceiverInputDStream from receiver
108
*/
109
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);
110
```
111
112
### JavaDStream Operations
113
114
Core DStream operations with Java 8 functional interfaces.
115
116
```java { .api }
117
/**
118
* Transform each element using a function
119
* @param f - Function to apply to each element
120
* @returns New JavaDStream with transformed elements
121
*/
122
public <R> JavaDStream<R> map(Function<T, R> f);
123
124
/**
125
* Transform each element to multiple elements
126
* @param f - Function returning an Iterable for each element
127
* @returns New JavaDStream with flattened results
128
*/
129
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
130
131
/**
132
* Filter elements based on predicate
133
* @param f - Predicate function returning boolean
134
* @returns New JavaDStream with filtered elements
135
*/
136
public JavaDStream<T> filter(Function<T, Boolean> f);
137
138
/**
139
* Union with another JavaDStream
140
* @param other - JavaDStream to union with
141
* @returns Combined JavaDStream
142
*/
143
public JavaDStream<T> union(JavaDStream<T> other);
144
145
/**
146
* Repartition the stream
147
* @param numPartitions - Number of partitions for output
148
* @returns Repartitioned JavaDStream
149
*/
150
public JavaDStream<T> repartition(int numPartitions);
151
152
/**
153
* Transform each RDD using custom function
154
* @param f - Function to transform JavaRDD
155
* @returns New JavaDStream with transformed RDDs
156
*/
157
public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> f);
158
159
/**
160
* Transform each RDD with time information
161
* @param f - Function receiving JavaRDD and Time
162
* @returns New JavaDStream with transformed RDDs
163
*/
164
public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> f);
165
```
166
167
### Window Operations in Java
168
169
Java-friendly window operations with Duration objects.
170
171
```java { .api }
172
/**
173
* Create windowed stream
174
* @param windowDuration - Width of the window
175
* @returns JavaDStream containing windowed data
176
*/
177
public JavaDStream<T> window(Duration windowDuration);
178
179
/**
180
* Create windowed stream with slide duration
181
* @param windowDuration - Width of the window
182
* @param slideDuration - Sliding interval
183
* @returns JavaDStream containing windowed data
184
*/
185
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);
186
187
/**
188
* Count elements over sliding window
189
* @param windowDuration - Width of the window
190
* @param slideDuration - Sliding interval
191
* @returns JavaDStream of counts
192
*/
193
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);
194
195
/**
196
* Reduce elements over sliding window
197
* @param reduceFunc - Function to combine elements
198
* @param windowDuration - Width of the window
199
* @param slideDuration - Sliding interval
200
* @returns JavaDStream with reduced results
201
*/
202
public JavaDStream<T> reduceByWindow(
203
Function2<T, T, T> reduceFunc,
204
Duration windowDuration,
205
Duration slideDuration
206
);
207
```
208
209
### Output Operations in Java
210
211
Java-friendly output operations for processing and saving data.
212
213
```java { .api }
214
/**
215
* Apply function to each RDD
216
* @param f - Function to apply to each JavaRDD
217
*/
218
public void foreachRDD(VoidFunction<JavaRDD<T>> f);
219
220
/**
221
* Apply function to each RDD with time information
222
* @param f - Function receiving JavaRDD and Time
223
*/
224
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> f);
225
226
/**
227
* Print first 10 elements of each RDD
228
*/
229
public void print();
230
231
/**
232
* Print first num elements of each RDD
233
* @param num - Number of elements to print
234
*/
235
public void print(int num);
236
237
/**
238
* Save as text files with prefix
239
* @param prefix - Prefix for output file names
240
*/
241
public void saveAsTextFiles(String prefix);
242
243
/**
244
* Save as text files with prefix and suffix
245
* @param prefix - Prefix for output file names
246
* @param suffix - Suffix for output file names
247
*/
248
public void saveAsTextFiles(String prefix, String suffix);
249
```
250
251
**Usage Examples:**
252
253
```java
254
import org.apache.spark.streaming.api.java.JavaDStream;
255
import org.apache.spark.api.java.function.Function;
256
import org.apache.spark.api.java.function.VoidFunction;
257
258
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
259
260
// Basic transformations
261
JavaDStream<Integer> lengths = lines.map(String::length);
262
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
263
JavaDStream<String> filtered = lines.filter(line -> line.length() > 0);
264
265
// Window operations
266
JavaDStream<String> windowed = lines.window(Durations.seconds(30), Durations.seconds(10));
267
JavaDStream<Long> counts = lines.countByWindow(Durations.minutes(1), Durations.seconds(30));
268
269
// Output operations
270
words.foreachRDD(rdd -> {
271
long count = rdd.count();
272
System.out.println("Words in this batch: " + count);
273
});
274
275
lines.print(20);
276
```
277
278
### JavaPairDStream Operations
279
280
Operations for key-value pair streams in Java.
281
282
```java { .api }
283
/**
284
* Group values by key
285
* @returns JavaPairDStream of (key, iterable of values)
286
*/
287
public JavaPairDStream<K, Iterable<V>> groupByKey();
288
289
/**
290
* Reduce values by key
291
* @param func - Function to combine values
292
* @returns JavaPairDStream of (key, reduced value)
293
*/
294
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);
295
296
/**
297
* Combine values by key using combiner functions
298
* @param createCombiner - Function to create initial combiner
299
* @param mergeValue - Function to merge value into combiner
300
* @param mergeCombiner - Function to merge combiners
301
* @returns JavaPairDStream of (key, combined value)
302
*/
303
public <C> JavaPairDStream<K, C> combineByKey(
304
Function<V, C> createCombiner,
305
Function2<C, V, C> mergeValue,
306
Function2<C, C, C> mergeCombiner
307
);
308
309
/**
310
* Transform values while keeping keys
311
* @param f - Function to transform values
312
* @returns JavaPairDStream with transformed values
313
*/
314
public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);
315
316
/**
317
* Join with another JavaPairDStream
318
* @param other - JavaPairDStream to join with
319
* @returns JavaPairDStream of (key, (leftValue, rightValue))
320
*/
321
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);
322
323
/**
324
* Left outer join with another JavaPairDStream
325
* @param other - JavaPairDStream to join with
326
* @returns JavaPairDStream of (key, (leftValue, Optional[rightValue]))
327
*/
328
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other);
329
```
330
331
### Java State Management
332
333
Stateful operations using Java functional interfaces.
334
335
```java { .api }
336
/**
337
* Update state by key using Java function
338
* @param updateFunc - Function to update state
339
* @returns JavaPairDStream of (key, state)
340
*/
341
public <S> JavaPairDStream<K, S> updateStateByKey(
342
Function2<List<V>, Optional<S>, Optional<S>> updateFunc
343
);
344
345
/**
346
* Update state by key with custom partitioner
347
* @param updateFunc - Function to update state
348
* @param partitioner - Custom partitioner
349
* @returns JavaPairDStream of (key, state)
350
*/
351
public <S> JavaPairDStream<K, S> updateStateByKey(
352
Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
353
Partitioner partitioner
354
);
355
356
/**
357
* Map with state using StateSpec (experimental)
358
* @param spec - StateSpec configuration
359
* @returns JavaMapWithStateDStream
360
*/
361
public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
362
StateSpec<K, V, StateType, MappedType> spec
363
);
364
```
365
366
**Usage Examples:**
367
368
```java
369
import org.apache.spark.streaming.api.java.JavaPairDStream;
370
import org.apache.spark.api.java.Optional;
371
import scala.Tuple2;
372
373
// Create pair stream
374
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
375
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
376
377
// Aggregations
378
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);
379
JavaPairDStream<String, Iterable<Integer>> grouped = pairs.groupByKey();
380
381
// State management
382
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey((values, state) -> {
383
int currentCount = state.or(0);
384
int newCount = currentCount + values.stream().mapToInt(Integer::intValue).sum();
385
return newCount == 0 ? Optional.empty() : Optional.of(newCount);
386
});
387
388
// Window operations on pairs
389
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
390
(a, b) -> a + b,
391
Durations.seconds(30),
392
Durations.seconds(10)
393
);
394
```
395
396
### Java Listeners
397
398
Java-friendly streaming listeners for monitoring applications.
399
400
```java { .api }
401
/**
402
* Abstract base class for Java streaming listeners
403
*/
404
public abstract class JavaStreamingListener {
405
// Override methods you need to handle
406
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
407
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
408
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
409
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
410
public void onReceiverError(StreamingListenerReceiverError receiverError) {}
411
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
412
}
413
414
// Add listener to streaming context
415
jssc.addStreamingListener(new MyJavaStreamingListener());
416
```
417
418
### Duration Helper Class
419
420
Java-friendly duration creation utilities.
421
422
```java { .api }
423
/**
424
* Utility class for creating Duration objects
425
*/
426
public class Durations {
427
public static Duration milliseconds(long milliseconds);
428
public static Duration seconds(long seconds);
429
public static Duration minutes(long minutes);
430
public static Duration hours(long hours);
431
}
432
```
433
434
## Complete Java Example
435
436
```java
437
import org.apache.spark.SparkConf;
438
import org.apache.spark.streaming.api.java.*;
439
import org.apache.spark.streaming.Durations;
440
import org.apache.spark.api.java.function.*;
441
import org.apache.spark.api.java.Optional;
442
import scala.Tuple2;
443
import java.util.Arrays;
444
import java.util.List;
445
446
public class JavaWordCount {
447
public static void main(String[] args) throws InterruptedException {
448
// Create streaming context
449
SparkConf conf = new SparkConf()
450
.setAppName("JavaWordCount")
451
.setMaster("local[2]");
452
453
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
454
jssc.checkpoint("checkpoint");
455
456
// Create input stream
457
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
458
459
// Process data
460
JavaDStream<String> words = lines.flatMap(line ->
461
Arrays.asList(line.split(" ")).iterator()
462
);
463
464
JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->
465
new Tuple2<>(word, 1)
466
);
467
468
// Running word count with state
469
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
470
(values, state) -> {
471
int sum = values.stream().mapToInt(Integer::intValue).sum();
472
return Optional.of(state.or(0) + sum);
473
}
474
);
475
476
// Windowed word count
477
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
478
(a, b) -> a + b, // Reduce function
479
(a, b) -> a - b, // Inverse reduce function
480
Durations.seconds(30), // Window duration
481
Durations.seconds(10) // Slide duration
482
);
483
484
// Output results
485
runningCounts.print();
486
windowedCounts.print();
487
488
// Advanced output with custom processing
489
runningCounts.foreachRDD(rdd -> {
490
List<Tuple2<String, Integer>> topWords = rdd.top(10,
491
(tuple1, tuple2) -> tuple1._2().compareTo(tuple2._2())
492
);
493
494
System.out.println("Top 10 words:");
495
topWords.forEach(tuple ->
496
System.out.println(tuple._1() + ": " + tuple._2())
497
);
498
});
499
500
// Start processing
501
jssc.start();
502
jssc.awaitTermination();
503
}
504
}
505
```
506
507
## Java API Type Conversions
508
509
Converting between Java and Scala types when needed:
510
511
```java
512
// Convert JavaRDD to RDD when calling Scala APIs
513
JavaRDD<String> javaRDD = /* ... */;
514
RDD<String> scalaRDD = javaRDD.rdd();
515
516
// Convert JavaDStream to DStream
517
JavaDStream<String> javaDStream = /* ... */;
518
DStream<String> scalaDStream = javaDStream.dstream();
519
520
// Working with Options
521
import org.apache.spark.api.java.Optional;
522
523
Optional<Integer> javaOptional = Optional.of(42);
524
Option<Integer> scalaOption = Optional.toScala(javaOptional);
525
```