CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

java-api.mddocs/

Java API

Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.

JavaStreamingContext

Creation

Create JavaStreamingContext with configuration:

public JavaStreamingContext(SparkConf conf, Duration batchDuration)
public JavaStreamingContext(String master, String appName, Duration batchDuration)
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String jarFile)
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars)
public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars, Map<String, String> environment)

Create with existing JavaSparkContext:

public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)

Create from checkpoint:

public JavaStreamingContext(String path)

Example context creation:

SparkConf conf = new SparkConf()
    .setAppName("JavaWordCount")
    .setMaster("local[2]");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

Lifecycle Management

Start and stop operations:

public void start()
public void stop()
public void stop(boolean stopSparkContext)
public void stop(boolean stopSparkContext, boolean stopGracefully)
public void awaitTermination() throws InterruptedException
public boolean awaitTerminationOrTimeout(long timeout) throws InterruptedException

Context state and configuration:

public StreamingContextState getState()
public JavaSparkContext sparkContext()
public void checkpoint(String directory)
public void remember(Duration duration)

Input Sources

Socket Streams

Text socket stream:

public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port)
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)

Custom socket stream:

public <T> JavaReceiverInputDStream<T> socketStream(
    String hostname,
    int port,
    Function<InputStream, Iterable<T>> converter,
    StorageLevel storageLevel
)

Example socket streams:

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Custom converter
JavaReceiverInputDStream<Integer> numbers = jssc.socketStream(
    "localhost", 8080,
    inputStream -> {
        List<Integer> result = new ArrayList<>();
        // Custom parsing logic
        return result;
    },
    StorageLevel.MEMORY_AND_DISK_SER()
);

File Streams

Text file stream:

public JavaDStream<String> textFileStream(String directory)

Generic file stream:

public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
    String directory,
    Class<K> kClass,
    Class<V> vClass,
    Class<F> fClass
)

public <K, V, F extends NewInputFormat<K, V>> JavaPairInputDStream<K, V> fileStream(
    String directory,
    Class<K> kClass,
    Class<V> vClass,
    Class<F> fClass,
    Function<Path, Boolean> filter,
    boolean newFilesOnly
)

Binary records stream:

public JavaDStream<byte[]> binaryRecordsStream(String directory, int recordLength)

Example file streams:

JavaDStream<String> fileStream = jssc.textFileStream("/data/input");

// Hadoop file stream
JavaPairInputDStream<LongWritable, Text> hadoopStream = jssc.fileStream(
    "/data/input",
    LongWritable.class,
    Text.class,
    TextInputFormat.class
);

Queue and Receiver Streams

Queue stream:

public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue)
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime)
public <T> JavaDStream<T> queueStream(Queue<JavaRDD<T>> queue, boolean oneAtATime, JavaRDD<T> defaultRDD)

Receiver stream:

public <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver)

Example queue stream:

Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
JavaDStream<String> queueStream = jssc.queueStream(rddQueue);

// Add RDDs to queue
rddQueue.add(jssc.sparkContext().parallelize(Arrays.asList("hello", "world")));

JavaDStream Transformations

Basic Transformations

Map operations:

public <R> JavaDStream<R> map(Function<T, R> f)
public <R> JavaDStream<R> mapPartitions(FlatMapFunction<Iterator<T>, R> f)
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f)

Filter and utility operations:

public JavaDStream<T> filter(Function<T, Boolean> f)
public JavaDStream<T[]> glom()
public JavaDStream<T> cache()
public JavaDStream<T> persist(StorageLevel level)
public JavaDStream<T> repartition(int numPartitions)

Example basic transformations:

JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

JavaDStream<Integer> lengths = lines.map(String::length);
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaDStream<String> nonEmpty = lines.filter(line -> !line.isEmpty());

Aggregation Operations

Reduce and count operations:

public JavaDStream<T> reduce(Function2<T, T, T> f)
public JavaDStream<Long> count()
public JavaPairDStream<T, Long> countByValue()
public JavaPairDStream<T, Long> countByValue(int numPartitions)

Example aggregations:

JavaDStream<Integer> numbers = lines.map(Integer::parseInt);

JavaDStream<Integer> sum = numbers.reduce(Integer::sum);
JavaDStream<Long> count = numbers.count();
JavaPairDStream<Integer, Long> histogram = numbers.countByValue();

Window Operations

Basic windowing:

public JavaDStream<T> window(Duration windowDuration)
public JavaDStream<T> window(Duration windowDuration, Duration slideDuration)

Windowed reductions:

public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Duration windowDuration, Duration slideDuration)
public JavaDStream<T> reduceByWindow(Function2<T, T, T> reduceFunc, Function2<T, T, T> invReduceFunc, Duration windowDuration, Duration slideDuration)
public JavaDStream<Long> countByWindow(Duration windowDuration, Duration slideDuration)

Example windowing:

JavaDStream<String> windowedLines = lines.window(Durations.seconds(30), Durations.seconds(10));
JavaDStream<Integer> windowSum = numbers.reduceByWindow(
    Integer::sum,
    Durations.minutes(1),
    Durations.seconds(10)
);

Transform Operations

RDD-level transformations:

public <R> JavaDStream<R> transform(Function<JavaRDD<T>, JavaRDD<R>> transformFunc)
public <R> JavaDStream<R> transform(Function2<JavaRDD<T>, Time, JavaRDD<R>> transformFunc)
public <U, R> JavaDStream<R> transformWith(JavaDStream<U> other, Function3<JavaRDD<T>, JavaRDD<U>, Time, JavaRDD<R>> transformFunc)

Example transforms:

JavaDStream<String> processed = lines.transform(rdd -> {
    return rdd.filter(line -> !line.isEmpty())
              .map(String::toUpperCase);
});

JavaDStream<String> timestamped = lines.transform((rdd, time) -> {
    return rdd.map(line -> time.milliseconds() + ": " + line);
});

JavaPairDStream Operations

Pair Creation

Create pair DStream:

public <K2, V2> JavaPairDStream<K2, V2> mapToPair(PairFunction<T, K2, V2> f)
public <K2, V2> JavaPairDStream<K2, V2> flatMapToPair(PairFlatMapFunction<T, K2, V2> f)

Example pair creation:

JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

Key-Value Transformations

Value transformations:

public <W> JavaPairDStream<K, W> mapValues(Function<V, W> f)  // On JavaPairDStream<K, V>
public <W> JavaPairDStream<K, W> flatMapValues(Function<V, Iterable<W>> f)

Grouping and reduction:

public JavaPairDStream<K, Iterable<V>> groupByKey()
public JavaPairDStream<K, Iterable<V>> groupByKey(int numPartitions)
public JavaPairDStream<K, Iterable<V>> groupByKey(Partitioner partitioner)
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func)
public JavaPairDStream<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)

Example key-value operations:

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);
JavaPairDStream<String, String> upperValues = pairs.mapValues(String::valueOf).mapValues(String::toUpperCase);

Windowed Key-Value Operations

Windowed grouping and reduction:

public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration)
public JavaPairDStream<K, Iterable<V>> groupByKeyAndWindow(Duration windowDuration, Duration slideDuration)
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration)
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> func, Duration windowDuration, Duration slideDuration)
public JavaPairDStream<K, V> reduceByKeyAndWindow(Function2<V, V, V> reduceFunc, Function2<V, V, V> invReduceFunc, Duration windowDuration, Duration slideDuration)

Example windowed operations:

JavaPairDStream<String, Integer> windowedCounts = pairs.reduceByKeyAndWindow(
    Integer::sum,
    Durations.minutes(5),
    Durations.seconds(30)
);

Join Operations

Join DStreams:

public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other)
public <W> JavaPairDStream<K, Tuple2<V, W>> join(JavaPairDStream<K, W> other, int numPartitions)
public <W> JavaPairDStream<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairDStream<K, W> other)
public <W> JavaPairDStream<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairDStream<K, W> other)
public <W> JavaPairDStream<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairDStream<K, W> other)

Example joins:

JavaPairDStream<String, String> stream1 = lines1.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));
JavaPairDStream<String, String> stream2 = lines2.mapToPair(line -> new Tuple2<>(line.split(",")[0], line.split(",")[1]));

JavaPairDStream<String, Tuple2<String, String>> joined = stream1.join(stream2);

Stateful Operations

UpdateStateByKey

Update state by key:

public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc)  // On JavaPairDStream<K, V>
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, int numPartitions)
public <S> JavaPairDStream<K, S> updateStateByKey(Function2<List<V>, Optional<S>, Optional<S>> updateFunc, Partitioner partitioner, JavaRDD<Tuple2<K, S>> initialRDD)

Example updateStateByKey:

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
    (values, state) -> {
        int currentCount = values.stream().mapToInt(Integer::intValue).sum();
        int newCount = state.orElse(0) + currentCount;
        return Optional.of(newCount);
    }
);

MapWithState

State specification and mapping:

// StateSpec factory methods for Java
public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction3<K, Optional<V>, State<S>, T> mappingFunction)
public static <K, V, S, T> StateSpec<K, V, S, T> function(JFunction4<Time, K, Optional<V>, State<S>, Optional<T>> mappingFunction)

StateSpec configuration:

public StateSpec<K, V, S, T> initialState(JavaPairRDD<K, S> rdd)
public StateSpec<K, V, S, T> numPartitions(int numPartitions)
public StateSpec<K, V, S, T> partitioner(Partitioner partitioner)
public StateSpec<K, V, S, T> timeout(Duration idleDuration)

Example mapWithState:

StateSpec<String, Integer, Integer, Tuple2<String, Integer>> stateSpec = 
    StateSpec.function((word, one, state) -> {
        int sum = one.orElse(0) + state.getOption().orElse(0);
        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
        state.update(sum);
        return output;
    });

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDStream = 
    pairs.mapWithState(stateSpec);

Output Operations

Basic Output

Print and forEach operations:

public void print()
public void print(int num)
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc)
public void foreachRDD(VoidFunction2<JavaRDD<T>, Time> foreachFunc)

File Output

Save operations:

public void saveAsTextFiles(String prefix, String suffix)  // On JavaDStream
public void saveAsObjectFiles(String prefix, String suffix)
public void saveAsHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass)  // On JavaPairDStream
public void saveAsNewAPIHadoopFiles(String prefix, String suffix, Class<?> keyClass, Class<?> valueClass, Class<? extends NewOutputFormat> outputFormatClass)

Example output operations:

wordCounts.print();

wordCounts.foreachRDD(rdd -> {
    System.out.println("Batch size: " + rdd.count());
    rdd.collect().forEach(System.out::println);
});

lines.saveAsTextFiles("output", "txt");

Event Listeners

JavaStreamingListener

Java streaming listener interface:

public abstract class JavaStreamingListener {
    public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
    public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
    public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
    public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}
    public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}
    public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
    public void onReceiverError(StreamingListenerReceiverError receiverError) {}
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
}

Add/remove listeners:

public void addStreamingListener(StreamingListener streamingListener)
public void removeStreamingListener(StreamingListener streamingListener)

Example listener:

jssc.addStreamingListener(new JavaStreamingListener() {
    @Override
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
        BatchInfo info = batchCompleted.batchInfo();
        System.out.println("Batch completed: " + info.batchTime() + 
                         " Processing time: " + info.processingDelay());
    }
});

Duration Utilities

Duration creation:

public class Durations {
    public static Duration milliseconds(long milliseconds)
    public static Duration seconds(long seconds)
    public static Duration minutes(long minutes)
}

Example duration usage:

Duration batchInterval = Durations.seconds(5);
Duration windowSize = Durations.minutes(10);
Duration slideInterval = Durations.seconds(30);

Complete Java Example

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.Durations;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Optional;

public class JavaWordCount {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf()
            .setAppName("JavaWordCount")
            .setMaster("local[2]");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        jssc.checkpoint("checkpoint");

        // Create input stream
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // Transform and count words
        JavaDStream<String> words = lines.flatMap(line -> 
            Arrays.asList(line.split(" ")).iterator());

        JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> 
            new Tuple2<>(word, 1));

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);

        // Running count across batches
        JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(
            (values, state) -> {
                int currentCount = values.stream().mapToInt(Integer::intValue).sum();
                int newCount = state.orElse(0) + currentCount;
                return Optional.of(newCount);
            }
        );

        wordCounts.print();
        runningCounts.print();

        jssc.start();
        jssc.awaitTermination();
    }
}

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming

docs

core-operations.md

index.md

input-sources.md

java-api.md

output-operations.md

stateful-operations.md

transformations.md

tile.json