Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures for seamless integration with Java applications.
⚠️ Deprecation Notice: The Java API is also deprecated along with Spark Streaming. Use Structured Streaming's Dataset API for new Java applications.
Java-friendly version of StreamingContext providing the main entry point for Java streaming applications.
/**
* Java API for StreamingContext
*/
public class JavaStreamingContext {
// Constructors
/** Create from JavaSparkContext and Duration */
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
/** Create from SparkConf and Duration */
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
// Lifecycle management
/** Start the streaming context */
public void start();
/** Stop the streaming context */
public void stop();
/** Stop with option to stop SparkContext */
public void stop(boolean stopSparkContext);
/** Wait for termination */
public void awaitTermination();
/** Wait for termination with timeout */
public boolean awaitTerminationOrTimeout(long timeout);
// Configuration
/** Set checkpoint directory */
public void checkpoint(String directory);
/** Set remember duration */
public void remember(Duration duration);
// Properties
/** Get underlying Spark context */
public JavaSparkContext sparkContext();
/** Get streaming context state */
public StreamingContextState getState();
// Input stream creation
/** Create socket text stream */
public JavaDStream<String> socketTextStream(String hostname, int port);
/** Create socket text stream with storage level */
public JavaDStream<String> socketTextStream(
String hostname,
int port,
StorageLevel storageLevel
);
/** Create text file stream */
public JavaDStream<String> textFileStream(String directory);
/** Create queue stream */
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue);
/** Create queue stream with options */
public <T> JavaDStream<T> queueStream(
Queue<JavaRDD<T>> queue,
boolean oneAtATime
);
/** Create receiver stream */
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);
// Union operations
/** Union multiple DStreams */
public <T> JavaDStream<T> union(JavaDStream<T> first, List<JavaDStream<T>> rest);
// Listeners
/** Add streaming listener */
public void addStreamingListener(JavaStreamingListener listener);
/** Remove streaming listener */
public void removeStreamingListener(JavaStreamingListener listener);
}Usage Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
// Create streaming context
SparkConf conf = new SparkConf().setAppName("JavaStreamingApp").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(2));
// Enable checkpointing
jssc.checkpoint("hdfs://namenode/checkpoints");
// Create input stream
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Start processing
jssc.start();
jssc.awaitTermination();Java wrapper for DStream providing functional programming interfaces compatible with Java 8+ lambda expressions.
/**
* Java API for DStream operations
* @param <T> - Type of elements in the stream
*/
public class JavaDStream<T> {
// Core properties
/** Get streaming context */
public JavaStreamingContext context();
/** Get slide duration */
public Duration slideDuration();
// Transformations
/** Transform each element using Java Function */
public <R> JavaDStream<R> map(Function<T, R> f);
/** Filter elements using predicate */
public JavaDStream<T> filter(Function<T, Boolean> f);
/** FlatMap transformation */
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
/** Transform to pair DStream */
public <K, V> JavaPairDStream<K, V> mapToPair(PairFunction<T, K, V> f);
/** Group elements into arrays */
public JavaDStream<List<T>> glom();
/** Repartition the stream */
public JavaDStream<T> repartition(int numPartitions);
/** Union with another stream */
public JavaDStream<T> union(JavaDStream<T> other);
/** Cache the stream */
public JavaDStream<T> cache();
/** Persist with storage level */
public JavaDStream<T> persist(StorageLevel storageLevel);
// Window operations
/** Create windowed stream */
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);
/** Reduce in sliding window */
public JavaDStream<T> reduceByWindow(
Function2<T, T, T> reduceFunc,
Duration windowDuration,
Duration slideDuration
);
/** Count in sliding window */
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);
// Advanced transformations
/** Transform using RDD-to-RDD function */
public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc);
/** Transform with time */
public <R> JavaDStream<R> transform(
Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc
);
// Actions
/** Print first 10 elements of each batch */
public void print();
/** Print first num elements */
public void print(int num);
/** Apply function to each RDD */
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
/** Apply function to each RDD with time */
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc);
// Conversion
/** Convert to Scala DStream */
public DStream<T> dstream();
}Usage Examples:
// Basic transformations
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaDStream<String> filtered = words.filter(word -> word.length() > 3);
// Map to pairs for aggregation
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
// Window operations
JavaDStream<String> windowed = words.window(
Durations.seconds(10),
Durations.seconds(2)
);
// Actions
words.print(20);
words.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Batch size: " + rdd.count());
}
});Java wrapper for pair DStreams providing key-value operations like joins and aggregations.
/**
* Java API for pair DStream operations
* @param <K> - Key type
* @param <V> - Value type
*/
public class JavaPairDStream<K, V> {
// Key-based aggregations
/** Group values by key */
public JavaPairDStream<K, Iterable<V>> groupByKey();
/** Group by key with partitioner */
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner);
/** Reduce values by key */
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);
/** Reduce by key with partitioner */
public JavaPairDStream<K, V> reduceByKey(
Function2<V, V, V> func,
Partitioner partitioner
);
/** Combine by key */
public <C> JavaPairDStream<K, C> combineByKey(
Function<V, C> createCombiner,
Function2<C, V, C> mergeValue,
Function2<C, C, C> mergeCombiners
);
/** Count by key */
public JavaPairDStream<K, Long> countByKey();
// Join operations
/** Inner join */
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);
/** Left outer join */
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(
JavaPairDStream<K, W> other
);
/** Right outer join */
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(
JavaPairDStream<K, W> other
);
/** Full outer join */
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(
JavaPairDStream<K, W> other
);
// Windowed operations
/** Group by key in window */
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(
Duration windowDuration,
Duration slideDuration
);
/** Reduce by key in window */
public JavaPairDStream<K, V> reduceByKeyAndWindow(
Function2<V, V, V> func,
Duration windowDuration,
Duration slideDuration
);
// State operations
/** Update state by key */
public <S> JavaPairDStream<K, S> updateStateByKey(
Function2<List<V>, Optional<S>, Optional<S>> updateFunc
);
/** Map with state */
public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
StateSpec<K, V, StateType, MappedType> spec
);
// Value operations
/** Map values only */
public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);
/** FlatMap values */
public <U> JavaPairDStream<K, U> flatMapValues(FlatMapFunction<V, U> f);
// Conversion
/** Convert to regular DStream of pairs */
public JavaDStream<Tuple2<K, V>> toJavaDStream();
/** Convert to Scala PairDStream */
public DStream<Tuple2<K, V>> dstream();
}Usage Examples:
// Word count example
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);
// Windowed word count
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
(a, b) -> a + b,
Durations.seconds(30),
Durations.seconds(10)
);
// Join streams
JavaPairDStream<String, Integer> counts1 = getCountStream1();
JavaPairDStream<String, Double> rates = getRateStream();
JavaPairDStream<String, Tuple2<Integer, Double>> joined = counts1.join(rates);Java wrappers for various input stream types.
/**
* Java wrapper for input streams
*/
public class JavaInputDStream<T> extends JavaDStream<T> {
// Inherits all JavaDStream methods
}
/**
* Java wrapper for pair input streams
*/
public class JavaPairInputDStream<K, V> extends JavaPairDStream<K, V> {
// Inherits all JavaPairDStream methods
}
/**
* Java wrapper for receiver input streams
*/
public class JavaReceiverInputDStream<T> extends JavaInputDStream<T> {
// Additional receiver-specific methods if any
}
/**
* Java wrapper for pair receiver input streams
*/
public class JavaPairReceiverInputDStream<K, V> extends JavaPairInputDStream<K, V> {
// Additional receiver-specific methods if any
}
/**
* Java wrapper for mapWithState result
*/
public class JavaMapWithStateDStream<K, V, S, T> extends JavaDStream<T> {
/** Get state snapshots */
public JavaPairDStream<K, S> stateSnapshots();
}Java-friendly interfaces for streaming event listeners.
/**
* Java interface for streaming listeners
*/
public interface JavaStreamingListener {
/** Called when streaming starts */
void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted);
/** Called when receiver starts */
void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted);
/** Called when receiver encounters error */
void onReceiverError(JavaStreamingListenerReceiverError receiverError);
/** Called when receiver stops */
void onReceiverStopped(JavaStreamingListenerReceiverStopped receiverStopped);
/** Called when batch is submitted */
void onBatchSubmitted(JavaStreamingListenerBatchSubmitted batchSubmitted);
/** Called when batch processing starts */
void onBatchStarted(JavaStreamingListenerBatchStarted batchStarted);
/** Called when batch completes */
void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted);
/** Called when output operation starts */
void onOutputOperationStarted(JavaStreamingListenerOutputOperationStarted outputOperationStarted);
/** Called when output operation completes */
void onOutputOperationCompleted(JavaStreamingListenerOutputOperationCompleted outputOperationCompleted);
}
/**
* Wrapper that converts Java listener to Scala listener
*/
public class JavaStreamingListenerWrapper implements StreamingListener {
public JavaStreamingListenerWrapper(JavaStreamingListener javaListener);
}Usage Examples:
// Custom Java listener
public class MyStreamingListener implements JavaStreamingListener {
@Override
public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
System.out.println("Batch completed: " + batchCompleted.batchInfo().batchTime());
}
// Implement other methods...
}
// Add listener to context
jssc.addStreamingListener(new MyStreamingListener());The Java API fully supports Java 8+ lambda expressions for concise functional programming:
// Functional style with lambdas
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaPairDStream<String, Integer> wordCounts = lines
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.filter(word -> !word.isEmpty())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
wordCounts.foreachRDD((rdd, time) -> {
System.out.println("=== Results at " + time + " ===");
rdd.collect().stream()
.sorted((t1, t2) -> t2._2.compareTo(t1._2))
.limit(10)
.forEach(tuple -> System.out.println(tuple._1 + ": " + tuple._2));
});