CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-datavec--datavec-local

DataVec integration library providing data loading, transformation, and Spark processing capabilities for DeepLearning4j

Pending
Overview
Eval results
Files

spark-integration.mddocs/

Spark Integration

Distributed data processing functions for Apache Spark, enabling large-scale data processing and training across clusters. These functions provide seamless integration between DataVec record processing and DeepLearning4j's distributed training capabilities.

Capabilities

DataVecDataSetFunction

Spark function for converting Collections of Writables to DataSet objects in distributed environments.

public class DataVecDataSetFunction implements Function<List<Writable>, DataSet>, Serializable {
    // Main constructors
    public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
    public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression, 
                                 DataSetPreProcessor preProcessor, WritableConverter converter);
    public DataVecDataSetFunction(int labelIndexFrom, int labelIndexTo, int numPossibleLabels, 
                                 boolean regression, DataSetPreProcessor preProcessor, 
                                 WritableConverter converter);
    
    // Spark function method
    public DataSet call(List<Writable> currList) throws Exception;
}

DataVecSequenceDataSetFunction

Spark function for converting sequence data (Collections of Collections of Writables) to DataSet objects.

public class DataVecSequenceDataSetFunction implements Function<List<List<Writable>>, DataSet>, Serializable {
    public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
    public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression, 
                                         DataSetPreProcessor preProcessor, WritableConverter converter);
    
    public DataSet call(List<List<Writable>> input) throws Exception;
}

DataVecSequencePairDataSetFunction

Spark function for converting pairs of sequence data to DataSet objects (separate feature and label sequences).

public class DataVecSequencePairDataSetFunction 
    implements Function<Tuple2<List<List<Writable>>, List<List<Writable>>>, DataSet>, Serializable {
    
    public DataVecSequencePairDataSetFunction();
    public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression);
    public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression, 
                                             AlignmentMode alignmentMode);
    public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression, 
                                             AlignmentMode alignmentMode, 
                                             DataSetPreProcessor preProcessor, 
                                             WritableConverter converter);
    
    public DataSet call(Tuple2<List<List<Writable>>, List<List<Writable>>> input) throws Exception;
}

DataVecByteDataSetFunction

Spark function for converting byte data to DataSet objects, useful for image and binary data processing.

public class DataVecByteDataSetFunction 
    implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {
    
    public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, 
                                     int batchSize, int byteFileLen);
    public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, 
                                     int batchSize, int byteFileLen, boolean regression);
    public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, 
                                     int batchSize, int byteFileLen, boolean regression, 
                                     DataSetPreProcessor preProcessor);
    
    public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;
}

RecordReaderFunction

Spark function for converting strings to DataSet objects using a RecordReader.

public class RecordReaderFunction implements Function<String, DataSet> {
    public RecordReaderFunction(RecordReader recordReader, int labelIndex, 
                               int numPossibleLabels, WritableConverter converter);
    public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);
    
    public DataSet call(String v1) throws Exception;
}

RDDMiniBatches

Utility for creating mini-batches from RDD<DataSet> for distributed training.

public class RDDMiniBatches implements Serializable {
    public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
    
    public JavaRDD<DataSet> miniBatchesJava();
    
    // Inner classes for batch processing
    public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {
        public MiniBatchFunction(int batchSize);
    }
    
    public static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {
        public MiniBatchFunctionAdapter(int batchSize);
        public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;
    }
}

StringToDataSetExportFunction

Export function for converting strings to DataSet objects with file output.

public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
    public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader, 
                                        int batchSize, boolean regression, 
                                        int labelIndex, int numPossibleLabels);
    
    public void call(Iterator<String> stringIterator) throws Exception;
}

Usage Examples

Basic Spark DataSet Processing

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datavec.api.writable.Writable;
import org.deeplearning4j.spark.datavec.DataVecDataSetFunction;

// Setup Spark context
JavaSparkContext sc = new JavaSparkContext();

// Load data as RDD<List<Writable>>
JavaRDD<List<Writable>> rawData = sc.textFile("hdfs://data.csv")
    .map(line -> {
        // Parse CSV line to List<Writable>
        String[] values = line.split(",");
        List<Writable> writables = new ArrayList<>();
        for (String value : values) {
            writables.add(new DoubleWritable(Double.parseDouble(value)));
        }
        return writables;
    });

// Convert to DataSet RDD
DataVecDataSetFunction dataSetFunction = new DataVecDataSetFunction(
    4,      // labelIndex (column 4)
    3,      // numPossibleLabels (3 classes)
    false   // regression = false (classification)
);

JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);

// Use for distributed training
dataSetRDD.foreach(dataSet -> {
    // Train model with dataSet
    System.out.println("Processing batch with " + dataSet.numExamples() + " examples");
});

Sequence Processing with Spark

import org.deeplearning4j.spark.datavec.DataVecSequenceDataSetFunction;

// RDD of sequence data (each element is a List<List<Writable>>)
JavaRDD<List<List<Writable>>> sequenceData = sc.textFile("hdfs://sequences/")
    .map(sequenceFile -> {
        // Parse sequence file into List<List<Writable>>
        // Each line represents one time step
        List<List<Writable>> sequence = new ArrayList<>();
        String[] lines = sequenceFile.split("\n");
        for (String line : lines) {
            List<Writable> timeStep = parseTimeStep(line);
            sequence.add(timeStep);
        }
        return sequence;
    });

// Convert to sequence DataSet RDD
DataVecSequenceDataSetFunction sequenceFunction = 
    new DataVecSequenceDataSetFunction(
        5,      // labelIndex (column 5 in each time step)
        10,     // numPossibleLabels (10 classes)
        false   // regression = false
    );

JavaRDD<DataSet> sequenceDataSetRDD = sequenceData.map(sequenceFunction);

Separate Feature and Label Sequences

import org.deeplearning4j.spark.datavec.DataVecSequencePairDataSetFunction;
import scala.Tuple2;

// RDD of paired sequences (features, labels)
JavaRDD<Tuple2<List<List<Writable>>, List<List<Writable>>>> pairedSequences = 
    featureSequenceRDD.zip(labelSequenceRDD);

// Convert paired sequences to DataSet
DataVecSequencePairDataSetFunction pairFunction = 
    new DataVecSequencePairDataSetFunction(
        5,                              // numPossibleLabels
        false,                          // regression = false
        AlignmentMode.ALIGN_START       // alignment mode
    );

JavaRDD<DataSet> pairedDataSetRDD = pairedSequences.map(pairFunction);

Binary/Image Data Processing

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.deeplearning4j.spark.datavec.DataVecByteDataSetFunction;

// RDD of binary files (e.g., images with labels in filename)
JavaPairRDD<Text, BytesWritable> binaryFiles = sc.sequenceFile("hdfs://images/", 
                                                               Text.class, 
                                                               BytesWritable.class);

// Convert binary data to DataSet
DataVecByteDataSetFunction byteFunction = new DataVecByteDataSetFunction(
    0,          // labelIndex (extract from filename)
    10,         // numPossibleLabels (10 image classes)
    32,         // batchSize
    28*28*3     // byteFileLen (image size)
);

JavaRDD<Tuple2<Double, DataSet>> imageDataSets = binaryFiles.map(byteFunction);

Mini-Batch Creation

import org.deeplearning4j.spark.datavec.RDDMiniBatches;

// Create mini-batches from individual DataSet objects
JavaRDD<DataSet> individualDataSets = // ... RDD of single-example DataSets

RDDMiniBatches miniBatcher = new RDDMiniBatches(64, individualDataSets); // 64 examples per batch
JavaRDD<DataSet> miniBatchRDD = miniBatcher.miniBatchesJava();

// Each element in miniBatchRDD now contains up to 64 examples
miniBatchRDD.foreach(batch -> {
    System.out.println("Mini-batch size: " + batch.numExamples());
});

RecordReader with Spark

import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import org.deeplearning4j.spark.datavec.RecordReaderFunction;

// Setup RecordReader (must be serializable)
RecordReader csvReader = new CSVRecordReader();

// Create function to process strings with RecordReader
RecordReaderFunction readerFunction = new RecordReaderFunction(
    csvReader,  // recordReader
    4,          // labelIndex
    3           // numPossibleLabels
);

// Apply to RDD of strings (e.g., CSV lines)
JavaRDD<String> csvLines = sc.textFile("hdfs://data.csv");
JavaRDD<DataSet> processedData = csvLines.map(readerFunction);

Data Export

import org.deeplearning4j.spark.datavec.export.StringToDataSetExportFunction;
import java.net.URI;

// Export processed data to files
URI outputPath = new URI("hdfs://output/datasets/");
RecordReader exportReader = new CSVRecordReader();

StringToDataSetExportFunction exportFunction = new StringToDataSetExportFunction(
    outputPath,     // outputDir
    exportReader,   // recordReader
    100,            // batchSize
    false,          // regression
    4,              // labelIndex
    3               // numPossibleLabels
);

// Apply export function to partitions
csvLines.foreachPartition(exportFunction);

Performance Considerations

Serialization

All Spark functions are Serializable and can be distributed across cluster nodes:

  • RecordReaders must be serializable
  • DataSetPreProcessors must be serializable
  • Custom WritableConverters must be serializable

Memory Management

  • Configure appropriate batch sizes to avoid OOM errors
  • Use RDDMiniBatches to create properly sized mini-batches
  • Consider data partitioning strategy for optimal cluster utilization

Caching

// Cache frequently accessed RDDs
JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);
dataSetRDD.cache(); // Cache in memory for repeated access

// Use for multiple training epochs
for (int epoch = 0; epoch < numEpochs; epoch++) {
    dataSetRDD.foreach(batch -> trainModel(batch));
}

Error Handling

Common Exceptions

  • SerializationException: Non-serializable objects in Spark functions
  • IllegalArgumentException: Invalid function parameters
  • IOException: Data reading/writing errors
  • SparkException: Spark cluster execution errors

Best Practices

  • Always test functions locally before running on cluster
  • Use appropriate error handling in map/foreach operations
  • Monitor Spark UI for task failures and performance bottlenecks
  • Validate data schema consistency across partitions

Install with Tessl CLI

npx tessl i tessl/maven-org-datavec--datavec-local

docs

dataset-iteration.md

index.md

multi-input-output.md

sequence-processing.md

spark-integration.md

tile.json