Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
Spark provides comprehensive Java APIs that mirror the Scala functionality while providing Java-friendly interfaces. The Java API includes JavaRDD, JavaPairRDD, and JavaDoubleRDD classes that offer type-safe operations for Java developers.
The Java-friendly version of SparkContext.
public class JavaSparkContext {
// Constructors
public JavaSparkContext()
public JavaSparkContext(SparkConf conf)
public JavaSparkContext(String master, String appName)
public JavaSparkContext(String master, String appName, SparkConf conf)
public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars, Map<String, String> environment)
// Core properties
public SparkContext sc()
public String master()
public String appName()
public Boolean isLocal()
public Integer defaultParallelism()
public Integer defaultMinPartitions()
}import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
// Basic creation with SparkConf
SparkConf conf = new SparkConf()
.setAppName("Java Spark App")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
// Alternative constructors
JavaSparkContext jsc2 = new JavaSparkContext("local[*]", "My Java App");
// With all parameters
String[] jars = {"myapp.jar", "dependencies.jar"};
Map<String, String> env = new HashMap<>();
env.put("SPARK_ENV", "production");
JavaSparkContext jsc3 = new JavaSparkContext(
"local[*]", // master
"My Java App", // app name
"/path/to/spark", // spark home
jars, // jar files
env // environment
);Java-friendly wrapper for RDD operations.
public class JavaRDD<T> extends AbstractJavaRDDLike<T, JavaRDD<T>> {
// Transformations
public <U> JavaRDD<U> map(Function<T, U> f)
public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)
public JavaRDD<T> filter(Function<T, Boolean> f)
public JavaRDD<T> distinct()
public JavaRDD<T> distinct(int numPartitions)
public JavaRDD<T> sample(boolean withReplacement, double fraction)
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)
public JavaRDD<T> union(JavaRDD<T> other)
public JavaRDD<T> intersection(JavaRDD<T> other)
public JavaRDD<T> subtract(JavaRDD<T> other)
public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)
// Partition operations
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)
public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning)
public JavaRDD<T> coalesce(int numPartitions)
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)
public JavaRDD<T> repartition(int numPartitions)
// Actions
public List<T> collect()
public long count()
public T first()
public List<T> take(int num)
public List<T> takeSample(boolean withReplacement, int num)
public List<T> takeSample(boolean withReplacement, int num, long seed)
public T reduce(Function2<T, T, T> f)
public T fold(T zeroValue, Function2<T, T, T> func)
public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)
public void foreach(VoidFunction<T> f)
public void foreachPartition(VoidFunction<Iterator<T>> f)
// Persistence
public JavaRDD<T> cache()
public JavaRDD<T> persist(StorageLevel newLevel)
public JavaRDD<T> unpersist()
public JavaRDD<T> unpersist(boolean blocking)
}import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.List;
// Create JavaRDD from collection
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> javaRDD = jsc.parallelize(data);
// Map transformation
JavaRDD<Integer> doubled = javaRDD.map(new Function<Integer, Integer>() {
public Integer call(Integer x) {
return x * 2;
}
});
// Using lambda expressions (Java 8+)
JavaRDD<Integer> doubled2 = javaRDD.map(x -> x * 2);
// FlatMap transformation
JavaRDD<String> lines = jsc.textFile("input.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
// With lambda
JavaRDD<String> words2 = lines.flatMap(line -> Arrays.asList(line.split(" ")));
// Filter transformation
JavaRDD<Integer> evens = javaRDD.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer x) {
return x % 2 == 0;
}
});
// With lambda
JavaRDD<Integer> evens2 = javaRDD.filter(x -> x % 2 == 0);List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = jsc.parallelize(data);
// Collect all elements
List<Integer> result = rdd.collect();
// Count elements
long count = rdd.count();
// Get first element
Integer first = rdd.first();
// Take first n elements
List<Integer> firstThree = rdd.take(3);
// Reduce with function
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// With lambda
Integer sum2 = rdd.reduce((a, b) -> a + b);
// Fold with zero value
Integer foldResult = rdd.fold(0, (a, b) -> a + b);
// Aggregate with different types
class Stats implements Serializable {
public int sum;
public int count;
public Stats(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
Stats stats = rdd.aggregate(
new Stats(0, 0), // Zero value
new Function2<Stats, Integer, Stats>() { // Seq function
public Stats call(Stats s, Integer x) {
return new Stats(s.sum + x, s.count + 1);
}
},
new Function2<Stats, Stats, Stats>() { // Combine function
public Stats call(Stats s1, Stats s2) {
return new Stats(s1.sum + s2.sum, s1.count + s2.count);
}
}
);Java wrapper for key-value pair RDDs.
public class JavaPairRDD<K, V> extends AbstractJavaRDDLike<Tuple2<K, V>, JavaPairRDD<K, V>> {
// Key-Value operations
public JavaRDD<K> keys()
public JavaRDD<V> values()
public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)
public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f)
// Aggregations
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)
public JavaPairRDD<K, Iterable<V>> groupByKey()
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner)
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc)
public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)
// Joins
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)
// Actions
public Map<K, V> collectAsMap()
public Map<K, Long> countByKey()
public List<V> lookup(K key)
// Save operations
public void saveAsHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass)
public void saveAsNewAPIHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass)
}import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
// Create JavaPairRDD from tuples
List<Tuple2<String, Integer>> pairs = Arrays.asList(
new Tuple2<>("apple", 5),
new Tuple2<>("banana", 3),
new Tuple2<>("apple", 2),
new Tuple2<>("orange", 1)
);
JavaPairRDD<String, Integer> pairRDD = jsc.parallelizePairs(pairs);
// Create from JavaRDD using mapToPair
JavaRDD<String> lines = jsc.textFile("input.txt");
JavaPairRDD<String, Integer> wordCounts = lines
.flatMap(line -> Arrays.asList(line.split(" ")))
.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) {
return new Tuple2<>(word, 1);
}
});
// With lambda
JavaPairRDD<String, Integer> wordCounts2 = lines
.flatMap(line -> Arrays.asList(line.split(" ")))
.mapToPair(word -> new Tuple2<>(word, 1));JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("b", 2),
new Tuple2<>("a", 3)
));
// Get keys and values
JavaRDD<String> keys = pairs.keys();
JavaRDD<Integer> values = pairs.values();
// Transform values while preserving keys
JavaPairRDD<String, Integer> doubled = pairs.mapValues(x -> x * 2);
// FlatMap values
JavaPairRDD<String, Character> chars = pairs.flatMapValues(
value -> Arrays.asList(value.toString().toCharArray())
);
// Reduce by key
JavaPairRDD<String, Integer> sums = pairs.reduceByKey((a, b) -> a + b);
// Group by key
JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();
// Aggregate by key
JavaPairRDD<String, Integer> aggregated = pairs.aggregateByKey(
0, // Zero value
(acc, value) -> acc + value, // Seq function
(acc1, acc2) -> acc1 + acc2 // Combine function
);JavaPairRDD<String, String> names = jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("1", "Alice"),
new Tuple2<>("2", "Bob"),
new Tuple2<>("3", "Charlie")
));
JavaPairRDD<String, Integer> ages = jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("1", 25),
new Tuple2<>("2", 30),
new Tuple2<>("4", 35)
));
// Inner join
JavaPairRDD<String, Tuple2<String, Integer>> joined = names.join(ages);
// Result: [("1", ("Alice", 25)), ("2", ("Bob", 30))]
// Left outer join
JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftJoined = names.leftOuterJoin(ages);
// Result: [("1", ("Alice", Some(25))), ("2", ("Bob", Some(30))), ("3", ("Charlie", None))]
// Full outer join
JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullJoined = names.fullOuterJoin(ages);
// Cogroup
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = names.cogroup(ages);JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("apple", 5),
new Tuple2<>("banana", 3),
new Tuple2<>("apple", 2)
));
// Collect as Map (assumes unique keys)
Map<String, Integer> map = pairs.collectAsMap();
// Count by key
Map<String, Long> counts = pairs.countByKey();
// Lookup values for a key
List<Integer> appleValues = pairs.lookup("apple"); // [5, 2]
// Count all elements
long totalCount = pairs.count();Specialized RDD for double values with statistical operations.
public class JavaDoubleRDD extends AbstractJavaRDDLike<Double, JavaDoubleRDD> {
// Statistical operations
public double mean()
public double sum()
public StatCounter stats()
public double variance()
public double sampleVariance()
public double stdev()
public double sampleStdev()
public long[] histogram(double[] buckets)
public Tuple2<double[], long[]> histogram(int bucketCount)
}import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.util.StatCounter;
// Create JavaDoubleRDD
List<Double> numbers = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0);
JavaDoubleRDD doubleRDD = jsc.parallelizeDoubles(numbers);
// Convert from JavaRDD<Double>
JavaRDD<Double> rdd = jsc.parallelize(numbers);
JavaDoubleRDD doubleRDD2 = rdd.mapToDouble(x -> x);
// Statistical operations
double mean = doubleRDD.mean();
double sum = doubleRDD.sum();
double variance = doubleRDD.variance();
double stdev = doubleRDD.stdev();
// Get detailed statistics
StatCounter stats = doubleRDD.stats();
System.out.println("Count: " + stats.count());
System.out.println("Mean: " + stats.mean());
System.out.println("Stdev: " + stats.stdev());
System.out.println("Max: " + stats.max());
System.out.println("Min: " + stats.min());
// Histogram
double[] buckets = {0.0, 2.0, 4.0, 6.0};
long[] histogram = doubleRDD.histogram(buckets);
// Or with automatic bucketing
Tuple2<double[], long[]> autoHistogram = doubleRDD.histogram(4);Java API uses function interfaces for type-safe transformations.
// Single argument function
public interface Function<T, R> extends Serializable {
R call(T t) throws Exception;
}
// Two argument function
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 t1, T2 t2) throws Exception;
}
// Void function (for actions)
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
// FlatMap function
public interface FlatMapFunction<T, R> extends Serializable {
Iterable<R> call(T t) throws Exception;
}
// Pair function (for creating key-value pairs)
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
// PairFlatMap function
public interface PairFlatMapFunction<T, K, V> extends Serializable {
Iterable<Tuple2<K, V>> call(T t) throws Exception;
}import org.apache.spark.api.java.function.*;
// Anonymous inner class
JavaRDD<Integer> doubled = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) {
return x * 2;
}
});
// Lambda expression (Java 8+)
JavaRDD<Integer> doubled2 = rdd.map(x -> x * 2);
// Method reference (Java 8+)
JavaRDD<String> strings = rdd.map(Object::toString);
// Complex transformation with PairFunction
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) {
return new Tuple2<>(word.toLowerCase(), word.length());
}
}
);
// FlatMap example
JavaRDD<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split("\\s+"));
}
}
);
// Void function for actions
rdd.foreach(new VoidFunction<Integer>() {
public void call(Integer x) {
System.out.println(x);
}
});import org.apache.spark.broadcast.Broadcast;
import java.util.Map;
import java.util.HashMap;
// Create broadcast variable
Map<String, Integer> lookupTable = new HashMap<>();
lookupTable.put("apple", 1);
lookupTable.put("banana", 2);
lookupTable.put("orange", 3);
Broadcast<Map<String, Integer>> broadcastTable = jsc.broadcast(lookupTable);
// Use in transformations
JavaRDD<String> fruits = jsc.parallelize(Arrays.asList("apple", "banana", "apple"));
JavaRDD<Integer> codes = fruits.map(fruit ->
broadcastTable.value().getOrDefault(fruit, 0)
);
// Clean up
broadcastTable.unpersist();import org.apache.spark.Accumulator;
// Create accumulator
Accumulator<Integer> errorCount = jsc.accumulator(0);
// Use in transformations
JavaRDD<String> lines = jsc.textFile("input.txt");
JavaRDD<String> validLines = lines.filter(line -> {
if (line.trim().isEmpty()) {
errorCount.add(1);
return false;
}
return true;
});
// Trigger action to update accumulator
validLines.count();
// Get accumulator value
System.out.println("Error count: " + errorCount.value());
// Custom accumulator types
Accumulator<Double> doubleAcc = jsc.accumulator(0.0);import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Map;
public class SparkWordCount {
public static void main(String[] args) {
// Create Spark context
SparkConf conf = new SparkConf()
.setAppName("Java Word Count")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
try {
// Read input file
JavaRDD<String> lines = jsc.textFile("input.txt");
// Split lines into words
JavaRDD<String> words = lines.flatMap(line ->
Arrays.asList(line.toLowerCase().split("\\s+"))
);
// Filter out empty words
JavaRDD<String> validWords = words.filter(word -> !word.trim().isEmpty());
// Create word-count pairs
JavaPairRDD<String, Integer> wordPairs = validWords.mapToPair(
word -> new Tuple2<>(word, 1)
);
// Sum counts by key
JavaPairRDD<String, Integer> wordCounts = wordPairs.reduceByKey(
(a, b) -> a + b
);
// Sort by count descending
JavaPairRDD<String, Integer> sortedCounts = wordCounts.mapToPair(
pair -> new Tuple2<>(pair._2, pair._1) // Swap to (count, word)
).sortByKey(false).mapToPair(
pair -> new Tuple2<>(pair._2, pair._1) // Swap back to (word, count)
);
// Collect and print results
Map<String, Integer> results = sortedCounts.collectAsMap();
System.out.println("Word Count Results:");
results.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.limit(10)
.forEach(entry ->
System.out.println(entry.getKey() + ": " + entry.getValue())
);
// Save results
sortedCounts.saveAsTextFile("output");
} finally {
// Stop Spark context
jsc.stop();
}
}
}<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>This comprehensive guide covers the complete Java API for Apache Spark, enabling Java developers to build scalable data processing applications with type safety and familiar Java patterns.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark