CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-deeplearning4j--dl4j-spark-2-11

DeepLearning4j Spark integration component providing DataVec transformations and DataSet operations for distributed deep learning in Apache Spark environments

Pending
Overview
Eval results
Files

batch-export.mddocs/

Batch Processing and Export

Batch processing and export utilities handle mini-batch creation from RDDs and dataset export functionality for production workflows. These tools optimize data processing performance and enable persistent storage of processed datasets.

RDDMiniBatches

Handles mini-batch partitioning of DataSet RDDs for efficient processing in distributed training scenarios.

public class RDDMiniBatches implements Serializable {
    public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
    public JavaRDD<DataSet> miniBatchesJava();
    
    // Inner classes for batch processing
    public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {
        public MiniBatchFunction(int batchSize);
    }
    
    static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {
        public MiniBatchFunctionAdapter(int batchSize);
        public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;
    }
}

Parameters

  • miniBatches: Target number of examples per mini-batch
  • toSplit: Input JavaRDD<DataSet> to be partitioned into mini-batches

Usage Examples

Basic Mini-Batch Creation

import org.deeplearning4j.spark.datavec.RDDMiniBatches;

// Create mini-batches from individual DataSet objects
JavaRDD<DataSet> individualDatasets = // ... RDD of single-example DataSets
int batchSize = 32;

RDDMiniBatches batcher = new RDDMiniBatches(batchSize, individualDatasets);
JavaRDD<DataSet> miniBatches = batcher.miniBatchesJava();

// Each DataSet in miniBatches now contains up to 32 examples
long batchCount = miniBatches.count();

Training Pipeline Integration

// Complete training pipeline with mini-batching
JavaRDD<List<Writable>> rawData = // ... load raw data
DataVecDataSetFunction transformer = new DataVecDataSetFunction(4, 10, false);
JavaRDD<DataSet> datasets = rawData.map(transformer);

// Create mini-batches for efficient training
RDDMiniBatches batcher = new RDDMiniBatches(64, datasets);
JavaRDD<DataSet> trainingBatches = batcher.miniBatchesJava();

// Cache for multiple epochs
trainingBatches.cache();

// Use in training loop
for (int epoch = 0; epoch < numEpochs; epoch++) {
    trainingBatches.foreach(batch -> {
        // Train model with batch
        model.fit(batch);
    });
}

Dynamic Batch Sizing

// Adjust batch size based on partition size
JavaRDD<DataSet> data = // ... your dataset RDD
long totalExamples = data.count();
int numPartitions = data.getNumPartitions();
int optimalBatchSize = (int) Math.max(1, totalExamples / (numPartitions * 4));

RDDMiniBatches batcher = new RDDMiniBatches(optimalBatchSize, data);
JavaRDD<DataSet> optimizedBatches = batcher.miniBatchesJava();

Behavior Details

Batch Merging Process

  1. Processes DataSets in groups according to partition boundaries
  2. Accumulates individual DataSets until reaching target batch size
  3. Merges accumulated DataSets using DataSet.merge()
  4. Handles edge cases where remaining examples are fewer than batch size
  5. Returns merged DataSets containing multiple examples

Memory Considerations

  • Creates copies of DataSets during merging process
  • Edge case handling: if remaining examples > 1, creates final batch
  • Race condition handling for map-partitions operations

StringToDataSetExportFunction

Exports DataSet objects to persistent storage using RecordReader conversion, designed for use with forEachPartition() operations.

public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
    public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader, int batchSize, 
                                         boolean regression, int labelIndex, int numPossibleLabels);
                                         
    public void call(Iterator<String> stringIterator) throws Exception;
}

Parameters

  • outputDir: URI destination directory for exported DataSet files
  • recordReader: RecordReader implementation for parsing input strings
  • batchSize: Number of records to include in each exported DataSet file
  • regression: false for classification, true for regression
  • labelIndex: Column index containing labels
  • numPossibleLabels: Number of classes for classification

Usage Examples

CSV Export to HDFS

import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import java.net.URI;

// Export processed CSV data to HDFS
JavaRDD<String> csvData = sc.textFile("hdfs://input/data.csv");
URI outputPath = new URI("hdfs://output/datasets/");

CSVRecordReader csvReader = new CSVRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
    outputPath,
    csvReader,
    100,   // batchSize: 100 records per file
    false, // classification mode
    4,     // labelIndex
    10     // numPossibleLabels
);

// Export in parallel across partitions
csvData.foreachPartition(exporter);

Batch Export with Custom RecordReader

import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;

// Export JSON data with custom batch sizes
JavaRDD<String> jsonData = // ... JSON string RDD
URI outputDirectory = new URI("s3://bucket/exported-datasets/");

JacksonRecordReader jsonReader = new JacksonRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
    outputDirectory,
    jsonReader,
    50,    // smaller batches for complex JSON
    true,  // regression mode
    0,     // labelIndex
    -1     // ignored for regression
);

jsonData.foreachPartition(exporter);

Local File System Export

import java.io.File;

// Export to local file system
JavaRDD<String> textData = // ... text data RDD
File localOutputDir = new File("/tmp/exported-datasets");
URI localPath = localOutputDir.toURI();

CSVRecordReader reader = new CSVRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
    localPath,
    reader,
    200,   // batchSize
    false, // classification
    -1,    // infer label column
    5      // numPossibleLabels
);

// Process each partition separately
textData.foreachPartition(exporter);

Behavior Details

Export Process Flow

  1. Processes input strings in batches according to batchSize
  2. Uses RecordReader to parse each string into List<Writable>
  3. Accumulates parsed records until batch is full or iterator is exhausted
  4. Creates RecordReaderDataSetIterator for batch conversion
  5. Converts batch to DataSet using RecordReaderDataSetIterator
  6. Generates unique filename using thread ID and output counter
  7. Saves DataSet to persistent storage using Hadoop FileSystem API

File Naming Convention

Generated files follow the pattern: dataset_{threadId}_{counter}.bin

  • threadId: Unique identifier based on JVM UID and thread ID
  • counter: Sequential number for files created by same thread

Storage Compatibility

  • Supports HDFS, S3, local file systems via Hadoop FileSystem API
  • Uses DataSet.save() method for binary serialization
  • Creates FSDataOutputStream for efficient writing

Integration Patterns

Production Export Pipeline

// Complete data processing and export pipeline
JavaRDD<String> rawData = sc.textFile("hdfs://input/*");

// Transform and export in single pipeline
CSVRecordReader reader = new CSVRecordReader();
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
    new URI("hdfs://output/processed-datasets/"),
    reader,
    128,   // optimal batch size for HDFS
    false, // classification
    0,     // label column
    10     // classes
);

// Process and export
rawData.foreachPartition(exporter);

// Verify export completion
FileSystem hdfs = FileSystem.get(new Configuration());
FileStatus[] exportedFiles = hdfs.listStatus(new Path("hdfs://output/processed-datasets/"));
System.out.println("Exported " + exportedFiles.length + " dataset files");

Stream Processing Export

// Streaming export for real-time processing
JavaDStream<String> streamingData = // ... Spark Streaming source

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
    new URI("hdfs://stream-output/"),
    new CSVRecordReader(),
    64,    // smaller batches for streaming
    false,
    1,
    5
);

streamingData.foreachRDD(rdd -> {
    if (!rdd.isEmpty()) {
        rdd.foreachPartition(exporter);
    }
});

Performance Considerations

Mini-Batch Optimization

  • Batch Size: Balance between memory usage and computational efficiency
  • Partition Count: Consider Spark cluster resources when sizing batches
  • Caching: Cache mini-batched RDDs when used multiple times

Export Optimization

  • Batch Size: Larger batches reduce file count but increase memory usage
  • Partitioning: More partitions enable parallel export but create more files
  • Storage Format: Binary DataSet format optimized for fast loading

Memory Management

// Monitor memory usage during batch processing
JavaRDD<DataSet> data = // ... your data
data.cache(); // Cache if reused

// Process in smaller batches if memory constrained
int conservativeBatchSize = 16;
RDDMiniBatches batcher = new RDDMiniBatches(conservativeBatchSize, data);

// Unpersist when no longer needed
data.unpersist();

Error Handling

try {
    StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
        outputPath, reader, batchSize, regression, labelIndex, numLabels
    );
    
    dataRDD.foreachPartition(exporter);
    
} catch (IOException e) {
    logger.error("Failed to write DataSet files: " + e.getMessage());
} catch (IllegalArgumentException e) {
    logger.error("Invalid export configuration: " + e.getMessage());
} catch (Exception e) {
    logger.error("Export processing failed: " + e.getMessage());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-deeplearning4j--dl4j-spark-2-11

docs

batch-export.md

data-transformation.md

index.md

sequence-processing.md

specialized-inputs.md

tile.json