0
# Java API
1
2
Java-friendly wrappers providing the complete Spark functionality through Java-compatible interfaces, lambda support, and familiar Java collection types.
3
4
## Capabilities
5
6
### JavaSparkContext
7
8
Java-friendly wrapper for SparkContext providing all core Spark functionality with Java-compatible method signatures.
9
10
```java { .api }
11
/**
12
* A Java-friendly version of SparkContext that returns JavaRDDs and works with Java collections.
13
* In addition to the methods defined in JavaSparkContextVarargsWorkaround, this class also
14
* provides several convenience methods.
15
*/
16
public class JavaSparkContext implements Closeable {
17
18
// CONSTRUCTORS
19
20
/** Create a JavaSparkContext with a SparkConf object */
21
public JavaSparkContext(SparkConf conf)
22
23
/** Create a JavaSparkContext with explicit master and application name */
24
public JavaSparkContext(String master, String appName)
25
26
/** Create a JavaSparkContext with explicit parameters */
27
public JavaSparkContext(String master, String appName, String sparkHome, String[] jars)
28
29
// RDD CREATION
30
31
/** Distribute a local Java collection to form an RDD */
32
public <T> JavaRDD<T> parallelize(List<T> list)
33
34
/** Distribute a local Java collection with specific number of partitions */
35
public <T> JavaRDD<T> parallelize(List<T> list, int numSlices)
36
37
/** Create JavaRDD from pair collection */
38
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list)
39
40
/** Create JavaRDD from pair collection with specific partitions */
41
public <K, V> JavaPairRDD<K, V> parallelizePairs(List<Tuple2<K, V>> list, int numSlices)
42
43
// FILE INPUT
44
45
/** Read a text file from HDFS, local file system, or any Hadoop-supported file system URI */
46
public JavaRDD<String> textFile(String path)
47
48
/** Read a text file with minimum number of partitions */
49
public JavaRDD<String> textFile(String path, int minPartitions)
50
51
/** Read whole text files from a directory */
52
public JavaPairRDD<String, String> wholeTextFiles(String path)
53
54
/** Read whole text files with minimum partitions */
55
public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions)
56
57
/** Read binary files as byte arrays */
58
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path)
59
60
/** Read binary files with minimum partitions */
61
public JavaPairRDD<String, PortableDataStream> binaryFiles(String path, int minPartitions)
62
63
/** Read objects from sequence files */
64
public <T> JavaRDD<T> objectFile(String path)
65
66
/** Read objects with minimum partitions */
67
public <T> JavaRDD<T> objectFile(String path, int minPartitions)
68
69
/** Read Hadoop sequence files */
70
public <K, V> JavaPairRDD<K, V> sequenceFile(String path, Class<K> keyClass, Class<V> valueClass)
71
72
/** Read Hadoop sequence files with minimum partitions */
73
public <K, V> JavaPairRDD<K, V> sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions)
74
75
/** Read Hadoop files using old MapReduce API */
76
public <K, V> JavaPairRDD<K, V> hadoopFile(String path,
77
Class<? extends InputFormat<K, V>> inputFormatClass,
78
Class<K> keyClass,
79
Class<V> valueClass)
80
81
/** Read Hadoop files using new MapReduce API */
82
public <K, V> JavaPairRDD<K, V> newAPIHadoopFile(String path,
83
Class<? extends NewInputFormat<K, V>> inputFormatClass,
84
Class<K> keyClass,
85
Class<V> valueClass)
86
87
// BROADCAST AND ACCUMULATORS
88
89
/** Broadcast a read-only variable to all worker nodes */
90
public <T> Broadcast<T> broadcast(T value)
91
92
/** Create an accumulator variable */
93
public Accumulator<Integer> accumulator(Integer initialValue)
94
95
/** Create an accumulator with custom AccumulatorParam */
96
public <T> Accumulator<T> accumulator(T initialValue, AccumulatorParam<T> param)
97
98
/** Create a named accumulator */
99
public <T> Accumulator<T> accumulator(T initialValue, String name, AccumulatorParam<T> param)
100
101
// CONFIGURATION AND MANAGEMENT
102
103
/** Get the SparkConf for this context */
104
public SparkConf getConf()
105
106
/** Default level of parallelism */
107
public int defaultParallelism()
108
109
/** Return the Spark version */
110
public String version()
111
112
/** Stop the SparkContext */
113
public void stop()
114
115
/** Close the SparkContext (implements Closeable) */
116
public void close() throws IOException
117
118
/** Add a file to be downloaded with this Spark job */
119
public void addFile(String path)
120
121
/** Add a JAR dependency */
122
public void addJar(String path)
123
124
/** Set job group for all jobs started by this thread */
125
public void setJobGroup(String groupId, String description)
126
127
/** Set job group with interrupt flag */
128
public void setJobGroup(String groupId, String description, boolean interruptOnCancel)
129
130
/** Clear the job group */
131
public void clearJobGroup()
132
133
/** Set checkpoint directory */
134
public void setCheckpointDir(String dir)
135
136
/** Get checkpoint directory */
137
public String getCheckpointDir()
138
139
/** Set log level */
140
public void setLogLevel(String logLevel)
141
142
/** Get status tracker */
143
public JavaSparkStatusTracker statusTracker()
144
}
145
```
146
147
**Usage Examples:**
148
149
```java
150
import org.apache.spark.api.java.JavaSparkContext;
151
import org.apache.spark.api.java.JavaRDD;
152
import org.apache.spark.SparkConf;
153
import java.util.Arrays;
154
import java.util.List;
155
156
// Create Spark context
157
SparkConf conf = new SparkConf()
158
.setAppName("Java Spark Example")
159
.setMaster("local[*]");
160
JavaSparkContext sc = new JavaSparkContext(conf);
161
162
// Create RDD from collection
163
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
164
JavaRDD<Integer> rdd = sc.parallelize(data);
165
166
// Read text file
167
JavaRDD<String> textRDD = sc.textFile("hdfs://path/to/file.txt");
168
169
// Create pair RDD
170
List<Tuple2<String, Integer>> pairs = Arrays.asList(
171
new Tuple2<>("apple", 5),
172
new Tuple2<>("banana", 3)
173
);
174
JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(pairs);
175
176
// Broadcast variable
177
Map<String, String> lookup = new HashMap<>();
178
lookup.put("key1", "value1");
179
Broadcast<Map<String, String>> broadcastVar = sc.broadcast(lookup);
180
181
// Accumulator
182
Accumulator<Integer> sum = sc.accumulator(0);
183
184
// Always close
185
sc.close();
186
```
187
188
### JavaRDD
189
190
Java-friendly wrapper for RDD providing functional programming operations with Java 8 lambda support.
191
192
```java { .api }
193
/**
194
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
195
* Represents an immutable, partitioned collection of elements that can be operated on in parallel.
196
*/
197
public class JavaRDD<T> extends AbstractJavaRDDLike<T, JavaRDD<T>> {
198
199
// TRANSFORMATIONS
200
201
/** Apply a function to each element */
202
public <R> JavaRDD<R> map(Function<T, R> f)
203
204
/** Apply a function to each element and flatten results */
205
public <R> JavaRDD<R> flatMap(FlatMapFunction<T, R> f)
206
207
/** Filter elements using a predicate */
208
public JavaRDD<T> filter(Function<T, Boolean> f)
209
210
/** Remove duplicate elements */
211
public JavaRDD<T> distinct()
212
213
/** Remove duplicates with custom number of partitions */
214
public JavaRDD<T> distinct(int numPartitions)
215
216
/** Union with another JavaRDD */
217
public JavaRDD<T> union(JavaRDD<T> other)
218
219
/** Intersection with another JavaRDD */
220
public JavaRDD<T> intersection(JavaRDD<T> other)
221
222
/** Subtract elements found in another JavaRDD */
223
public JavaRDD<T> subtract(JavaRDD<T> other)
224
225
/** Cartesian product with another JavaRDD */
226
public <U> JavaPairRDD<T, U> cartesian(JavaRDD<U> other)
227
228
/** Sample elements */
229
public JavaRDD<T> sample(boolean withReplacement, double fraction)
230
231
/** Sample with seed */
232
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)
233
234
/** Group elements by partition */
235
public JavaRDD<List<T>> glom()
236
237
/** Apply function to each partition */
238
public <R> JavaRDD<R> mapPartitions(FlatMapFunction<Iterator<T>, R> f)
239
240
/** Apply function to each partition with partition index */
241
public <R> JavaRDD<R> mapPartitionsWithIndex(Function2<Integer, Iterator<T>, Iterator<R>> f,
242
boolean preservesPartitioning)
243
244
/** Zip with another JavaRDD */
245
public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other)
246
247
/** Zip with indices */
248
public JavaPairRDD<T, Long> zipWithIndex()
249
250
/** Zip with unique IDs */
251
public JavaPairRDD<T, Long> zipWithUniqueId()
252
253
/** Pipe through external command */
254
public JavaRDD<String> pipe(String command)
255
256
/** Coalesce to fewer partitions */
257
public JavaRDD<T> coalesce(int numPartitions)
258
259
/** Coalesce with shuffle option */
260
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)
261
262
/** Repartition to different number of partitions */
263
public JavaRDD<T> repartition(int numPartitions)
264
265
/** Sort elements */
266
public JavaRDD<T> sortBy(Function<T, ?> f, boolean ascending, int numPartitions)
267
268
// ACTIONS
269
270
/** Return all elements as a list */
271
public List<T> collect()
272
273
/** Return number of elements */
274
public long count()
275
276
/** Return first element */
277
public T first()
278
279
/** Return first num elements */
280
public List<T> take(int num)
281
282
/** Return top num elements */
283
public List<T> top(int num)
284
285
/** Return smallest num elements */
286
public List<T> takeOrdered(int num)
287
288
/** Take a random sample */
289
public List<T> takeSample(boolean withReplacement, int num)
290
291
/** Take sample with seed */
292
public List<T> takeSample(boolean withReplacement, int num, long seed)
293
294
/** Apply function to each element */
295
public void foreach(VoidFunction<T> f)
296
297
/** Apply function to each partition */
298
public void foreachPartition(VoidFunction<Iterator<T>> f)
299
300
/** Reduce elements using function */
301
public T reduce(Function2<T, T, T> f)
302
303
/** Fold elements with zero value */
304
public T fold(T zeroValue, Function2<T, T, T> op)
305
306
/** Aggregate with different types */
307
public <U> U aggregate(U zeroValue, Function2<U, T, U> seqOp, Function2<U, U, U> combOp)
308
309
/** Count elements by value */
310
public Map<T, Long> countByValue()
311
312
/** Check if RDD is empty */
313
public boolean isEmpty()
314
315
/** Save as text file */
316
public void saveAsTextFile(String path)
317
318
/** Save as object file */
319
public void saveAsObjectFile(String path)
320
321
// PERSISTENCE
322
323
/** Cache in memory */
324
public JavaRDD<T> cache()
325
326
/** Persist with storage level */
327
public JavaRDD<T> persist(StorageLevel newLevel)
328
329
/** Remove from cache */
330
public JavaRDD<T> unpersist()
331
332
/** Remove from cache with blocking option */
333
public JavaRDD<T> unpersist(boolean blocking)
334
335
/** Mark for checkpointing */
336
public void checkpoint()
337
338
// INFORMATION
339
340
/** Get number of partitions */
341
public int getNumPartitions()
342
343
/** Get storage level */
344
public StorageLevel getStorageLevel()
345
346
/** Check if checkpointed */
347
public boolean isCheckpointed()
348
349
/** Get RDD ID */
350
public int id()
351
352
/** Set name */
353
public JavaRDD<T> setName(String name)
354
355
/** Get name */
356
public String name()
357
358
// CONVERSION
359
360
/** Convert to JavaPairRDD using PairFunction */
361
public <K, V> JavaPairRDD<K, V> mapToPair(PairFunction<T, K, V> f)
362
363
/** Convert to JavaDoubleRDD using DoubleFunction */
364
public JavaDoubleRDD mapToDouble(DoubleFunction<T> f)
365
}
366
```
367
368
**Usage Examples:**
369
370
```java
371
import org.apache.spark.api.java.function.*;
372
import java.util.Arrays;
373
374
JavaRDD<String> lines = sc.textFile("data.txt");
375
376
// Transform data using lambda expressions (Java 8+)
377
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
378
JavaRDD<String> filtered = words.filter(word -> !word.isEmpty());
379
JavaRDD<Integer> lengths = words.map(String::length);
380
381
// Using anonymous classes (pre-Java 8)
382
JavaRDD<String> upperCase = words.map(new Function<String, String>() {
383
public String call(String word) {
384
return word.toUpperCase();
385
}
386
});
387
388
// Actions
389
List<String> collected = words.collect();
390
long count = words.count();
391
String firstWord = words.first();
392
List<String> sample = words.take(10);
393
394
// Reduce operations
395
int totalLength = lengths.reduce((a, b) -> a + b);
396
int maxLength = lengths.reduce(Integer::max);
397
398
// Aggregation
399
Tuple2<Integer, Integer> stats = lengths.aggregate(
400
new Tuple2<>(0, 0), // (sum, count)
401
(acc, len) -> new Tuple2<>(acc._1() + len, acc._2() + 1),
402
(acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2())
403
);
404
double averageLength = (double) stats._1() / stats._2();
405
```
406
407
### JavaPairRDD
408
409
Java-friendly wrapper for pair RDDs providing key-value operations.
410
411
```java { .api }
412
/**
413
* A Resilient Distributed Dataset of key-value pairs.
414
*/
415
public class JavaPairRDD<K, V> extends AbstractJavaRDDLike<Tuple2<K, V>, JavaPairRDD<K, V>> {
416
417
// GROUPING OPERATIONS
418
419
/** Group values by key */
420
public JavaPairRDD<K, Iterable<V>> groupByKey()
421
422
/** Group values by key with custom partitions */
423
public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions)
424
425
/** Group values by key with partitioner */
426
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)
427
428
// REDUCTION OPERATIONS
429
430
/** Combine values with same key using function */
431
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func)
432
433
/** Reduce by key with custom partitions */
434
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func, int numPartitions)
435
436
/** Reduce by key with partitioner */
437
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, Function2<V, V, V> func)
438
439
/** Fold values by key */
440
public JavaPairRDD<K, V> foldByKey(V zeroValue, Function2<V, V, V> func)
441
442
/** Fold by key with partitioner */
443
public JavaPairRDD<K, V> foldByKey(V zeroValue, Partitioner partitioner, Function2<V, V, V> func)
444
445
/** Aggregate values by key with different types */
446
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue,
447
Function2<U, V, U> seqFunc,
448
Function2<U, U, U> combFunc)
449
450
/** Aggregate by key with partitioner */
451
public <U> JavaPairRDD<K, U> aggregateByKey(U zeroValue,
452
Partitioner partitioner,
453
Function2<U, V, U> seqFunc,
454
Function2<U, U, U> combFunc)
455
456
/** Generic combine by key */
457
public <C> JavaPairRDD<K, C> combineByKey(Function<V, C> createCombiner,
458
Function2<C, V, C> mergeValue,
459
Function2<C, C, C> mergeCombiners)
460
461
// JOIN OPERATIONS
462
463
/** Inner join */
464
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
465
466
/** Inner join with partitioner */
467
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, Partitioner partitioner)
468
469
/** Left outer join */
470
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
471
472
/** Right outer join */
473
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
474
475
/** Full outer join */
476
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
477
478
/** Cogroup with another RDD */
479
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)
480
481
// SET OPERATIONS
482
483
/** Subtract by key */
484
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other)
485
486
// SORTING AND PARTITIONING
487
488
/** Sort by key */
489
public JavaPairRDD<K, V> sortByKey()
490
491
/** Sort by key with ascending flag */
492
public JavaPairRDD<K, V> sortByKey(boolean ascending)
493
494
/** Sort by key with partitions */
495
public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions)
496
497
/** Partition by partitioner */
498
public JavaPairRDD<K, V> partitionBy(Partitioner partitioner)
499
500
// EXTRACTION
501
502
/** Get keys as JavaRDD */
503
public JavaRDD<K> keys()
504
505
/** Get values as JavaRDD */
506
public JavaRDD<V> values()
507
508
/** Transform values only */
509
public <U> JavaPairRDD<K, U> mapValues(Function<V, U> f)
510
511
/** FlatMap values only */
512
public <U> JavaPairRDD<K, U> flatMapValues(Function<V, Iterable<U>> f)
513
514
// ACTIONS
515
516
/** Count by key */
517
public Map<K, Long> countByKey()
518
519
/** Collect as map */
520
public Map<K, V> collectAsMap()
521
522
/** Lookup values for key */
523
public List<V> lookup(K key)
524
525
// HADOOP OUTPUT
526
527
/** Save as Hadoop file */
528
public void saveAsHadoopFile(String path,
529
Class<?> keyClass,
530
Class<?> valueClass,
531
Class<? extends OutputFormat> outputFormatClass)
532
533
/** Save as new API Hadoop file */
534
public void saveAsNewAPIHadoopFile(String path,
535
Class<?> keyClass,
536
Class<?> valueClass,
537
Class<? extends NewOutputFormat> outputFormatClass)
538
}
539
```
540
541
**Usage Examples:**
542
543
```java
544
import scala.Tuple2;
545
import java.util.Arrays;
546
import java.util.Map;
547
import java.util.Optional;
548
549
// Create pair RDD
550
JavaRDD<String> lines = sc.textFile("data.txt");
551
JavaPairRDD<String, Integer> wordCounts = lines
552
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
553
.mapToPair(word -> new Tuple2<>(word, 1))
554
.reduceByKey(Integer::sum);
555
556
// Grouping operations
557
JavaPairRDD<String, Iterable<Integer>> grouped = wordCounts.groupByKey();
558
559
// Join operations
560
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
561
new Tuple2<>("apple", "fruit"),
562
new Tuple2<>("carrot", "vegetable")
563
));
564
565
JavaPairRDD<String, Tuple2<Integer, String>> joined = wordCounts.join(categories);
566
567
// Left outer join
568
JavaPairRDD<String, Tuple2<Integer, Optional<String>>> leftJoined =
569
wordCounts.leftOuterJoin(categories);
570
571
// Actions
572
Map<String, Long> counts = wordCounts.countByKey();
573
Map<String, Integer> asMap = wordCounts.collectAsMap();
574
List<Integer> appleCounts = wordCounts.lookup("apple");
575
576
// Extract keys and values
577
JavaRDD<String> words = wordCounts.keys();
578
JavaRDD<Integer> counts2 = wordCounts.values();
579
580
// Transform values
581
JavaPairRDD<String, String> formatted = wordCounts.mapValues(count -> "Count: " + count);
582
```
583
584
### JavaDoubleRDD
585
586
Java-friendly wrapper for RDDs of Double values with statistical operations.
587
588
```java { .api }
589
/**
590
* A Resilient Distributed Dataset of Double values, with statistical operations.
591
*/
592
public class JavaDoubleRDD extends AbstractJavaRDDLike<Double, JavaDoubleRDD> {
593
594
// STATISTICAL OPERATIONS
595
596
/** Compute mean */
597
public double mean()
598
599
/** Compute variance */
600
public double variance()
601
602
/** Compute standard deviation */
603
public double stdev()
604
605
/** Compute sum */
606
public double sum()
607
608
/** Compute statistics summary */
609
public StatCounter stats()
610
611
/** Compute histogram with number of buckets */
612
public Tuple2<double[], long[]> histogram(int buckets)
613
614
/** Compute histogram with bucket boundaries */
615
public long[] histogram(double[] buckets)
616
617
/** Find maximum value */
618
public double max()
619
620
/** Find minimum value */
621
public double min()
622
}
623
```
624
625
**Usage Examples:**
626
627
```java
628
JavaRDD<String> textRDD = sc.textFile("numbers.txt");
629
JavaDoubleRDD doubleRDD = textRDD.mapToDouble(Double::parseDouble);
630
631
// Statistics
632
double mean = doubleRDD.mean();
633
double variance = doubleRDD.variance();
634
double stdev = doubleRDD.stdev();
635
double sum = doubleRDD.sum();
636
637
// Complete statistics
638
StatCounter stats = doubleRDD.stats();
639
System.out.println("Count: " + stats.count());
640
System.out.println("Mean: " + stats.mean());
641
System.out.println("StdDev: " + stats.stdev());
642
643
// Histogram
644
Tuple2<double[], long[]> histogram = doubleRDD.histogram(10);
645
double[] buckets = histogram._1();
646
long[] counts = histogram._2();
647
```
648
649
## Functional Interfaces
650
651
Java 8 functional interfaces used in Spark Java API:
652
653
```java { .api }
654
// Basic function interfaces
655
interface Function<T, R> extends Serializable {
656
R call(T t) throws Exception;
657
}
658
659
interface Function2<T1, T2, R> extends Serializable {
660
R call(T1 t1, T2 t2) throws Exception;
661
}
662
663
interface VoidFunction<T> extends Serializable {
664
void call(T t) throws Exception;
665
}
666
667
// Specialized interfaces
668
interface PairFunction<T, K, V> extends Serializable {
669
Tuple2<K, V> call(T t) throws Exception;
670
}
671
672
interface FlatMapFunction<T, R> extends Serializable {
673
Iterator<R> call(T t) throws Exception;
674
}
675
676
interface DoubleFunction<T> extends Serializable {
677
Double call(T t) throws Exception;
678
}
679
```
680
681
## Performance Tips for Java API
682
683
### Lambda vs Anonymous Classes
684
```java
685
// Preferred: Lambda expressions (Java 8+)
686
JavaRDD<String> mapped = rdd.map(s -> s.toUpperCase());
687
688
// Alternative: Method references
689
JavaRDD<Integer> lengths = rdd.map(String::length);
690
691
// Legacy: Anonymous classes (verbose but compatible)
692
JavaRDD<String> mapped2 = rdd.map(new Function<String, String>() {
693
public String call(String s) { return s.toUpperCase(); }
694
});
695
```
696
697
### Avoid Serialization Issues
698
```java
699
// Bad: Capturing non-serializable objects
700
MyClass instance = new MyClass(); // Not serializable
701
JavaRDD<String> bad = rdd.map(s -> instance.process(s)); // Will fail
702
703
// Good: Use serializable objects or broadcast variables
704
Broadcast<MySerializableClass> broadcast = sc.broadcast(new MySerializableClass());
705
JavaRDD<String> good = rdd.map(s -> broadcast.value().process(s));
706
```
707
708
### Efficient Transformations
709
```java
710
// Efficient: Chain transformations before actions
711
List<String> result = rdd
712
.filter(s -> !s.isEmpty())
713
.map(String::toUpperCase)
714
.filter(s -> s.startsWith("A"))
715
.collect();
716
717
// Less efficient: Multiple actions
718
rdd.filter(s -> !s.isEmpty()).collect(); // First action
719
rdd.map(String::toUpperCase).collect(); // Second action (recomputation)
720
```