0
# Java Functional Interfaces
1
2
Comprehensive set of functional interfaces enabling Java integration with Spark's functional programming model through lambda expressions, method references, and anonymous functions.
3
4
## Capabilities
5
6
### Core Function Interfaces
7
8
Base functional interfaces for transforming and processing data with different arities.
9
10
```java { .api }
11
/**
12
* Base interface for functions whose return types do not create special RDDs
13
* Supports lambda expressions and method references
14
*/
15
@FunctionalInterface
16
public interface Function<T1, R> extends Serializable {
17
/**
18
* Apply function to input value
19
* @param v1 Input value
20
* @return Transformed result
21
* @throws Exception If transformation fails
22
*/
23
R call(T1 v1) throws Exception;
24
}
25
26
/**
27
* Function with no arguments
28
*/
29
@FunctionalInterface
30
public interface Function0<R> extends Serializable {
31
R call() throws Exception;
32
}
33
34
/**
35
* Function with two arguments
36
*/
37
@FunctionalInterface
38
public interface Function2<T1, T2, R> extends Serializable {
39
R call(T1 v1, T2 v2) throws Exception;
40
}
41
42
/**
43
* Function with three arguments
44
*/
45
@FunctionalInterface
46
public interface Function3<T1, T2, T3, R> extends Serializable {
47
R call(T1 v1, T2 v2, T3 v3) throws Exception;
48
}
49
50
/**
51
* Function with four arguments
52
*/
53
@FunctionalInterface
54
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
55
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
56
}
57
```
58
59
**Usage Examples:**
60
61
```java
62
import org.apache.spark.api.java.function.*;
63
64
// Lambda expressions
65
Function<String, Integer> stringLength = s -> s.length();
66
Function<Integer, String> intToString = i -> i.toString();
67
68
// Method references
69
Function<String, String> toUpperCase = String::toUpperCase;
70
Function0<Long> currentTime = System::currentTimeMillis;
71
72
// Multi-argument functions
73
Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
74
Function3<String, String, String, String> concat3 = (a, b, c) -> a + b + c;
75
76
// Anonymous function classes
77
Function<Employee, String> getName = new Function<Employee, String>() {
78
@Override
79
public String call(Employee emp) throws Exception {
80
return emp.getName();
81
}
82
};
83
```
84
85
### Dataset and RDD Transformation Interfaces
86
87
Specialized interfaces for common Dataset and RDD operations.
88
89
```java { .api }
90
/**
91
* Base interface for map functions used in Dataset operations
92
* Transforms each element individually
93
*/
94
@FunctionalInterface
95
public interface MapFunction<T, U> extends Serializable {
96
/**
97
* Transform input value to output value
98
* @param value Input value
99
* @return Transformed output value
100
* @throws Exception If transformation fails
101
*/
102
U call(T value) throws Exception;
103
}
104
105
/**
106
* Interface for flat map operations that produce multiple output elements
107
*/
108
@FunctionalInterface
109
public interface FlatMapFunction<T, R> extends Serializable {
110
/**
111
* Transform single input into multiple outputs
112
* @param t Input value
113
* @return Iterator over output values
114
* @throws Exception If transformation fails
115
*/
116
Iterator<R> call(T t) throws Exception;
117
}
118
119
/**
120
* Interface for filtering operations
121
*/
122
@FunctionalInterface
123
public interface FilterFunction<T> extends Serializable {
124
/**
125
* Test whether element should be included in result
126
* @param value Input value to test
127
* @return true if element should be included
128
* @throws Exception If predicate evaluation fails
129
*/
130
boolean call(T value) throws Exception;
131
}
132
133
/**
134
* Interface for map partitions operations
135
* Transforms entire partitions rather than individual elements
136
*/
137
@FunctionalInterface
138
public interface MapPartitionsFunction<T, U> extends Serializable {
139
/**
140
* Transform partition of elements
141
* @param input Iterator over partition elements
142
* @return Iterator over transformed elements
143
* @throws Exception If transformation fails
144
*/
145
Iterator<U> call(Iterator<T> input) throws Exception;
146
}
147
```
148
149
**Usage Examples:**
150
151
```java
152
// Map transformations
153
MapFunction<String, Integer> parseInteger = Integer::parseInt;
154
MapFunction<Person, String> extractName = person -> person.getName();
155
156
// Flat map transformations
157
FlatMapFunction<String, String> splitWords = line -> Arrays.asList(line.split(" ")).iterator();
158
FlatMapFunction<List<Integer>, Integer> flattenList = list -> list.iterator();
159
160
// Filter operations
161
FilterFunction<Integer> isPositive = x -> x > 0;
162
FilterFunction<String> isNotEmpty = s -> !s.isEmpty();
163
164
// Partition-level operations
165
MapPartitionsFunction<String, String> processPartition = partition -> {
166
List<String> results = new ArrayList<>();
167
while (partition.hasNext()) {
168
String line = partition.next();
169
// Process entire partition with shared resources
170
results.add(line.toUpperCase());
171
}
172
return results.iterator();
173
};
174
```
175
176
### Key-Value Pair Operations
177
178
Interfaces for operations that work with key-value pairs and create PairRDDs.
179
180
```java { .api }
181
/**
182
* Function that returns key-value pairs for constructing PairRDDs
183
*/
184
@FunctionalInterface
185
public interface PairFunction<T, K, V> extends Serializable {
186
/**
187
* Extract key-value pair from input element
188
* @param t Input element
189
* @return Tuple2 containing key and value
190
* @throws Exception If extraction fails
191
*/
192
Tuple2<K, V> call(T t) throws Exception;
193
}
194
195
/**
196
* Flat map function that produces key-value pairs
197
*/
198
@FunctionalInterface
199
public interface PairFlatMapFunction<T, K, V> extends Serializable {
200
/**
201
* Transform single input into multiple key-value pairs
202
* @param t Input element
203
* @return Iterator over key-value tuples
204
* @throws Exception If transformation fails
205
*/
206
Iterator<Tuple2<K, V>> call(T t) throws Exception;
207
}
208
209
/**
210
* Function for flat map operations with two inputs (used in cogroup)
211
*/
212
@FunctionalInterface
213
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
214
/**
215
* Transform two inputs into multiple outputs
216
* @param t1 First input
217
* @param t2 Second input
218
* @return Iterator over output values
219
* @throws Exception If transformation fails
220
*/
221
Iterator<R> call(T1 t1, T2 t2) throws Exception;
222
}
223
```
224
225
**Usage Examples:**
226
227
```java
228
import scala.Tuple2;
229
230
// Creating key-value pairs
231
PairFunction<String, String, Integer> wordCount =
232
word -> new Tuple2<>(word, 1);
233
234
PairFunction<Employee, String, Employee> indexByName =
235
emp -> new Tuple2<>(emp.getName(), emp);
236
237
// Flat map to pairs
238
PairFlatMapFunction<String, String, Integer> wordFrequency = line -> {
239
List<Tuple2<String, Integer>> pairs = new ArrayList<>();
240
for (String word : line.split(" ")) {
241
pairs.add(new Tuple2<>(word, 1));
242
}
243
return pairs.iterator();
244
};
245
246
// Two-input flat map
247
FlatMapFunction2<String, Integer, String> replicate = (str, count) -> {
248
List<String> result = new ArrayList<>();
249
for (int i = 0; i < count; i++) {
250
result.add(str);
251
}
252
return result.iterator();
253
};
254
```
255
256
### Aggregate and Reduce Operations
257
258
Interfaces for aggregation, reduction, and grouping operations.
259
260
```java { .api }
261
/**
262
* Function for reduce operations
263
*/
264
@FunctionalInterface
265
public interface ReduceFunction<T> extends Serializable {
266
/**
267
* Combine two values into a single value
268
* @param v1 First value
269
* @param v2 Second value
270
* @return Combined result
271
* @throws Exception If combination fails
272
*/
273
T call(T v1, T v2) throws Exception;
274
}
275
276
/**
277
* Function for cogroup operations combining iterables from two RDDs
278
*/
279
@FunctionalInterface
280
public interface CoGroupFunction<V1, V2, R> extends Serializable {
281
/**
282
* Process cogroup result with iterables from both RDDs
283
* @param v1 Values from first RDD for this key
284
* @param v2 Values from second RDD for this key
285
* @return Iterator over output values
286
* @throws Exception If processing fails
287
*/
288
Iterator<R> call(Iterable<V1> v1, Iterable<V2> v2) throws Exception;
289
}
290
291
/**
292
* Function for map groups operations on grouped data
293
*/
294
@FunctionalInterface
295
public interface MapGroupsFunction<K, V, R> extends Serializable {
296
/**
297
* Process all values for a given key
298
* @param key The grouping key
299
* @param values Iterator over all values for this key
300
* @return Single result for this group
301
* @throws Exception If processing fails
302
*/
303
R call(K key, Iterator<V> values) throws Exception;
304
}
305
306
/**
307
* Function for flat map groups operations on grouped data
308
*/
309
@FunctionalInterface
310
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
311
/**
312
* Process all values for a given key, producing multiple outputs
313
* @param key The grouping key
314
* @param values Iterator over all values for this key
315
* @return Iterator over results for this group
316
* @throws Exception If processing fails
317
*/
318
Iterator<R> call(K key, Iterator<V> values) throws Exception;
319
}
320
```
321
322
**Usage Examples:**
323
324
```java
325
// Reduce operations
326
ReduceFunction<Integer> sum = (a, b) -> a + b;
327
ReduceFunction<String> concatenate = (s1, s2) -> s1 + s2;
328
329
// Group processing
330
MapGroupsFunction<String, Integer, Integer> sumByKey = (key, values) -> {
331
int sum = 0;
332
while (values.hasNext()) {
333
sum += values.next();
334
}
335
return sum;
336
};
337
338
FlatMapGroupsFunction<String, Person, String> extractEmails = (dept, people) -> {
339
List<String> emails = new ArrayList<>();
340
while (people.hasNext()) {
341
Person person = people.next();
342
if (person.getEmail() != null) {
343
emails.add(person.getEmail());
344
}
345
}
346
return emails.iterator();
347
};
348
349
// Cogroup operations
350
CoGroupFunction<Integer, String, String> joinData = (numbers, strings) -> {
351
List<String> results = new ArrayList<>();
352
for (Integer num : numbers) {
353
for (String str : strings) {
354
results.add(str + ":" + num);
355
}
356
}
357
return results.iterator();
358
};
359
```
360
361
### Action and Side-Effect Interfaces
362
363
Interfaces for actions that produce side effects rather than transformations.
364
365
```java { .api }
366
/**
367
* Function with void return type for foreach operations
368
*/
369
@FunctionalInterface
370
public interface VoidFunction<T> extends Serializable {
371
/**
372
* Process element with side effects (no return value)
373
* @param t Input element
374
* @throws Exception If processing fails
375
*/
376
void call(T t) throws Exception;
377
}
378
379
/**
380
* Function with void return type and two arguments
381
*/
382
@FunctionalInterface
383
public interface VoidFunction2<T1, T2> extends Serializable {
384
/**
385
* Process two elements with side effects
386
* @param v1 First input
387
* @param v2 Second input
388
* @throws Exception If processing fails
389
*/
390
void call(T1 v1, T2 v2) throws Exception;
391
}
392
393
/**
394
* Function for foreach operations on individual elements
395
*/
396
@FunctionalInterface
397
public interface ForeachFunction<T> extends Serializable {
398
/**
399
* Process single element with side effects
400
* @param t Input element
401
* @throws Exception If processing fails
402
*/
403
void call(T t) throws Exception;
404
}
405
406
/**
407
* Function for foreach operations on partitions
408
*/
409
@FunctionalInterface
410
public interface ForeachPartitionFunction<T> extends Serializable {
411
/**
412
* Process entire partition with side effects
413
* @param t Iterator over partition elements
414
* @throws Exception If processing fails
415
*/
416
void call(Iterator<T> t) throws Exception;
417
}
418
```
419
420
**Usage Examples:**
421
422
```java
423
// Element-wise side effects
424
VoidFunction<String> printLine = System.out::println;
425
ForeachFunction<Employee> saveEmployee = emp -> database.save(emp);
426
427
// Two-argument side effects
428
VoidFunction2<String, Integer> logWithCount = (msg, count) ->
429
logger.info("Message: {} (count: {})", msg, count);
430
431
// Partition-level side effects
432
ForeachPartitionFunction<Record> batchSave = partition -> {
433
List<Record> batch = new ArrayList<>();
434
while (partition.hasNext()) {
435
batch.add(partition.next());
436
if (batch.size() >= 1000) {
437
database.saveBatch(batch);
438
batch.clear();
439
}
440
}
441
if (!batch.isEmpty()) {
442
database.saveBatch(batch);
443
}
444
};
445
```
446
447
### Specialized Type Functions
448
449
Functions for creating specialized RDD types like DoubleRDD.
450
451
```java { .api }
452
/**
453
* Function that returns double values for creating DoubleRDDs
454
*/
455
@FunctionalInterface
456
public interface DoubleFunction<T> extends Serializable {
457
/**
458
* Extract double value from input element
459
* @param t Input element
460
* @return Double value
461
* @throws Exception If extraction fails
462
*/
463
double call(T t) throws Exception;
464
}
465
466
/**
467
* Flat map function that produces double values
468
*/
469
@FunctionalInterface
470
public interface DoubleFlatMapFunction<T> extends Serializable {
471
/**
472
* Transform single input into multiple double values
473
* @param t Input element
474
* @return Iterator over double values
475
* @throws Exception If transformation fails
476
*/
477
Iterator<Double> call(T t) throws Exception;
478
}
479
```
480
481
**Usage Examples:**
482
483
```java
484
// Extract numeric values
485
DoubleFunction<String> parseDouble = Double::parseDouble;
486
DoubleFunction<Employee> getSalary = emp -> emp.getSalary();
487
488
// Generate multiple numeric values
489
DoubleFlatMapFunction<String> parseNumbers = line -> {
490
List<Double> numbers = new ArrayList<>();
491
for (String token : line.split(",")) {
492
try {
493
numbers.add(Double.parseDouble(token.trim()));
494
} catch (NumberFormatException e) {
495
// Skip invalid numbers
496
}
497
}
498
return numbers.iterator();
499
};
500
```
501
502
## Function Composition and Patterns
503
504
### Lambda Expression Patterns
505
506
```java
507
// Simple transformations
508
Function<String, String> trim = String::trim;
509
Function<String, Integer> length = String::length;
510
511
// Chained method references
512
Function<String, String> process = s -> s.trim().toLowerCase();
513
514
// Conditional logic
515
FilterFunction<Integer> isEven = x -> x % 2 == 0;
516
Function<Integer, String> classify = x -> x > 0 ? "positive" : "non-positive";
517
518
// Complex business logic
519
MapFunction<Order, OrderSummary> summarizeOrder = order -> {
520
double total = order.getItems().stream()
521
.mapToDouble(item -> item.getPrice() * item.getQuantity())
522
.sum();
523
return new OrderSummary(order.getId(), total, order.getCustomerId());
524
};
525
```
526
527
### Error Handling in Functions
528
529
```java
530
// Exception handling within functions
531
Function<String, Integer> safeParseInt = s -> {
532
try {
533
return Integer.parseInt(s);
534
} catch (NumberFormatException e) {
535
return 0; // Default value
536
}
537
};
538
539
// Filtering with exception handling
540
FilterFunction<String> isValidNumber = s -> {
541
try {
542
Double.parseDouble(s);
543
return true;
544
} catch (NumberFormatException e) {
545
return false;
546
}
547
};
548
```