0
# Java API
1
2
Spark's Java API provides Java-friendly wrappers around the core Scala RDD API. It uses Java collections and functional interfaces to integrate seamlessly with Java applications while maintaining type safety and performance.
3
4
## Core Java Classes
5
6
### JavaSparkContext
7
8
The main entry point for Java applications using Spark.
9
10
```java { .api }
11
public class JavaSparkContext {
12
// Constructors
13
public JavaSparkContext();
14
public JavaSparkContext(String master, String appName);
15
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars);
16
public JavaSparkContext(SparkConf conf);
17
public JavaSparkContext(SparkContext sc);
18
19
// RDD Creation
20
public <T> JavaRDD<T> parallelize(List<T> list);
21
public <T> JavaRDD<T> parallelize(List<T> list, int numSlices);
22
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list);
23
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices);
24
public JavaRDD<Long> range(long start, long end);
25
public JavaRDD<Long> range(long start, long end, long step);
26
public JavaRDD<Long> range(long start, long end, long step, int numSlices);
27
28
// File Input
29
public JavaRDD<String> textFile(String path);
30
public JavaRDD<String> textFile(String path, int minPartitions);
31
public JavaPairRDD<String, String> wholeTextFiles(String path);
32
public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions);
33
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path);
34
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions);
35
36
// Hadoop Integration
37
public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(
38
JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass);
39
public <K, V, F extends InputFormat<K, V>> JavaPairRDD<K, V> hadoopRDD(
40
JobConf conf, Class<F> inputFormatClass, Class<K> keyClass, Class<V> valueClass, int minPartitions);
41
public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopRDD(
42
Configuration conf, Class<F> fClass, Class<K> kClass, Class<V> vClass);
43
44
// Broadcast and Accumulators
45
public <T> Broadcast<T> broadcast(T value);
46
public LongAccumulator longAccumulator();
47
public LongAccumulator longAccumulator(String name);
48
public DoubleAccumulator doubleAccumulator();
49
public DoubleAccumulator doubleAccumulator(String name);
50
public <T> CollectionAccumulator<T> collectionAccumulator();
51
public <T> CollectionAccumulator<T> collectionAccumulator(String name);
52
53
// Application Control
54
public void stop();
55
public void addFile(String path);
56
public void addJar(String path);
57
public void setLogLevel(String logLevel);
58
public void setJobGroup(String groupId, String description);
59
public void setJobGroup(String groupId, String description, boolean interruptOnCancel);
60
public void clearJobGroup();
61
public void setLocalProperty(String key, String value);
62
public String getLocalProperty(String key);
63
64
// Properties
65
public SparkConf getConf();
66
public String master();
67
public String appName();
68
public List<String> jars();
69
public long startTime();
70
public String version();
71
public int defaultParallelism();
72
public SparkStatusTracker statusTracker();
73
}
74
```
75
76
### JavaRDD
77
78
Java wrapper for RDD providing type-safe operations.
79
80
```java { .api }
81
public class JavaRDD<T> {
82
// Transformations
83
public <R> JavaRDD<R> map(Function<T, R> f);
84
public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f);
85
public JavaRDD<T> filter(Function<T, Boolean> f);
86
public JavaRDD<T> distinct();
87
public JavaRDD<T> distinct(int numPartitions);
88
public JavaRDD<T> union(JavaRDD<T> other);
89
public JavaRDD<T> intersection(JavaRDD<T> other);
90
public JavaRDD<T> subtract(JavaRDD<T> other);
91
public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other);
92
public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other);
93
public JavaPairRDD<T, Long> zipWithIndex();
94
public JavaPairRDD<T, Long> zipWithUniqueId();
95
public JavaRDD<T> sample(boolean withReplacement, double fraction);
96
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed);
97
public JavaRDD<T>[] randomSplit(double[] weights);
98
public JavaRDD<T>[] randomSplit(double[] weights, long seed);
99
public JavaRDD<T> repartition(int numPartitions);
100
public JavaRDD<T> coalesce(int numPartitions);
101
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle);
102
public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions);
103
public JavaRDD<T[]> glom();
104
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f);
105
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f, boolean preservesPartitioning);
106
public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning);
107
public JavaRDD<String> pipe(String command);
108
public JavaRDD<String> pipe(List<String> command);
109
110
// Actions
111
public List<T> collect();
112
public Iterator<T> toLocalIterator();
113
public long count();
114
public PartialResult<BoundedDouble> countApprox(long timeout);
115
public PartialResult<BoundedDouble> countApprox(long timeout, double confidence);
116
public long countApproxDistinct();
117
public long countApproxDistinct(double relativeSD);
118
public Map<T, Long> countByValue();
119
public T first();
120
public boolean isEmpty();
121
public List<T> take(int num);
122
public List<T> takeOrdered(int num);
123
public List<T> takeOrdered(int num, Comparator<T> comp);
124
public List<T> takeSample(boolean withReplacement, int num);
125
public List<T> takeSample(boolean withReplacement, int num, long seed);
126
public List<T> top(int num);
127
public List<T> top(int num, Comparator<T> comp);
128
public T reduce(Function2<T, T, T> f);
129
public T fold(T zeroValue, Function2<T, T, T> f);
130
public <U> U aggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);
131
public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp);
132
public <U> U treeAggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp, int depth);
133
public void foreach(VoidFunction<T> f);
134
public void foreachPartition(VoidFunction<Iterator<T>> f);
135
136
// Persistence
137
public JavaRDD<T> persist(StorageLevel newLevel);
138
public JavaRDD<T> cache();
139
public JavaRDD<T> unpersist();
140
public JavaRDD<T> unpersist(boolean blocking);
141
public void checkpoint();
142
143
// Conversion
144
public <K, V> JavaPairRDD<K, V> mapToPair(PairFunction<T, K, V> f);
145
public JavaDoubleRDD mapToDouble(DoubleFunction<T> f);
146
public JavaDoubleRDD flatMapToDouble(DoubleFlatMapFunction<T> f);
147
public <K, V> JavaPairRDD<K, V> flatMapToPair(PairFlatMapFunction<T, K, V> f);
148
149
// Information
150
public int getNumPartitions();
151
public StorageLevel getStorageLevel();
152
public boolean isCheckpointed();
153
public String name();
154
public JavaRDD<T> setName(String name);
155
public String toDebugString();
156
public SparkContext context();
157
}
158
```
159
160
### JavaPairRDD
161
162
Java wrapper for key-value RDDs.
163
164
```java { .api }
165
public class JavaPairRDD<K, V> {
166
// Basic Operations
167
public JavaRDD<K> keys();
168
public JavaRDD<V> values();
169
public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f);
170
public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f);
171
public JavaPairRDD<V, K> swap();
172
173
// Grouping
174
public JavaPairRDD<K, Iterable<V>> groupByKey();
175
public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions);
176
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner);
177
178
// Reduction
179
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func);
180
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions);
181
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func);
182
public Map<K, V> reduceByKeyLocally(Function2<V, V, V> func);
183
184
public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func);
185
public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, Function2<V, V, V> func);
186
public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func);
187
188
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
189
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, int numPartitions, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
190
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc);
191
192
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners);
193
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, int numPartitions);
194
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner);
195
196
// Joins
197
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other);
198
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions);
199
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner);
200
201
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other);
202
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
203
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
204
205
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other);
206
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
207
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
208
209
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other);
210
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, int numPartitions);
211
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner);
212
213
// Cogroup
214
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other);
215
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, int numPartitions);
216
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, Partitioner partitioner);
217
public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2);
218
public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, Partitioner partitioner);
219
public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3);
220
public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(JavaPairRDD<K, W1> other1, JavaPairRDD<K, W2> other2, JavaPairRDD<K, W3> other3, Partitioner partitioner);
221
222
// Sorting and Partitioning
223
public JavaPairRDD<K, V> sortByKey();
224
public JavaPairRDD<K, V> sortByKey(boolean ascending);
225
public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions);
226
public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(Partitioner partitioner);
227
public JavaPairRDD<K, V> partitionBy(Partitioner partitioner);
228
229
// Collection Operations
230
public Map<K, V> collectAsMap();
231
public Map<K, Long> countByKey();
232
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout);
233
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout, double confidence);
234
public List<V> lookup(K key);
235
236
// Subtraction
237
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other);
238
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, int numPartitions);
239
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, Partitioner partitioner);
240
241
// Conversion and Utility
242
public JavaRDD<Tuple2<K, V>> rdd();
243
public JavaDoubleRDD values(); // When V extends Number
244
245
// Standard RDD operations (inherited)
246
public List<Tuple2<K, V>> collect();
247
public long count();
248
public JavaPairRDD<K, V> persist(StorageLevel newLevel);
249
public JavaPairRDD<K, V> cache();
250
public void foreach(VoidFunction<Tuple2<K, V>> f);
251
// ... other inherited operations
252
}
253
```
254
255
### JavaDoubleRDD
256
257
Specialized RDD for double values with statistical operations.
258
259
```java { .api }
260
public class JavaDoubleRDD {
261
// Statistical Operations
262
public StatCounter stats();
263
public double mean();
264
public double sum();
265
public double variance();
266
public double sampleVariance();
267
public double stdev();
268
public double sampleStdev();
269
public Tuple2<double[], long[]> histogram(double[] buckets);
270
public Tuple2<double[], long[]> histogram(int bucketCount);
271
272
// Standard RDD Operations
273
public JavaDoubleRDD map(DoubleFunction<Double> f);
274
public JavaDoubleRDD filter(Function<Double, Boolean> f);
275
public JavaDoubleRDD union(JavaDoubleRDD other);
276
public JavaDoubleRDD distinct();
277
public JavaDoubleRDD sample(boolean withReplacement, double fraction);
278
public JavaDoubleRDD cache();
279
public JavaDoubleRDD persist(StorageLevel newLevel);
280
281
// Collection Operations
282
public List<Double> collect();
283
public double[] collectArray();
284
public long count();
285
public double first();
286
public List<Double> take(int num);
287
public void foreach(VoidFunction<Double> f);
288
}
289
```
290
291
## Function Interfaces
292
293
All function interfaces extend `Serializable` and are marked with `@FunctionalInterface`.
294
295
```java { .api }
296
@FunctionalInterface
297
public interface Function<T1, R> extends Serializable {
298
R call(T1 v1) throws Exception;
299
}
300
301
@FunctionalInterface
302
public interface Function2<T1, T2, R> extends Serializable {
303
R call(T1 v1, T2 v2) throws Exception;
304
}
305
306
@FunctionalInterface
307
public interface Function3<T1, T2, T3, R> extends Serializable {
308
R call(T1 v1, T2 v2, T3 v3) throws Exception;
309
}
310
311
@FunctionalInterface
312
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
313
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
314
}
315
316
@FunctionalInterface
317
public interface VoidFunction<T> extends Serializable {
318
void call(T t) throws Exception;
319
}
320
321
@FunctionalInterface
322
public interface VoidFunction2<T1, T2> extends Serializable {
323
void call(T1 v1, T2 v2) throws Exception;
324
}
325
326
@FunctionalInterface
327
public interface PairFunction<T, K, V> extends Serializable {
328
Tuple2<K, V> call(T t) throws Exception;
329
}
330
331
@FunctionalInterface
332
public interface FlatMapFunction<T, R> extends Serializable {
333
Iterator<R> call(T t) throws Exception;
334
}
335
336
@FunctionalInterface
337
public interface PairFlatMapFunction<T, K, V> extends Serializable {
338
Iterator<Tuple2<K, V>> call(T t) throws Exception;
339
}
340
341
@FunctionalInterface
342
public interface DoubleFunction<T> extends Serializable {
343
double call(T t) throws Exception;
344
}
345
346
@FunctionalInterface
347
public interface DoubleFlatMapFunction<T> extends Serializable {
348
Iterator<Double> call(T t) throws Exception;
349
}
350
351
@FunctionalInterface
352
public interface FilterFunction<T> extends Serializable {
353
boolean call(T t) throws Exception;
354
}
355
356
@FunctionalInterface
357
public interface ForeachFunction<T> extends Serializable {
358
void call(T t) throws Exception;
359
}
360
361
@FunctionalInterface
362
public interface ForeachPartitionFunction<T> extends Serializable {
363
void call(Iterator<T> t) throws Exception;
364
}
365
366
@FunctionalInterface
367
public interface ReduceFunction<T> extends Serializable {
368
T call(T v1, T v2) throws Exception;
369
}
370
371
@FunctionalInterface
372
public interface MapFunction<T, R> extends Serializable {
373
R call(T value) throws Exception;
374
}
375
376
@FunctionalInterface
377
public interface MapPartitionsFunction<T, R> extends Serializable {
378
Iterator<R> call(Iterator<T> input) throws Exception;
379
}
380
381
@FunctionalInterface
382
public interface CoGroupFunction<K, V, W, R> extends Serializable {
383
Iterator<R> call(Tuple2<K, Tuple2<Iterable<V>, Iterable<W>>> t) throws Exception;
384
}
385
386
@FunctionalInterface
387
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
388
Iterator<R> call(T1 t1, T2 t2) throws Exception;
389
}
390
391
@FunctionalInterface
392
public interface MapGroupsFunction<K, V, R> extends Serializable {
393
R call(K key, Iterator<V> values) throws Exception;
394
}
395
396
@FunctionalInterface
397
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
398
Iterator<R> call(K key, Iterator<V> values) throws Exception;
399
}
400
```
401
402
## Usage Examples
403
404
### Basic Application Structure
405
406
```java
407
import org.apache.spark.api.java.JavaSparkContext;
408
import org.apache.spark.api.java.JavaRDD;
409
import org.apache.spark.api.java.JavaPairRDD;
410
import org.apache.spark.SparkConf;
411
import org.apache.spark.api.java.function.*;
412
import scala.Tuple2;
413
import java.util.*;
414
415
public class SparkJavaExample {
416
public static void main(String[] args) {
417
// Configure Spark
418
SparkConf conf = new SparkConf()
419
.setAppName("Java Spark Example")
420
.setMaster("local[*]");
421
422
// Create Spark context
423
JavaSparkContext jsc = new JavaSparkContext(conf);
424
425
try {
426
// Your Spark code here
427
processData(jsc);
428
} finally {
429
// Always stop the context
430
jsc.stop();
431
}
432
}
433
434
private static void processData(JavaSparkContext jsc) {
435
// Create RDD from collection
436
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
437
JavaRDD<Integer> numbersRDD = jsc.parallelize(numbers);
438
439
// Transform and collect
440
List<Integer> evenNumbers = numbersRDD
441
.filter(n -> n % 2 == 0)
442
.collect();
443
444
System.out.println("Even numbers: " + evenNumbers);
445
}
446
}
447
```
448
449
### Word Count Example
450
451
```java
452
public class WordCount {
453
public static void main(String[] args) {
454
SparkConf conf = new SparkConf().setAppName("Word Count").setMaster("local[*]");
455
JavaSparkContext jsc = new JavaSparkContext(conf);
456
457
try {
458
// Read text file
459
JavaRDD<String> lines = jsc.textFile("input.txt");
460
461
// Split lines into words and count
462
JavaPairRDD<String, Integer> wordCounts = lines
463
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
464
.filter(word -> !word.isEmpty())
465
.mapToPair(word -> new Tuple2<>(word.toLowerCase(), 1))
466
.reduceByKey((a, b) -> a + b);
467
468
// Sort by count descending
469
JavaPairRDD<String, Integer> sortedCounts = wordCounts
470
.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)) // Swap key-value
471
.sortByKey(false) // Sort descending
472
.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); // Swap back
473
474
// Collect and print results
475
List<Tuple2<String, Integer>> results = sortedCounts.take(10);
476
for (Tuple2<String, Integer> result : results) {
477
System.out.println(result._1 + ": " + result._2);
478
}
479
480
} finally {
481
jsc.stop();
482
}
483
}
484
}
485
```
486
487
### Complex Aggregation Example
488
489
```java
490
public class SalesAnalysis {
491
public static class SaleRecord implements Serializable {
492
public final String product;
493
public final String region;
494
public final double amount;
495
public final int quantity;
496
497
public SaleRecord(String product, String region, double amount, int quantity) {
498
this.product = product;
499
this.region = region;
500
this.amount = amount;
501
this.quantity = quantity;
502
}
503
}
504
505
public static class SalesStats implements Serializable {
506
public final int count;
507
public final double totalAmount;
508
public final int totalQuantity;
509
510
public SalesStats(int count, double totalAmount, int totalQuantity) {
511
this.count = count;
512
this.totalAmount = totalAmount;
513
this.totalQuantity = totalQuantity;
514
}
515
516
public SalesStats combine(SalesStats other) {
517
return new SalesStats(
518
this.count + other.count,
519
this.totalAmount + other.totalAmount,
520
this.totalQuantity + other.totalQuantity
521
);
522
}
523
524
public double averageAmount() {
525
return totalAmount / count;
526
}
527
}
528
529
public static void main(String[] args) {
530
SparkConf conf = new SparkConf().setAppName("Sales Analysis").setMaster("local[*]");
531
JavaSparkContext jsc = new JavaSparkContext(conf);
532
533
try {
534
// Create sample sales data
535
List<SaleRecord> salesData = Arrays.asList(
536
new SaleRecord("Laptop", "North", 999.99, 1),
537
new SaleRecord("Mouse", "North", 29.99, 3),
538
new SaleRecord("Laptop", "South", 899.99, 2),
539
new SaleRecord("Keyboard", "North", 79.99, 1),
540
new SaleRecord("Mouse", "South", 24.99, 5)
541
);
542
543
JavaRDD<SaleRecord> salesRDD = jsc.parallelize(salesData);
544
545
// Aggregate sales by product
546
JavaPairRDD<String, SalesStats> productStats = salesRDD
547
.mapToPair(sale -> new Tuple2<>(sale.product,
548
new SalesStats(1, sale.amount * sale.quantity, sale.quantity)))
549
.reduceByKey((stats1, stats2) -> stats1.combine(stats2));
550
551
// Collect and display results
552
Map<String, SalesStats> results = productStats.collectAsMap();
553
results.forEach((product, stats) -> {
554
System.out.printf("%s: Total Sales=%.2f, Average=%.2f, Units=%d%n",
555
product, stats.totalAmount, stats.averageAmount(), stats.totalQuantity);
556
});
557
558
} finally {
559
jsc.stop();
560
}
561
}
562
}
563
```
564
565
### Join Operations Example
566
567
```java
568
public class CustomerOrderAnalysis {
569
public static void main(String[] args) {
570
SparkConf conf = new SparkConf().setAppName("Customer Order Analysis").setMaster("local[*]");
571
JavaSparkContext jsc = new JavaSparkContext(conf);
572
573
try {
574
// Customer data
575
List<Tuple2<Integer, String>> customerData = Arrays.asList(
576
new Tuple2<>(1, "Alice Johnson"),
577
new Tuple2<>(2, "Bob Smith"),
578
new Tuple2<>(3, "Charlie Brown"),
579
new Tuple2<>(4, "Diana Prince")
580
);
581
JavaPairRDD<Integer, String> customers = jsc.parallelizePairs(customerData);
582
583
// Order data
584
List<Tuple2<Integer, Double>> orderData = Arrays.asList(
585
new Tuple2<>(1, 100.50),
586
new Tuple2<>(1, 75.25),
587
new Tuple2<>(2, 200.00),
588
new Tuple2<>(3, 50.75),
589
new Tuple2<>(1, 125.00),
590
new Tuple2<>(5, 300.00) // Customer 5 doesn't exist
591
);
592
JavaPairRDD<Integer, Double> orders = jsc.parallelizePairs(orderData);
593
594
// Inner join - customers with orders
595
JavaPairRDD<Integer, Tuple2<String, Double>> customerOrders = customers.join(orders);
596
System.out.println("Customer Orders (Inner Join):");
597
customerOrders.collect().forEach(entry -> {
598
int customerId = entry._1;
599
String customerName = entry._2._1;
600
double orderAmount = entry._2._2;
601
System.out.printf("Customer %d (%s): $%.2f%n", customerId, customerName, orderAmount);
602
});
603
604
// Left outer join - all customers, orders if they exist
605
JavaPairRDD<Integer, Tuple2<String, Optional<Double>>> allCustomers =
606
customers.leftOuterJoin(orders);
607
608
// Aggregate total orders per customer
609
JavaPairRDD<Integer, Tuple2<String, Double>> customerTotals = allCustomers
610
.mapValues(tuple -> {
611
String name = tuple._1;
612
double total = tuple._2.isPresent() ? tuple._2.get() : 0.0;
613
return new Tuple2<>(name, total);
614
})
615
.reduceByKey((tuple1, tuple2) -> new Tuple2<>(tuple1._1, tuple1._2 + tuple2._2));
616
617
System.out.println("\nCustomer Totals:");
618
customerTotals.collect().forEach(entry -> {
619
int customerId = entry._1;
620
String customerName = entry._2._1;
621
double totalAmount = entry._2._2;
622
System.out.printf("Customer %d (%s): Total $%.2f%n", customerId, customerName, totalAmount);
623
});
624
625
} finally {
626
jsc.stop();
627
}
628
}
629
}
630
```
631
632
### Using Broadcast Variables and Accumulators
633
634
```java
635
public class BroadcastAccumulatorExample {
636
public static void main(String[] args) {
637
SparkConf conf = new SparkConf().setAppName("Broadcast Accumulator Example").setMaster("local[*]");
638
JavaSparkContext jsc = new JavaSparkContext(conf);
639
640
try {
641
// Create lookup table to broadcast
642
Map<String, String> categoryLookup = new HashMap<>();
643
categoryLookup.put("TECH", "Technology");
644
categoryLookup.put("BOOK", "Books");
645
categoryLookup.put("HOME", "Home & Garden");
646
647
org.apache.spark.broadcast.Broadcast<Map<String, String>> broadcastLookup =
648
jsc.broadcast(categoryLookup);
649
650
// Create accumulators for metrics
651
org.apache.spark.util.LongAccumulator processedCount = jsc.sc().longAccumulator("Processed Items");
652
org.apache.spark.util.LongAccumulator errorCount = jsc.sc().longAccumulator("Error Count");
653
654
// Sample data
655
List<String> products = Arrays.asList(
656
"TECH:Laptop:999.99",
657
"BOOK:Java Programming:49.99",
658
"HOME:Garden Hose:29.99",
659
"INVALID:Bad Data",
660
"TECH:Smartphone:699.99"
661
);
662
663
JavaRDD<String> productsRDD = jsc.parallelize(products);
664
665
// Process data using broadcast and accumulators
666
JavaRDD<String> processedProducts = productsRDD.map(product -> {
667
try {
668
String[] parts = product.split(":");
669
if (parts.length >= 3) {
670
String categoryCode = parts[0];
671
String productName = parts[1];
672
String price = parts[2];
673
674
String categoryName = broadcastLookup.value().getOrDefault(categoryCode, "Unknown");
675
processedCount.add(1);
676
677
return String.format("%s - %s: %s", categoryName, productName, price);
678
} else {
679
errorCount.add(1);
680
return "ERROR: Invalid product format - " + product;
681
}
682
} catch (Exception e) {
683
errorCount.add(1);
684
return "ERROR: Processing failed - " + product;
685
}
686
});
687
688
// Trigger computation
689
List<String> results = processedProducts.collect();
690
691
// Display results
692
System.out.println("Processed Products:");
693
results.forEach(System.out::println);
694
695
// Display metrics
696
System.out.println("\nMetrics:");
697
System.out.println("Processed items: " + processedCount.value());
698
System.out.println("Error count: " + errorCount.value());
699
700
} finally {
701
jsc.stop();
702
}
703
}
704
}
705
```
706
707
## Best Practices for Java API
708
709
### Lambda Expressions vs Anonymous Classes
710
711
```java
712
// Prefer lambda expressions (Java 8+)
713
JavaRDD<String> upperCase = textRDD.map(s -> s.toUpperCase());
714
715
// Instead of anonymous classes
716
JavaRDD<String> upperCaseOld = textRDD.map(new Function<String, String>() {
717
@Override
718
public String call(String s) {
719
return s.toUpperCase();
720
}
721
});
722
```
723
724
### Serialization Considerations
725
726
```java
727
// Ensure all objects used in transformations are Serializable
728
public class ProcessingUtils implements Serializable {
729
private final String prefix;
730
731
public ProcessingUtils(String prefix) {
732
this.prefix = prefix;
733
}
734
735
public String process(String input) {
736
return prefix + ": " + input.toUpperCase();
737
}
738
}
739
740
// Use in transformations
741
ProcessingUtils utils = new ProcessingUtils("PROCESSED");
742
JavaRDD<String> processed = inputRDD.map(utils::process);
743
```
744
745
### Memory Management
746
747
```java
748
// Persist expensive computations
749
JavaRDD<ComplexObject> expensiveRDD = inputRDD
750
.map(this::expensiveTransformation)
751
.filter(obj -> obj.isValid())
752
.persist(StorageLevel.MEMORY_AND_DISK_SER());
753
754
// Use the persisted RDD multiple times
755
long count = expensiveRDD.count();
756
List<ComplexObject> sample = expensiveRDD.take(10);
757
758
// Clean up when done
759
expensiveRDD.unpersist();
760
```