0
# Java API
1
2
The Spark Java API provides Java-friendly wrappers around Spark's core functionality, using Java collections and functional interfaces to enable seamless integration with Java applications.
3
4
## JavaSparkContext
5
6
The Java-friendly version of SparkContext that returns JavaRDDs and works with Java collections.
7
8
### Constructors
9
10
```java { .api }
11
public class JavaSparkContext implements Closeable {
12
public JavaSparkContext()
13
public JavaSparkContext(SparkConf conf)
14
public JavaSparkContext(String master, String appName)
15
public JavaSparkContext(String master, String appName, SparkConf conf)
16
public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)
17
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)
18
}
19
```
20
21
### Context Access
22
23
```java { .api }
24
public class JavaSparkContext {
25
public SparkContext sc()
26
public void close()
27
}
28
```
29
30
### Java RDD Creation
31
32
```java { .api }
33
public class JavaSparkContext {
34
public <T> JavaRDD<T> parallelize(List<T> list)
35
public <T> JavaRDD<T> parallelize(List<T> list, int numSlices)
36
37
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list)
38
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices)
39
40
public JavaDoubleRDD parallelizeDoubles(List<Double> list)
41
public JavaDoubleRDD parallelizeDoubles(List<Double> list, int numSlices)
42
43
public <T> JavaRDD<T> emptyRDD()
44
public <T> JavaRDD<T> union(JavaRDD<T> first, JavaRDD<T>... rest)
45
public <T> JavaRDD<T> union(List<JavaRDD<T>> rdds)
46
}
47
```
48
49
### File I/O (Java-friendly)
50
51
```java { .api }
52
public class JavaSparkContext {
53
public JavaRDD<String> textFile(String path)
54
public JavaRDD<String> textFile(String path, int minPartitions)
55
56
public JavaPairRDD<String, String> wholeTextFiles(String path)
57
public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions)
58
59
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path)
60
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions)
61
62
public JavaRDD<byte[]> binaryRecords(String path, int recordLength)
63
64
public <T> JavaRDD<T> objectFile(String path)
65
public <T> JavaRDD<T> objectFile(String path, int minPartitions)
66
}
67
```
68
69
### Hadoop Integration (Java)
70
71
```java { .api }
72
public class JavaSparkContext {
73
public <K, V> JavaPairRDD<K, V> hadoopRDD(
74
JobConf conf,
75
Class<? extends InputFormat<K, V>> inputFormatClass,
76
Class<K> keyClass,
77
Class<V> valueClass
78
)
79
80
public <K, V> JavaPairRDD<K, V> hadoopRDD(
81
JobConf conf,
82
Class<? extends InputFormat<K, V>> inputFormatClass,
83
Class<K> keyClass,
84
Class<V> valueClass,
85
int minPartitions
86
)
87
88
public <K, V> JavaPairRDD<K, V> hadoopFile(
89
String path,
90
Class<? extends InputFormat<K, V>> inputFormatClass,
91
Class<K> keyClass,
92
Class<V> valueClass
93
)
94
95
public <K, V> JavaPairRDD<K, V> hadoopFile(
96
String path,
97
Class<? extends InputFormat<K, V>> inputFormatClass,
98
Class<K> keyClass,
99
Class<V> valueClass,
100
int minPartitions
101
)
102
103
public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopFile(
104
String path,
105
Class<F> fClass,
106
Class<K> kClass,
107
Class<V> vClass
108
)
109
110
public <K, V, F extends NewInputFormat<K, V>> JavaPairRDD<K, V> newAPIHadoopRDD(
111
Configuration conf,
112
Class<F> fClass,
113
Class<K> kClass,
114
Class<V> vClass
115
)
116
117
public <K, V> JavaPairRDD<K, V> sequenceFile(
118
String path,
119
Class<K> keyClass,
120
Class<V> valueClass
121
)
122
123
public <K, V> JavaPairRDD<K, V> sequenceFile(
124
String path,
125
Class<K> keyClass,
126
Class<V> valueClass,
127
int minPartitions
128
)
129
}
130
```
131
132
### Shared Variables (Java)
133
134
```java { .api }
135
public class JavaSparkContext {
136
public <T> Broadcast<T> broadcast(T value)
137
138
public LongAccumulator longAccumulator()
139
public LongAccumulator longAccumulator(String name)
140
141
public DoubleAccumulator doubleAccumulator()
142
public DoubleAccumulator doubleAccumulator(String name)
143
144
public <T> CollectionAccumulator<T> collectionAccumulator()
145
}
146
```
147
148
## JavaRDD
149
150
Java-friendly wrapper around RDD that provides Java-compatible method signatures.
151
152
### Transformations (Java-friendly)
153
154
```java { .api }
155
public class JavaRDD<T> {
156
public <R> JavaRDD<R> map(Function<T, R> f)
157
public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)
158
public JavaRDD<T> filter(Function<T, Boolean> f)
159
160
public JavaRDD<T> distinct()
161
public JavaRDD<T> distinct(int numPartitions)
162
163
public JavaRDD<T> sample(boolean withReplacement, double fraction)
164
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)
165
166
public JavaRDD<T>[] randomSplit(double[] weights)
167
public JavaRDD<T>[] randomSplit(double[] weights, long seed)
168
169
public JavaRDD<T> union(JavaRDD<T> other)
170
public JavaRDD<T> intersection(JavaRDD<T> other)
171
public JavaRDD<T> subtract(JavaRDD<T> other)
172
173
public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)
174
}
175
```
176
177
### Partition Operations (Java)
178
179
```java { .api }
180
public class JavaRDD<T> {
181
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)
182
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f, boolean preservesPartitioning)
183
184
public <U> JavaRDD<U> mapPartitionsWithIndex(
185
Function2<Integer, Iterator<T>, Iterator<U>> f,
186
boolean preservesPartitioning
187
)
188
189
public void foreachPartition(VoidFunction<Iterator<T>> f)
190
public JavaRDD<List<T>> glom()
191
}
192
```
193
194
### Grouping & Sorting (Java)
195
196
```java { .api }
197
public class JavaRDD<T> {
198
public <K> JavaPairRDD<K, Iterable<T>> groupBy(Function<T, K> f)
199
public <K> JavaPairRDD<K, Iterable<T>> groupBy(Function<T, K> f, int numPartitions)
200
201
public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions)
202
}
203
```
204
205
### Pairing Operations (Java)
206
207
```java { .api }
208
public class JavaRDD<T> {
209
public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other)
210
public JavaPairRDD<T, Long> zipWithIndex()
211
public JavaPairRDD<T, Long> zipWithUniqueId()
212
public <K> JavaPairRDD<K, T> keyBy(Function<T, K> f)
213
}
214
```
215
216
### Actions (Java)
217
218
```java { .api }
219
public class JavaRDD<T> {
220
public List<T> collect()
221
public <U> List<U> collect(Function<T, U> f)
222
223
public T reduce(Function2<T, T, T> f)
224
public T fold(T zeroValue, Function2<T, T, T> f)
225
public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)
226
227
public long count()
228
public Map<T, Long> countByValue()
229
230
public List<T> take(int num)
231
public List<T> takeSample(boolean withReplacement, int num)
232
public List<T> takeSample(boolean withReplacement, int num, long seed)
233
234
public List<T> takeOrdered(int num)
235
public List<T> takeOrdered(int num, Comparator<T> comp)
236
237
public List<T> top(int num)
238
public List<T> top(int num, Comparator<T> comp)
239
240
public T first()
241
public boolean isEmpty()
242
243
public void foreach(VoidFunction<T> f)
244
}
245
```
246
247
### Persistence (Java)
248
249
```java { .api }
250
public class JavaRDD<T> {
251
public JavaRDD<T> cache()
252
public JavaRDD<T> persist(StorageLevel newLevel)
253
public JavaRDD<T> unpersist()
254
public JavaRDD<T> unpersist(boolean blocking)
255
}
256
```
257
258
### Repartitioning (Java)
259
260
```java { .api }
261
public class JavaRDD<T> {
262
public JavaRDD<T> repartition(int numPartitions)
263
public JavaRDD<T> coalesce(int numPartitions)
264
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)
265
}
266
```
267
268
### Output (Java)
269
270
```java { .api }
271
public class JavaRDD<T> {
272
public void saveAsTextFile(String path)
273
public void saveAsObjectFile(String path)
274
}
275
```
276
277
### Conversion & Access
278
279
```java { .api }
280
public class JavaRDD<T> {
281
public RDD<T> rdd()
282
public Object[] toArray()
283
}
284
```
285
286
## JavaPairRDD
287
288
Java-friendly version of key-value pair RDD with operations specific to (K, V) pairs.
289
290
### Key-Value Transformations (Java)
291
292
```java { .api }
293
public class JavaPairRDD<K, V> {
294
public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)
295
public <U> JavaPairRDD<K, U> flatMapValues(Function<V, Iterable<U>> f)
296
public <K2, V2> JavaPairRDD<K2, V2> mapToPair(PairFunction<Tuple2<K, V>, K2, V2> f)
297
298
public JavaRDD<K> keys()
299
public JavaRDD<V> values()
300
}
301
```
302
303
### Grouping Operations (Java)
304
305
```java { .api }
306
public class JavaPairRDD<K, V> {
307
public JavaPairRDD<K, Iterable<V>> groupByKey()
308
public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions)
309
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)
310
311
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)
312
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)
313
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)
314
315
public Map<K, V> reduceByKeyLocally(Function2<V, V, V> func)
316
public Map<K, Long> countByKey()
317
318
public <U> JavaPairRDD<K, U> aggregateByKey(
319
U zeroValue,
320
Function2<U, V, U> seqFunc,
321
Function2<U, U, U> combFunc
322
)
323
public <U> JavaPairRDD<K, U> aggregateByKey(
324
U zeroValue,
325
Partitioner partitioner,
326
Function2<U, V, U> seqFunc,
327
Function2<U, U, U> combFunc
328
)
329
public <U> JavaPairRDD<K, U> aggregateByKey(
330
U zeroValue,
331
int numPartitions,
332
Function2<U, V, U> seqFunc,
333
Function2<U, U, U> combFunc
334
)
335
336
public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)
337
public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, Function2<V, V, V> func)
338
public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func)
339
340
public <C> JavaPairRDD<K, C> combineByKey(
341
Function<V, C> createCombiner,
342
Function2<C, V, C> mergeValue,
343
Function2<C, C, C> mergeCombiners
344
)
345
public <C> JavaPairRDD<K, C> combineByKey(
346
Function<V, C> createCombiner,
347
Function2<C, V, C> mergeValue,
348
Function2<C, C, C> mergeCombiners,
349
int numPartitions
350
)
351
public <C> JavaPairRDD<K, C> combineByKey(
352
Function<V, C> createCombiner,
353
Function2<C, V, C> mergeValue,
354
Function2<C, C, C> mergeCombiners,
355
Partitioner partitioner
356
)
357
}
358
```
359
360
### Join Operations (Java)
361
362
```java { .api }
363
public class JavaPairRDD<K, V> {
364
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
365
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions)
366
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)
367
368
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
369
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, int numPartitions)
370
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner)
371
372
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
373
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, int numPartitions)
374
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner)
375
376
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
377
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, int numPartitions)
378
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other, Partitioner partitioner)
379
380
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)
381
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, int numPartitions)
382
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other, Partitioner partitioner)
383
384
public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>> cogroup(
385
JavaPairRDD<K, W1> other1,
386
JavaPairRDD<K, W2> other2
387
)
388
389
public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>> cogroup(
390
JavaPairRDD<K, W1> other1,
391
JavaPairRDD<K, W2> other2,
392
JavaPairRDD<K, W3> other3
393
)
394
}
395
```
396
397
### Set Operations (Java)
398
399
```java { .api }
400
public class JavaPairRDD<K, V> {
401
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other)
402
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, int numPartitions)
403
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other, Partitioner partitioner)
404
}
405
```
406
407
### Sorting Operations (Java)
408
409
```java { .api }
410
public class JavaPairRDD<K, V> {
411
public JavaPairRDD<K, V> sortByKey()
412
public JavaPairRDD<K, V> sortByKey(boolean ascending)
413
public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions)
414
415
public JavaPairRDD<K, V> sortByKey(Comparator<K> comp)
416
public JavaPairRDD<K, V> sortByKey(Comparator<K> comp, boolean ascending)
417
public JavaPairRDD<K, V> sortByKey(Comparator<K> comp, boolean ascending, int numPartitions)
418
}
419
```
420
421
### Partitioning Operations (Java)
422
423
```java { .api }
424
public class JavaPairRDD<K, V> {
425
public JavaPairRDD<K, V> partitionBy(Partitioner partitioner)
426
public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(Partitioner partitioner)
427
public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(
428
Partitioner partitioner,
429
Comparator<K> comp
430
)
431
}
432
```
433
434
### Lookup Operations (Java)
435
436
```java { .api }
437
public class JavaPairRDD<K, V> {
438
public List<V> lookup(K key)
439
public Map<K, V> collectAsMap()
440
}
441
```
442
443
### Output Operations (Java)
444
445
```java { .api }
446
public class JavaPairRDD<K, V> {
447
public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(
448
String path,
449
Class<?> keyClass,
450
Class<?> valueClass,
451
Class<F> outputFormatClass
452
)
453
454
public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(
455
String path,
456
Class<?> keyClass,
457
Class<?> valueClass,
458
Class<F> outputFormatClass,
459
JobConf conf
460
)
461
462
public void saveAsHadoopDataset(JobConf conf)
463
464
public <F extends NewOutputFormat<?, ?>> void saveAsNewAPIHadoopFile(
465
String path,
466
Class<?> keyClass,
467
Class<?> valueClass,
468
Class<F> outputFormatClass
469
)
470
471
public <F extends NewOutputFormat<?, ?>> void saveAsNewAPIHadoopFile(
472
String path,
473
Class<?> keyClass,
474
Class<?> valueClass,
475
Class<F> outputFormatClass,
476
Configuration conf
477
)
478
479
public void saveAsNewAPIHadoopDataset(Configuration conf)
480
}
481
```
482
483
### Conversion
484
485
```java { .api }
486
public class JavaPairRDD<K, V> {
487
public RDD<Tuple2<K, V>> rdd()
488
public JavaRDD<Tuple2<K, V>> toJavaRDD()
489
}
490
```
491
492
## JavaDoubleRDD
493
494
Specialized RDD for double values with numeric operations.
495
496
```java { .api }
497
public class JavaDoubleRDD {
498
public double sum()
499
public StatCounter stats()
500
public double mean()
501
public double variance()
502
public double stdev()
503
public double sampleStdev()
504
public double sampleVariance()
505
506
public long[] histogram(double[] buckets)
507
public Tuple2<double[], long[]> histogram(int buckets)
508
}
509
```
510
511
## Function Interfaces
512
513
The Java API uses functional interfaces from the `org.apache.spark.api.java.function` package.
514
515
### Basic Function Types
516
517
```java { .api }
518
// Single argument function
519
@FunctionalInterface
520
public interface Function<T, R> extends Serializable {
521
R call(T t) throws Exception;
522
}
523
524
// Two argument function
525
@FunctionalInterface
526
public interface Function2<T1, T2, R> extends Serializable {
527
R call(T1 t1, T2 t2) throws Exception;
528
}
529
530
// Void function (for actions)
531
@FunctionalInterface
532
public interface VoidFunction<T> extends Serializable {
533
void call(T t) throws Exception;
534
}
535
536
// Flat map function (returns Iterable)
537
@FunctionalInterface
538
public interface FlatMapFunction<T, R> extends Serializable {
539
Iterator<R> call(T t) throws Exception;
540
}
541
542
// Pair function (returns Tuple2)
543
@FunctionalInterface
544
public interface PairFunction<T, K, V> extends Serializable {
545
Tuple2<K, V> call(T t) throws Exception;
546
}
547
```
548
549
### Specialized Function Types
550
551
```java { .api }
552
// Double function
553
@FunctionalInterface
554
public interface DoubleFunction<T> extends Serializable {
555
double call(T t) throws Exception;
556
}
557
558
// Pair flat map function
559
@FunctionalInterface
560
public interface PairFlatMapFunction<T, K, V> extends Serializable {
561
Iterator<Tuple2<K, V>> call(T t) throws Exception;
562
}
563
564
// Double flat map function
565
@FunctionalInterface
566
public interface DoubleFlatMapFunction<T> extends Serializable {
567
Iterator<Double> call(T t) throws Exception;
568
}
569
```
570
571
## Usage Examples
572
573
### Basic Java API Usage
574
575
```java
576
import org.apache.spark.SparkConf;
577
import org.apache.spark.api.java.JavaSparkContext;
578
import org.apache.spark.api.java.JavaRDD;
579
import java.util.Arrays;
580
import java.util.List;
581
582
public class SparkJavaExample {
583
public static void main(String[] args) {
584
SparkConf conf = new SparkConf()
585
.setAppName("Java Spark Example")
586
.setMaster("local[*]");
587
588
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
589
// Create RDD from Java collection
590
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
591
JavaRDD<Integer> rdd = sc.parallelize(data);
592
593
// Transform and collect
594
JavaRDD<Integer> filtered = rdd.filter(x -> x % 2 == 0);
595
JavaRDD<Integer> doubled = filtered.map(x -> x * 2);
596
597
List<Integer> result = doubled.collect();
598
System.out.println(result); // [4, 8]
599
}
600
}
601
}
602
```
603
604
### Working with Key-Value Pairs
605
606
```java
607
import org.apache.spark.api.java.JavaPairRDD;
608
import scala.Tuple2;
609
import java.util.Arrays;
610
import java.util.List;
611
import java.util.Map;
612
613
// Create pair RDD from tuples
614
List<Tuple2<String, Integer>> pairs = Arrays.asList(
615
new Tuple2<>("apple", 1),
616
new Tuple2<>("banana", 2),
617
new Tuple2<>("apple", 3)
618
);
619
620
JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(pairs);
621
622
// Group by key
623
JavaPairRDD<String, Iterable<Integer>> grouped = pairRDD.groupByKey();
624
625
// Reduce by key
626
JavaPairRDD<String, Integer> totals = pairRDD.reduceByKey(Integer::sum);
627
628
// Collect as map
629
Map<String, Integer> resultMap = totals.collectAsMap();
630
```
631
632
### Lambda Expressions (Java 8+)
633
634
```java
635
// Using lambda expressions for cleaner code
636
JavaRDD<String> lines = sc.textFile("input.txt");
637
638
JavaRDD<String> words = lines.flatMap(line ->
639
Arrays.asList(line.split(" ")).iterator()
640
);
641
642
JavaPairRDD<String, Integer> wordCounts = words
643
.mapToPair(word -> new Tuple2<>(word, 1))
644
.reduceByKey(Integer::sum);
645
646
// Sort by count (descending)
647
JavaPairRDD<String, Integer> sorted = wordCounts
648
.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1))
649
.sortByKey(false)
650
.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
651
652
List<Tuple2<String, Integer>> top10 = sorted.take(10);
653
```
654
655
### Advanced Aggregations
656
657
```java
658
import org.apache.spark.api.java.function.Function2;
659
660
// Custom aggregation with aggregateByKey
661
JavaPairRDD<String, Integer> sales = sc.parallelizePairs(Arrays.asList(
662
new Tuple2<>("store1", 100),
663
new Tuple2<>("store2", 200),
664
new Tuple2<>("store1", 150)
665
));
666
667
// Calculate sum and count for each store
668
class SalesSummary implements Serializable {
669
public int sum;
670
public int count;
671
672
public SalesSummary(int sum, int count) {
673
this.sum = sum;
674
this.count = count;
675
}
676
677
public double average() {
678
return (double) sum / count;
679
}
680
}
681
682
JavaPairRDD<String, SalesSummary> summary = sales.aggregateByKey(
683
new SalesSummary(0, 0),
684
685
// Sequence function
686
(summary, sale) -> new SalesSummary(
687
summary.sum + sale,
688
summary.count + 1
689
),
690
691
// Combiner function
692
(sum1, sum2) -> new SalesSummary(
693
sum1.sum + sum2.sum,
694
sum1.count + sum2.count
695
)
696
);
697
```
698
699
### Working with Files
700
701
```java
702
// Read text file
703
JavaRDD<String> textFile = sc.textFile("hdfs://path/to/file");
704
705
// Read whole text files (returns filename and content)
706
JavaPairRDD<String, String> wholeFiles = sc.wholeTextFiles("hdfs://path/to/dir");
707
708
// Process each file
709
JavaPairRDD<String, Integer> lineCounts = wholeFiles.mapToPair(file ->
710
new Tuple2<>(file._1, file._2.split("\n").length)
711
);
712
713
// Save results
714
lineCounts.saveAsTextFile("hdfs://path/to/output");
715
```
716
717
### Exception Handling
718
719
```java
720
// Functions can throw exceptions - they're automatically wrapped
721
JavaRDD<Integer> numbers = sc.parallelize(Arrays.asList(1, 2, 0, 4, 5));
722
723
JavaRDD<Double> inverses = numbers.map(x -> {
724
if (x == 0) {
725
throw new IllegalArgumentException("Cannot divide by zero");
726
}
727
return 1.0 / x;
728
});
729
730
// Filter out problematic values instead
731
JavaRDD<Double> safeInverses = numbers
732
.filter(x -> x != 0)
733
.map(x -> 1.0 / x);
734
```
735
736
## Important Notes
737
738
- **All Function interfaces are from `org.apache.spark.api.java.function` package**
739
- **Functions must be Serializable** - avoid capturing non-serializable objects
740
- **Use `Tuple2`, `Tuple3`, etc. from Scala** for pair operations
741
- **Use `Optional` for nullable values** in joins and outer operations
742
- **JavaSparkContext implements Closeable** - use try-with-resources for automatic cleanup
743
- **Java collections are used throughout** - List, Map, Iterator instead of Scala collections
744
- **Lambda expressions (Java 8+) provide cleaner syntax** than anonymous inner classes
745
- **Method references can be used** where appropriate (e.g., `Integer::sum`)
746
- **All operations maintain the same semantics** as the Scala API but with Java-friendly types