The Java API for Spark Streaming provides Java-friendly wrappers around the Scala implementation, using Java 8 functional interfaces and familiar Java patterns. All core Spark Streaming functionality is available through the Java API with appropriate type safety and lambda support.
Entry point for Java-based Spark Streaming applications.
/**
* Create JavaStreamingContext from SparkConf
* @param conf - Spark configuration
* @param batchDuration - Time interval for batching streaming data
*/
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
/**
* Create JavaStreamingContext from JavaSparkContext
* @param sparkContext - Existing JavaSparkContext instance
* @param batchDuration - Time interval for batching streaming data
*/
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
/**
* Create JavaStreamingContext with master and app name
* @param master - Cluster URL to connect to
* @param appName - Name for your application
* @param batchDuration - Time interval for batching streaming data
*/
public JavaStreamingContext(String master, String appName, Duration batchDuration);
// Lifecycle management
public void start();
public void stop();
public void stop(boolean stopSparkContext);
public void awaitTermination() throws InterruptedException;
public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedException;
// Configuration
public void checkpoint(String directory);
public void remember(Duration duration);Usage Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;
// Create streaming context
SparkConf conf = new SparkConf().setAppName("JavaStreamingApp").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Set checkpoint
jssc.checkpoint("hdfs://checkpoint");
// Start processing
jssc.start();
jssc.awaitTermination();Java-friendly methods for creating input streams from various sources.
/**
* Create text input stream from TCP socket
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @returns JavaReceiverInputDStream of strings
*/
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port);
/**
* Create text input stream with storage level
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param storageLevel - Storage level for received data
* @returns JavaReceiverInputDStream of strings
*/
public JavaReceiverInputDStream<String> socketTextStream(
String hostname,
int port,
StorageLevel storageLevel
);
/**
* Create input stream from text files in directory
* @param directory - Directory path to monitor
* @returns JavaDStream of strings
*/
public JavaDStream<String> textFileStream(String directory);
/**
* Create input stream from queue of JavaRDDs
* @param queue - Queue containing JavaRDDs to process
* @returns JavaInputDStream from queue
*/
public <T> JavaInputDStream<T> queueStream(Queue<JavaRDD<T>> queue);
/**
* Create input stream from custom receiver
* @param receiver - Custom receiver implementation
* @returns JavaReceiverInputDStream from receiver
*/
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);Core DStream operations with Java 8 functional interfaces.
/**
* Transform each element using a function
* @param f - Function to apply to each element
* @returns New JavaDStream with transformed elements
*/
public <R> JavaDStream<R> map(Function<T, R> f);
/**
* Transform each element to multiple elements
* @param f - Function returning an Iterable for each element
* @returns New JavaDStream with flattened results
*/
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
/**
* Filter elements based on predicate
* @param f - Predicate function returning boolean
* @returns New JavaDStream with filtered elements
*/
public JavaDStream<T> filter(Function<T, Boolean> f);
/**
* Union with another JavaDStream
* @param other - JavaDStream to union with
* @returns Combined JavaDStream
*/
public JavaDStream<T> union(JavaDStream<T> other);
/**
* Repartition the stream
* @param numPartitions - Number of partitions for output
* @returns Repartitioned JavaDStream
*/
public JavaDStream<T> repartition(int numPartitions);
/**
* Transform each RDD using custom function
* @param f - Function to transform JavaRDD
* @returns New JavaDStream with transformed RDDs
*/
public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> f);
/**
* Transform each RDD with time information
* @param f - Function receiving JavaRDD and Time
* @returns New JavaDStream with transformed RDDs
*/
public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> f);Java-friendly window operations with Duration objects.
/**
* Create windowed stream
* @param windowDuration - Width of the window
* @returns JavaDStream containing windowed data
*/
public JavaDStream<T> window(Duration windowDuration);
/**
* Create windowed stream with slide duration
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval
* @returns JavaDStream containing windowed data
*/
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);
/**
* Count elements over sliding window
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval
* @returns JavaDStream of counts
*/
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);
/**
* Reduce elements over sliding window
* @param reduceFunc - Function to combine elements
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval
* @returns JavaDStream with reduced results
*/
public JavaDStream<T> reduceByWindow(
Function2<T, T, T> reduceFunc,
Duration windowDuration,
Duration slideDuration
);Java-friendly output operations for processing and saving data.
/**
* Apply function to each RDD
* @param f - Function to apply to each JavaRDD
*/
public void foreachRDD(VoidFunction<JavaRDD<T>> f);
/**
* Apply function to each RDD with time information
* @param f - Function receiving JavaRDD and Time
*/
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> f);
/**
* Print first 10 elements of each RDD
*/
public void print();
/**
* Print first num elements of each RDD
* @param num - Number of elements to print
*/
public void print(int num);
/**
* Save as text files with prefix
* @param prefix - Prefix for output file names
*/
public void saveAsTextFiles(String prefix);
/**
* Save as text files with prefix and suffix
* @param prefix - Prefix for output file names
* @param suffix - Suffix for output file names
*/
public void saveAsTextFiles(String prefix, String suffix);Usage Examples:
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Basic transformations
JavaDStream<Integer> lengths = lines.map(String::length);
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaDStream<String> filtered = lines.filter(line -> line.length() > 0);
// Window operations
JavaDStream<String> windowed = lines.window(Durations.seconds(30), Durations.seconds(10));
JavaDStream<Long> counts = lines.countByWindow(Durations.minutes(1), Durations.seconds(30));
// Output operations
words.foreachRDD(rdd -> {
long count = rdd.count();
System.out.println("Words in this batch: " + count);
});
lines.print(20);Operations for key-value pair streams in Java.
/**
* Group values by key
* @returns JavaPairDStream of (key, iterable of values)
*/
public JavaPairDStream<K, Iterable<V>> groupByKey();
/**
* Reduce values by key
* @param func - Function to combine values
* @returns JavaPairDStream of (key, reduced value)
*/
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);
/**
* Combine values by key using combiner functions
* @param createCombiner - Function to create initial combiner
* @param mergeValue - Function to merge value into combiner
* @param mergeCombiner - Function to merge combiners
* @returns JavaPairDStream of (key, combined value)
*/
public <C> JavaPairDStream<K, C> combineByKey(
Function<V, C> createCombiner,
Function2<C, V, C> mergeValue,
Function2<C, C, C> mergeCombiner
);
/**
* Transform values while keeping keys
* @param f - Function to transform values
* @returns JavaPairDStream with transformed values
*/
public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);
/**
* Join with another JavaPairDStream
* @param other - JavaPairDStream to join with
* @returns JavaPairDStream of (key, (leftValue, rightValue))
*/
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);
/**
* Left outer join with another JavaPairDStream
* @param other - JavaPairDStream to join with
* @returns JavaPairDStream of (key, (leftValue, Optional[rightValue]))
*/
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other);Stateful operations using Java functional interfaces.
/**
* Update state by key using Java function
* @param updateFunc - Function to update state
* @returns JavaPairDStream of (key, state)
*/
public <S> JavaPairDStream<K, S> updateStateByKey(
Function2<List<V>, Optional<S>, Optional<S>> updateFunc
);
/**
* Update state by key with custom partitioner
* @param updateFunc - Function to update state
* @param partitioner - Custom partitioner
* @returns JavaPairDStream of (key, state)
*/
public <S> JavaPairDStream<K, S> updateStateByKey(
Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
Partitioner partitioner
);
/**
* Map with state using StateSpec (experimental)
* @param spec - StateSpec configuration
* @returns JavaMapWithStateDStream
*/
public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
StateSpec<K, V, StateType, MappedType> spec
);Usage Examples:
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
// Create pair stream
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
// Aggregations
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);
JavaPairDStream<String, Iterable<Integer>> grouped = pairs.groupByKey();
// State management
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey((values, state) -> {
int currentCount = state.or(0);
int newCount = currentCount + values.stream().mapToInt(Integer::intValue).sum();
return newCount == 0 ? Optional.empty() : Optional.of(newCount);
});
// Window operations on pairs
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
(a, b) -> a + b,
Durations.seconds(30),
Durations.seconds(10)
);Java-friendly streaming listeners for monitoring applications.
/**
* Abstract base class for Java streaming listeners
*/
public abstract class JavaStreamingListener {
// Override methods you need to handle
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
public void onReceiverError(StreamingListenerReceiverError receiverError) {}
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
}
// Add listener to streaming context
jssc.addStreamingListener(new MyJavaStreamingListener());Java-friendly duration creation utilities.
/**
* Utility class for creating Duration objects
*/
public class Durations {
public static Duration milliseconds(long milliseconds);
public static Duration seconds(long seconds);
public static Duration minutes(long minutes);
public static Duration hours(long hours);
}import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class JavaWordCount {
public static void main(String[] args) throws InterruptedException {
// Create streaming context
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
jssc.checkpoint("checkpoint");
// Create input stream
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Process data
JavaDStream<String> words = lines.flatMap(line ->
Arrays.asList(line.split(" ")).iterator()
);
JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->
new Tuple2<>(word, 1)
);
// Running word count with state
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
(values, state) -> {
int sum = values.stream().mapToInt(Integer::intValue).sum();
return Optional.of(state.or(0) + sum);
}
);
// Windowed word count
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
(a, b) -> a + b, // Reduce function
(a, b) -> a - b, // Inverse reduce function
Durations.seconds(30), // Window duration
Durations.seconds(10) // Slide duration
);
// Output results
runningCounts.print();
windowedCounts.print();
// Advanced output with custom processing
runningCounts.foreachRDD(rdd -> {
List<Tuple2<String, Integer>> topWords = rdd.top(10,
(tuple1, tuple2) -> tuple1._2().compareTo(tuple2._2())
);
System.out.println("Top 10 words:");
topWords.forEach(tuple ->
System.out.println(tuple._1() + ": " + tuple._2())
);
});
// Start processing
jssc.start();
jssc.awaitTermination();
}
}Converting between Java and Scala types when needed:
// Convert JavaRDD to RDD when calling Scala APIs
JavaRDD<String> javaRDD = /* ... */;
RDD<String> scalaRDD = javaRDD.rdd();
// Convert JavaDStream to DStream
JavaDStream<String> javaDStream = /* ... */;
DStream<String> scalaDStream = javaDStream.dstream();
// Working with Options
import org.apache.spark.api.java.Optional;
Optional<Integer> javaOptional = Optional.of(42);
Option<Integer> scalaOption = Optional.toScala(javaOptional);