PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.
Create JavaStreamingContext with configuration:
public JavaStreamingContext(SparkConf conf, Duration batchDuration)
public JavaStreamingContext(String master, String appName, Duration batchDuration)
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String jarFile)
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars)
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars, Map<String, String> environment)Create with existing JavaSparkContext:
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)Create from checkpoint:
public JavaStreamingContext(String path)Example context creation:
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));Start and stop operations:
public void start()
public void stop()
public void stop(boolean stopSparkContext)
public void stop(boolean stopSparkContext, boolean stopGracefully)
public void awaitTermination() throws InterruptedException
public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedExceptionContext state and configuration:
public StreamingContextState getState()
public JavaSparkContext sparkContext()
public void checkpoint(String directory)
public void remember(Duration duration)Text socket stream:
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port)
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)Custom socket stream:
public <T> JavaReceiverInputDStream<T> socketStream(
String hostname,
int port,
Function<InputStream, Iterable<T>> converter,
StorageLevel storageLevel
)Example socket streams:
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Custom converter
JavaReceiverInputDStream<Integer> numbers = jssc.socketStream(
"localhost", 8080,
inputStream -> {
List<Integer> result = new ArrayList<>();
// Custom parsing logic
return result;
},
StorageLevel.MEMORY_AND_DISK_SER()
);Text file stream:
public JavaDStream<String> textFileStream(String directory)Generic file stream:
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
String directory,
Class<K> kClass,
Class<V> vClass,
Class<F> fClass
)
public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
String directory,
Class<K> kClass,
Class<V> vClass,
Class<F> fClass,
Function<Path, Boolean> filter,
boolean newFilesOnly
)Binary records stream:
public JavaDStream<byte[]> binaryRecordsStream(String directory, int recordLength)Example file streams:
JavaDStream<String> fileStream = jssc.textFileStream("/data/input");
// Hadoop file stream
JavaPairInputDStream<LongWritable, Text> hadoopStream = jssc.fileStream(
"/data/input",
LongWritable.class,
Text.class,
TextInputFormat.class
);Queue stream:
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue)
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime)
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime, JavaRDD<T> defaultRDD)Receiver stream:
public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver)Example queue stream:
Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
JavaDStream<String> queueStream = jssc.queueStream(rddQueue);
// Add RDDs to queue
rddQueue.add(jssc.sparkContext().parallelize(Arrays.asList("hello", "world")));Map operations:
public <R> JavaDStream<R> map(Function<T, R> f)
public <R> JavaDStream<R> mapPartitions(FlatMapFunction<Iterator<T>, R> f)
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f)Filter and utility operations:
public JavaDStream<T> filter(Function<T, Boolean> f)
public JavaDStream<T[]> glom()
public JavaDStream<T> cache()
public JavaDStream<T> persist(StorageLevel level)
public JavaDStream<T> repartition(int numPartitions)Example basic transformations:
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<Integer> lengths = lines.map(String::length);
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaDStream<String> nonEmpty = lines.filter(line -> !line.isEmpty());Reduce and count operations:
public JavaDStream<T> reduce(Function2<T, T, T> f)
public JavaDStream<Long> count()
public JavaPairDStream<T, Long> countByValue()
public JavaPairDStream<T, Long> countByValue(int numPartitions)Example aggregations:
JavaDStream<Integer> numbers = lines.map(Integer::parseInt);
JavaDStream<Integer> sum = numbers.reduce(Integer::sum);
JavaDStream<Long> count = numbers.count();
JavaPairDStream<Integer, Long> histogram = numbers.countByValue();Basic windowing:
public JavaDStream<T> window(Duration windowDuration)
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration)Windowed reductions:
public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Duration windowDuration, Duration slideDuration)
public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Function2<T, T, T> invReduceFunc, Duration windowDuration, Duration slideDuration)
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration)Example windowing:
JavaDStream<String> windowedLines = lines.window(Durations.seconds(30), Durations.seconds(10));
JavaDStream<Integer> windowSum = numbers.reduceByWindow(
Integer::sum,
Durations.minutes(1),
Durations.seconds(10)
);RDD-level transformations:
public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc)
public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc)
public <U, R> JavaDStream<R> transformWith(JavaDStream<U> other, Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<R>> transformFunc)Example transforms:
JavaDStream<String> processed = lines.transform(rdd -> {
return rdd.filter(line -> !line.isEmpty())
.map(String::toUpperCase);
});
JavaDStream<String> timestamped = lines.transform((rdd, time) -> {
return rdd.map(line -> time.milliseconds() + ": " + line);
});Create pair DStream:
public <K2, V2> JavaPairDStream<K2, V2> mapToPair(PairFunction<T, K2, V2> f)
public <K2, V2> JavaPairDStream<K2, V2> flatMapToPair(PairFlatMapFunction<T, K2, V2> f)Example pair creation:
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));Value transformations:
public <W> JavaPairDStream<K, W> mapValues(Function<V, W> f) // On JavaPairDStream<K, V>
public <W> JavaPairDStream<K, W> flatMapValues(Function<V, Iterable<W>> f)Grouping and reduction:
public JavaPairDStream<K, Iterable<V>> groupByKey()
public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions)
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner)
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func)
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)Example key-value operations:
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
JavaPairDStream<String, String> upperValues = pairs.mapValues(String::valueOf).mapValues(String::toUpperCase);Windowed grouping and reduction:
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration)
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration, Duration slideDuration)
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration)
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration, Duration slideDuration)
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> reduceFunc, Function2<V, V, V> invReduceFunc, Duration windowDuration, Duration slideDuration)Example windowed operations:
JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
Integer::sum,
Durations.minutes(5),
Durations.seconds(30)
);Join DStreams:
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other)
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other, int numPartitions)
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other)
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other)
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other)Example joins:
JavaPairDStream<String, String> stream1 = lines1.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));
JavaPairDStream<String, String> stream2 = lines2.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));
JavaPairDStream<String, Tuple2<String, String>> joined = stream1.join(stream2);Update state by key:
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc) // On JavaPairDStream<K, V>
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, int numPartitions)
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, Partitioner partitioner, JavaRDD<Tuple2<K, S>> initialRDD)Example updateStateByKey:
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
(values, state) -> {
int currentCount = values.stream().mapToInt(Integer::intValue).sum();
int newCount = state.orElse(0) + currentCount;
return Optional.of(newCount);
}
);State specification and mapping:
// StateSpec factory methods for Java
public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction3<K, Optional<V>, State<S>, T> mappingFunction)
public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction4<Time, K, Optional<V>, State<S>, Optional<T>> mappingFunction)StateSpec configuration:
public StateSpec<K, V, S, T> initialState(JavaPairRDD<K, S> rdd)
public StateSpec<K, V, S, T> numPartitions(int numPartitions)
public StateSpec<K, V, S, T> partitioner(Partitioner partitioner)
public StateSpec<K, V, S, T> timeout(Duration idleDuration)Example mapWithState:
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> stateSpec =
StateSpec.function((word, one, state) -> {
int sum = one.orElse(0) + state.getOption().orElse(0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDStream =
pairs.mapWithState(stateSpec);Print and forEach operations:
public void print()
public void print(int num)
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc)
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc)Save operations:
public void saveAsTextFiles(String prefix, String suffix) // On JavaDStream
public void saveAsObjectFiles(String prefix, String suffix)
public void saveAsHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass) // On JavaPairDStream
public void saveAsNewAPIHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends NewOutputFormat> outputFormatClass)Example output operations:
wordCounts.print();
wordCounts.foreachRDD(rdd -> {
System.out.println("Batch size: " + rdd.count());
rdd.collect().forEach(System.out::println);
});
lines.saveAsTextFiles("output", "txt");Java streaming listener interface:
public abstract class JavaStreamingListener {
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}
public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
public void onReceiverError(StreamingListenerReceiverError receiverError) {}
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
}Add/remove listeners:
public void addStreamingListener(StreamingListener streamingListener)
public void removeStreamingListener(StreamingListener streamingListener)Example listener:
jssc.addStreamingListener(new JavaStreamingListener() {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
BatchInfo info = batchCompleted.batchInfo();
System.out.println("Batch completed: " + info.batchTime() +
" Processing time: " + info.processingDelay());
}
});Duration creation:
public class Durations {
public static Duration milliseconds(long milliseconds)
public static Duration seconds(long seconds)
public static Duration minutes(long minutes)
}Example duration usage:
Duration batchInterval = Durations.seconds(5);
Duration windowSize = Durations.minutes(10);
Duration slideInterval = Durations.seconds(30);import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.Durations;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Optional;
public class JavaWordCount {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
jssc.checkpoint("checkpoint");
// Create input stream
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Transform and count words
JavaDStream<String> words = lines.flatMap(line ->
Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->
new Tuple2<>(word, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
// Running count across batches
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
(values, state) -> {
int currentCount = values.stream().mapToInt(Integer::intValue).sum();
int newCount = state.orElse(0) + currentCount;
return Optional.of(newCount);
}
);
wordCounts.print();
runningCounts.print();
jssc.start();
jssc.awaitTermination();
}
}Install with Tessl CLI
npx tessl i tessl/pypi-pyspark-streaming@2.4.3