Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
java-api.md docs/
1# Java API23Spark provides comprehensive Java APIs that mirror the Scala functionality while providing Java-friendly interfaces. The Java API includes JavaRDD, JavaPairRDD, and JavaDoubleRDD classes that offer type-safe operations for Java developers.45## JavaSparkContext67The Java-friendly version of SparkContext.89### JavaSparkContext Class1011```java { .api }12public class JavaSparkContext {13// Constructors14public JavaSparkContext()15public JavaSparkContext(SparkConf conf)16public JavaSparkContext(String master, String appName)17public JavaSparkContext(String master, String appName, SparkConf conf)18public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)19public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)20public JavaSparkContext(String master, String appName, String sparkHome, String[] jars, Map<String, String> environment)2122// Core properties23public SparkContext sc()24public String master()25public String appName()26public Boolean isLocal()27public Integer defaultParallelism()28public Integer defaultMinPartitions()29}30```3132### Creating JavaSparkContext3334```java35import org.apache.spark.SparkConf;36import org.apache.spark.api.java.JavaSparkContext;3738// Basic creation with SparkConf39SparkConf conf = new SparkConf()40.setAppName("Java Spark App")41.setMaster("local[*]");4243JavaSparkContext jsc = new JavaSparkContext(conf);4445// Alternative constructors46JavaSparkContext jsc2 = new JavaSparkContext("local[*]", "My Java App");4748// With all parameters49String[] jars = {"myapp.jar", "dependencies.jar"};50Map<String, String> env = new HashMap<>();51env.put("SPARK_ENV", "production");5253JavaSparkContext jsc3 = new JavaSparkContext(54"local[*]", // master55"My Java App", // app name56"/path/to/spark", // spark home57jars, // jar files58env // environment59);60```6162## JavaRDD6364Java-friendly wrapper for RDD operations.6566### JavaRDD Class6768```java { .api }69public class JavaRDD<T> extends AbstractJavaRDDLike<T, JavaRDD<T>> {70// Transformations71public <U> JavaRDD<U> map(Function<T, U> f)72public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)73public JavaRDD<T> filter(Function<T, Boolean> f)74public JavaRDD<T> distinct()75public JavaRDD<T> distinct(int numPartitions)76public JavaRDD<T> sample(boolean withReplacement, double fraction)77public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)78public JavaRDD<T> union(JavaRDD<T> other)79public JavaRDD<T> intersection(JavaRDD<T> other)80public JavaRDD<T> subtract(JavaRDD<T> other)81public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)8283// Partition operations84public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)85public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning)86public JavaRDD<T> coalesce(int numPartitions)87public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)88public JavaRDD<T> repartition(int numPartitions)8990// Actions91public List<T> collect()92public long count()93public T first()94public List<T> take(int num)95public List<T> takeSample(boolean withReplacement, int num)96public List<T> takeSample(boolean withReplacement, int num, long seed)97public T reduce(Function2<T, T, T> f)98public T fold(T zeroValue, Function2<T, T, T> func)99public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)100public void foreach(VoidFunction<T> f)101public void foreachPartition(VoidFunction<Iterator<T>> f)102103// Persistence104public JavaRDD<T> cache()105public JavaRDD<T> persist(StorageLevel newLevel)106public JavaRDD<T> unpersist()107public JavaRDD<T> unpersist(boolean blocking)108}109```110111### Creating and Using JavaRDD112113```java114import org.apache.spark.api.java.JavaRDD;115import org.apache.spark.api.java.function.Function;116import org.apache.spark.api.java.function.Function2;117import org.apache.spark.api.java.function.FlatMapFunction;118import java.util.Arrays;119import java.util.List;120121// Create JavaRDD from collection122List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);123JavaRDD<Integer> javaRDD = jsc.parallelize(data);124125// Map transformation126JavaRDD<Integer> doubled = javaRDD.map(new Function<Integer, Integer>() {127public Integer call(Integer x) {128return x * 2;129}130});131132// Using lambda expressions (Java 8+)133JavaRDD<Integer> doubled2 = javaRDD.map(x -> x * 2);134135// FlatMap transformation136JavaRDD<String> lines = jsc.textFile("input.txt");137JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {138public Iterable<String> call(String line) {139return Arrays.asList(line.split(" "));140}141});142143// With lambda144JavaRDD<String> words2 = lines.flatMap(line -> Arrays.asList(line.split(" ")));145146// Filter transformation147JavaRDD<Integer> evens = javaRDD.filter(new Function<Integer, Boolean>() {148public Boolean call(Integer x) {149return x % 2 == 0;150}151});152153// With lambda154JavaRDD<Integer> evens2 = javaRDD.filter(x -> x % 2 == 0);155```156157### Actions on JavaRDD158159```java160List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);161JavaRDD<Integer> rdd = jsc.parallelize(data);162163// Collect all elements164List<Integer> result = rdd.collect();165166// Count elements167long count = rdd.count();168169// Get first element170Integer first = rdd.first();171172// Take first n elements173List<Integer> firstThree = rdd.take(3);174175// Reduce with function176Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {177public Integer call(Integer a, Integer b) {178return a + b;179}180});181182// With lambda183Integer sum2 = rdd.reduce((a, b) -> a + b);184185// Fold with zero value186Integer foldResult = rdd.fold(0, (a, b) -> a + b);187188// Aggregate with different types189class Stats implements Serializable {190public int sum;191public int count;192193public Stats(int sum, int count) {194this.sum = sum;195this.count = count;196}197}198199Stats stats = rdd.aggregate(200new Stats(0, 0), // Zero value201new Function2<Stats, Integer, Stats>() { // Seq function202public Stats call(Stats s, Integer x) {203return new Stats(s.sum + x, s.count + 1);204}205},206new Function2<Stats, Stats, Stats>() { // Combine function207public Stats call(Stats s1, Stats s2) {208return new Stats(s1.sum + s2.sum, s1.count + s2.count);209}210}211);212```213214## JavaPairRDD215216Java wrapper for key-value pair RDDs.217218### JavaPairRDD Class219220```java { .api }221public class JavaPairRDD<K, V> extends AbstractJavaRDDLike<Tuple2<K, V>, JavaPairRDD<K, V>> {222// Key-Value operations223public JavaRDD<K> keys()224public JavaRDD<V> values()225public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)226public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f)227228// Aggregations229public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)230public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)231public JavaPairRDD<K, Iterable<V>> groupByKey()232public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)233public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner)234public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc)235public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)236237// Joins238public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)239public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)240public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)241public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)242public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)243public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)244245// Actions246public Map<K, V> collectAsMap()247public Map<K, Long> countByKey()248public List<V> lookup(K key)249250// Save operations251public void saveAsHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass)252public void saveAsNewAPIHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass)253}254```255256### Creating and Using JavaPairRDD257258```java259import org.apache.spark.api.java.JavaPairRDD;260import org.apache.spark.api.java.function.PairFunction;261import scala.Tuple2;262import java.util.Arrays;263import java.util.List;264265// Create JavaPairRDD from tuples266List<Tuple2<String, Integer>> pairs = Arrays.asList(267new Tuple2<>("apple", 5),268new Tuple2<>("banana", 3),269new Tuple2<>("apple", 2),270new Tuple2<>("orange", 1)271);272273JavaPairRDD<String, Integer> pairRDD = jsc.parallelizePairs(pairs);274275// Create from JavaRDD using mapToPair276JavaRDD<String> lines = jsc.textFile("input.txt");277JavaPairRDD<String, Integer> wordCounts = lines278.flatMap(line -> Arrays.asList(line.split(" ")))279.mapToPair(new PairFunction<String, String, Integer>() {280public Tuple2<String, Integer> call(String word) {281return new Tuple2<>(word, 1);282}283});284285// With lambda286JavaPairRDD<String, Integer> wordCounts2 = lines287.flatMap(line -> Arrays.asList(line.split(" ")))288.mapToPair(word -> new Tuple2<>(word, 1));289```290291### Key-Value Transformations292293```java294JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(295new Tuple2<>("a", 1),296new Tuple2<>("b", 2),297new Tuple2<>("a", 3)298));299300// Get keys and values301JavaRDD<String> keys = pairs.keys();302JavaRDD<Integer> values = pairs.values();303304// Transform values while preserving keys305JavaPairRDD<String, Integer> doubled = pairs.mapValues(x -> x * 2);306307// FlatMap values308JavaPairRDD<String, Character> chars = pairs.flatMapValues(309value -> Arrays.asList(value.toString().toCharArray())310);311312// Reduce by key313JavaPairRDD<String, Integer> sums = pairs.reduceByKey((a, b) -> a + b);314315// Group by key316JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();317318// Aggregate by key319JavaPairRDD<String, Integer> aggregated = pairs.aggregateByKey(3200, // Zero value321(acc, value) -> acc + value, // Seq function322(acc1, acc2) -> acc1 + acc2 // Combine function323);324```325326### Join Operations327328```java329JavaPairRDD<String, String> names = jsc.parallelizePairs(Arrays.asList(330new Tuple2<>("1", "Alice"),331new Tuple2<>("2", "Bob"),332new Tuple2<>("3", "Charlie")333));334335JavaPairRDD<String, Integer> ages = jsc.parallelizePairs(Arrays.asList(336new Tuple2<>("1", 25),337new Tuple2<>("2", 30),338new Tuple2<>("4", 35)339));340341// Inner join342JavaPairRDD<String, Tuple2<String, Integer>> joined = names.join(ages);343// Result: [("1", ("Alice", 25)), ("2", ("Bob", 30))]344345// Left outer join346JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftJoined = names.leftOuterJoin(ages);347// Result: [("1", ("Alice", Some(25))), ("2", ("Bob", Some(30))), ("3", ("Charlie", None))]348349// Full outer join350JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullJoined = names.fullOuterJoin(ages);351352// Cogroup353JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = names.cogroup(ages);354```355356### Actions on JavaPairRDD357358```java359JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(360new Tuple2<>("apple", 5),361new Tuple2<>("banana", 3),362new Tuple2<>("apple", 2)363));364365// Collect as Map (assumes unique keys)366Map<String, Integer> map = pairs.collectAsMap();367368// Count by key369Map<String, Long> counts = pairs.countByKey();370371// Lookup values for a key372List<Integer> appleValues = pairs.lookup("apple"); // [5, 2]373374// Count all elements375long totalCount = pairs.count();376```377378## JavaDoubleRDD379380Specialized RDD for double values with statistical operations.381382### JavaDoubleRDD Class383384```java { .api }385public class JavaDoubleRDD extends AbstractJavaRDDLike<Double, JavaDoubleRDD> {386// Statistical operations387public double mean()388public double sum()389public StatCounter stats()390public double variance()391public double sampleVariance()392public double stdev()393public double sampleStdev()394public long[] histogram(double[] buckets)395public Tuple2<double[], long[]> histogram(int bucketCount)396}397```398399### Using JavaDoubleRDD400401```java402import org.apache.spark.api.java.JavaDoubleRDD;403import org.apache.spark.util.StatCounter;404405// Create JavaDoubleRDD406List<Double> numbers = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0);407JavaDoubleRDD doubleRDD = jsc.parallelizeDoubles(numbers);408409// Convert from JavaRDD<Double>410JavaRDD<Double> rdd = jsc.parallelize(numbers);411JavaDoubleRDD doubleRDD2 = rdd.mapToDouble(x -> x);412413// Statistical operations414double mean = doubleRDD.mean();415double sum = doubleRDD.sum();416double variance = doubleRDD.variance();417double stdev = doubleRDD.stdev();418419// Get detailed statistics420StatCounter stats = doubleRDD.stats();421System.out.println("Count: " + stats.count());422System.out.println("Mean: " + stats.mean());423System.out.println("Stdev: " + stats.stdev());424System.out.println("Max: " + stats.max());425System.out.println("Min: " + stats.min());426427// Histogram428double[] buckets = {0.0, 2.0, 4.0, 6.0};429long[] histogram = doubleRDD.histogram(buckets);430431// Or with automatic bucketing432Tuple2<double[], long[]> autoHistogram = doubleRDD.histogram(4);433```434435## Function Interfaces436437Java API uses function interfaces for type-safe transformations.438439### Function Interfaces440441```java { .api }442// Single argument function443public interface Function<T, R> extends Serializable {444R call(T t) throws Exception;445}446447// Two argument function448public interface Function2<T1, T2, R> extends Serializable {449R call(T1 t1, T2 t2) throws Exception;450}451452// Void function (for actions)453public interface VoidFunction<T> extends Serializable {454void call(T t) throws Exception;455}456457// FlatMap function458public interface FlatMapFunction<T, R> extends Serializable {459Iterable<R> call(T t) throws Exception;460}461462// Pair function (for creating key-value pairs)463public interface PairFunction<T, K, V> extends Serializable {464Tuple2<K, V> call(T t) throws Exception;465}466467// PairFlatMap function468public interface PairFlatMapFunction<T, K, V> extends Serializable {469Iterable<Tuple2<K, V>> call(T t) throws Exception;470}471```472473### Function Usage Examples474475```java476import org.apache.spark.api.java.function.*;477478// Anonymous inner class479JavaRDD<Integer> doubled = rdd.map(new Function<Integer, Integer>() {480public Integer call(Integer x) {481return x * 2;482}483});484485// Lambda expression (Java 8+)486JavaRDD<Integer> doubled2 = rdd.map(x -> x * 2);487488// Method reference (Java 8+)489JavaRDD<String> strings = rdd.map(Object::toString);490491// Complex transformation with PairFunction492JavaPairRDD<String, Integer> pairs = words.mapToPair(493new PairFunction<String, String, Integer>() {494public Tuple2<String, Integer> call(String word) {495return new Tuple2<>(word.toLowerCase(), word.length());496}497}498);499500// FlatMap example501JavaRDD<String> words = lines.flatMap(502new FlatMapFunction<String, String>() {503public Iterable<String> call(String line) {504return Arrays.asList(line.split("\\s+"));505}506}507);508509// Void function for actions510rdd.foreach(new VoidFunction<Integer>() {511public void call(Integer x) {512System.out.println(x);513}514});515```516517## Shared Variables in Java518519### Broadcast Variables520521```java522import org.apache.spark.broadcast.Broadcast;523import java.util.Map;524import java.util.HashMap;525526// Create broadcast variable527Map<String, Integer> lookupTable = new HashMap<>();528lookupTable.put("apple", 1);529lookupTable.put("banana", 2);530lookupTable.put("orange", 3);531532Broadcast<Map<String, Integer>> broadcastTable = jsc.broadcast(lookupTable);533534// Use in transformations535JavaRDD<String> fruits = jsc.parallelize(Arrays.asList("apple", "banana", "apple"));536JavaRDD<Integer> codes = fruits.map(fruit ->537broadcastTable.value().getOrDefault(fruit, 0)538);539540// Clean up541broadcastTable.unpersist();542```543544### Accumulators545546```java547import org.apache.spark.Accumulator;548549// Create accumulator550Accumulator<Integer> errorCount = jsc.accumulator(0);551552// Use in transformations553JavaRDD<String> lines = jsc.textFile("input.txt");554JavaRDD<String> validLines = lines.filter(line -> {555if (line.trim().isEmpty()) {556errorCount.add(1);557return false;558}559return true;560});561562// Trigger action to update accumulator563validLines.count();564565// Get accumulator value566System.out.println("Error count: " + errorCount.value());567568// Custom accumulator types569Accumulator<Double> doubleAcc = jsc.accumulator(0.0);570```571572## Complete Example573574```java575import org.apache.spark.SparkConf;576import org.apache.spark.api.java.JavaSparkContext;577import org.apache.spark.api.java.JavaRDD;578import org.apache.spark.api.java.JavaPairRDD;579import org.apache.spark.api.java.function.*;580import scala.Tuple2;581import java.util.Arrays;582import java.util.Map;583584public class SparkWordCount {585public static void main(String[] args) {586// Create Spark context587SparkConf conf = new SparkConf()588.setAppName("Java Word Count")589.setMaster("local[*]");590591JavaSparkContext jsc = new JavaSparkContext(conf);592593try {594// Read input file595JavaRDD<String> lines = jsc.textFile("input.txt");596597// Split lines into words598JavaRDD<String> words = lines.flatMap(line ->599Arrays.asList(line.toLowerCase().split("\\s+"))600);601602// Filter out empty words603JavaRDD<String> validWords = words.filter(word -> !word.trim().isEmpty());604605// Create word-count pairs606JavaPairRDD<String, Integer> wordPairs = validWords.mapToPair(607word -> new Tuple2<>(word, 1)608);609610// Sum counts by key611JavaPairRDD<String, Integer> wordCounts = wordPairs.reduceByKey(612(a, b) -> a + b613);614615// Sort by count descending616JavaPairRDD<String, Integer> sortedCounts = wordCounts.mapToPair(617pair -> new Tuple2<>(pair._2, pair._1) // Swap to (count, word)618).sortByKey(false).mapToPair(619pair -> new Tuple2<>(pair._2, pair._1) // Swap back to (word, count)620);621622// Collect and print results623Map<String, Integer> results = sortedCounts.collectAsMap();624625System.out.println("Word Count Results:");626results.entrySet().stream()627.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())628.limit(10)629.forEach(entry ->630System.out.println(entry.getKey() + ": " + entry.getValue())631);632633// Save results634sortedCounts.saveAsTextFile("output");635636} finally {637// Stop Spark context638jsc.stop();639}640}641}642```643644## Maven Dependencies645646```xml647<dependencies>648<dependency>649<groupId>org.apache.spark</groupId>650<artifactId>spark-core_2.10</artifactId>651<version>1.0.0</version>652</dependency>653</dependencies>654```655656This comprehensive guide covers the complete Java API for Apache Spark, enabling Java developers to build scalable data processing applications with type safety and familiar Java patterns.