DeepLearning4j Spark integration component providing DataVec transformations and DataSet operations for distributed deep learning in Apache Spark environments
npx @tessl/cli install tessl/maven-org-deeplearning4j--dl4j-spark-2-11@0.9.0DeepLearning4j Spark Integration (dl4j-spark) provides data preprocessing and transformation operations specifically designed for Apache Spark environments within the DeepLearning4j ecosystem. This component transforms various data formats (collections, sequences, bytes, strings) into DataSet objects suitable for deep learning training and inference in distributed Spark applications.
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>dl4j-spark_2.11</artifactId>
<version>0.9.1_spark_1</version>
</dependency>import org.deeplearning4j.spark.datavec.*;
import org.deeplearning4j.spark.datavec.export.*;
import org.deeplearning4j.datasets.datavec.*;
import org.apache.spark.api.java.function.*;
import org.datavec.api.writable.Writable;
import org.nd4j.linalg.dataset.DataSet;import org.deeplearning4j.spark.datavec.DataVecDataSetFunction;
import org.apache.spark.api.java.JavaRDD;
import org.datavec.api.writable.Writable;
import org.nd4j.linalg.dataset.DataSet;
import java.util.List;
// Create a function to transform DataVec records to DataSet
DataVecDataSetFunction transformer = new DataVecDataSetFunction(
4, // labelIndex - column index for labels
10, // numPossibleLabels - number of classes for classification
false // regression - false for classification, true for regression
);
// Apply transformation to RDD of Writable collections
JavaRDD<List<Writable>> records = // ... your DataVec records RDD
JavaRDD<DataSet> datasets = records.map(transformer);DataVec Spark Inference Model follows a functional transformation pattern where different Function implementations convert various input formats into standardized DataSet objects. The architecture consists of:
Converts DataVec Writable collections into DataSet objects for both classification and regression tasks.
public class DataVecDataSetFunction implements Function<List<Writable>, DataSet> {
public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,
DataSetPreProcessor preProcessor, WritableConverter converter);
public DataVecDataSetFunction(int labelIndexFrom, int labelIndexTo, int numPossibleLabels,
boolean regression, DataSetPreProcessor preProcessor,
WritableConverter converter);
public DataSet call(List<Writable> currList) throws Exception;
}Handles time series and sequential data conversion with support for variable-length sequences and alignment modes.
public class DataVecSequenceDataSetFunction implements Function<List<List<Writable>>, DataSet> {
public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,
DataSetPreProcessor preProcessor, WritableConverter converter);
public DataSet call(List<List<Writable>> input) throws Exception;
}public class DataVecSequencePairDataSetFunction
implements Function<Tuple2<List<List<Writable>>, List<List<Writable>>>, DataSet> {
public enum AlignmentMode { EQUAL_LENGTH, ALIGN_START, ALIGN_END }
public DataVecSequencePairDataSetFunction();
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression);
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,
AlignmentMode alignmentMode);
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,
AlignmentMode alignmentMode,
DataSetPreProcessor preProcessor,
WritableConverter converter);
public DataSet call(Tuple2<List<List<Writable>>, List<List<Writable>>> input) throws Exception;
}Processes binary data and string records using RecordReader implementations.
public class DataVecByteDataSetFunction implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize, int byteFileLen);
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,
int byteFileLen, boolean regression);
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,
int byteFileLen, boolean regression,
DataSetPreProcessor preProcessor);
public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;
}public class RecordReaderFunction implements Function<String, DataSet> {
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels,
WritableConverter converter);
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);
public DataSet call(String v1) throws Exception;
}Handles mini-batch creation and dataset export functionality for production workflows.
public class RDDMiniBatches implements Serializable {
public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
public JavaRDD<DataSet> miniBatchesJava();
}public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader, int batchSize,
boolean regression, int labelIndex, int numPossibleLabels);
public void call(Iterator<String> stringIterator) throws Exception;
}// From DataVec API
interface Writable {
double toDouble();
int toInt();
String toString();
}
class NDArrayWritable implements Writable {
public NDArrayWritable(INDArray array);
public INDArray get();
}
interface WritableConverter {
Writable convert(Writable writable) throws WritableConverterException;
}
// From ND4J
interface DataSetPreProcessor {
void preProcess(DataSet dataSet);
}
class DataSet {
public DataSet(INDArray features, INDArray labels);
public INDArray getFeatureMatrix();
public INDArray getLabels();
public static DataSet merge(List<DataSet> dataSets);
}
// From DataVec Records API
interface RecordReader {
void initialize(InputSplit split) throws IOException, InterruptedException;
List<Writable> next();
boolean hasNext();
}
// From Hadoop
class Text implements WritableComparable<Text> {
public Text(String string);
public String toString();
}
class BytesWritable implements WritableComparable<BytesWritable> {
public byte[] getBytes();
public int getLength();
}
// From Apache Spark
class Tuple2<T1, T2> {
public T1 _1();
public T2 _2();
}class ZeroLengthSequenceException extends RuntimeException {
public ZeroLengthSequenceException(String message);
}
class WritableConverterException extends Exception {
public WritableConverterException(String message);
public WritableConverterException(String message, Throwable cause);
}