Java-friendly wrappers providing seamless integration for Java applications with lambda expressions, Java collections support, and idiomatic Java patterns for Spark Streaming functionality.
Java-friendly version of StreamingContext with native Java types and collections.
/**
* Java-friendly wrapper for StreamingContext
*/
public class JavaStreamingContext {
/** Create from JavaSparkContext and batch duration */
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
/** Create from SparkConf and batch duration */
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
/** Create with master URL, app name, and batch duration */
public JavaStreamingContext(String master, String appName, Duration batchDuration);
/** Restore from checkpoint */
public JavaStreamingContext(String path);
/** Get underlying StreamingContext */
public StreamingContext ssc();
/** Get underlying JavaSparkContext */
public JavaSparkContext sparkContext();
/** Start the streaming computation */
public void start();
/** Stop the streaming computation */
public void stop();
/** Stop with option to stop SparkContext */
public void stop(boolean stopSparkContext);
/** Stop with graceful shutdown options */
public void stop(boolean stopSparkContext, boolean stopGracefully);
/** Wait for termination */
public void awaitTermination();
/** Wait for termination with timeout */
public boolean awaitTerminationOrTimeout(long timeout);
/** Set checkpoint directory */
public void checkpoint(String directory);
/** Set remember duration */
public void remember(Duration duration);
/** Add streaming listener */
public void addStreamingListener(StreamingListener streamingListener);
/** Remove streaming listener */
public void removeStreamingListener(StreamingListener streamingListener);
}Static Factory Methods:
public class JavaStreamingContext {
/** Get currently active JavaStreamingContext */
public static Optional<JavaStreamingContext> getActive();
/** Get active context or create new one */
public static JavaStreamingContext getActiveOrCreate(Function0<JavaStreamingContext> creatingFunc);
/** Create from checkpoint or use creating function */
public static JavaStreamingContext getOrCreate(
String checkpointPath,
Function0<JavaStreamingContext> creatingFunc
);
/** Create from checkpoint with Hadoop configuration */
public static JavaStreamingContext getOrCreate(
String checkpointPath,
Configuration hadoopConf,
Function0<JavaStreamingContext> creatingFunc
);
}Usage Examples:
import org.apache.spark.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
// Create JavaStreamingContext
SparkConf conf = new SparkConf().setAppName("JavaStreamingApp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create with checkpoint recovery
JavaStreamingContext jssc2 = JavaStreamingContext.getOrCreate("/path/to/checkpoint", () -> {
SparkConf conf = new SparkConf().setAppName("RecoverableApp");
return new JavaStreamingContext(conf, Durations.seconds(1));
});
// Configure and start
jssc.checkpoint("/path/to/checkpoint");
jssc.start();
jssc.awaitTermination();Java-friendly methods for creating input streams from various sources.
public class JavaStreamingContext {
/** Create text file stream */
public JavaDStream<String> textFileStream(String directory);
/** Create socket text stream */
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port);
/** Create socket text stream with storage level */
public JavaReceiverInputDStream<String> socketTextStream(
String hostname,
int port,
StorageLevel storageLevel
);
/** Create file stream with Hadoop InputFormat */
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
String directory,
Class<K> keyClass,
Class<V> valueClass,
Class<F> inputFormatClass
);
/** Create file stream with configuration */
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
String directory,
Class<K> keyClass,
Class<V> valueClass,
Class<F> inputFormatClass,
Function<Path, Boolean> filter,
boolean newFilesOnly,
Configuration conf
);
/** Create queue stream */
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue);
/** Create queue stream with processing options */
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime);
/** Create queue stream with default RDD */
public <T> JavaDStream<T> queueStream(
Queue<JavaRDD<T>> queue,
boolean oneAtATime,
JavaRDD<T> defaultRDD
);
/** Create receiver stream */
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver);
/** Union multiple streams */
public <T> JavaDStream<T> union(JavaDStream<T> first, List<JavaDStream<T>> rest);
}Usage Examples:
// Text file stream
JavaDStream<String> lines = jssc.textFileStream("/path/to/files");
// Socket stream
JavaReceiverInputDStream<String> socketLines = jssc.socketTextStream("localhost", 9999);
// File stream with Hadoop InputFormat
JavaPairInputDStream<LongWritable, Text> fileStream = jssc.fileStream(
"/path/to/files",
LongWritable.class,
Text.class,
TextInputFormat.class
);
// Queue stream
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
JavaRDD<Integer> rdd1 = jssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));
rddQueue.add(rdd1);
JavaDStream<Integer> queueStream = jssc.queueStream(rddQueue);
// Union streams
JavaDStream<String> stream1 = jssc.textFileStream("/path1");
JavaDStream<String> stream2 = jssc.textFileStream("/path2");
JavaDStream<String> combined = jssc.union(stream1, Arrays.asList(stream2));Java wrapper for DStream with lambda expression support and Java-friendly transformations.
/**
* Java-friendly wrapper for DStream
*/
public class JavaDStream<T> {
/** Get underlying Scala DStream */
public DStream<T> dstream();
/** Cache RDDs in memory */
public JavaDStream<T> cache();
/** Persist with default storage level */
public JavaDStream<T> persist();
/** Persist with specific storage level */
public JavaDStream<T> persist(StorageLevel storageLevel);
/** Enable checkpointing */
public JavaDStream<T> checkpoint(Duration interval);
/** Get associated JavaStreamingContext */
public JavaStreamingContext context();
/** Print first 10 elements */
public void print();
/** Print first num elements */
public void print(int num);
}Basic Transformations:
public class JavaDStream<T> {
/** Map transformation with Java function */
public <U> JavaDStream<U> map(Function<T, U> f);
/** FlatMap transformation */
public <U> JavaDStream<U> flatMap(FlatMapFunction<T, U> f);
/** Filter transformation */
public JavaDStream<T> filter(Function<T, Boolean> f);
/** Map partitions */
public <U> JavaDStream<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);
/** Group elements by partition */
public JavaDStream<T[]> glom();
/** Repartition RDDs */
public JavaDStream<T> repartition(int numPartitions);
/** Union with another stream */
public JavaDStream<T> union(JavaDStream<T> other);
/** Count elements */
public JavaDStream<Long> count();
/** Count occurrences of each value */
public JavaPairDStream<T, Long> countByValue();
/** Reduce elements */
public JavaDStream<T> reduce(Function2<T, T, T> f);
}Advanced Transformations:
public class JavaDStream<T> {
/** Transform using RDD operations */
public <U> JavaDStream<U> transform(Function<JavaRDD<T>, JavaRDD<U>> transformFunc);
/** Transform with time access */
public <U> JavaDStream<U> transform(Function2<JavaRDD<T>, Time, JavaRDD<U>> transformFunc);
/** Transform with another DStream */
public <U, V> JavaDStream<V> transformWith(
JavaDStream<U> other,
Function2<JavaRDD<T>, JavaRDD<U>, JavaRDD<V>> transformFunc
);
/** Transform with another DStream and time access */
public <U, V> JavaDStream<V> transformWith(
JavaDStream<U> other,
Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<V>> transformFunc
);
}Window Operations:
public class JavaDStream<T> {
/** Create windowed stream */
public JavaDStream<T> window(Duration windowDuration);
/** Create windowed stream with slide duration */
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration);
/** Reduce over window */
public JavaDStream<T> reduceByWindow(
Function2<T, T, T> reduceFunc,
Duration windowDuration,
Duration slideDuration
);
/** Incremental reduce over window */
public JavaDStream<T> reduceByWindow(
Function2<T, T, T> reduceFunc,
Function2<T, T, T> invReduceFunc,
Duration windowDuration,
Duration slideDuration
);
/** Count elements in window */
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration);
/** Count values in window */
public JavaPairDStream<T, Long> countByValueAndWindow(
Duration windowDuration,
Duration slideDuration
);
}Output Operations:
public class JavaDStream<T> {
/** Apply function to each RDD */
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
/** Apply function to each RDD with time */
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc);
/** Save as object files */
public void saveAsObjectFiles(String prefix, String suffix);
/** Save as text files */
public void saveAsTextFiles(String prefix, String suffix);
}Usage Examples:
import org.apache.spark.api.java.function.*;
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Basic transformations with lambda expressions
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaDStream<String> filteredWords = words.filter(word -> word.length() > 2);
JavaDStream<String> upperWords = words.map(String::toUpperCase);
// Window operations
JavaDStream<String> windowedWords = words.window(Durations.seconds(30), Durations.seconds(10));
JavaDStream<String> reducedWindow = words.reduceByWindow(
(s1, s2) -> s1 + " " + s2,
Durations.seconds(30),
Durations.seconds(10)
);
// Output operations
words.foreachRDD(rdd -> {
System.out.println("Batch size: " + rdd.count());
rdd.take(10).forEach(System.out::println);
});
// Custom transformations
JavaDStream<Integer> wordLengths = words.transform(rdd -> {
return rdd.map(String::length).filter(len -> len > 0);
});Java wrapper for pair DStreams with key-value operations.
/**
* Java-friendly wrapper for pair DStreams
*/
public class JavaPairDStream<K, V> {
/** Get underlying Scala DStream */
public DStream<Tuple2<K, V>> dstream();
/** Cache RDDs */
public JavaPairDStream<K, V> cache();
/** Persist with storage level */
public JavaPairDStream<K, V> persist(StorageLevel storageLevel);
/** Enable checkpointing */
public JavaPairDStream<K, V> checkpoint(Duration interval);
/** Convert to regular JavaDStream */
public JavaDStream<Tuple2<K, V>> toJavaDStream();
}Key-Value Transformations:
public class JavaPairDStream<K, V> {
/** Group values by key */
public JavaPairDStream<K, Iterable<V>> groupByKey();
/** Group by key with partitions */
public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions);
/** Group by key with partitioner */
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner);
/** Reduce values by key */
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func);
/** Reduce by key with partitions */
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);
/** Reduce by key with partitioner */
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, Partitioner partitioner);
/** Combine by key */
public <C> JavaPairDStream<K, C> combineByKey(
Function<V, C> createCombiner,
Function2<C, V, C> mergeValue,
Function2<C, C, C> mergeCombiner
);
/** Map values only */
public <U> JavaPairDStream<K, U> mapValues(Function<V, U> f);
/** FlatMap values only */
public <U> JavaPairDStream<K, U> flatMapValues(FlatMapFunction<V, U> f);
}Join Operations:
public class JavaPairDStream<K, V> {
/** Inner join */
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other);
/** Left outer join */
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other);
/** Right outer join */
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other);
/** Full outer join */
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other);
/** Cogroup operation */
public <W> JavaPairDStream<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairDStream<K, W> other);
}Windowed Operations:
public class JavaPairDStream<K, V> {
/** Group by key over window */
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration);
/** Group by key with slide duration */
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(
Duration windowDuration,
Duration slideDuration
);
/** Reduce by key over window */
public JavaPairDStream<K, V> reduceByKeyAndWindow(
Function2<V, V, V> func,
Duration windowDuration,
Duration slideDuration
);
/** Incremental reduce by key over window */
public JavaPairDStream<K, V> reduceByKeyAndWindow(
Function2<V, V, V> reduceFunc,
Function2<V, V, V> invReduceFunc,
Duration windowDuration,
Duration slideDuration
);
}Stateful Operations:
public class JavaPairDStream<K, V> {
/** Update state by key */
public <S> JavaPairDStream<K, S> updateStateByKey(
Function2<List<V>, Optional<S>, Optional<S>> updateFunc
);
/** Update state with partitioner */
public <S> JavaPairDStream<K, S> updateStateByKey(
Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
Partitioner partitioner
);
/** Update state with partition count */
public <S> JavaPairDStream<K, S> updateStateByKey(
Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
int numPartitions
);
/** Map with state */
public <StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
StateSpec<K, V, StateType, MappedType> spec
);
}Usage Examples:
// Create pair stream from words
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
// Word count
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
// Group and count
JavaPairDStream<String, Iterable<Integer>> grouped = pairs.groupByKey();
// Windowed word count
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
Integer::sum,
Durations.minutes(1),
Durations.seconds(10)
);
// Stateful word count
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
(values, state) -> {
int sum = values.stream().mapToInt(Integer::intValue).sum();
return Optional.of(sum + state.orElse(0));
}
);
// Join operations
JavaPairDStream<String, Double> scores = // another pair stream
JavaPairDStream<String, Tuple2<Integer, Double>> joined = wordCounts.join(scores);
// Output
wordCounts.foreachRDD(rdd -> {
Map<String, Integer> wordCountMap = rdd.collectAsMap();
wordCountMap.forEach((word, count) -> {
System.out.println(word + ": " + count);
});
});Functional interfaces for lambda expressions and method references.
// Basic function interfaces
@FunctionalInterface
public interface Function<T, R> extends Serializable {
R call(T t) throws Exception;
}
@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 t1, T2 t2) throws Exception;
}
@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
void call(T1 t1, T2 t2) throws Exception;
}
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
R call(T1 t1, T2 t2, T3 t3) throws Exception;
}
@FunctionalInterface
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
Iterator<Double> call(T t) throws Exception;
}
@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
double call(T t) throws Exception;
}Usage Examples:
// Lambda expressions
JavaDStream<String> filtered = lines.filter(line -> line.length() > 0);
JavaDStream<Integer> lengths = lines.map(String::length);
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
// Method references
JavaDStream<String> upper = lines.map(String::toUpperCase);
JavaDStream<Integer> wordCounts = pairs.values().reduce(Integer::sum);
// Anonymous functions
JavaDStream<String[]> splitLines = lines.map(new Function<String, String[]>() {
@Override
public String[] call(String line) {
return line.split("\\s+");
}
});