Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
Spark's Java API provides Java-friendly wrappers around the core Scala RDD API. It uses Java collections and functional interfaces to integrate seamlessly with Java applications while maintaining type safety and performance.
The main entry point for Java applications using Spark.
public class JavaSparkContext {
// Constructors
public JavaSparkContext();
public JavaSparkContext(String master, String appName);
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars);
public JavaSparkContext(SparkConf conf);
public JavaSparkContext(SparkContext sc);
// RDD Creation
public <T> JavaRDD<T> parallelize(List<T> list);
public <T> JavaRDD<T> parallelize(List<T> list, int numSlices);
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list);
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices);
public JavaRDD<Long> range(long start, long end);
public JavaRDD<Long> range(long start, long end, long step);
public JavaRDD<Long> range(long start, long end, long step, int numSlices);
// File Input
public JavaRDD<String> textFile(String path);
public JavaRDD<String> textFile(String path, int minPartitions);
public JavaPairRDD<String, String> wholeTextFiles(String path);
public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions);
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path);
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions);
// Hadoop Integration
public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(
JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass);
public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(
JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass, int minPartitions);
public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopRDD(
Configuration conf, Class<F> fClass, Class<K> kClass, Class<V> vClass);
// Broadcast and Accumulators
public <T> Broadcast<T> broadcast(T value);
public LongAccumulator longAccumulator();
public LongAccumulator longAccumulator(String name);
public DoubleAccumulator doubleAccumulator();
public DoubleAccumulator doubleAccumulator(String name);
public <T> CollectionAccumulator<T> collectionAccumulator();
public <T> CollectionAccumulator<T> collectionAccumulator(String name);
// Application Control
public void stop();
public void addFile(String path);
public void addJar(String path);
public void setLogLevel(String logLevel);
public void setJobGroup(String groupId, String description);
public void setJobGroup(String groupId, String description, boolean interruptOnCancel);
public void clearJobGroup();
public void setLocalProperty(String key, String value);
public String getLocalProperty(String key);
// Properties
public SparkConf getConf();
public String master();
public String appName();
public List<String> jars();
public long startTime();
public String version();
public int defaultParallelism();
public SparkStatusTracker statusTracker();
}Java wrapper for RDD providing type-safe operations.
public class JavaRDD<T> {
// Transformations
public <R> JavaRDD<R> map(Function<T, R> f);
public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f);
public JavaRDD<T> filter(Function<T, Boolean> f);
public JavaRDD<T> distinct();
public JavaRDD<T> distinct(int numPartitions);
public JavaRDD<T> union(JavaRDD<T> other);
public JavaRDD<T> intersection(JavaRDD<T> other);
public JavaRDD<T> subtract(JavaRDD<T> other);
public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other);
public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other);
public JavaPairRDD<T, Long> zipWithIndex();
public JavaPairRDD<T, Long> zipWithUniqueId();
public JavaRDD<T> sample(boolean withReplacement, double fraction);
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed);
public JavaRDD<T>[] randomSplit(double[] weights);
public JavaRDD<T>[] randomSplit(double[] weights, long seed);
public JavaRDD<T> repartition(int numPartitions);
public JavaRDD<T> coalesce(int numPartitions);
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle);
public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions);
public JavaRDD<T[]> glom();
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f, boolean preservesPartitioning);
public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning);
public JavaRDD<String> pipe(String command);
public JavaRDD<String> pipe(List<String> command);
// Actions
public List<T> collect();
public Iterator<T> toLocalIterator();
public long count();
public PartialResult<BoundedDouble> countApprox(long timeout);
public PartialResult<BoundedDouble> countApprox(long timeout, double confidence);
public long countApproxDistinct();
public long countApproxDistinct(double relativeSD);
public Map<T, Long> countByValue();
public T first();
public boolean isEmpty();
public List<T> take(int num);
public List<T> takeOrdered(int num);
public List<T> takeOrdered(int num, Comparator<T> comp);
public List<T> takeSample(boolean withReplacement, int num);
public List<T> takeSample(boolean withReplacement, int num, long seed);
public List<T> top(int num);
public List<T> top(int num, Comparator<T> comp);
public T reduce(Function2<T, T, T> f);
public T fold(T zeroValue, Function2<T, T, T> f);
public <U> U aggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);
public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);
public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp, int depth);
public void foreach(VoidFunction<T> f);
public void foreachPartition(VoidFunction<Iterator<T>> f);
// Persistence
public JavaRDD<T> persist(StorageLevel newLevel);
public JavaRDD<T> cache();
public JavaRDD<T> unpersist();
public JavaRDD<T> unpersist(boolean blocking);
public void checkpoint();
// Conversion
public <K, V> JavaPairRDD<K, V> mapToPair(PairFunction<T, K, V> f);
public JavaDoubleRDD mapToDouble(DoubleFunction<T> f);
public JavaDoubleRDD flatMapToDouble(DoubleFlatMapFunction<T> f);
public <K, V> JavaPairRDD<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> f);
// Information
public int getNumPartitions();
public StorageLevel getStorageLevel();
public boolean isCheckpointed();
public String name();
public JavaRDD<T> setName(String name);
public String toDebugString();
public SparkContext context();
}Java wrapper for key-value RDDs.
public class JavaPairRDD<K, V> {
// Basic Operations
public JavaRDD<K> keys();
public JavaRDD<V> values();
public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f);
public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f);
public JavaPairRDD<V, K> swap();
// Grouping
public JavaPairRDD<K, Iterable<V>> groupByKey();
public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions);
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner);
// Reduction
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func);
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func);
public Map<K, V> reduceByKeyLocally(Function2<V, V, V> func);
public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func);
public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, Function2<V, V, V> func);
public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func);
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, int numPartitions, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners);
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, int numPartitions);
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner);
// Joins
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other);
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions);
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner);
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other);
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other);
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other);
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
// Cogroup
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other);
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, int numPartitions);
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, Partitioner partitioner);
public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2);
public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, Partitioner partitioner);
public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3);
public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3, Partitioner partitioner);
// Sorting and Partitioning
public JavaPairRDD<K, V> sortByKey();
public JavaPairRDD<K, V> sortByKey(boolean ascending);
public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions);
public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(Partitioner partitioner);
public JavaPairRDD<K, V> partitionBy(Partitioner partitioner);
// Collection Operations
public Map<K, V> collectAsMap();
public Map<K, Long> countByKey();
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout);
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout, double confidence);
public List<V> lookup(K key);
// Subtraction
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other);
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, int numPartitions);
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, Partitioner partitioner);
// Conversion and Utility
public JavaRDD<Tuple2<K, V>> rdd();
public JavaDoubleRDD values(); // When V extends Number
// Standard RDD operations (inherited)
public List<Tuple2<K, V>> collect();
public long count();
public JavaPairRDD<K, V> persist(StorageLevel newLevel);
public JavaPairRDD<K, V> cache();
public void foreach(VoidFunction<Tuple2<K, V>> f);
// ... other inherited operations
}Specialized RDD for double values with statistical operations.
public class JavaDoubleRDD {
// Statistical Operations
public StatCounter stats();
public double mean();
public double sum();
public double variance();
public double sampleVariance();
public double stdev();
public double sampleStdev();
public Tuple2<double[], long[]> histogram(double[] buckets);
public Tuple2<double[], long[]> histogram(int bucketCount);
// Standard RDD Operations
public JavaDoubleRDD map(DoubleFunction<Double> f);
public JavaDoubleRDD filter(Function<Double, Boolean> f);
public JavaDoubleRDD union(JavaDoubleRDD other);
public JavaDoubleRDD distinct();
public JavaDoubleRDD sample(boolean withReplacement, double fraction);
public JavaDoubleRDD cache();
public JavaDoubleRDD persist(StorageLevel newLevel);
// Collection Operations
public List<Double> collect();
public double[] collectArray();
public long count();
public double first();
public List<Double> take(int num);
public void foreach(VoidFunction<Double> f);
}All function interfaces extend Serializable and are marked with @FunctionalInterface.
@FunctionalInterface
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 v1, T2 v2) throws Exception;
}
@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
@FunctionalInterface
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}
@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
void call(T1 v1, T2 v2) throws Exception;
}
@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
double call(T t) throws Exception;
}
@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
Iterator<Double> call(T t) throws Exception;
}
@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
boolean call(T t) throws Exception;
}
@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
void call(T t) throws Exception;
}
@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
void call(Iterator<T> t) throws Exception;
}
@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
T call(T v1, T v2) throws Exception;
}
@FunctionalInterface
public interface MapFunction<T, R> extends Serializable {
R call(T value) throws Exception;
}
@FunctionalInterface
public interface MapPartitionsFunction<T, R> extends Serializable {
Iterator<R> call(Iterator<T> input) throws Exception;
}
@FunctionalInterface
public interface CoGroupFunction<K, V, W, R> extends Serializable {
Iterator<R> call(Tuple2<K, Tuple2<Iterable<V>, Iterable<W>>> t) throws Exception;
}
@FunctionalInterface
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
@FunctionalInterface
public interface MapGroupsFunction<K, V, R> extends Serializable {
R call(K key, Iterator<V> values) throws Exception;
}
@FunctionalInterface
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values) throws Exception;
}import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.*;
public class SparkJavaExample {
public static void main(String[] args) {
// Configure Spark
SparkConf conf = new SparkConf()
.setAppName("Java Spark Example")
.setMaster("local[*]");
// Create Spark context
JavaSparkContext jsc = new JavaSparkContext(conf);
try {
// Your Spark code here
processData(jsc);
} finally {
// Always stop the context
jsc.stop();
}
}
private static void processData(JavaSparkContext jsc) {
// Create RDD from collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbersRDD = jsc.parallelize(numbers);
// Transform and collect
List<Integer> evenNumbers = numbersRDD
.filter(n -> n % 2 == 0)
.collect();
System.out.println("Even numbers: " + evenNumbers);
}
}public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Word Count").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
try {
// Read text file
JavaRDD<String> lines = jsc.textFile("input.txt");
// Split lines into words and count
JavaPairRDD<String, Integer> wordCounts = lines
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.filter(word -> !word.isEmpty())
.mapToPair(word -> new Tuple2<>(word.toLowerCase(), 1))
.reduceByKey((a, b) -> a + b);
// Sort by count descending
JavaPairRDD<String, Integer> sortedCounts = wordCounts
.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)) // Swap key-value
.sortByKey(false) // Sort descending
.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); // Swap back
// Collect and print results
List<Tuple2<String, Integer>> results = sortedCounts.take(10);
for (Tuple2<String, Integer> result : results) {
System.out.println(result._1 + ": " + result._2);
}
} finally {
jsc.stop();
}
}
}public class SalesAnalysis {
public static class SaleRecord implements Serializable {
public final String product;
public final String region;
public final double amount;
public final int quantity;
public SaleRecord(String product, String region, double amount, int quantity) {
this.product = product;
this.region = region;
this.amount = amount;
this.quantity = quantity;
}
}
public static class SalesStats implements Serializable {
public final int count;
public final double totalAmount;
public final int totalQuantity;
public SalesStats(int count, double totalAmount, int totalQuantity) {
this.count = count;
this.totalAmount = totalAmount;
this.totalQuantity = totalQuantity;
}
public SalesStats combine(SalesStats other) {
return new SalesStats(
this.count + other.count,
this.totalAmount + other.totalAmount,
this.totalQuantity + other.totalQuantity
);
}
public double averageAmount() {
return totalAmount / count;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Sales Analysis").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
try {
// Create sample sales data
List<SaleRecord> salesData = Arrays.asList(
new SaleRecord("Laptop", "North", 999.99, 1),
new SaleRecord("Mouse", "North", 29.99, 3),
new SaleRecord("Laptop", "South", 899.99, 2),
new SaleRecord("Keyboard", "North", 79.99, 1),
new SaleRecord("Mouse", "South", 24.99, 5)
);
JavaRDD<SaleRecord> salesRDD = jsc.parallelize(salesData);
// Aggregate sales by product
JavaPairRDD<String, SalesStats> productStats = salesRDD
.mapToPair(sale -> new Tuple2<>(sale.product,
new SalesStats(1, sale.amount * sale.quantity, sale.quantity)))
.reduceByKey((stats1, stats2) -> stats1.combine(stats2));
// Collect and display results
Map<String, SalesStats> results = productStats.collectAsMap();
results.forEach((product, stats) -> {
System.out.printf("%s: Total Sales=%.2f, Average=%.2f, Units=%d%n",
product, stats.totalAmount, stats.averageAmount(), stats.totalQuantity);
});
} finally {
jsc.stop();
}
}
}public class CustomerOrderAnalysis {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Customer Order Analysis").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
try {
// Customer data
List<Tuple2<Integer, String>> customerData = Arrays.asList(
new Tuple2<>(1, "Alice Johnson"),
new Tuple2<>(2, "Bob Smith"),
new Tuple2<>(3, "Charlie Brown"),
new Tuple2<>(4, "Diana Prince")
);
JavaPairRDD<Integer, String> customers = jsc.parallelizePairs(customerData);
// Order data
List<Tuple2<Integer, Double>> orderData = Arrays.asList(
new Tuple2<>(1, 100.50),
new Tuple2<>(1, 75.25),
new Tuple2<>(2, 200.00),
new Tuple2<>(3, 50.75),
new Tuple2<>(1, 125.00),
new Tuple2<>(5, 300.00) // Customer 5 doesn't exist
);
JavaPairRDD<Integer, Double> orders = jsc.parallelizePairs(orderData);
// Inner join - customers with orders
JavaPairRDD<Integer, Tuple2<String, Double>> customerOrders = customers.join(orders);
System.out.println("Customer Orders (Inner Join):");
customerOrders.collect().forEach(entry -> {
int customerId = entry._1;
String customerName = entry._2._1;
double orderAmount = entry._2._2;
System.out.printf("Customer %d (%s): $%.2f%n", customerId, customerName, orderAmount);
});
// Left outer join - all customers, orders if they exist
JavaPairRDD<Integer, Tuple2<String, Optional<Double>>> allCustomers =
customers.leftOuterJoin(orders);
// Aggregate total orders per customer
JavaPairRDD<Integer, Tuple2<String, Double>> customerTotals = allCustomers
.mapValues(tuple -> {
String name = tuple._1;
double total = tuple._2.isPresent() ? tuple._2.get() : 0.0;
return new Tuple2<>(name, total);
})
.reduceByKey((tuple1, tuple2) -> new Tuple2<>(tuple1._1, tuple1._2 + tuple2._2));
System.out.println("\nCustomer Totals:");
customerTotals.collect().forEach(entry -> {
int customerId = entry._1;
String customerName = entry._2._1;
double totalAmount = entry._2._2;
System.out.printf("Customer %d (%s): Total $%.2f%n", customerId, customerName, totalAmount);
});
} finally {
jsc.stop();
}
}
}public class BroadcastAccumulatorExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Broadcast Accumulator Example").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
try {
// Create lookup table to broadcast
Map<String, String> categoryLookup = new HashMap<>();
categoryLookup.put("TECH", "Technology");
categoryLookup.put("BOOK", "Books");
categoryLookup.put("HOME", "Home & Garden");
org.apache.spark.broadcast.Broadcast<Map<String, String>> broadcastLookup =
jsc.broadcast(categoryLookup);
// Create accumulators for metrics
org.apache.spark.util.LongAccumulator processedCount = jsc.sc().longAccumulator("Processed Items");
org.apache.spark.util.LongAccumulator errorCount = jsc.sc().longAccumulator("Error Count");
// Sample data
List<String> products = Arrays.asList(
"TECH:Laptop:999.99",
"BOOK:Java Programming:49.99",
"HOME:Garden Hose:29.99",
"INVALID:Bad Data",
"TECH:Smartphone:699.99"
);
JavaRDD<String> productsRDD = jsc.parallelize(products);
// Process data using broadcast and accumulators
JavaRDD<String> processedProducts = productsRDD.map(product -> {
try {
String[] parts = product.split(":");
if (parts.length >= 3) {
String categoryCode = parts[0];
String productName = parts[1];
String price = parts[2];
String categoryName = broadcastLookup.value().getOrDefault(categoryCode, "Unknown");
processedCount.add(1);
return String.format("%s - %s: %s", categoryName, productName, price);
} else {
errorCount.add(1);
return "ERROR: Invalid product format - " + product;
}
} catch (Exception e) {
errorCount.add(1);
return "ERROR: Processing failed - " + product;
}
});
// Trigger computation
List<String> results = processedProducts.collect();
// Display results
System.out.println("Processed Products:");
results.forEach(System.out::println);
// Display metrics
System.out.println("\nMetrics:");
System.out.println("Processed items: " + processedCount.value());
System.out.println("Error count: " + errorCount.value());
} finally {
jsc.stop();
}
}
}// Prefer lambda expressions (Java 8+)
JavaRDD<String> upperCase = textRDD.map(s -> s.toUpperCase());
// Instead of anonymous classes
JavaRDD<String> upperCaseOld = textRDD.map(new Function<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
});// Ensure all objects used in transformations are Serializable
public class ProcessingUtils implements Serializable {
private final String prefix;
public ProcessingUtils(String prefix) {
this.prefix = prefix;
}
public String process(String input) {
return prefix + ": " + input.toUpperCase();
}
}
// Use in transformations
ProcessingUtils utils = new ProcessingUtils("PROCESSED");
JavaRDD<String> processed = inputRDD.map(utils::process);// Persist expensive computations
JavaRDD<ComplexObject> expensiveRDD = inputRDD
.map(this::expensiveTransformation)
.filter(obj -> obj.isValid())
.persist(StorageLevel.MEMORY_AND_DISK_SER());
// Use the persisted RDD multiple times
long count = expensiveRDD.count();
List<ComplexObject> sample = expensiveRDD.take(10);
// Clean up when done
expensiveRDD.unpersist();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11