tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.
Spark Streaming provides a complete Java API with type-safe wrappers for all Scala functionality, enabling seamless integration for Java developers. The Java API follows Java conventions and provides lambda-friendly interfaces.
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;// From SparkConf
JavaStreamingContext(SparkConf conf, Duration batchDuration)
// From JavaSparkContext
JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)
// From master and app name
JavaStreamingContext(String master, String appName, Duration batchDuration)
JavaStreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, String[] jars, Map<String, String> environment)
// From checkpoint
JavaStreamingContext(String checkpointPath)
JavaStreamingContext(String checkpointPath, Configuration hadoopConf)void start()
void stop()
void stop(boolean stopSparkContext)
void stop(boolean stopSparkContext, boolean stopGracefully)
void awaitTermination()
void awaitTermination(long timeout)
boolean awaitTerminationOrTimeout(long timeout)
void checkpoint(String directory)
void remember(Duration duration)
StreamingContextState getState()// Socket streams
JavaReceiverInputDStream<String> socketTextStream(String hostname, int port)
JavaReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)
<T> JavaReceiverInputDStream<T> socketStream(String hostname, int port,
Function<InputStream, Iterable<T>> converter, StorageLevel storageLevel)
<T> JavaReceiverInputDStream<T> rawSocketStream(String hostname, int port)
// File streams
JavaDStream<String> textFileStream(String directory)
JavaDStream<byte[]> binaryRecordsStream(String directory, int recordLength)
<K, V, F extends NewInputFormat<K, V>> JavaInputDStream<Tuple2<K, V>> fileStream(
String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass)
<K, V, F extends NewInputFormat<K, V>> JavaInputDStream<Tuple2<K, V>> fileStream(
String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass,
Function<Path, Boolean> filter, boolean newFilesOnly)
// Queue and receiver streams
<T> JavaInputDStream<T> queueStream(Queue<JavaRDD<T>> queue)
<T> JavaInputDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime)
<T> JavaInputDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime, JavaRDD<T> defaultRDD)
<T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver)
<T> JavaReceiverInputDStream<T> actorStream(Props props, String name)
<T> JavaReceiverInputDStream<T> actorStream(Props props, String name, StorageLevel storageLevel)
// Utility
<T> JavaDStream<T> union(List<JavaDStream<T>> streams)
void addStreamingListener(StreamingListener streamingListener)static JavaStreamingContext getOrCreate(String checkpointPath,
Function0<JavaStreamingContext> creatingFunc)
static JavaStreamingContext getOrCreate(String checkpointPath,
Function0<JavaStreamingContext> creatingFunc, Configuration hadoopConf)
static JavaStreamingContext getOrCreate(String checkpointPath,
Function0<JavaStreamingContext> creatingFunc, Configuration hadoopConf, boolean createOnError)<R> JavaDStream<R> map(Function<T, R> f)
<R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f)
JavaDStream<T> filter(Function<T, Boolean> f)
JavaDStream<T[]> glom()
JavaDStream<T> repartition(int numPartitions)
<R> JavaDStream<R> mapPartitions(FlatMapFunction<Iterator<T>, R> f)
JavaDStream<T> union(JavaDStream<T> other)
<R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc)
<R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc)
<U, R> JavaDStream<R> transformWith(JavaDStream<U> other,
Function2<JavaRDD<T>, JavaRDD<U>, JavaRDD<R>> transformFunc)JavaDStream<T> reduce(Function2<T, T, T> f)
JavaDStream<Long> count()
JavaPairDStream<T, Long> countByValue()
JavaDStream<T> sample(boolean withReplacement, double fraction)
JavaDStream<T> sample(boolean withReplacement, double fraction, long seed)
<U> JavaDStream<U> aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)JavaDStream<T> window(Duration windowDuration)
JavaDStream<T> window(Duration windowDuration, Duration slideDuration)
JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc,
Duration windowDuration, Duration slideDuration)
JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration)
JavaPairDStream<T, Long> countByValueAndWindow(Duration windowDuration, Duration slideDuration)void print()
void print(int num)
void saveAsTextFiles(String prefix, String suffix)
void saveAsObjectFiles(String prefix, String suffix)
void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc)
void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc)JavaDStream<T> cache()
JavaDStream<T> persist()
JavaDStream<T> persist(StorageLevel storageLevel)
JavaDStream<T> checkpoint(Duration interval)
<K, V> JavaPairDStream<K, V> mapToPair(PairFunction<T, K, V> f)
JavaDStream<T> coalesce(int numPartitions)
List<JavaRDD<T>> slice(Time fromTime, Time toTime)JavaPairDStream<K, Iterable<V>> groupByKey()
JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions)
JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner)
JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func)
JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)
JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, Partitioner partitioner)
<C> JavaPairDStream<K, C> combineByKey(Function<V, C> createCombiner,
Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners)<W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other)
<W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other)
<W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other)
<W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other)
<W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other, int numPartitions)JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration)
JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration, Duration slideDuration)
JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func,
Duration windowDuration, Duration slideDuration)
JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> reduceFunc,
Function2<V, V, V> invReduceFunc, Duration windowDuration, Duration slideDuration)
JavaPairDStream<K, Long> countByKeyAndWindow(Duration windowDuration, Duration slideDuration)<S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc)
<S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc,
Partitioner partitioner)
<StateType, MappedType> JavaMapWithStateDStream<K, V, StateType, MappedType> mapWithState(
StateSpec<K, V, StateType, MappedType> spec)void saveAsHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass,
Class<? extends OutputFormat> outputFormatClass)
JavaDStream<V> values()
JavaDStream<K> keys()
void print()
void print(int num)import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;
import scala.Tuple2;
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.checkpoint("checkpoint");
JavaDStream<String> lines = jssc.textFileStream("hdfs://data/input");
JavaPairDStream<String, Integer> wordPairs = lines
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1));
// Maintain running totals
JavaPairDStream<String, Integer> runningCounts = wordPairs.updateStateByKey(
(values, state) -> {
int sum = values.stream().mapToInt(Integer::intValue).sum();
return Optional.of(state.orElse(0) + sum);
}
);
runningCounts.print();
jssc.start();
jssc.awaitTermination();JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaPairDStream<String, Integer> wordPairs = lines
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1));
// Count words over last 30 seconds, updated every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = wordPairs
.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
windowedWordCounts.print();wordCounts.foreachRDD((rdd, time) -> {
System.out.println("Processing batch at time: " + time);
rdd.foreachPartition(partition -> {
// Create database connection per partition
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost/db");
partition.forEachRemaining(record -> {
String word = record._1();
Integer count = record._2();
PreparedStatement stmt = connection.prepareStatement(
"INSERT INTO word_counts (word, count, timestamp) VALUES (?, ?, ?)");
stmt.setString(1, word);
stmt.setInt(2, count);
stmt.setTimestamp(3, new Timestamp(time.milliseconds()));
stmt.executeUpdate();
});
connection.close();
});
});