DataVec integration library providing data loading, transformation, and Spark processing capabilities for DeepLearning4j
—
Distributed data processing functions for Apache Spark, enabling large-scale data processing and training across clusters. These functions provide seamless integration between DataVec record processing and DeepLearning4j's distributed training capabilities.
Spark function for converting Collections of Writables to DataSet objects in distributed environments.
public class DataVecDataSetFunction implements Function<List<Writable>, DataSet>, Serializable {
// Main constructors
public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,
DataSetPreProcessor preProcessor, WritableConverter converter);
public DataVecDataSetFunction(int labelIndexFrom, int labelIndexTo, int numPossibleLabels,
boolean regression, DataSetPreProcessor preProcessor,
WritableConverter converter);
// Spark function method
public DataSet call(List<Writable> currList) throws Exception;
}Spark function for converting sequence data (Collections of Collections of Writables) to DataSet objects.
public class DataVecSequenceDataSetFunction implements Function<List<List<Writable>>, DataSet>, Serializable {
public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,
DataSetPreProcessor preProcessor, WritableConverter converter);
public DataSet call(List<List<Writable>> input) throws Exception;
}Spark function for converting pairs of sequence data to DataSet objects (separate feature and label sequences).
public class DataVecSequencePairDataSetFunction
implements Function<Tuple2<List<List<Writable>>, List<List<Writable>>>, DataSet>, Serializable {
public DataVecSequencePairDataSetFunction();
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression);
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,
AlignmentMode alignmentMode);
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,
AlignmentMode alignmentMode,
DataSetPreProcessor preProcessor,
WritableConverter converter);
public DataSet call(Tuple2<List<List<Writable>>, List<List<Writable>>> input) throws Exception;
}Spark function for converting byte data to DataSet objects, useful for image and binary data processing.
public class DataVecByteDataSetFunction
implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,
int batchSize, int byteFileLen);
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,
int batchSize, int byteFileLen, boolean regression);
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,
int batchSize, int byteFileLen, boolean regression,
DataSetPreProcessor preProcessor);
public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;
}Spark function for converting strings to DataSet objects using a RecordReader.
public class RecordReaderFunction implements Function<String, DataSet> {
public RecordReaderFunction(RecordReader recordReader, int labelIndex,
int numPossibleLabels, WritableConverter converter);
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);
public DataSet call(String v1) throws Exception;
}Utility for creating mini-batches from RDD<DataSet> for distributed training.
public class RDDMiniBatches implements Serializable {
public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
public JavaRDD<DataSet> miniBatchesJava();
// Inner classes for batch processing
public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {
public MiniBatchFunction(int batchSize);
}
public static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {
public MiniBatchFunctionAdapter(int batchSize);
public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;
}
}Export function for converting strings to DataSet objects with file output.
public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader,
int batchSize, boolean regression,
int labelIndex, int numPossibleLabels);
public void call(Iterator<String> stringIterator) throws Exception;
}import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datavec.api.writable.Writable;
import org.deeplearning4j.spark.datavec.DataVecDataSetFunction;
// Setup Spark context
JavaSparkContext sc = new JavaSparkContext();
// Load data as RDD<List<Writable>>
JavaRDD<List<Writable>> rawData = sc.textFile("hdfs://data.csv")
.map(line -> {
// Parse CSV line to List<Writable>
String[] values = line.split(",");
List<Writable> writables = new ArrayList<>();
for (String value : values) {
writables.add(new DoubleWritable(Double.parseDouble(value)));
}
return writables;
});
// Convert to DataSet RDD
DataVecDataSetFunction dataSetFunction = new DataVecDataSetFunction(
4, // labelIndex (column 4)
3, // numPossibleLabels (3 classes)
false // regression = false (classification)
);
JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);
// Use for distributed training
dataSetRDD.foreach(dataSet -> {
// Train model with dataSet
System.out.println("Processing batch with " + dataSet.numExamples() + " examples");
});import org.deeplearning4j.spark.datavec.DataVecSequenceDataSetFunction;
// RDD of sequence data (each element is a List<List<Writable>>)
JavaRDD<List<List<Writable>>> sequenceData = sc.textFile("hdfs://sequences/")
.map(sequenceFile -> {
// Parse sequence file into List<List<Writable>>
// Each line represents one time step
List<List<Writable>> sequence = new ArrayList<>();
String[] lines = sequenceFile.split("\n");
for (String line : lines) {
List<Writable> timeStep = parseTimeStep(line);
sequence.add(timeStep);
}
return sequence;
});
// Convert to sequence DataSet RDD
DataVecSequenceDataSetFunction sequenceFunction =
new DataVecSequenceDataSetFunction(
5, // labelIndex (column 5 in each time step)
10, // numPossibleLabels (10 classes)
false // regression = false
);
JavaRDD<DataSet> sequenceDataSetRDD = sequenceData.map(sequenceFunction);import org.deeplearning4j.spark.datavec.DataVecSequencePairDataSetFunction;
import scala.Tuple2;
// RDD of paired sequences (features, labels)
JavaRDD<Tuple2<List<List<Writable>>, List<List<Writable>>>> pairedSequences =
featureSequenceRDD.zip(labelSequenceRDD);
// Convert paired sequences to DataSet
DataVecSequencePairDataSetFunction pairFunction =
new DataVecSequencePairDataSetFunction(
5, // numPossibleLabels
false, // regression = false
AlignmentMode.ALIGN_START // alignment mode
);
JavaRDD<DataSet> pairedDataSetRDD = pairedSequences.map(pairFunction);import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.deeplearning4j.spark.datavec.DataVecByteDataSetFunction;
// RDD of binary files (e.g., images with labels in filename)
JavaPairRDD<Text, BytesWritable> binaryFiles = sc.sequenceFile("hdfs://images/",
Text.class,
BytesWritable.class);
// Convert binary data to DataSet
DataVecByteDataSetFunction byteFunction = new DataVecByteDataSetFunction(
0, // labelIndex (extract from filename)
10, // numPossibleLabels (10 image classes)
32, // batchSize
28*28*3 // byteFileLen (image size)
);
JavaRDD<Tuple2<Double, DataSet>> imageDataSets = binaryFiles.map(byteFunction);import org.deeplearning4j.spark.datavec.RDDMiniBatches;
// Create mini-batches from individual DataSet objects
JavaRDD<DataSet> individualDataSets = // ... RDD of single-example DataSets
RDDMiniBatches miniBatcher = new RDDMiniBatches(64, individualDataSets); // 64 examples per batch
JavaRDD<DataSet> miniBatchRDD = miniBatcher.miniBatchesJava();
// Each element in miniBatchRDD now contains up to 64 examples
miniBatchRDD.foreach(batch -> {
System.out.println("Mini-batch size: " + batch.numExamples());
});import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import org.deeplearning4j.spark.datavec.RecordReaderFunction;
// Setup RecordReader (must be serializable)
RecordReader csvReader = new CSVRecordReader();
// Create function to process strings with RecordReader
RecordReaderFunction readerFunction = new RecordReaderFunction(
csvReader, // recordReader
4, // labelIndex
3 // numPossibleLabels
);
// Apply to RDD of strings (e.g., CSV lines)
JavaRDD<String> csvLines = sc.textFile("hdfs://data.csv");
JavaRDD<DataSet> processedData = csvLines.map(readerFunction);import org.deeplearning4j.spark.datavec.export.StringToDataSetExportFunction;
import java.net.URI;
// Export processed data to files
URI outputPath = new URI("hdfs://output/datasets/");
RecordReader exportReader = new CSVRecordReader();
StringToDataSetExportFunction exportFunction = new StringToDataSetExportFunction(
outputPath, // outputDir
exportReader, // recordReader
100, // batchSize
false, // regression
4, // labelIndex
3 // numPossibleLabels
);
// Apply export function to partitions
csvLines.foreachPartition(exportFunction);All Spark functions are Serializable and can be distributed across cluster nodes:
RDDMiniBatches to create properly sized mini-batches// Cache frequently accessed RDDs
JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);
dataSetRDD.cache(); // Cache in memory for repeated access
// Use for multiple training epochs
for (int epoch = 0; epoch < numEpochs; epoch++) {
dataSetRDD.foreach(batch -> trainModel(batch));
}Install with Tessl CLI
npx tessl i tessl/maven-org-datavec--datavec-local