0
# Java API
1
2
Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures for seamless integration with Java applications.
3
4
**⚠️ Deprecation Notice**: The Java API is also deprecated along with Spark Streaming. Use Structured Streaming's Dataset API for new Java applications.
5
6
## Capabilities
7
8
### JavaStreamingContext
9
10
Java-friendly version of StreamingContext providing the main entry point for Java streaming applications.
11
12
```java { .api }
13
/**
14
* Java API for StreamingContext
15
*/
16
public class JavaStreamingContext {
17
18
// Constructors
19
/** Create from JavaSparkContext and Duration */
20
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
21
22
/** Create from SparkConf and Duration */
23
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
24
25
// Lifecycle management
26
/** Start the streaming context */
27
public void start();
28
29
/** Stop the streaming context */
30
public void stop();
31
32
/** Stop with option to stop SparkContext */
33
public void stop(boolean stopSparkContext);
34
35
/** Wait for termination */
36
public void awaitTermination();
37
38
/** Wait for termination with timeout */
39
public boolean awaitTerminationOrTimeout(long timeout);
40
41
// Configuration
42
/** Set checkpoint directory */
43
public void checkpoint(String directory);
44
45
/** Set remember duration */
46
public void remember(Duration duration);
47
48
// Properties
49
/** Get underlying Spark context */
50
public JavaSparkContext sparkContext();
51
52
/** Get streaming context state */
53
public StreamingContextState getState();
54
55
// Input stream creation
56
/** Create socket text stream */
57
public JavaDStream<String> socketTextStream(String hostname, int port);
58
59
/** Create socket text stream with storage level */
60
public JavaDStream<String> socketTextStream(
61
String hostname,
62
int port,
63
StorageLevel storageLevel
64
);
65
66
/** Create text file stream */
67
public JavaDStream<String> textFileStream(String directory);
68
69
/** Create queue stream */
70
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue);
71
72
/** Create queue stream with options */
73
public <T> JavaDStream<T> queueStream(
74
Queue<JavaRDD<T>> queue,
75
boolean oneAtATime
76
);
77
78
/** Create receiver stream */
79
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);
80
81
// Union operations
82
/** Union multiple DStreams */
83
public <T> JavaDStream<T> union(JavaDStream<T> first, List<JavaDStream<T>> rest);
84
85
// Listeners
86
/** Add streaming listener */
87
public void addStreamingListener(JavaStreamingListener listener);
88
89
/** Remove streaming listener */
90
public void removeStreamingListener(JavaStreamingListener listener);
91
}
92
```
93
94
**Usage Examples:**
95
96
```java
97
import org.apache.spark.SparkConf;
98
import org.apache.spark.api.java.JavaSparkContext;
99
import org.apache.spark.streaming.Duration;
100
import org.apache.spark.streaming.Durations;
101
import org.apache.spark.streaming.api.java.*;
102
103
// Create streaming context
104
SparkConf conf = new SparkConf().setAppName("JavaStreamingApp").setMaster("local[2]");
105
JavaSparkContext sc = new JavaSparkContext(conf);
106
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(2));
107
108
// Enable checkpointing
109
jssc.checkpoint("hdfs://namenode/checkpoints");
110
111
// Create input stream
112
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
113
114
// Start processing
115
jssc.start();
116
jssc.awaitTermination();
117
```
118
119
### JavaDStream
120
121
Java wrapper for DStream providing functional programming interfaces compatible with Java 8+ lambda expressions.
122
123
```java { .api }
124
/**
125
* Java API for DStream operations
126
* @param <T> - Type of elements in the stream
127
*/
128
public class JavaDStream<T> {
129
130
// Core properties
131
/** Get streaming context */
132
public JavaStreamingContext context();
133
134
/** Get slide duration */
135
public Duration slideDuration();
136
137
// Transformations
138
/** Transform each element using Java Function */
139
public <R> JavaDStream<R> map(Function<T, R> f);
140
141
/** Filter elements using predicate */
142
public JavaDStream<T> filter(Function<T, Boolean> f);
143
144
/** FlatMap transformation */
145
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
146
147
/** Transform to pair DStream */
148
public <K, V> JavaPairDStream<K, V> mapToPair(PairFunction<T, K, V> f);
149
150
/** Group elements into arrays */
151
public JavaDStream<List<T>> glom();
152
153
/** Repartition the stream */
154
public JavaDStream<T> repartition(int numPartitions);
155
156
/** Union with another stream */
157
public JavaDStream<T> union(JavaDStream<T> other);
158
159
/** Cache the stream */
160
public JavaDStream<T> cache();
161
162
/** Persist with storage level */
163
public JavaDStream<T> persist(StorageLevel storageLevel);
164
165
// Window operations
166
/** Create windowed stream */
167
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);
168
169
/** Reduce in sliding window */
170
public JavaDStream<T> reduceByWindow(
171
Function2<T, T, T> reduceFunc,
172
Duration windowDuration,
173
Duration slideDuration
174
);
175
176
/** Count in sliding window */
177
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);
178
179
// Advanced transformations
180
/** Transform using RDD-to-RDD function */
181
public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc);
182
183
/** Transform with time */
184
public <R> JavaDStream<R> transform(
185
Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc
186
);
187
188
// Actions
189
/** Print first 10 elements of each batch */
190
public void print();
191
192
/** Print first num elements */
193
public void print(int num);
194
195
/** Apply function to each RDD */
196
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
197
198
/** Apply function to each RDD with time */
199
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc);
200
201
// Conversion
202
/** Convert to Scala DStream */
203
public DStream<T> dstream();
204
}
205
```
206
207
**Usage Examples:**
208
209
```java
210
// Basic transformations
211
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
212
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
213
JavaDStream<String> filtered = words.filter(word -> word.length() > 3);
214
215
// Map to pairs for aggregation
216
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
217
218
// Window operations
219
JavaDStream<String> windowed = words.window(
220
Durations.seconds(10),
221
Durations.seconds(2)
222
);
223
224
// Actions
225
words.print(20);
226
words.foreachRDD(rdd -> {
227
if (!rdd.isEmpty()) {
228
System.out.println("Batch size: " + rdd.count());
229
}
230
});
231
```
232
233
### JavaPairDStream
234
235
Java wrapper for pair DStreams providing key-value operations like joins and aggregations.
236
237
```java { .api }
238
/**
239
* Java API for pair DStream operations
240
* @param <K> - Key type
241
* @param <V> - Value type
242
*/
243
public class JavaPairDStream<K, V> {
244
245
// Key-based aggregations
246
/** Group values by key */
247
public JavaPairDStream<K, Iterable<V>> groupByKey();
248
249
/** Group by key with partitioner */
250
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner);
251
252
/** Reduce values by key */
253
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);
254
255
/** Reduce by key with partitioner */
256
public JavaPairDStream<K, V> reduceByKey(
257
Function2<V, V, V> func,
258
Partitioner partitioner
259
);
260
261
/** Combine by key */
262
public <C> JavaPairDStream<K, C> combineByKey(
263
Function<V, C> createCombiner,
264
Function2<C, V, C> mergeValue,
265
Function2<C, C, C> mergeCombiners
266
);
267
268
/** Count by key */
269
public JavaPairDStream<K, Long> countByKey();
270
271
// Join operations
272
/** Inner join */
273
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);
274
275
/** Left outer join */
276
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(
277
JavaPairDStream<K, W> other
278
);
279
280
/** Right outer join */
281
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(
282
JavaPairDStream<K, W> other
283
);
284
285
/** Full outer join */
286
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(
287
JavaPairDStream<K, W> other
288
);
289
290
// Windowed operations
291
/** Group by key in window */
292
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(
293
Duration windowDuration,
294
Duration slideDuration
295
);
296
297
/** Reduce by key in window */
298
public JavaPairDStream<K, V> reduceByKeyAndWindow(
299
Function2<V, V, V> func,
300
Duration windowDuration,
301
Duration slideDuration
302
);
303
304
// State operations
305
/** Update state by key */
306
public <S> JavaPairDStream<K, S> updateStateByKey(
307
Function2<List<V>, Optional<S>, Optional<S>> updateFunc
308
);
309
310
/** Map with state */
311
public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
312
StateSpec<K, V, StateType, MappedType> spec
313
);
314
315
// Value operations
316
/** Map values only */
317
public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);
318
319
/** FlatMap values */
320
public <U> JavaPairDStream<K, U> flatMapValues(FlatMapFunction<V, U> f);
321
322
// Conversion
323
/** Convert to regular DStream of pairs */
324
public JavaDStream<Tuple2<K, V>> toJavaDStream();
325
326
/** Convert to Scala PairDStream */
327
public DStream<Tuple2<K, V>> dstream();
328
}
329
```
330
331
**Usage Examples:**
332
333
```java
334
// Word count example
335
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
336
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
337
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
338
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);
339
340
// Windowed word count
341
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
342
(a, b) -> a + b,
343
Durations.seconds(30),
344
Durations.seconds(10)
345
);
346
347
// Join streams
348
JavaPairDStream<String, Integer> counts1 = getCountStream1();
349
JavaPairDStream<String, Double> rates = getRateStream();
350
JavaPairDStream<String, Tuple2<Integer, Double>> joined = counts1.join(rates);
351
```
352
353
### Java Input Streams
354
355
Java wrappers for various input stream types.
356
357
```java { .api }
358
/**
359
* Java wrapper for input streams
360
*/
361
public class JavaInputDStream<T> extends JavaDStream<T> {
362
// Inherits all JavaDStream methods
363
}
364
365
/**
366
* Java wrapper for pair input streams
367
*/
368
public class JavaPairInputDStream<K, V> extends JavaPairDStream<K, V> {
369
// Inherits all JavaPairDStream methods
370
}
371
372
/**
373
* Java wrapper for receiver input streams
374
*/
375
public class JavaReceiverInputDStream<T> extends JavaInputDStream<T> {
376
// Additional receiver-specific methods if any
377
}
378
379
/**
380
* Java wrapper for pair receiver input streams
381
*/
382
public class JavaPairReceiverInputDStream<K, V> extends JavaPairInputDStream<K, V> {
383
// Additional receiver-specific methods if any
384
}
385
386
/**
387
* Java wrapper for mapWithState result
388
*/
389
public class JavaMapWithStateDStream<K, V, S, T> extends JavaDStream<T> {
390
/** Get state snapshots */
391
public JavaPairDStream<K, S> stateSnapshots();
392
}
393
```
394
395
### Java Listeners
396
397
Java-friendly interfaces for streaming event listeners.
398
399
```java { .api }
400
/**
401
* Java interface for streaming listeners
402
*/
403
public interface JavaStreamingListener {
404
405
/** Called when streaming starts */
406
void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted);
407
408
/** Called when receiver starts */
409
void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted);
410
411
/** Called when receiver encounters error */
412
void onReceiverError(JavaStreamingListenerReceiverError receiverError);
413
414
/** Called when receiver stops */
415
void onReceiverStopped(JavaStreamingListenerReceiverStopped receiverStopped);
416
417
/** Called when batch is submitted */
418
void onBatchSubmitted(JavaStreamingListenerBatchSubmitted batchSubmitted);
419
420
/** Called when batch processing starts */
421
void onBatchStarted(JavaStreamingListenerBatchStarted batchStarted);
422
423
/** Called when batch completes */
424
void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted);
425
426
/** Called when output operation starts */
427
void onOutputOperationStarted(JavaStreamingListenerOutputOperationStarted outputOperationStarted);
428
429
/** Called when output operation completes */
430
void onOutputOperationCompleted(JavaStreamingListenerOutputOperationCompleted outputOperationCompleted);
431
}
432
433
/**
434
* Wrapper that converts Java listener to Scala listener
435
*/
436
public class JavaStreamingListenerWrapper implements StreamingListener {
437
public JavaStreamingListenerWrapper(JavaStreamingListener javaListener);
438
}
439
```
440
441
**Usage Examples:**
442
443
```java
444
// Custom Java listener
445
public class MyStreamingListener implements JavaStreamingListener {
446
@Override
447
public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
448
System.out.println("Batch completed: " + batchCompleted.batchInfo().batchTime());
449
}
450
451
// Implement other methods...
452
}
453
454
// Add listener to context
455
jssc.addStreamingListener(new MyStreamingListener());
456
```
457
458
## Java 8+ Lambda Support
459
460
The Java API fully supports Java 8+ lambda expressions for concise functional programming:
461
462
```java
463
// Functional style with lambdas
464
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
465
466
JavaPairDStream<String, Integer> wordCounts = lines
467
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
468
.filter(word -> !word.isEmpty())
469
.mapToPair(word -> new Tuple2<>(word, 1))
470
.reduceByKey((a, b) -> a + b);
471
472
wordCounts.foreachRDD((rdd, time) -> {
473
System.out.println("=== Results at " + time + " ===");
474
rdd.collect().stream()
475
.sorted((t1, t2) -> t2._2.compareTo(t1._2))
476
.limit(10)
477
.forEach(tuple -> System.out.println(tuple._1 + ": " + tuple._2));
478
});
479
```