CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

java-api.mddocs/

Java API

Spark provides comprehensive Java APIs that mirror the Scala functionality while providing Java-friendly interfaces. The Java API includes JavaRDD, JavaPairRDD, and JavaDoubleRDD classes that offer type-safe operations for Java developers.

JavaSparkContext

The Java-friendly version of SparkContext.

JavaSparkContext Class

public class JavaSparkContext {
    // Constructors
    public JavaSparkContext()
    public JavaSparkContext(SparkConf conf)
    public JavaSparkContext(String master, String appName)
    public JavaSparkContext(String master, String appName, SparkConf conf)
    public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)
    public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)
    public JavaSparkContext(String master, String appName, String sparkHome, String[] jars, Map<String, String> environment)
    
    // Core properties
    public SparkContext sc()
    public String master()
    public String appName()  
    public Boolean isLocal()
    public Integer defaultParallelism()
    public Integer defaultMinPartitions()
}

Creating JavaSparkContext

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

// Basic creation with SparkConf
SparkConf conf = new SparkConf()
    .setAppName("Java Spark App")
    .setMaster("local[*]");

JavaSparkContext jsc = new JavaSparkContext(conf);

// Alternative constructors
JavaSparkContext jsc2 = new JavaSparkContext("local[*]", "My Java App");

// With all parameters
String[] jars = {"myapp.jar", "dependencies.jar"};
Map<String, String> env = new HashMap<>();
env.put("SPARK_ENV", "production");

JavaSparkContext jsc3 = new JavaSparkContext(
    "local[*]",           // master
    "My Java App",        // app name
    "/path/to/spark",     // spark home
    jars,                 // jar files
    env                   // environment
);

JavaRDD

Java-friendly wrapper for RDD operations.

JavaRDD Class

public class JavaRDD<T> extends AbstractJavaRDDLike<T, JavaRDD<T>> {
    // Transformations
    public <U> JavaRDD<U> map(Function<T, U> f)
    public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)
    public JavaRDD<T> filter(Function<T, Boolean> f)
    public JavaRDD<T> distinct()
    public JavaRDD<T> distinct(int numPartitions)
    public JavaRDD<T> sample(boolean withReplacement, double fraction)
    public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)
    public JavaRDD<T> union(JavaRDD<T> other)
    public JavaRDD<T> intersection(JavaRDD<T> other)
    public JavaRDD<T> subtract(JavaRDD<T> other)
    public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)
    
    // Partition operations
    public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)
    public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning)
    public JavaRDD<T> coalesce(int numPartitions)
    public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)
    public JavaRDD<T> repartition(int numPartitions)
    
    // Actions
    public List<T> collect()
    public long count()
    public T first()
    public List<T> take(int num)
    public List<T> takeSample(boolean withReplacement, int num)
    public List<T> takeSample(boolean withReplacement, int num, long seed)
    public T reduce(Function2<T, T, T> f)
    public T fold(T zeroValue, Function2<T, T, T> func)
    public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)
    public void foreach(VoidFunction<T> f)
    public void foreachPartition(VoidFunction<Iterator<T>> f)
    
    // Persistence
    public JavaRDD<T> cache()
    public JavaRDD<T> persist(StorageLevel newLevel)
    public JavaRDD<T> unpersist()
    public JavaRDD<T> unpersist(boolean blocking)
}

Creating and Using JavaRDD

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.List;

// Create JavaRDD from collection
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> javaRDD = jsc.parallelize(data);

// Map transformation
JavaRDD<Integer> doubled = javaRDD.map(new Function<Integer, Integer>() {
    public Integer call(Integer x) {
        return x * 2;
    }
});

// Using lambda expressions (Java 8+)
JavaRDD<Integer> doubled2 = javaRDD.map(x -> x * 2);

// FlatMap transformation
JavaRDD<String> lines = jsc.textFile("input.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
        return Arrays.asList(line.split(" "));
    }
});

// With lambda
JavaRDD<String> words2 = lines.flatMap(line -> Arrays.asList(line.split(" ")));

// Filter transformation
JavaRDD<Integer> evens = javaRDD.filter(new Function<Integer, Boolean>() {
    public Boolean call(Integer x) {
        return x % 2 == 0;
    }
});

// With lambda
JavaRDD<Integer> evens2 = javaRDD.filter(x -> x % 2 == 0);

Actions on JavaRDD

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = jsc.parallelize(data);

// Collect all elements
List<Integer> result = rdd.collect();

// Count elements
long count = rdd.count();

// Get first element
Integer first = rdd.first();

// Take first n elements
List<Integer> firstThree = rdd.take(3);

// Reduce with function
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer a, Integer b) {
        return a + b;
    }
});

// With lambda
Integer sum2 = rdd.reduce((a, b) -> a + b);

// Fold with zero value
Integer foldResult = rdd.fold(0, (a, b) -> a + b);

// Aggregate with different types
class Stats implements Serializable {
    public int sum;
    public int count;
    
    public Stats(int sum, int count) {
        this.sum = sum;
        this.count = count;
    }
}

Stats stats = rdd.aggregate(
    new Stats(0, 0),  // Zero value
    new Function2<Stats, Integer, Stats>() {  // Seq function
        public Stats call(Stats s, Integer x) {
            return new Stats(s.sum + x, s.count + 1);
        }
    },
    new Function2<Stats, Stats, Stats>() {  // Combine function
        public Stats call(Stats s1, Stats s2) {
            return new Stats(s1.sum + s2.sum, s1.count + s2.count);
        }
    }
);

JavaPairRDD

Java wrapper for key-value pair RDDs.

JavaPairRDD Class

public class JavaPairRDD<K, V> extends AbstractJavaRDDLike<Tuple2<K, V>, JavaPairRDD<K, V>> {
    // Key-Value operations
    public JavaRDD<K> keys()
    public JavaRDD<V> values()
    public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)
    public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f)
    
    // Aggregations
    public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)
    public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)
    public JavaPairRDD<K, Iterable<V>> groupByKey()
    public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)
    public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner)
    public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc)
    public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)
    
    // Joins
    public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
    public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)
    public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
    public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)
    
    // Actions
    public Map<K, V> collectAsMap()
    public Map<K, Long> countByKey()
    public List<V> lookup(K key)
    
    // Save operations
    public void saveAsHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass)
    public void saveAsNewAPIHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass)
}

Creating and Using JavaPairRDD

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;

// Create JavaPairRDD from tuples
List<Tuple2<String, Integer>> pairs = Arrays.asList(
    new Tuple2<>("apple", 5),
    new Tuple2<>("banana", 3),
    new Tuple2<>("apple", 2),
    new Tuple2<>("orange", 1)
);

JavaPairRDD<String, Integer> pairRDD = jsc.parallelizePairs(pairs);

// Create from JavaRDD using mapToPair
JavaRDD<String> lines = jsc.textFile("input.txt");
JavaPairRDD<String, Integer> wordCounts = lines
    .flatMap(line -> Arrays.asList(line.split(" ")))
    .mapToPair(new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String word) {
            return new Tuple2<>(word, 1);
        }
    });

// With lambda
JavaPairRDD<String, Integer> wordCounts2 = lines
    .flatMap(line -> Arrays.asList(line.split(" ")))
    .mapToPair(word -> new Tuple2<>(word, 1));

Key-Value Transformations

JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(
    new Tuple2<>("a", 1),
    new Tuple2<>("b", 2),
    new Tuple2<>("a", 3)
));

// Get keys and values
JavaRDD<String> keys = pairs.keys();
JavaRDD<Integer> values = pairs.values();

// Transform values while preserving keys
JavaPairRDD<String, Integer> doubled = pairs.mapValues(x -> x * 2);

// FlatMap values
JavaPairRDD<String, Character> chars = pairs.flatMapValues(
    value -> Arrays.asList(value.toString().toCharArray())
);

// Reduce by key
JavaPairRDD<String, Integer> sums = pairs.reduceByKey((a, b) -> a + b);

// Group by key
JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();

// Aggregate by key
JavaPairRDD<String, Integer> aggregated = pairs.aggregateByKey(
    0,                           // Zero value
    (acc, value) -> acc + value, // Seq function
    (acc1, acc2) -> acc1 + acc2  // Combine function
);

Join Operations

JavaPairRDD<String, String> names = jsc.parallelizePairs(Arrays.asList(
    new Tuple2<>("1", "Alice"),
    new Tuple2<>("2", "Bob"),
    new Tuple2<>("3", "Charlie")
));

JavaPairRDD<String, Integer> ages = jsc.parallelizePairs(Arrays.asList(
    new Tuple2<>("1", 25),
    new Tuple2<>("2", 30),
    new Tuple2<>("4", 35)
));

// Inner join
JavaPairRDD<String, Tuple2<String, Integer>> joined = names.join(ages);
// Result: [("1", ("Alice", 25)), ("2", ("Bob", 30))]

// Left outer join
JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftJoined = names.leftOuterJoin(ages);
// Result: [("1", ("Alice", Some(25))), ("2", ("Bob", Some(30))), ("3", ("Charlie", None))]

// Full outer join
JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullJoined = names.fullOuterJoin(ages);

// Cogroup
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = names.cogroup(ages);

Actions on JavaPairRDD

JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(
    new Tuple2<>("apple", 5),
    new Tuple2<>("banana", 3),
    new Tuple2<>("apple", 2)
));

// Collect as Map (assumes unique keys)
Map<String, Integer> map = pairs.collectAsMap();

// Count by key
Map<String, Long> counts = pairs.countByKey();

// Lookup values for a key
List<Integer> appleValues = pairs.lookup("apple");  // [5, 2]

// Count all elements
long totalCount = pairs.count();

JavaDoubleRDD

Specialized RDD for double values with statistical operations.

JavaDoubleRDD Class

public class JavaDoubleRDD extends AbstractJavaRDDLike<Double, JavaDoubleRDD> {
    // Statistical operations
    public double mean()
    public double sum()
    public StatCounter stats()
    public double variance()
    public double sampleVariance() 
    public double stdev()
    public double sampleStdev()
    public long[] histogram(double[] buckets)
    public Tuple2<double[], long[]> histogram(int bucketCount)
}

Using JavaDoubleRDD

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.util.StatCounter;

// Create JavaDoubleRDD
List<Double> numbers = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0);
JavaDoubleRDD doubleRDD = jsc.parallelizeDoubles(numbers);

// Convert from JavaRDD<Double>
JavaRDD<Double> rdd = jsc.parallelize(numbers);
JavaDoubleRDD doubleRDD2 = rdd.mapToDouble(x -> x);

// Statistical operations
double mean = doubleRDD.mean();
double sum = doubleRDD.sum();
double variance = doubleRDD.variance();
double stdev = doubleRDD.stdev();

// Get detailed statistics
StatCounter stats = doubleRDD.stats();
System.out.println("Count: " + stats.count());
System.out.println("Mean: " + stats.mean());
System.out.println("Stdev: " + stats.stdev());
System.out.println("Max: " + stats.max());
System.out.println("Min: " + stats.min());

// Histogram
double[] buckets = {0.0, 2.0, 4.0, 6.0};
long[] histogram = doubleRDD.histogram(buckets);

// Or with automatic bucketing
Tuple2<double[], long[]> autoHistogram = doubleRDD.histogram(4);

Function Interfaces

Java API uses function interfaces for type-safe transformations.

Function Interfaces

// Single argument function
public interface Function<T, R> extends Serializable {
    R call(T t) throws Exception;
}

// Two argument function
public interface Function2<T1, T2, R> extends Serializable {
    R call(T1 t1, T2 t2) throws Exception;
}

// Void function (for actions)
public interface VoidFunction<T> extends Serializable {
    void call(T t) throws Exception;
}

// FlatMap function
public interface FlatMapFunction<T, R> extends Serializable {
    Iterable<R> call(T t) throws Exception;
}

// Pair function (for creating key-value pairs)
public interface PairFunction<T, K, V> extends Serializable {
    Tuple2<K, V> call(T t) throws Exception;
}

// PairFlatMap function
public interface PairFlatMapFunction<T, K, V> extends Serializable {
    Iterable<Tuple2<K, V>> call(T t) throws Exception;
}

Function Usage Examples

import org.apache.spark.api.java.function.*;

// Anonymous inner class
JavaRDD<Integer> doubled = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer x) {
        return x * 2;
    }
});

// Lambda expression (Java 8+)
JavaRDD<Integer> doubled2 = rdd.map(x -> x * 2);

// Method reference (Java 8+)
JavaRDD<String> strings = rdd.map(Object::toString);

// Complex transformation with PairFunction
JavaPairRDD<String, Integer> pairs = words.mapToPair(
    new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String word) {
            return new Tuple2<>(word.toLowerCase(), word.length());
        }
    }
);

// FlatMap example
JavaRDD<String> words = lines.flatMap(
    new FlatMapFunction<String, String>() {
        public Iterable<String> call(String line) {
            return Arrays.asList(line.split("\\s+"));
        }
    }
);

// Void function for actions
rdd.foreach(new VoidFunction<Integer>() {
    public void call(Integer x) {
        System.out.println(x);
    }
});

Shared Variables in Java

Broadcast Variables

import org.apache.spark.broadcast.Broadcast;
import java.util.Map;
import java.util.HashMap;

// Create broadcast variable
Map<String, Integer> lookupTable = new HashMap<>();
lookupTable.put("apple", 1);
lookupTable.put("banana", 2);
lookupTable.put("orange", 3);

Broadcast<Map<String, Integer>> broadcastTable = jsc.broadcast(lookupTable);

// Use in transformations
JavaRDD<String> fruits = jsc.parallelize(Arrays.asList("apple", "banana", "apple"));
JavaRDD<Integer> codes = fruits.map(fruit -> 
    broadcastTable.value().getOrDefault(fruit, 0)
);

// Clean up
broadcastTable.unpersist();

Accumulators

import org.apache.spark.Accumulator;

// Create accumulator
Accumulator<Integer> errorCount = jsc.accumulator(0);

// Use in transformations
JavaRDD<String> lines = jsc.textFile("input.txt");
JavaRDD<String> validLines = lines.filter(line -> {
    if (line.trim().isEmpty()) {
        errorCount.add(1);
        return false;
    }
    return true;
});

// Trigger action to update accumulator
validLines.count();

// Get accumulator value
System.out.println("Error count: " + errorCount.value());

// Custom accumulator types
Accumulator<Double> doubleAcc = jsc.accumulator(0.0);

Complete Example

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Map;

public class SparkWordCount {
    public static void main(String[] args) {
        // Create Spark context
        SparkConf conf = new SparkConf()
            .setAppName("Java Word Count")
            .setMaster("local[*]");
        
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        try {
            // Read input file
            JavaRDD<String> lines = jsc.textFile("input.txt");
            
            // Split lines into words
            JavaRDD<String> words = lines.flatMap(line -> 
                Arrays.asList(line.toLowerCase().split("\\s+"))
            );
            
            // Filter out empty words
            JavaRDD<String> validWords = words.filter(word -> !word.trim().isEmpty());
            
            // Create word-count pairs
            JavaPairRDD<String, Integer> wordPairs = validWords.mapToPair(
                word -> new Tuple2<>(word, 1)
            );
            
            // Sum counts by key
            JavaPairRDD<String, Integer> wordCounts = wordPairs.reduceByKey(
                (a, b) -> a + b
            );
            
            // Sort by count descending
            JavaPairRDD<String, Integer> sortedCounts = wordCounts.mapToPair(
                pair -> new Tuple2<>(pair._2, pair._1)  // Swap to (count, word)
            ).sortByKey(false).mapToPair(
                pair -> new Tuple2<>(pair._2, pair._1)  // Swap back to (word, count)
            );
            
            // Collect and print results
            Map<String, Integer> results = sortedCounts.collectAsMap();
            
            System.out.println("Word Count Results:");
            results.entrySet().stream()
                .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                .limit(10)
                .forEach(entry -> 
                    System.out.println(entry.getKey() + ": " + entry.getValue())
                );
            
            // Save results
            sortedCounts.saveAsTextFile("output");
            
        } finally {
            // Stop Spark context
            jsc.stop();
        }
    }
}

Maven Dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

This comprehensive guide covers the complete Java API for Apache Spark, enabling Java developers to build scalable data processing applications with type safety and familiar Java patterns.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json