DeepLearning4j Spark integration component providing DataVec transformations and DataSet operations for distributed deep learning in Apache Spark environments
—
Batch processing and export utilities handle mini-batch creation from RDDs and dataset export functionality for production workflows. These tools optimize data processing performance and enable persistent storage of processed datasets.
Handles mini-batch partitioning of DataSet RDDs for efficient processing in distributed training scenarios.
public class RDDMiniBatches implements Serializable {
public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
public JavaRDD<DataSet> miniBatchesJava();
// Inner classes for batch processing
public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {
public MiniBatchFunction(int batchSize);
}
static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {
public MiniBatchFunctionAdapter(int batchSize);
public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;
}
}import org.deeplearning4j.spark.datavec.RDDMiniBatches;
// Create mini-batches from individual DataSet objects
JavaRDD<DataSet> individualDatasets = // ... RDD of single-example DataSets
int batchSize = 32;
RDDMiniBatches batcher = new RDDMiniBatches(batchSize, individualDatasets);
JavaRDD<DataSet> miniBatches = batcher.miniBatchesJava();
// Each DataSet in miniBatches now contains up to 32 examples
long batchCount = miniBatches.count();// Complete training pipeline with mini-batching
JavaRDD<List<Writable>> rawData = // ... load raw data
DataVecDataSetFunction transformer = new DataVecDataSetFunction(4, 10, false);
JavaRDD<DataSet> datasets = rawData.map(transformer);
// Create mini-batches for efficient training
RDDMiniBatches batcher = new RDDMiniBatches(64, datasets);
JavaRDD<DataSet> trainingBatches = batcher.miniBatchesJava();
// Cache for multiple epochs
trainingBatches.cache();
// Use in training loop
for (int epoch = 0; epoch < numEpochs; epoch++) {
trainingBatches.foreach(batch -> {
// Train model with batch
model.fit(batch);
});
}// Adjust batch size based on partition size
JavaRDD<DataSet> data = // ... your dataset RDD
long totalExamples = data.count();
int numPartitions = data.getNumPartitions();
int optimalBatchSize = (int) Math.max(1, totalExamples / (numPartitions * 4));
RDDMiniBatches batcher = new RDDMiniBatches(optimalBatchSize, data);
JavaRDD<DataSet> optimizedBatches = batcher.miniBatchesJava();DataSet.merge()Exports DataSet objects to persistent storage using RecordReader conversion, designed for use with forEachPartition() operations.
public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader, int batchSize,
boolean regression, int labelIndex, int numPossibleLabels);
public void call(Iterator<String> stringIterator) throws Exception;
}false for classification, true for regressionimport org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import java.net.URI;
// Export processed CSV data to HDFS
JavaRDD<String> csvData = sc.textFile("hdfs://input/data.csv");
URI outputPath = new URI("hdfs://output/datasets/");
CSVRecordReader csvReader = new CSVRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
outputPath,
csvReader,
100, // batchSize: 100 records per file
false, // classification mode
4, // labelIndex
10 // numPossibleLabels
);
// Export in parallel across partitions
csvData.foreachPartition(exporter);import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;
// Export JSON data with custom batch sizes
JavaRDD<String> jsonData = // ... JSON string RDD
URI outputDirectory = new URI("s3://bucket/exported-datasets/");
JacksonRecordReader jsonReader = new JacksonRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
outputDirectory,
jsonReader,
50, // smaller batches for complex JSON
true, // regression mode
0, // labelIndex
-1 // ignored for regression
);
jsonData.foreachPartition(exporter);import java.io.File;
// Export to local file system
JavaRDD<String> textData = // ... text data RDD
File localOutputDir = new File("/tmp/exported-datasets");
URI localPath = localOutputDir.toURI();
CSVRecordReader reader = new CSVRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
localPath,
reader,
200, // batchSize
false, // classification
-1, // infer label column
5 // numPossibleLabels
);
// Process each partition separately
textData.foreachPartition(exporter);Generated files follow the pattern: dataset_{threadId}_{counter}.bin
// Complete data processing and export pipeline
JavaRDD<String> rawData = sc.textFile("hdfs://input/*");
// Transform and export in single pipeline
CSVRecordReader reader = new CSVRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
new URI("hdfs://output/processed-datasets/"),
reader,
128, // optimal batch size for HDFS
false, // classification
0, // label column
10 // classes
);
// Process and export
rawData.foreachPartition(exporter);
// Verify export completion
FileSystem hdfs = FileSystem.get(new Configuration());
FileStatus[] exportedFiles = hdfs.listStatus(new Path("hdfs://output/processed-datasets/"));
System.out.println("Exported " + exportedFiles.length + " dataset files");// Streaming export for real-time processing
JavaDStream<String> streamingData = // ... Spark Streaming source
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
new URI("hdfs://stream-output/"),
new CSVRecordReader(),
64, // smaller batches for streaming
false,
1,
5
);
streamingData.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
rdd.foreachPartition(exporter);
}
});// Monitor memory usage during batch processing
JavaRDD<DataSet> data = // ... your data
data.cache(); // Cache if reused
// Process in smaller batches if memory constrained
int conservativeBatchSize = 16;
RDDMiniBatches batcher = new RDDMiniBatches(conservativeBatchSize, data);
// Unpersist when no longer needed
data.unpersist();try {
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
outputPath, reader, batchSize, regression, labelIndex, numLabels
);
dataRDD.foreachPartition(exporter);
} catch (IOException e) {
logger.error("Failed to write DataSet files: " + e.getMessage());
} catch (IllegalArgumentException e) {
logger.error("Invalid export configuration: " + e.getMessage());
} catch (Exception e) {
logger.error("Export processing failed: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-deeplearning4j--dl4j-spark-2-11