Java-friendly wrappers for Spark functionality providing type-safe distributed processing and seamless integration with Java applications.
Java-friendly version of SparkContext providing the main entry point for Java Spark applications.
/**
* Java-friendly wrapper for SparkContext
*/
public class JavaSparkContext {
/** Create JavaSparkContext from SparkContext */
public JavaSparkContext(SparkContext sc)
/** Create JavaSparkContext from SparkConf */
public JavaSparkContext(SparkConf conf)
/** Create JavaSparkContext with app name and master URL */
public JavaSparkContext(String master, String appName)
/** Create RDD from Java collection */
public <T> JavaRDD<T> parallelize(java.util.List<T> list)
public <T> JavaRDD<T> parallelize(java.util.List<T> list, int numSlices)
/** Create pair RDD from Java collection */
public <K, V> JavaPairRDD<K, V> parallelizePairs(java.util.List<scala.Tuple2<K, V>> list)
public <K, V> JavaPairRDD<K, V> parallelizePairs(java.util.List<scala.Tuple2<K, V>> list, int numSlices)
/** Read text file */
public JavaRDD<String> textFile(String path)
public JavaRDD<String> textFile(String path, int minPartitions)
/** Read whole text files */
public JavaPairRDD<String, String> wholeTextFiles(String path)
public JavaPairRDD<String, String> wholeTextFiles(String path, int minPartitions)
/** Create RDD from Hadoop InputFormat */
public <K, V> JavaPairRDD<K, V> hadoopRDD(
JobConf conf,
Class<? extends InputFormat<K, V>> inputFormatClass,
Class<K> keyClass,
Class<V> valueClass
)
/** Create RDD from new Hadoop InputFormat */
public <K, V> JavaPairRDD<K, V> newAPIHadoopRDD(
Configuration conf,
Class<? extends NewInputFormat<K, V>> fClass,
Class<K> kClass,
Class<V> vClass
)
/** Create broadcast variable */
public <T> Broadcast<T> broadcast(T value)
/** Create accumulator */
public LongAccumulator longAccumulator()
public LongAccumulator longAccumulator(String name)
public DoubleAccumulator doubleAccumulator()
public DoubleAccumulator doubleAccumulator(String name)
public <T> CollectionAccumulator<T> collectionAccumulator()
public <T> CollectionAccumulator<T> collectionAccumulator(String name)
/** Add file to Spark job */
public void addFile(String path)
public void addFile(String path, boolean recursive)
/** Add JAR file */
public void addJar(String path)
/** Set checkpoint directory */
public void setCheckpointDir(String dir)
/** Get underlying SparkContext */
public SparkContext sc()
/** Get status tracker */
public JavaSparkStatusTracker statusTracker()
/** Stop JavaSparkContext */
public void stop()
/** Close JavaSparkContext (same as stop) */
public void close()
}Java-friendly wrapper for RDD providing type-safe distributed operations.
/**
* Java-friendly wrapper for RDD
*/
public class JavaRDD<T> {
// Transformations
/** Transform each element */
public <R> JavaRDD<R> map(org.apache.spark.api.java.function.Function<T, R> f)
/** Transform and flatten */
public <R> JavaRDD<R> flatMap(org.apache.spark.api.java.function.FlatMapFunction<T, R> f)
/** Filter elements */
public JavaRDD<T> filter(org.apache.spark.api.java.function.Function<T, Boolean> f)
/** Map with partition index */
public <R> JavaRDD<R> mapPartitionsWithIndex(
org.apache.spark.api.java.function.org.apache.spark.api.java.function.Function2<Integer, java.util.Iterator<T>, java.util.Iterator<R>> f,
boolean preservesPartitioning
)
/** Sample elements */
public JavaRDD<T> sample(boolean withReplacement, double fraction)
public JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)
/** Union with another RDD */
public JavaRDD<T> union(JavaRDD<T> other)
/** Intersection with another RDD */
public JavaRDD<T> intersection(JavaRDD<T> other)
/** Get distinct elements */
public JavaRDD<T> distinct()
public JavaRDD<T> distinct(int numPartitions)
/** Group by key function */
public <K> JavaPairRDD<K, Iterable<T>> groupBy(Function<T, K> f)
/** Coalesce partitions */
public JavaRDD<T> coalesce(int numPartitions)
public JavaRDD<T> coalesce(int numPartitions, boolean shuffle)
/** Repartition */
public JavaRDD<T> repartition(int numPartitions)
/** Sort by key function */
public <S> JavaRDD<T> sortBy(Function<T, S> f, boolean ascending, int numPartitions)
/** Zip with another RDD */
public <U> JavaPairRDD<T, U> zip(JavaRDD<U> other)
/** Zip with indices */
public JavaPairRDD<T, Long> zipWithIndex()
/** Zip with unique IDs */
public JavaPairRDD<T, Long> zipWithUniqueId()
/** Map to pair RDD */
public <K, V> JavaPairRDD<K, V> mapToPair(PairFunction<T, K, V> f)
// Actions
/** Collect all elements */
public List<T> collect()
/** Count elements */
public long count()
/** Get first element */
public T first()
/** Take first n elements */
public List<T> take(int num)
/** Take ordered elements */
public List<T> takeOrdered(int num)
public List<T> takeOrdered(int num, Comparator<T> comp)
/** Take random sample */
public List<T> takeSample(boolean withReplacement, int num)
public List<T> takeSample(boolean withReplacement, int num, long seed)
/** Reduce elements */
public T reduce(org.apache.spark.api.java.function.Function2<T, T, T> f)
/** Fold with zero value */
public T fold(T zeroValue, org.apache.spark.api.java.function.Function2<T, T, T> op)
/** Aggregate with different types */
public <U> U aggregate(U zeroValue, org.apache.spark.api.java.function.Function2<U, T, U> seqOp, org.apache.spark.api.java.function.Function2<U, U, U> combOp)
/** Tree reduce */
public T treeReduce(org.apache.spark.api.java.function.Function2<T, T, T> f)
/** Tree aggregate */
public <U> U treeAggregate(
U zeroValue,
org.apache.spark.api.java.function.Function2<U, T, U> seqOp,
org.apache.spark.api.java.function.Function2<U, U, U> combOp,
int depth
)
/** Apply function to each element */
public void foreach(VoidFunction<T> f)
/** Apply function to each partition */
public void foreachPartition(VoidFunction<Iterator<T>> f)
/** Count by value */
public Map<T, Long> countByValue()
/** Save as text file */
public void saveAsTextFile(String path)
public void saveAsTextFile(String path, Class<? extends CompressionCodec> codec)
// Persistence
/** Persist with storage level */
public JavaRDD<T> persist(StorageLevel newLevel)
/** Cache in memory */
public JavaRDD<T> cache()
/** Unpersist */
public JavaRDD<T> unpersist()
public JavaRDD<T> unpersist(boolean blocking)
/** Checkpoint */
public void checkpoint()
/** Check if empty */
public boolean isEmpty()
// Metadata
/** Get partitions */
public List<Partition> partitions()
/** Get storage level */
public StorageLevel getStorageLevel()
/** Convert to Scala RDD */
public RDD<T> rdd()
}Java-friendly wrapper for pair RDDs providing key-value operations.
/**
* Java-friendly wrapper for pair RDD
*/
public class JavaPairRDD<K, V> {
// Transformations
/** Map values */
public <W> JavaPairRDD<K, W> mapValues(Function<V, W> f)
/** Flat map values */
public <W> JavaPairRDD<K, W> flatMapValues(Function<V, Iterable<W>> f)
/** Map to different key-value pairs */
public <K2, V2> JavaPairRDD<K2, V2> mapToPair(PairFunction<Tuple2<K, V>, K2, V2> f)
/** Group by key */
public JavaPairRDD<K, Iterable<V>> groupByKey()
public JavaPairRDD<K, Iterable<V>> groupByKey(int numPartitions)
public JavaPairRDD<K, Iterable<V>> groupByKey(Partitioner partitioner)
/** Reduce by key */
public JavaPairRDD<K, V> reduceByKey(org.apache.spark.api.java.function.Function2<V, V, V> func)
public JavaPairRDD<K, V> reduceByKey(org.apache.spark.api.java.function.Function2<V, V, V> func, int numPartitions)
public JavaPairRDD<K, V> reduceByKey(Partitioner partitioner, org.apache.spark.api.java.function.Function2<V, V, V> func)
/** Aggregate by key */
public <U> JavaPairRDD<K, U> aggregateByKey(
U zeroValue,
org.apache.spark.api.java.function.Function2<U, V, U> seqFunc,
org.apache.spark.api.java.function.Function2<U, U, U> combFunc
)
public <U> JavaPairRDD<K, U> aggregateByKey(
U zeroValue,
int numPartitions,
org.apache.spark.api.java.function.Function2<U, V, U> seqFunc,
org.apache.spark.api.java.function.Function2<U, U, U> combFunc
)
/** Fold by key */
public JavaPairRDD<K, V> foldByKey(V zeroValue, org.apache.spark.api.java.function.Function2<V, V, V> func)
public JavaPairRDD<K, V> foldByKey(V zeroValue, int numPartitions, org.apache.spark.api.java.function.Function2<V, V, V> func)
/** Combine by key */
public <C> JavaPairRDD<K, C> combineByKey(
org.apache.spark.api.java.function.Function<V, C> createCombiner,
org.apache.spark.api.java.function.Function2<C, V, C> mergeValue,
org.apache.spark.api.java.function.Function2<C, C, C> mergeCombiners
)
/** Join operations */
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other, int numPartitions)
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
/** Cogroup operations */
public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>> cogroup(JavaPairRDD<K, W> other)
/** Sort by key */
public JavaPairRDD<K, V> sortByKey()
public JavaPairRDD<K, V> sortByKey(boolean ascending)
public JavaPairRDD<K, V> sortByKey(boolean ascending, int numPartitions)
/** Get keys */
public JavaRDD<K> keys()
/** Get values */
public JavaRDD<V> values()
/** Subtract by key */
public <W> JavaPairRDD<K, V> subtractByKey(JavaPairRDD<K, W> other)
// Actions
/** Lookup values for key */
public List<V> lookup(K key)
/** Collect as map */
public Map<K, V> collectAsMap()
/** Count by key */
public Map<K, Long> countByKey()
/** Count by key approximately */
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(long timeout)
/** Save as Hadoop file */
public void saveAsHadoopFile(
String path,
Class<?> keyClass,
Class<?> valueClass,
Class<? extends OutputFormat> outputFormatClass
)
/** Save as new API Hadoop file */
public void saveAsNewAPIHadoopFile(
String path,
Class<?> keyClass,
Class<?> valueClass,
Class<? extends NewOutputFormat> outputFormatClass
)
}Java-friendly wrapper for RDDs of doubles providing statistical operations.
/**
* Java-friendly wrapper for RDD of doubles
*/
public class JavaDoubleRDD {
/** Compute mean */
public double mean()
/** Compute variance */
public double variance()
/** Compute standard deviation */
public double stdev()
/** Compute sum */
public double sum()
/** Compute statistics */
public StatCounter stats()
/** Compute histogram */
public Tuple2<double[], long[]> histogram(int buckets)
public long[] histogram(double[] buckets)
/** Sum approximately */
public PartialResult<BoundedDouble> sumApprox(long timeout)
/** Mean approximately */
public PartialResult<BoundedDouble> meanApprox(long timeout)
}Java 8 compatible function interfaces for transformations.
/** Function interface for map operations */
@FunctionalInterface
public interface Function<T, R> extends Serializable {
R call(T t) throws Exception;
}
/** Function interface for pair transformations */
@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
/** Function interface for flat map operations */
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
/** Function interface for two-argument operations */
@FunctionalInterface
public interface org.apache.spark.api.java.function.Function2<T1, T2, R> extends Serializable {
R call(T1 t1, T2 t2) throws Exception;
}
/** Function interface for void operations */
@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}/**
* Storage level constants for Java API
*/
public class StorageLevels {
public static final StorageLevel MEMORY_ONLY = StorageLevel.MEMORY_ONLY();
public static final StorageLevel MEMORY_AND_DISK = StorageLevel.MEMORY_AND_DISK();
public static final StorageLevel MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY_SER();
public static final StorageLevel MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK_SER();
public static final StorageLevel DISK_ONLY = StorageLevel.DISK_ONLY();
public static final StorageLevel MEMORY_ONLY_2 = StorageLevel.MEMORY_ONLY_2();
public static final StorageLevel MEMORY_AND_DISK_2 = StorageLevel.MEMORY_AND_DISK_2();
}Usage Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
// Setup
SparkConf conf = new SparkConf()
.setAppName("Java Spark Example")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Create RDD
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data);
// Transformations
JavaRDD<Integer> squares = rdd.map(x -> x * x);
JavaRDD<Integer> evens = rdd.filter(x -> x % 2 == 0);
// Pair operations
JavaPairRDD<String, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>("num", x));
JavaPairRDD<String, Integer> sums = pairs.reduceByKey((a, b) -> a + b);
// Actions
List<Integer> result = squares.collect();
long count = rdd.count();
int sum = rdd.reduce((a, b) -> a + b);
// Cleanup
sc.close();