0
# Java API
1
2
Spark provides comprehensive Java APIs that mirror the Scala functionality while providing Java-friendly interfaces. The Java API includes JavaRDD, JavaPairRDD, and JavaDoubleRDD classes that offer type-safe operations for Java developers.
3
4
## JavaSparkContext
5
6
The Java-friendly version of SparkContext.
7
8
### JavaSparkContext Class
9
10
```java { .api }
11
public class JavaSparkContext {
12
// Constructors
13
public JavaSparkContext()
14
public JavaSparkContext(SparkConf conf)
15
public JavaSparkContext(String master, String appName)
16
public JavaSparkContext(String master, String appName, SparkConf conf)
17
public JavaSparkContext(String master, String appName, String sparkHome, String jarFile)
18
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)
19
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars, Map<String, String> environment)
20
21
// Core properties
22
public SparkContext sc()
23
public String master()
24
public String appName()
25
public Boolean isLocal()
26
public Integer defaultParallelism()
27
public Integer defaultMinPartitions()
28
}
29
```
30
31
### Creating JavaSparkContext
32
33
```java
34
import org.apache.spark.SparkConf;
35
import org.apache.spark.api.java.JavaSparkContext;
36
37
// Basic creation with SparkConf
38
SparkConf conf = new SparkConf()
39
.setAppName("Java Spark App")
40
.setMaster("local[*]");
41
42
JavaSparkContext jsc = new JavaSparkContext(conf);
43
44
// Alternative constructors
45
JavaSparkContext jsc2 = new JavaSparkContext("local[*]", "My Java App");
46
47
// With all parameters
48
String[] jars = {"myapp.jar", "dependencies.jar"};
49
Map<String, String> env = new HashMap<>();
50
env.put("SPARK_ENV", "production");
51
52
JavaSparkContext jsc3 = new JavaSparkContext(
53
"local[*]", // master
54
"My Java App", // app name
55
"/path/to/spark", // spark home
56
jars, // jar files
57
env // environment
58
);
59
```
60
61
## JavaRDD
62
63
Java-friendly wrapper for RDD operations.
64
65
### JavaRDD Class
66
67
```java { .api }
68
public class JavaRDD<T> extends AbstractJavaRDDLike<T, JavaRDD<T>> {
69
// Transformations
70
public <U> JavaRDD<U> map(Function<T, U> f)
71
public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f)
72
public JavaRDD<T> filter(Function<T, Boolean> f)
73
public JavaRDD<T> distinct()
74
public JavaRDD<T> distinct(int numPartitions)
75
public JavaRDD<T> sample(boolean withReplacement, double fraction)
76
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)
77
public JavaRDD<T> union(JavaRDD<T> other)
78
public JavaRDD<T> intersection(JavaRDD<T> other)
79
public JavaRDD<T> subtract(JavaRDD<T> other)
80
public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)
81
82
// Partition operations
83
public <U> JavaRDD<U> mapPartitions(FlatMapFunction<Iterator<T>, U> f)
84
public <U> JavaRDD<U> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<U>> f, boolean preservesPartitioning)
85
public JavaRDD<T> coalesce(int numPartitions)
86
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)
87
public JavaRDD<T> repartition(int numPartitions)
88
89
// Actions
90
public List<T> collect()
91
public long count()
92
public T first()
93
public List<T> take(int num)
94
public List<T> takeSample(boolean withReplacement, int num)
95
public List<T> takeSample(boolean withReplacement, int num, long seed)
96
public T reduce(Function2<T, T, T> f)
97
public T fold(T zeroValue, Function2<T, T, T> func)
98
public <U> U aggregate(U zeroValue, Function2<U, T, U> seqFunc, Function2<U, U, U> combFunc)
99
public void foreach(VoidFunction<T> f)
100
public void foreachPartition(VoidFunction<Iterator<T>> f)
101
102
// Persistence
103
public JavaRDD<T> cache()
104
public JavaRDD<T> persist(StorageLevel newLevel)
105
public JavaRDD<T> unpersist()
106
public JavaRDD<T> unpersist(boolean blocking)
107
}
108
```
109
110
### Creating and Using JavaRDD
111
112
```java
113
import org.apache.spark.api.java.JavaRDD;
114
import org.apache.spark.api.java.function.Function;
115
import org.apache.spark.api.java.function.Function2;
116
import org.apache.spark.api.java.function.FlatMapFunction;
117
import java.util.Arrays;
118
import java.util.List;
119
120
// Create JavaRDD from collection
121
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
122
JavaRDD<Integer> javaRDD = jsc.parallelize(data);
123
124
// Map transformation
125
JavaRDD<Integer> doubled = javaRDD.map(new Function<Integer, Integer>() {
126
public Integer call(Integer x) {
127
return x * 2;
128
}
129
});
130
131
// Using lambda expressions (Java 8+)
132
JavaRDD<Integer> doubled2 = javaRDD.map(x -> x * 2);
133
134
// FlatMap transformation
135
JavaRDD<String> lines = jsc.textFile("input.txt");
136
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
137
public Iterable<String> call(String line) {
138
return Arrays.asList(line.split(" "));
139
}
140
});
141
142
// With lambda
143
JavaRDD<String> words2 = lines.flatMap(line -> Arrays.asList(line.split(" ")));
144
145
// Filter transformation
146
JavaRDD<Integer> evens = javaRDD.filter(new Function<Integer, Boolean>() {
147
public Boolean call(Integer x) {
148
return x % 2 == 0;
149
}
150
});
151
152
// With lambda
153
JavaRDD<Integer> evens2 = javaRDD.filter(x -> x % 2 == 0);
154
```
155
156
### Actions on JavaRDD
157
158
```java
159
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
160
JavaRDD<Integer> rdd = jsc.parallelize(data);
161
162
// Collect all elements
163
List<Integer> result = rdd.collect();
164
165
// Count elements
166
long count = rdd.count();
167
168
// Get first element
169
Integer first = rdd.first();
170
171
// Take first n elements
172
List<Integer> firstThree = rdd.take(3);
173
174
// Reduce with function
175
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
176
public Integer call(Integer a, Integer b) {
177
return a + b;
178
}
179
});
180
181
// With lambda
182
Integer sum2 = rdd.reduce((a, b) -> a + b);
183
184
// Fold with zero value
185
Integer foldResult = rdd.fold(0, (a, b) -> a + b);
186
187
// Aggregate with different types
188
class Stats implements Serializable {
189
public int sum;
190
public int count;
191
192
public Stats(int sum, int count) {
193
this.sum = sum;
194
this.count = count;
195
}
196
}
197
198
Stats stats = rdd.aggregate(
199
new Stats(0, 0), // Zero value
200
new Function2<Stats, Integer, Stats>() { // Seq function
201
public Stats call(Stats s, Integer x) {
202
return new Stats(s.sum + x, s.count + 1);
203
}
204
},
205
new Function2<Stats, Stats, Stats>() { // Combine function
206
public Stats call(Stats s1, Stats s2) {
207
return new Stats(s1.sum + s2.sum, s1.count + s2.count);
208
}
209
}
210
);
211
```
212
213
## JavaPairRDD
214
215
Java wrapper for key-value pair RDDs.
216
217
### JavaPairRDD Class
218
219
```java { .api }
220
public class JavaPairRDD<K, V> extends AbstractJavaRDDLike<Tuple2<K, V>, JavaPairRDD<K, V>> {
221
// Key-Value operations
222
public JavaRDD<K> keys()
223
public JavaRDD<V> values()
224
public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)
225
public <U> JavaPairRDD<K, U> flatMapValues(FlatMapFunction<V, U> f)
226
227
// Aggregations
228
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)
229
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)
230
public JavaPairRDD<K, Iterable<V>> groupByKey()
231
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)
232
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner, Function2<C, V, C> mergeValue, Function2<C, C, C> mergeCombiners, Partitioner partitioner)
233
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue, Partitioner partitioner, Function2<U, V, U> seqFunc, Function2<U, U, U> combFunc)
234
public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)
235
236
// Joins
237
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
238
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)
239
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
240
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
241
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
242
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)
243
244
// Actions
245
public Map<K, V> collectAsMap()
246
public Map<K, Long> countByKey()
247
public List<V> lookup(K key)
248
249
// Save operations
250
public void saveAsHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends OutputFormat> outputFormatClass)
251
public void saveAsNewAPIHadoopFile(String path, Class<?> keyClass, Class<?> valueClass, Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass)
252
}
253
```
254
255
### Creating and Using JavaPairRDD
256
257
```java
258
import org.apache.spark.api.java.JavaPairRDD;
259
import org.apache.spark.api.java.function.PairFunction;
260
import scala.Tuple2;
261
import java.util.Arrays;
262
import java.util.List;
263
264
// Create JavaPairRDD from tuples
265
List<Tuple2<String, Integer>> pairs = Arrays.asList(
266
new Tuple2<>("apple", 5),
267
new Tuple2<>("banana", 3),
268
new Tuple2<>("apple", 2),
269
new Tuple2<>("orange", 1)
270
);
271
272
JavaPairRDD<String, Integer> pairRDD = jsc.parallelizePairs(pairs);
273
274
// Create from JavaRDD using mapToPair
275
JavaRDD<String> lines = jsc.textFile("input.txt");
276
JavaPairRDD<String, Integer> wordCounts = lines
277
.flatMap(line -> Arrays.asList(line.split(" ")))
278
.mapToPair(new PairFunction<String, String, Integer>() {
279
public Tuple2<String, Integer> call(String word) {
280
return new Tuple2<>(word, 1);
281
}
282
});
283
284
// With lambda
285
JavaPairRDD<String, Integer> wordCounts2 = lines
286
.flatMap(line -> Arrays.asList(line.split(" ")))
287
.mapToPair(word -> new Tuple2<>(word, 1));
288
```
289
290
### Key-Value Transformations
291
292
```java
293
JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(
294
new Tuple2<>("a", 1),
295
new Tuple2<>("b", 2),
296
new Tuple2<>("a", 3)
297
));
298
299
// Get keys and values
300
JavaRDD<String> keys = pairs.keys();
301
JavaRDD<Integer> values = pairs.values();
302
303
// Transform values while preserving keys
304
JavaPairRDD<String, Integer> doubled = pairs.mapValues(x -> x * 2);
305
306
// FlatMap values
307
JavaPairRDD<String, Character> chars = pairs.flatMapValues(
308
value -> Arrays.asList(value.toString().toCharArray())
309
);
310
311
// Reduce by key
312
JavaPairRDD<String, Integer> sums = pairs.reduceByKey((a, b) -> a + b);
313
314
// Group by key
315
JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();
316
317
// Aggregate by key
318
JavaPairRDD<String, Integer> aggregated = pairs.aggregateByKey(
319
0, // Zero value
320
(acc, value) -> acc + value, // Seq function
321
(acc1, acc2) -> acc1 + acc2 // Combine function
322
);
323
```
324
325
### Join Operations
326
327
```java
328
JavaPairRDD<String, String> names = jsc.parallelizePairs(Arrays.asList(
329
new Tuple2<>("1", "Alice"),
330
new Tuple2<>("2", "Bob"),
331
new Tuple2<>("3", "Charlie")
332
));
333
334
JavaPairRDD<String, Integer> ages = jsc.parallelizePairs(Arrays.asList(
335
new Tuple2<>("1", 25),
336
new Tuple2<>("2", 30),
337
new Tuple2<>("4", 35)
338
));
339
340
// Inner join
341
JavaPairRDD<String, Tuple2<String, Integer>> joined = names.join(ages);
342
// Result: [("1", ("Alice", 25)), ("2", ("Bob", 30))]
343
344
// Left outer join
345
JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftJoined = names.leftOuterJoin(ages);
346
// Result: [("1", ("Alice", Some(25))), ("2", ("Bob", Some(30))), ("3", ("Charlie", None))]
347
348
// Full outer join
349
JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullJoined = names.fullOuterJoin(ages);
350
351
// Cogroup
352
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = names.cogroup(ages);
353
```
354
355
### Actions on JavaPairRDD
356
357
```java
358
JavaPairRDD<String, Integer> pairs = jsc.parallelizePairs(Arrays.asList(
359
new Tuple2<>("apple", 5),
360
new Tuple2<>("banana", 3),
361
new Tuple2<>("apple", 2)
362
));
363
364
// Collect as Map (assumes unique keys)
365
Map<String, Integer> map = pairs.collectAsMap();
366
367
// Count by key
368
Map<String, Long> counts = pairs.countByKey();
369
370
// Lookup values for a key
371
List<Integer> appleValues = pairs.lookup("apple"); // [5, 2]
372
373
// Count all elements
374
long totalCount = pairs.count();
375
```
376
377
## JavaDoubleRDD
378
379
Specialized RDD for double values with statistical operations.
380
381
### JavaDoubleRDD Class
382
383
```java { .api }
384
public class JavaDoubleRDD extends AbstractJavaRDDLike<Double, JavaDoubleRDD> {
385
// Statistical operations
386
public double mean()
387
public double sum()
388
public StatCounter stats()
389
public double variance()
390
public double sampleVariance()
391
public double stdev()
392
public double sampleStdev()
393
public long[] histogram(double[] buckets)
394
public Tuple2<double[], long[]> histogram(int bucketCount)
395
}
396
```
397
398
### Using JavaDoubleRDD
399
400
```java
401
import org.apache.spark.api.java.JavaDoubleRDD;
402
import org.apache.spark.util.StatCounter;
403
404
// Create JavaDoubleRDD
405
List<Double> numbers = Arrays.asList(1.0, 2.0, 3.0, 4.0, 5.0);
406
JavaDoubleRDD doubleRDD = jsc.parallelizeDoubles(numbers);
407
408
// Convert from JavaRDD<Double>
409
JavaRDD<Double> rdd = jsc.parallelize(numbers);
410
JavaDoubleRDD doubleRDD2 = rdd.mapToDouble(x -> x);
411
412
// Statistical operations
413
double mean = doubleRDD.mean();
414
double sum = doubleRDD.sum();
415
double variance = doubleRDD.variance();
416
double stdev = doubleRDD.stdev();
417
418
// Get detailed statistics
419
StatCounter stats = doubleRDD.stats();
420
System.out.println("Count: " + stats.count());
421
System.out.println("Mean: " + stats.mean());
422
System.out.println("Stdev: " + stats.stdev());
423
System.out.println("Max: " + stats.max());
424
System.out.println("Min: " + stats.min());
425
426
// Histogram
427
double[] buckets = {0.0, 2.0, 4.0, 6.0};
428
long[] histogram = doubleRDD.histogram(buckets);
429
430
// Or with automatic bucketing
431
Tuple2<double[], long[]> autoHistogram = doubleRDD.histogram(4);
432
```
433
434
## Function Interfaces
435
436
Java API uses function interfaces for type-safe transformations.
437
438
### Function Interfaces
439
440
```java { .api }
441
// Single argument function
442
public interface Function<T, R> extends Serializable {
443
R call(T t) throws Exception;
444
}
445
446
// Two argument function
447
public interface Function2<T1, T2, R> extends Serializable {
448
R call(T1 t1, T2 t2) throws Exception;
449
}
450
451
// Void function (for actions)
452
public interface VoidFunction<T> extends Serializable {
453
void call(T t) throws Exception;
454
}
455
456
// FlatMap function
457
public interface FlatMapFunction<T, R> extends Serializable {
458
Iterable<R> call(T t) throws Exception;
459
}
460
461
// Pair function (for creating key-value pairs)
462
public interface PairFunction<T, K, V> extends Serializable {
463
Tuple2<K, V> call(T t) throws Exception;
464
}
465
466
// PairFlatMap function
467
public interface PairFlatMapFunction<T, K, V> extends Serializable {
468
Iterable<Tuple2<K, V>> call(T t) throws Exception;
469
}
470
```
471
472
### Function Usage Examples
473
474
```java
475
import org.apache.spark.api.java.function.*;
476
477
// Anonymous inner class
478
JavaRDD<Integer> doubled = rdd.map(new Function<Integer, Integer>() {
479
public Integer call(Integer x) {
480
return x * 2;
481
}
482
});
483
484
// Lambda expression (Java 8+)
485
JavaRDD<Integer> doubled2 = rdd.map(x -> x * 2);
486
487
// Method reference (Java 8+)
488
JavaRDD<String> strings = rdd.map(Object::toString);
489
490
// Complex transformation with PairFunction
491
JavaPairRDD<String, Integer> pairs = words.mapToPair(
492
new PairFunction<String, String, Integer>() {
493
public Tuple2<String, Integer> call(String word) {
494
return new Tuple2<>(word.toLowerCase(), word.length());
495
}
496
}
497
);
498
499
// FlatMap example
500
JavaRDD<String> words = lines.flatMap(
501
new FlatMapFunction<String, String>() {
502
public Iterable<String> call(String line) {
503
return Arrays.asList(line.split("\\s+"));
504
}
505
}
506
);
507
508
// Void function for actions
509
rdd.foreach(new VoidFunction<Integer>() {
510
public void call(Integer x) {
511
System.out.println(x);
512
}
513
});
514
```
515
516
## Shared Variables in Java
517
518
### Broadcast Variables
519
520
```java
521
import org.apache.spark.broadcast.Broadcast;
522
import java.util.Map;
523
import java.util.HashMap;
524
525
// Create broadcast variable
526
Map<String, Integer> lookupTable = new HashMap<>();
527
lookupTable.put("apple", 1);
528
lookupTable.put("banana", 2);
529
lookupTable.put("orange", 3);
530
531
Broadcast<Map<String, Integer>> broadcastTable = jsc.broadcast(lookupTable);
532
533
// Use in transformations
534
JavaRDD<String> fruits = jsc.parallelize(Arrays.asList("apple", "banana", "apple"));
535
JavaRDD<Integer> codes = fruits.map(fruit ->
536
broadcastTable.value().getOrDefault(fruit, 0)
537
);
538
539
// Clean up
540
broadcastTable.unpersist();
541
```
542
543
### Accumulators
544
545
```java
546
import org.apache.spark.Accumulator;
547
548
// Create accumulator
549
Accumulator<Integer> errorCount = jsc.accumulator(0);
550
551
// Use in transformations
552
JavaRDD<String> lines = jsc.textFile("input.txt");
553
JavaRDD<String> validLines = lines.filter(line -> {
554
if (line.trim().isEmpty()) {
555
errorCount.add(1);
556
return false;
557
}
558
return true;
559
});
560
561
// Trigger action to update accumulator
562
validLines.count();
563
564
// Get accumulator value
565
System.out.println("Error count: " + errorCount.value());
566
567
// Custom accumulator types
568
Accumulator<Double> doubleAcc = jsc.accumulator(0.0);
569
```
570
571
## Complete Example
572
573
```java
574
import org.apache.spark.SparkConf;
575
import org.apache.spark.api.java.JavaSparkContext;
576
import org.apache.spark.api.java.JavaRDD;
577
import org.apache.spark.api.java.JavaPairRDD;
578
import org.apache.spark.api.java.function.*;
579
import scala.Tuple2;
580
import java.util.Arrays;
581
import java.util.Map;
582
583
public class SparkWordCount {
584
public static void main(String[] args) {
585
// Create Spark context
586
SparkConf conf = new SparkConf()
587
.setAppName("Java Word Count")
588
.setMaster("local[*]");
589
590
JavaSparkContext jsc = new JavaSparkContext(conf);
591
592
try {
593
// Read input file
594
JavaRDD<String> lines = jsc.textFile("input.txt");
595
596
// Split lines into words
597
JavaRDD<String> words = lines.flatMap(line ->
598
Arrays.asList(line.toLowerCase().split("\\s+"))
599
);
600
601
// Filter out empty words
602
JavaRDD<String> validWords = words.filter(word -> !word.trim().isEmpty());
603
604
// Create word-count pairs
605
JavaPairRDD<String, Integer> wordPairs = validWords.mapToPair(
606
word -> new Tuple2<>(word, 1)
607
);
608
609
// Sum counts by key
610
JavaPairRDD<String, Integer> wordCounts = wordPairs.reduceByKey(
611
(a, b) -> a + b
612
);
613
614
// Sort by count descending
615
JavaPairRDD<String, Integer> sortedCounts = wordCounts.mapToPair(
616
pair -> new Tuple2<>(pair._2, pair._1) // Swap to (count, word)
617
).sortByKey(false).mapToPair(
618
pair -> new Tuple2<>(pair._2, pair._1) // Swap back to (word, count)
619
);
620
621
// Collect and print results
622
Map<String, Integer> results = sortedCounts.collectAsMap();
623
624
System.out.println("Word Count Results:");
625
results.entrySet().stream()
626
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
627
.limit(10)
628
.forEach(entry ->
629
System.out.println(entry.getKey() + ": " + entry.getValue())
630
);
631
632
// Save results
633
sortedCounts.saveAsTextFile("output");
634
635
} finally {
636
// Stop Spark context
637
jsc.stop();
638
}
639
}
640
}
641
```
642
643
## Maven Dependencies
644
645
```xml
646
<dependencies>
647
<dependency>
648
<groupId>org.apache.spark</groupId>
649
<artifactId>spark-core_2.10</artifactId>
650
<version>1.0.0</version>
651
</dependency>
652
</dependencies>
653
```
654
655
This comprehensive guide covers the complete Java API for Apache Spark, enabling Java developers to build scalable data processing applications with type safety and familiar Java patterns.