CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

java-api.mddocs/

Java API

Spark's Java API provides Java-friendly wrappers around the core Scala RDD API. It uses Java collections and functional interfaces to integrate seamlessly with Java applications while maintaining type safety and performance.

Core Java Classes

JavaSparkContext

The main entry point for Java applications using Spark.

public class JavaSparkContext {
    // Constructors
    public JavaSparkContext();
    public JavaSparkContext(String master, String appName);
    public JavaSparkContext(String master, String appName, String sparkHome, String[] jars);
    public JavaSparkContext(SparkConf conf);
    public JavaSparkContext(SparkContext sc);
    
    // RDD Creation
    public <T> JavaRDD<T> parallelize(List<T> list);
    public <T> JavaRDD<T> parallelize(List<T> list, int numSlices);
    public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list);
    public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices);
    public JavaRDD<Long> range(long start, long end);
    public JavaRDD<Long> range(long start, long end, long step);
    public JavaRDD<Long> range(long start, long end, long step, int numSlices);
    
    // File Input
    public JavaRDD<String> textFile(String path);
    public JavaRDD<String> textFile(String path, int minPartitions);
    public JavaPairRDD<String, String> wholeTextFiles(String path);
    public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions);
    public JavaPairRDD<String, PortableDataStream> binaryFiles(String path);
    public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions);
    
    // Hadoop Integration
    public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(
        JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass);
    public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(
        JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass, int minPartitions);
    public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopRDD(
        Configuration conf, Class<F> fClass, Class<K> kClass, Class<V> vClass);
    
    // Broadcast and Accumulators
    public <T> Broadcast<T> broadcast(T value);
    public LongAccumulator longAccumulator();
    public LongAccumulator longAccumulator(String name);
    public DoubleAccumulator doubleAccumulator();
    public DoubleAccumulator doubleAccumulator(String name);
    public <T> CollectionAccumulator<T> collectionAccumulator();
    public <T> CollectionAccumulator<T> collectionAccumulator(String name);
    
    // Application Control
    public void stop();
    public void addFile(String path);
    public void addJar(String path);
    public void setLogLevel(String logLevel);
    public void setJobGroup(String groupId, String description);
    public void setJobGroup(String groupId, String description, boolean interruptOnCancel);
    public void clearJobGroup();
    public void setLocalProperty(String key, String value);
    public String getLocalProperty(String key);
    
    // Properties
    public SparkConf getConf();
    public String master();
    public String appName();
    public List<String> jars();
    public long startTime();
    public String version();
    public int defaultParallelism();
    public SparkStatusTracker statusTracker();
}

JavaRDD

Java wrapper for RDD providing type-safe operations.

public class JavaRDD<T> {
    // Transformations
    public <R> JavaRDD<R> map(Function<T, R> f);
    public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f);
    public JavaRDD<T> filter(Function<T, Boolean> f);
    public JavaRDD<T> distinct();
    public JavaRDD<T> distinct(int numPartitions);
    public JavaRDD<T> union(JavaRDD<T> other);
    public JavaRDD<T> intersection(JavaRDD<T> other);
    public JavaRDD<T> subtract(JavaRDD<T> other);
    public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other);
    public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other);
    public JavaPairRDD<T, Long> zipWithIndex();
    public JavaPairRDD<T, Long> zipWithUniqueId();
    public JavaRDD<T> sample(boolean withReplacement, double fraction);
    public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed);
    public JavaRDD<T>[] randomSplit(double[] weights);
    public JavaRDD<T>[] randomSplit(double[] weights, long seed);
    public JavaRDD<T> repartition(int numPartitions);
    public JavaRDD<T> coalesce(int numPartitions);
    public JavaRDD<T> coalesce(int numPartitions, boolean shuffle);
    public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions);
    public JavaRDD<T[]> glom();
    public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);
    public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f, boolean preservesPartitioning);
    public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning);
    public JavaRDD<String> pipe(String command);
    public JavaRDD<String> pipe(List<String> command);
    
    // Actions
    public List<T> collect();
    public Iterator<T> toLocalIterator();
    public long count();
    public PartialResult<BoundedDouble> countApprox(long timeout);
    public PartialResult<BoundedDouble> countApprox(long timeout, double confidence);
    public long countApproxDistinct();
    public long countApproxDistinct(double relativeSD);
    public Map<T, Long> countByValue();
    public T first();
    public boolean isEmpty();
    public List<T> take(int num);
    public List<T> takeOrdered(int num);
    public List<T> takeOrdered(int num, Comparator<T> comp);
    public List<T> takeSample(boolean withReplacement, int num);
    public List<T> takeSample(boolean withReplacement, int num, long seed);
    public List<T> top(int num);
    public List<T> top(int num, Comparator<T> comp);
    public T reduce(Function2<T, T, T> f);
    public T fold(T zeroValue, Function2<T, T, T> f);
    public <U> U aggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);
    public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);
    public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp, int depth);
    public void foreach(VoidFunction<T> f);
    public void foreachPartition(VoidFunction<Iterator<T>> f);
    
    // Persistence
    public JavaRDD<T> persist(StorageLevel newLevel);
    public JavaRDD<T> cache();
    public JavaRDD<T> unpersist();
    public JavaRDD<T> unpersist(boolean blocking);
    public void checkpoint();
    
    // Conversion
    public <K, V> JavaPairRDD<K, V> mapToPair(PairFunction<T, K, V> f);
    public JavaDoubleRDD mapToDouble(DoubleFunction<T> f);
    public JavaDoubleRDD flatMapToDouble(DoubleFlatMapFunction<T> f);
    public <K, V> JavaPairRDD<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> f);
    
    // Information
    public int getNumPartitions();
    public StorageLevel getStorageLevel();
    public boolean isCheckpointed();
    public String name();
    public JavaRDD<T> setName(String name);
    public String toDebugString();
    public SparkContext context();
}

JavaPairRDD

Java wrapper for key-value RDDs.

public class JavaPairRDD<K, V> {
    // Basic Operations
    public JavaRDD<K> keys();
    public JavaRDD<V> values();
    public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f);
    public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f);
    public JavaPairRDD<V, K> swap();
    
    // Grouping
    public JavaPairRDD<K, Iterable<V>> groupByKey();
    public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions);
    public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner);
    
    // Reduction
    public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func);
    public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);
    public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func);
    public Map<K, V> reduceByKeyLocally(Function2<V, V, V> func);
    
    public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func);
    public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, Function2<V, V, V> func);
    public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func);
    
    public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
    public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, int numPartitions, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
    public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
    
    public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners);
    public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, int numPartitions);
    public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner);
    
    // Joins
    public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other);
    public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions);
    public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner);
    
    public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other);
    public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
    public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
    
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other);
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
    
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other);
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
    public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
    
    // Cogroup
    public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other);
    public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, int numPartitions);
    public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, Partitioner partitioner);
    public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2);
    public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, Partitioner partitioner);
    public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3);
    public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3, Partitioner partitioner);
    
    // Sorting and Partitioning
    public JavaPairRDD<K, V> sortByKey();
    public JavaPairRDD<K, V> sortByKey(boolean ascending);
    public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions);
    public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(Partitioner partitioner);
    public JavaPairRDD<K, V> partitionBy(Partitioner partitioner);
    
    // Collection Operations
    public Map<K, V> collectAsMap();
    public Map<K, Long> countByKey();
    public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout);
    public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout, double confidence);
    public List<V> lookup(K key);
    
    // Subtraction
    public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other);
    public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, int numPartitions);
    public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, Partitioner partitioner);
    
    // Conversion and Utility
    public JavaRDD<Tuple2<K, V>> rdd();
    public JavaDoubleRDD values();  // When V extends Number
    
    // Standard RDD operations (inherited)
    public List<Tuple2<K, V>> collect();
    public long count();
    public JavaPairRDD<K, V> persist(StorageLevel newLevel);
    public JavaPairRDD<K, V> cache();
    public void foreach(VoidFunction<Tuple2<K, V>> f);
    // ... other inherited operations
}

JavaDoubleRDD

Specialized RDD for double values with statistical operations.

public class JavaDoubleRDD {
    // Statistical Operations
    public StatCounter stats();
    public double mean();
    public double sum();
    public double variance();
    public double sampleVariance();
    public double stdev();
    public double sampleStdev();
    public Tuple2<double[], long[]> histogram(double[] buckets);
    public Tuple2<double[], long[]> histogram(int bucketCount);
    
    // Standard RDD Operations
    public JavaDoubleRDD map(DoubleFunction<Double> f);
    public JavaDoubleRDD filter(Function<Double, Boolean> f);
    public JavaDoubleRDD union(JavaDoubleRDD other);
    public JavaDoubleRDD distinct();
    public JavaDoubleRDD sample(boolean withReplacement, double fraction);
    public JavaDoubleRDD cache();
    public JavaDoubleRDD persist(StorageLevel newLevel);
    
    // Collection Operations
    public List<Double> collect();
    public double[] collectArray();
    public long count();
    public double first();
    public List<Double> take(int num);
    public void foreach(VoidFunction<Double> f);
}

Function Interfaces

All function interfaces extend Serializable and are marked with @FunctionalInterface.

@FunctionalInterface
public interface Function<T1, R> extends Serializable {
    R call(T1 v1) throws Exception;
}

@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
    R call(T1 v1, T2 v2) throws Exception;
}

@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
    R call(T1 v1, T2 v2, T3 v3) throws Exception;
}

@FunctionalInterface
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
    R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}

@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
    void call(T t) throws Exception;
}

@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
    void call(T1 v1, T2 v2) throws Exception;
}

@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
    Tuple2<K, V> call(T t) throws Exception;
}

@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
    Iterator<R> call(T t) throws Exception;
}

@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
    Iterator<Tuple2<K, V>> call(T t) throws Exception;
}

@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
    double call(T t) throws Exception;
}

@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
    Iterator<Double> call(T t) throws Exception;
}

@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
    boolean call(T t) throws Exception;
}

@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
    void call(T t) throws Exception;
}

@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
    void call(Iterator<T> t) throws Exception;
}

@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
    T call(T v1, T v2) throws Exception;
}

@FunctionalInterface
public interface MapFunction<T, R> extends Serializable {
    R call(T value) throws Exception;
}

@FunctionalInterface
public interface MapPartitionsFunction<T, R> extends Serializable {
    Iterator<R> call(Iterator<T> input) throws Exception;
}

@FunctionalInterface
public interface CoGroupFunction<K, V, W, R> extends Serializable {
    Iterator<R> call(Tuple2<K, Tuple2<Iterable<V>, Iterable<W>>> t) throws Exception;
}

@FunctionalInterface
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
    Iterator<R> call(T1 t1, T2 t2) throws Exception;
}

@FunctionalInterface
public interface MapGroupsFunction<K, V, R> extends Serializable {
    R call(K key, Iterator<V> values) throws Exception;
}

@FunctionalInterface
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
    Iterator<R> call(K key, Iterator<V> values) throws Exception;
}

Usage Examples

Basic Application Structure

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.*;

public class SparkJavaExample {
    public static void main(String[] args) {
        // Configure Spark
        SparkConf conf = new SparkConf()
            .setAppName("Java Spark Example")
            .setMaster("local[*]");
        
        // Create Spark context
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        try {
            // Your Spark code here
            processData(jsc);
        } finally {
            // Always stop the context
            jsc.stop();
        }
    }
    
    private static void processData(JavaSparkContext jsc) {
        // Create RDD from collection
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbersRDD = jsc.parallelize(numbers);
        
        // Transform and collect
        List<Integer> evenNumbers = numbersRDD
            .filter(n -> n % 2 == 0)
            .collect();
        
        System.out.println("Even numbers: " + evenNumbers);
    }
}

Word Count Example

public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Word Count").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        try {
            // Read text file
            JavaRDD<String> lines = jsc.textFile("input.txt");
            
            // Split lines into words and count
            JavaPairRDD<String, Integer> wordCounts = lines
                .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .filter(word -> !word.isEmpty())
                .mapToPair(word -> new Tuple2<>(word.toLowerCase(), 1))
                .reduceByKey((a, b) -> a + b);
            
            // Sort by count descending
            JavaPairRDD<String, Integer> sortedCounts = wordCounts
                .mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1))  // Swap key-value
                .sortByKey(false)  // Sort descending
                .mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); // Swap back
            
            // Collect and print results
            List<Tuple2<String, Integer>> results = sortedCounts.take(10);
            for (Tuple2<String, Integer> result : results) {
                System.out.println(result._1 + ": " + result._2);
            }
            
        } finally {
            jsc.stop();
        }
    }
}

Complex Aggregation Example

public class SalesAnalysis {
    public static class SaleRecord implements Serializable {
        public final String product;
        public final String region;
        public final double amount;
        public final int quantity;
        
        public SaleRecord(String product, String region, double amount, int quantity) {
            this.product = product;
            this.region = region;
            this.amount = amount;
            this.quantity = quantity;
        }
    }
    
    public static class SalesStats implements Serializable {
        public final int count;
        public final double totalAmount;
        public final int totalQuantity;
        
        public SalesStats(int count, double totalAmount, int totalQuantity) {
            this.count = count;
            this.totalAmount = totalAmount;
            this.totalQuantity = totalQuantity;
        }
        
        public SalesStats combine(SalesStats other) {
            return new SalesStats(
                this.count + other.count,
                this.totalAmount + other.totalAmount,
                this.totalQuantity + other.totalQuantity
            );
        }
        
        public double averageAmount() {
            return totalAmount / count;
        }
    }
    
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Sales Analysis").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        try {
            // Create sample sales data
            List<SaleRecord> salesData = Arrays.asList(
                new SaleRecord("Laptop", "North", 999.99, 1),
                new SaleRecord("Mouse", "North", 29.99, 3),
                new SaleRecord("Laptop", "South", 899.99, 2),
                new SaleRecord("Keyboard", "North", 79.99, 1),
                new SaleRecord("Mouse", "South", 24.99, 5)
            );
            
            JavaRDD<SaleRecord> salesRDD = jsc.parallelize(salesData);
            
            // Aggregate sales by product
            JavaPairRDD<String, SalesStats> productStats = salesRDD
                .mapToPair(sale -> new Tuple2<>(sale.product, 
                    new SalesStats(1, sale.amount * sale.quantity, sale.quantity)))
                .reduceByKey((stats1, stats2) -> stats1.combine(stats2));
            
            // Collect and display results
            Map<String, SalesStats> results = productStats.collectAsMap();
            results.forEach((product, stats) -> {
                System.out.printf("%s: Total Sales=%.2f, Average=%.2f, Units=%d%n",
                    product, stats.totalAmount, stats.averageAmount(), stats.totalQuantity);
            });
            
        } finally {
            jsc.stop();
        }
    }
}

Join Operations Example

public class CustomerOrderAnalysis {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Customer Order Analysis").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        try {
            // Customer data
            List<Tuple2<Integer, String>> customerData = Arrays.asList(
                new Tuple2<>(1, "Alice Johnson"),
                new Tuple2<>(2, "Bob Smith"),
                new Tuple2<>(3, "Charlie Brown"),
                new Tuple2<>(4, "Diana Prince")
            );
            JavaPairRDD<Integer, String> customers = jsc.parallelizePairs(customerData);
            
            // Order data
            List<Tuple2<Integer, Double>> orderData = Arrays.asList(
                new Tuple2<>(1, 100.50),
                new Tuple2<>(1, 75.25),
                new Tuple2<>(2, 200.00),
                new Tuple2<>(3, 50.75),
                new Tuple2<>(1, 125.00),
                new Tuple2<>(5, 300.00)  // Customer 5 doesn't exist
            );
            JavaPairRDD<Integer, Double> orders = jsc.parallelizePairs(orderData);
            
            // Inner join - customers with orders
            JavaPairRDD<Integer, Tuple2<String, Double>> customerOrders = customers.join(orders);
            System.out.println("Customer Orders (Inner Join):");
            customerOrders.collect().forEach(entry -> {
                int customerId = entry._1;
                String customerName = entry._2._1;
                double orderAmount = entry._2._2;
                System.out.printf("Customer %d (%s): $%.2f%n", customerId, customerName, orderAmount);
            });
            
            // Left outer join - all customers, orders if they exist
            JavaPairRDD<Integer, Tuple2<String, Optional<Double>>> allCustomers = 
                customers.leftOuterJoin(orders);
            
            // Aggregate total orders per customer
            JavaPairRDD<Integer, Tuple2<String, Double>> customerTotals = allCustomers
                .mapValues(tuple -> {
                    String name = tuple._1;
                    double total = tuple._2.isPresent() ? tuple._2.get() : 0.0;
                    return new Tuple2<>(name, total);
                })
                .reduceByKey((tuple1, tuple2) -> new Tuple2<>(tuple1._1, tuple1._2 + tuple2._2));
            
            System.out.println("\nCustomer Totals:");
            customerTotals.collect().forEach(entry -> {
                int customerId = entry._1;
                String customerName = entry._2._1;
                double totalAmount = entry._2._2;
                System.out.printf("Customer %d (%s): Total $%.2f%n", customerId, customerName, totalAmount);
            });
            
        } finally {
            jsc.stop();
        }
    }
}

Using Broadcast Variables and Accumulators

public class BroadcastAccumulatorExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Broadcast Accumulator Example").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
        try {
            // Create lookup table to broadcast
            Map<String, String> categoryLookup = new HashMap<>();
            categoryLookup.put("TECH", "Technology");
            categoryLookup.put("BOOK", "Books");
            categoryLookup.put("HOME", "Home & Garden");
            
            org.apache.spark.broadcast.Broadcast<Map<String, String>> broadcastLookup = 
                jsc.broadcast(categoryLookup);
            
            // Create accumulators for metrics
            org.apache.spark.util.LongAccumulator processedCount = jsc.sc().longAccumulator("Processed Items");
            org.apache.spark.util.LongAccumulator errorCount = jsc.sc().longAccumulator("Error Count");
            
            // Sample data
            List<String> products = Arrays.asList(
                "TECH:Laptop:999.99",
                "BOOK:Java Programming:49.99",
                "HOME:Garden Hose:29.99",
                "INVALID:Bad Data",
                "TECH:Smartphone:699.99"
            );
            
            JavaRDD<String> productsRDD = jsc.parallelize(products);
            
            // Process data using broadcast and accumulators
            JavaRDD<String> processedProducts = productsRDD.map(product -> {
                try {
                    String[] parts = product.split(":");
                    if (parts.length >= 3) {
                        String categoryCode = parts[0];
                        String productName = parts[1];
                        String price = parts[2];
                        
                        String categoryName = broadcastLookup.value().getOrDefault(categoryCode, "Unknown");
                        processedCount.add(1);
                        
                        return String.format("%s - %s: %s", categoryName, productName, price);
                    } else {
                        errorCount.add(1);
                        return "ERROR: Invalid product format - " + product;
                    }
                } catch (Exception e) {
                    errorCount.add(1);
                    return "ERROR: Processing failed - " + product;
                }
            });
            
            // Trigger computation
            List<String> results = processedProducts.collect();
            
            // Display results
            System.out.println("Processed Products:");
            results.forEach(System.out::println);
            
            // Display metrics
            System.out.println("\nMetrics:");
            System.out.println("Processed items: " + processedCount.value());
            System.out.println("Error count: " + errorCount.value());
            
        } finally {
            jsc.stop();
        }
    }
}

Best Practices for Java API

Lambda Expressions vs Anonymous Classes

// Prefer lambda expressions (Java 8+)
JavaRDD<String> upperCase = textRDD.map(s -> s.toUpperCase());

// Instead of anonymous classes
JavaRDD<String> upperCaseOld = textRDD.map(new Function<String, String>() {
    @Override
    public String call(String s) {
        return s.toUpperCase();
    }
});

Serialization Considerations

// Ensure all objects used in transformations are Serializable
public class ProcessingUtils implements Serializable {
    private final String prefix;
    
    public ProcessingUtils(String prefix) {
        this.prefix = prefix;
    }
    
    public String process(String input) {
        return prefix + ": " + input.toUpperCase();
    }
}

// Use in transformations
ProcessingUtils utils = new ProcessingUtils("PROCESSED");
JavaRDD<String> processed = inputRDD.map(utils::process);

Memory Management

// Persist expensive computations
JavaRDD<ComplexObject> expensiveRDD = inputRDD
    .map(this::expensiveTransformation)
    .filter(obj -> obj.isValid())
    .persist(StorageLevel.MEMORY_AND_DISK_SER());

// Use the persisted RDD multiple times
long count = expensiveRDD.count();
List<ComplexObject> sample = expensiveRDD.take(10);

// Clean up when done
expensiveRDD.unpersist();

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json