Core utility classes and functions for Apache Spark including exception handling, logging, storage configuration, and Java API integration
—
Comprehensive functional interfaces for Spark's Java API, enabling type-safe lambda expressions and functional programming patterns for Java developers using Spark operations.
Base functional interfaces for common transformation and action operations.
/**
* Base single-argument function interface
* @param <T1> - Input type
* @param <R> - Return type
*/
@FunctionalInterface
public interface Function<T1, R> extends Serializable {
/**
* Apply function to input value
* @param v1 - Input value
* @return Transformed result
* @throws Exception - Any exception during function execution
*/
R call(T1 v1) throws Exception;
}
/**
* No-argument function interface
* @param <R> - Return type
*/
@FunctionalInterface
public interface Function0<R> extends Serializable {
/**
* Apply function with no arguments
* @return Function result
* @throws Exception - Any exception during function execution
*/
R call() throws Exception;
}
/**
* Two-argument function interface
* @param <T1> - First input type
* @param <T2> - Second input type
* @param <R> - Return type
*/
@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
/**
* Apply function to two input values
* @param v1 - First input value
* @param v2 - Second input value
* @return Transformed result
* @throws Exception - Any exception during function execution
*/
R call(T1 v1, T2 v2) throws Exception;
}
/**
* Three-argument function interface
* @param <T1> - First input type
* @param <T2> - Second input type
* @param <T3> - Third input type
* @param <R> - Return type
*/
@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
/**
* Apply function to three input values
* @param v1 - First input value
* @param v2 - Second input value
* @param v3 - Third input value
* @return Transformed result
* @throws Exception - Any exception during function execution
*/
R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
/**
* Four-argument function interface
* @param <T1> - First input type
* @param <T2> - Second input type
* @param <T3> - Third input type
* @param <T4> - Fourth input type
* @param <R> - Return type
*/
@FunctionalInterface
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
/**
* Apply function to four input values
* @param v1 - First input value
* @param v2 - Second input value
* @param v3 - Third input value
* @param v4 - Fourth input value
* @return Transformed result
* @throws Exception - Any exception during function execution
*/
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}Advanced functional interfaces for specific Spark operations like pair RDD creation, flatMap operations, and void functions.
/**
* Function that returns key-value pairs for creating PairRDDs
* @param <T> - Input type
* @param <K> - Key type
* @param <V> - Value type
*/
@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
/**
* Returns a key-value pair from input
* @param t - Input value
* @return Tuple2 containing key and value
* @throws Exception - Any exception during function execution
*/
Tuple2<K, V> call(T t) throws Exception;
}
/**
* Function that returns zero or more output records from each input record
* @param <T> - Input type
* @param <R> - Output type
*/
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
/**
* Returns iterator of output records
* @param t - Input value
* @return Iterator over output values
* @throws Exception - Any exception during function execution
*/
Iterator<R> call(T t) throws Exception;
}
/**
* Function with no return value for actions
* @param <T> - Input type
*/
@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
/**
* Execute action on input value
* @param t - Input value
* @throws Exception - Any exception during function execution
*/
void call(T t) throws Exception;
}
/**
* Two-argument void function for actions
* @param <T1> - First input type
* @param <T2> - Second input type
*/
@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
/**
* Execute action on two input values
* @param v1 - First input value
* @param v2 - Second input value
* @throws Exception - Any exception during function execution
*/
void call(T1 v1, T2 v2) throws Exception;
}
/**
* Function for filtering operations
* @param <T> - Input type
*/
@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
/**
* Test whether input should be included
* @param value - Input value to test
* @return true if value should be included
* @throws Exception - Any exception during function execution
*/
boolean call(T value) throws Exception;
}
/**
* Function for reducing operations
* @param <T> - Input and output type
*/
@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
/**
* Combine two values into one
* @param v1 - First value
* @param v2 - Second value
* @return Combined result
* @throws Exception - Any exception during function execution
*/
T call(T v1, T v2) throws Exception;
}
/**
* Function for operations on each partition
* @param <T> - Input type
*/
@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
/**
* Execute action on partition iterator
* @param t - Iterator over partition elements
* @throws Exception - Any exception during function execution
*/
void call(Iterator<T> t) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaRDD;
// Single-argument function for map operations
Function<String, Integer> stringLength = s -> s.length();
JavaRDD<Integer> lengths = stringRDD.map(stringLength);
// Two-argument function for reduce operations
Function2<Integer, Integer, Integer> sum = (a, b) -> a + b;
int total = numberRDD.reduce(sum);
// No-argument function for suppliers
Function0<String> currentTime = () -> java.time.Instant.now().toString();
// Multi-argument functions for complex operations
Function3<String, Integer, Boolean, String> formatter =
(str, num, flag) -> flag ? str.toUpperCase() + num : str + num;Functions that perform side effects without returning values, commonly used for actions like foreach.
/**
* Function with no return value (void function)
* @param <T> - Input type
*/
@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
/**
* Apply function to input value with no return
* @param t - Input value
* @throws Exception - Any exception during function execution
*/
void call(T t) throws Exception;
}
/**
* Two-argument function with no return value
* @param <T1> - First input type
* @param <T2> - Second input type
*/
@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
/**
* Apply function to two input values with no return
* @param t1 - First input value
* @param t2 - Second input value
* @throws Exception - Any exception during function execution
*/
void call(T1 t1, T2 t2) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
// Void function for foreach operations
VoidFunction<String> printString = s -> System.out.println(s);
stringRDD.foreach(printString);
// Void function for side effects
VoidFunction<Integer> incrementCounter = i -> {
counter.addAndGet(i);
logger.info("Processed: " + i);
};
numberRDD.foreach(incrementCounter);
// Two-argument void function
VoidFunction2<String, Integer> logPair = (key, value) -> {
System.out.println("Key: " + key + ", Value: " + value);
};Functions designed for specific Spark operations like mapping, filtering, and flat mapping.
/**
* Function for mapping transformations
* @param <T> - Input type
* @param <R> - Output type
*/
@FunctionalInterface
public interface MapFunction<T, R> extends Serializable {
/**
* Transform input value to output value
* @param value - Input value
* @return Transformed value
* @throws Exception - Any exception during transformation
*/
R call(T value) throws Exception;
}
/**
* Function for filtering operations
* @param <T> - Input type
*/
@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
/**
* Test if value should be included in result
* @param value - Input value to test
* @return true if value should be included, false otherwise
* @throws Exception - Any exception during filtering
*/
boolean call(T value) throws Exception;
}
/**
* Function for flat mapping operations (one-to-many)
* @param <T> - Input type
* @param <R> - Output element type
*/
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
/**
* Transform single input to iterator of outputs
* @param t - Input value
* @return Iterator of output values
* @throws Exception - Any exception during transformation
*/
Iterator<R> call(T t) throws Exception;
}
/**
* Function for flat mapping operations with two arguments
* @param <A> - First input type
* @param <B> - Second input type
* @param <R> - Output element type
*/
@FunctionalInterface
public interface FlatMapFunction2<A, B, R> extends Serializable {
/**
* Transform two inputs to iterator of outputs
* @param a - First input value
* @param b - Second input value
* @return Iterator of output values
* @throws Exception - Any exception during transformation
*/
Iterator<R> call(A a, B b) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
import java.util.*;
// Map function for Dataset operations
MapFunction<Person, String> getName = person -> person.getName();
Dataset<String> names = personDataset.map(getName, Encoders.STRING());
// Filter function
FilterFunction<Integer> isEven = i -> i % 2 == 0;
Dataset<Integer> evenNumbers = numberDataset.filter(isEven);
// Flat map function - split strings into words
FlatMapFunction<String, String> splitWords = line -> {
return Arrays.asList(line.split(" ")).iterator();
};
JavaRDD<String> words = linesRDD.flatMap(splitWords);
// Two-argument flat map
FlatMapFunction2<String, String, String> combineAndSplit = (s1, s2) -> {
String combined = s1 + " " + s2;
return Arrays.asList(combined.split(" ")).iterator();
};Functions that work with key-value pairs, essential for operations like groupByKey and reduceByKey.
/**
* Function that produces key-value pairs (Tuple2)
* @param <T> - Input type
* @param <K> - Key type
* @param <V> - Value type
*/
@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
/**
* Transform input to key-value pair
* @param t - Input value
* @return Tuple2 containing key and value
* @throws Exception - Any exception during transformation
*/
Tuple2<K, V> call(T t) throws Exception;
}
/**
* Flat map function that produces key-value pairs
* @param <T> - Input type
* @param <K> - Key type
* @param <V> - Value type
*/
@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
/**
* Transform input to iterator of key-value pairs
* @param t - Input value
* @return Iterator of Tuple2 containing keys and values
* @throws Exception - Any exception during transformation
*/
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
// Pair function for creating key-value pairs
PairFunction<String, String, Integer> wordToPair =
word -> new Tuple2<>(word, 1);
JavaPairRDD<String, Integer> wordCounts = wordsRDD.mapToPair(wordToPair);
// Pair flat map function
PairFlatMapFunction<String, String, Integer> lineToPairs = line -> {
List<Tuple2<String, Integer>> pairs = new ArrayList<>();
for (String word : line.split(" ")) {
pairs.add(new Tuple2<>(word, 1));
}
return pairs.iterator();
};
JavaPairRDD<String, Integer> wordPairs = linesRDD.flatMapToPair(lineToPairs);Functions for aggregation and reduction operations.
/**
* Function for reduction operations
* @param <T> - Type being reduced
*/
@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
/**
* Combine two values into one
* @param v1 - First value
* @param v2 - Second value
* @return Combined result
* @throws Exception - Any exception during reduction
*/
T call(T v1, T v2) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
// Reduction function for summing integers
ReduceFunction<Integer> sum = (a, b) -> a + b;
int total = numberRDD.reduce(sum);
// Reduction function for finding maximum
ReduceFunction<Double> max = (a, b) -> Math.max(a, b);
double maximum = doubleRDD.reduce(max);
// Reduction function for string concatenation
ReduceFunction<String> concat = (s1, s2) -> s1 + " " + s2;
String combined = stringRDD.reduce(concat);Functions specifically designed for numeric operations.
/**
* Function that returns double values
* @param <T> - Input type
*/
@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
/**
* Transform input to double value
* @param t - Input value
* @return Double result
* @throws Exception - Any exception during transformation
*/
double call(T t) throws Exception;
}
/**
* Flat map function that returns double values
* @param <T> - Input type
*/
@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
/**
* Transform input to iterator of double values
* @param t - Input value
* @return Iterator of double values
* @throws Exception - Any exception during transformation
*/
Iterator<Double> call(T t) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
// Double function for numeric extraction
DoubleFunction<String> parseDouble = s -> Double.parseDouble(s);
JavaDoubleRDD doubleRDD = stringRDD.mapToDouble(parseDouble);
// Double flat map function
DoubleFlatMapFunction<String> extractNumbers = line -> {
List<Double> numbers = new ArrayList<>();
for (String token : line.split(" ")) {
try {
numbers.add(Double.parseDouble(token));
} catch (NumberFormatException e) {
// Skip non-numeric tokens
}
}
return numbers.iterator();
};Functions for advanced grouping and co-grouping operations.
/**
* Function for co-grouping operations
* @param <V1> - First value type
* @param <V2> - Second value type
* @param <R> - Result type
*/
@FunctionalInterface
public interface CoGroupFunction<V1, V2, R> extends Serializable {
/**
* Process co-grouped values
* @param v1 - Iterator of first group values
* @param v2 - Iterator of second group values
* @return Processing result
* @throws Exception - Any exception during processing
*/
R call(Iterator<V1> v1, Iterator<V2> v2) throws Exception;
}
/**
* Function for mapping grouped data
* @param <K> - Key type
* @param <V> - Value type
* @param <R> - Result type
*/
@FunctionalInterface
public interface MapGroupsFunction<K, V, R> extends Serializable {
/**
* Process grouped values for a key
* @param key - Group key
* @param values - Iterator of values for the key
* @return Processing result
* @throws Exception - Any exception during processing
*/
R call(K key, Iterator<V> values) throws Exception;
}
/**
* Function for flat mapping grouped data
* @param <K> - Key type
* @param <V> - Value type
* @param <R> - Result element type
*/
@FunctionalInterface
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
/**
* Process grouped values and return iterator of results
* @param key - Group key
* @param values - Iterator of values for the key
* @return Iterator of processing results
* @throws Exception - Any exception during processing
*/
Iterator<R> call(K key, Iterator<V> values) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
// Map groups function for aggregation
MapGroupsFunction<String, Integer, Double> computeAverage = (key, values) -> {
int sum = 0;
int count = 0;
while (values.hasNext()) {
sum += values.next();
count++;
}
return count > 0 ? (double) sum / count : 0.0;
};
// Flat map groups function for expansion
FlatMapGroupsFunction<String, Person, String> extractEmails = (department, people) -> {
List<String> emails = new ArrayList<>();
while (people.hasNext()) {
Person person = people.next();
if (person.getEmail() != null) {
emails.add(person.getEmail());
}
}
return emails.iterator();
};Functions for partition-wise operations and actions.
/**
* Function for mapping entire partitions
* @param <T> - Input element type
* @param <R> - Output element type
*/
@FunctionalInterface
public interface MapPartitionsFunction<T, R> extends Serializable {
/**
* Process entire partition
* @param input - Iterator of partition elements
* @return Iterator of results
* @throws Exception - Any exception during processing
*/
Iterator<R> call(Iterator<T> input) throws Exception;
}
/**
* Function for foreach operations on elements
* @param <T> - Input type
*/
@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
/**
* Process single element (side effect)
* @param t - Input element
* @throws Exception - Any exception during processing
*/
void call(T t) throws Exception;
}
/**
* Function for foreach operations on partitions
* @param <T> - Input element type
*/
@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
/**
* Process entire partition (side effect)
* @param t - Iterator of partition elements
* @throws Exception - Any exception during processing
*/
void call(Iterator<T> t) throws Exception;
}Usage Examples:
import org.apache.spark.api.java.function.*;
// Map partitions function for batch processing
MapPartitionsFunction<String, String> processPartition = partition -> {
List<String> results = new ArrayList<>();
BatchProcessor processor = new BatchProcessor();
while (partition.hasNext()) {
results.add(processor.process(partition.next()));
}
processor.close();
return results.iterator();
};
// Foreach function for side effects
ForeachFunction<String> writeToFile = line -> {
fileWriter.write(line + "\n");
};
// Foreach partition function for batch side effects
ForeachPartitionFunction<Record> batchInsert = records -> {
DatabaseConnection conn = getConnection();
PreparedStatement stmt = conn.prepareStatement("INSERT INTO table VALUES (?)");
while (records.hasNext()) {
stmt.setString(1, records.next().getValue());
stmt.addBatch();
}
stmt.executeBatch();
conn.close();
};// Core function interfaces
@FunctionalInterface
interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
@FunctionalInterface
interface Function2<T1, T2, R> extends Serializable {
R call(T1 v1, T2 v2) throws Exception;
}
@FunctionalInterface
interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
// Specialized transformation interfaces
@FunctionalInterface
interface MapFunction<T, R> extends Serializable {
R call(T value) throws Exception;
}
@FunctionalInterface
interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
}
@FunctionalInterface
interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
// Pair operation interfaces
@FunctionalInterface
interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
// Reduction and aggregation interfaces
@FunctionalInterface
interface ReduceFunction<T> extends Serializable {
T call(T v1, T v2) throws Exception;
}
// Numeric operation interfaces
@FunctionalInterface
interface DoubleFunction<T> extends Serializable {
double call(T t) throws Exception;
}
// Grouping operation interfaces
@FunctionalInterface
interface MapGroupsFunction<K, V, R> extends Serializable {
R call(K key, Iterator<V> values) throws Exception;
}Install with Tessl CLI
npx tessl i tessl/maven-spark-common-utils