DeepLearning4j Spark integration component providing DataVec transformations and DataSet operations for distributed deep learning in Apache Spark environments
—
Specialized input processing functions handle binary data and string records using RecordReader implementations. These functions are designed for specific data formats and input sources commonly encountered in production data pipelines.
Processes binary data stored as BytesWritable objects, converting them to DataSet objects with configurable batch sizes and file lengths.
public class DataVecByteDataSetFunction
implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize, int byteFileLen);
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,
int byteFileLen, boolean regression);
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,
int byteFileLen, boolean regression,
DataSetPreProcessor preProcessor);
public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;
}false for classification, true for regression// Process binary image data with labels
// Each record: 784 bytes (28x28 pixels) + 1 label byte = 785 bytes total
DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(
0, // labelIndex: first byte contains label
10, // numPossibleLabels: 10 classes (digits 0-9)
32, // batchSize: process 32 images per DataSet
785 // byteFileLen: 785 bytes per record
);
JavaRDD<Tuple2<Text, BytesWritable>> byteData = // ... load binary data
JavaPairRDD<Double, DataSet> results = byteData.map(transformer);import org.nd4j.linalg.dataset.api.preprocessor.NormalizerMinMaxScaler;
// Process sensor data with normalization
DataSetPreProcessor normalizer = new NormalizerMinMaxScaler();
DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(
-1, // labelIndex: -1 for last byte as label
5, // numPossibleLabels: 5 sensor states
64, // batchSize: 64 samples per batch
100, // byteFileLen: 100 bytes per sensor reading
false, // classification mode
normalizer
);// Binary feature extraction for regression
DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(
255, // labelIndex: byte 255 contains regression target
-1, // numPossibleLabels: ignored for regression
16, // batchSize
256, // byteFileLen: 256 bytes per record
true // regression mode
);When labelIndex is -1, automatically uses the last byte position as the label.
Processes multiple records into a single DataSet until reaching batchSize or end of input stream.
Converts string data to DataSet objects using DataVec RecordReader implementations for flexible data parsing.
public class RecordReaderFunction implements Function<String, DataSet> {
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels,
WritableConverter converter);
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);
public DataSet call(String v1) throws Exception;
}import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
// Process CSV strings to DataSet
CSVRecordReader csvReader = new CSVRecordReader();
RecordReaderFunction transformer = new RecordReaderFunction(
csvReader,
4, // labelIndex: column 4 contains labels
3 // numPossibleLabels: 3 classes
);
JavaRDD<String> csvStrings = // ... RDD of CSV lines
JavaRDD<DataSet> datasets = csvStrings.map(transformer);import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;
import org.datavec.api.io.converters.SelfWritableConverter;
// Process JSON strings with custom converter
JacksonRecordReader jsonReader = new JacksonRecordReader();
WritableConverter converter = new SelfWritableConverter();
RecordReaderFunction transformer = new RecordReaderFunction(
jsonReader,
0, // labelIndex
5, // numPossibleLabels
converter // custom conversion logic
);
JavaRDD<String> jsonStrings = // ... RDD of JSON strings
JavaRDD<DataSet> datasets = jsonStrings.map(transformer);import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.split.StringSplit;
// Implement custom string parsing logic
RecordReader customReader = new RecordReader() {
@Override
public void initialize(InputSplit split) throws IOException, InterruptedException {
// Custom initialization
}
@Override
public List<Writable> next() {
// Custom parsing logic
return parsedWritables;
}
@Override
public boolean hasNext() {
return true; // Single record processing
}
};
RecordReaderFunction transformer = new RecordReaderFunction(customReader, 2, 7);IllegalStateException if numPossibleLabels < 1Internally creates lists of DataSet objects and uses Nd4j.vstack() to combine them, though typically processes single records.
// Complete file processing workflow
JavaRDD<String> fileContents = sc.textFile("hdfs://data/*.csv");
CSVRecordReader csvReader = new CSVRecordReader();
RecordReaderFunction transformer = new RecordReaderFunction(csvReader, 0, 10);
JavaRDD<DataSet> processedData = fileContents.map(transformer);
processedData.cache(); // Cache for reuse// Streaming data processing
JavaDStream<String> stream = // ... Spark Streaming source
JavaDStream<DataSet> processedStream = stream.map(transformer);
processedStream.foreachRDD(rdd -> {
// Process each batch
rdd.collect();
});Both specialized input functions support:
try {
DataSet result = transformer.call(input);
// Process successful result
} catch (IllegalStateException e) {
// Handle configuration errors
logger.error("Configuration error: " + e.getMessage());
} catch (IOException e) {
// Handle I/O errors during parsing
logger.error("I/O error: " + e.getMessage());
} catch (Exception e) {
// Handle other processing errors
logger.error("Processing error: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-deeplearning4j--dl4j-spark-2-11