0
# Batch Processing and Export
1
2
Batch processing and export utilities handle mini-batch creation from RDDs and dataset export functionality for production workflows. These tools optimize data processing performance and enable persistent storage of processed datasets.
3
4
## RDDMiniBatches
5
6
Handles mini-batch partitioning of DataSet RDDs for efficient processing in distributed training scenarios.
7
8
```java { .api }
9
public class RDDMiniBatches implements Serializable {
10
public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
11
public JavaRDD<DataSet> miniBatchesJava();
12
13
// Inner classes for batch processing
14
public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {
15
public MiniBatchFunction(int batchSize);
16
}
17
18
static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {
19
public MiniBatchFunctionAdapter(int batchSize);
20
public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;
21
}
22
}
23
```
24
25
### Parameters
26
27
- **miniBatches**: Target number of examples per mini-batch
28
- **toSplit**: Input JavaRDD<DataSet> to be partitioned into mini-batches
29
30
### Usage Examples
31
32
#### Basic Mini-Batch Creation
33
34
```java
35
import org.deeplearning4j.spark.datavec.RDDMiniBatches;
36
37
// Create mini-batches from individual DataSet objects
38
JavaRDD<DataSet> individualDatasets = // ... RDD of single-example DataSets
39
int batchSize = 32;
40
41
RDDMiniBatches batcher = new RDDMiniBatches(batchSize, individualDatasets);
42
JavaRDD<DataSet> miniBatches = batcher.miniBatchesJava();
43
44
// Each DataSet in miniBatches now contains up to 32 examples
45
long batchCount = miniBatches.count();
46
```
47
48
#### Training Pipeline Integration
49
50
```java
51
// Complete training pipeline with mini-batching
52
JavaRDD<List<Writable>> rawData = // ... load raw data
53
DataVecDataSetFunction transformer = new DataVecDataSetFunction(4, 10, false);
54
JavaRDD<DataSet> datasets = rawData.map(transformer);
55
56
// Create mini-batches for efficient training
57
RDDMiniBatches batcher = new RDDMiniBatches(64, datasets);
58
JavaRDD<DataSet> trainingBatches = batcher.miniBatchesJava();
59
60
// Cache for multiple epochs
61
trainingBatches.cache();
62
63
// Use in training loop
64
for (int epoch = 0; epoch < numEpochs; epoch++) {
65
trainingBatches.foreach(batch -> {
66
// Train model with batch
67
model.fit(batch);
68
});
69
}
70
```
71
72
#### Dynamic Batch Sizing
73
74
```java
75
// Adjust batch size based on partition size
76
JavaRDD<DataSet> data = // ... your dataset RDD
77
long totalExamples = data.count();
78
int numPartitions = data.getNumPartitions();
79
int optimalBatchSize = (int) Math.max(1, totalExamples / (numPartitions * 4));
80
81
RDDMiniBatches batcher = new RDDMiniBatches(optimalBatchSize, data);
82
JavaRDD<DataSet> optimizedBatches = batcher.miniBatchesJava();
83
```
84
85
### Behavior Details
86
87
#### Batch Merging Process
88
1. Processes DataSets in groups according to partition boundaries
89
2. Accumulates individual DataSets until reaching target batch size
90
3. Merges accumulated DataSets using `DataSet.merge()`
91
4. Handles edge cases where remaining examples are fewer than batch size
92
5. Returns merged DataSets containing multiple examples
93
94
#### Memory Considerations
95
- Creates copies of DataSets during merging process
96
- Edge case handling: if remaining examples > 1, creates final batch
97
- Race condition handling for map-partitions operations
98
99
## StringToDataSetExportFunction
100
101
Exports DataSet objects to persistent storage using RecordReader conversion, designed for use with `forEachPartition()` operations.
102
103
```java { .api }
104
public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
105
public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader, int batchSize,
106
boolean regression, int labelIndex, int numPossibleLabels);
107
108
public void call(Iterator<String> stringIterator) throws Exception;
109
}
110
```
111
112
### Parameters
113
114
- **outputDir**: URI destination directory for exported DataSet files
115
- **recordReader**: RecordReader implementation for parsing input strings
116
- **batchSize**: Number of records to include in each exported DataSet file
117
- **regression**: `false` for classification, `true` for regression
118
- **labelIndex**: Column index containing labels
119
- **numPossibleLabels**: Number of classes for classification
120
121
### Usage Examples
122
123
#### CSV Export to HDFS
124
125
```java
126
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
127
import java.net.URI;
128
129
// Export processed CSV data to HDFS
130
JavaRDD<String> csvData = sc.textFile("hdfs://input/data.csv");
131
URI outputPath = new URI("hdfs://output/datasets/");
132
133
CSVRecordReader csvReader = new CSVRecordReader();
134
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
135
outputPath,
136
csvReader,
137
100, // batchSize: 100 records per file
138
false, // classification mode
139
4, // labelIndex
140
10 // numPossibleLabels
141
);
142
143
// Export in parallel across partitions
144
csvData.foreachPartition(exporter);
145
```
146
147
#### Batch Export with Custom RecordReader
148
149
```java
150
import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;
151
152
// Export JSON data with custom batch sizes
153
JavaRDD<String> jsonData = // ... JSON string RDD
154
URI outputDirectory = new URI("s3://bucket/exported-datasets/");
155
156
JacksonRecordReader jsonReader = new JacksonRecordReader();
157
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
158
outputDirectory,
159
jsonReader,
160
50, // smaller batches for complex JSON
161
true, // regression mode
162
0, // labelIndex
163
-1 // ignored for regression
164
);
165
166
jsonData.foreachPartition(exporter);
167
```
168
169
#### Local File System Export
170
171
```java
172
import java.io.File;
173
174
// Export to local file system
175
JavaRDD<String> textData = // ... text data RDD
176
File localOutputDir = new File("/tmp/exported-datasets");
177
URI localPath = localOutputDir.toURI();
178
179
CSVRecordReader reader = new CSVRecordReader();
180
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
181
localPath,
182
reader,
183
200, // batchSize
184
false, // classification
185
-1, // infer label column
186
5 // numPossibleLabels
187
);
188
189
// Process each partition separately
190
textData.foreachPartition(exporter);
191
```
192
193
### Behavior Details
194
195
#### Export Process Flow
196
1. Processes input strings in batches according to batchSize
197
2. Uses RecordReader to parse each string into List<Writable>
198
3. Accumulates parsed records until batch is full or iterator is exhausted
199
4. Creates RecordReaderDataSetIterator for batch conversion
200
5. Converts batch to DataSet using RecordReaderDataSetIterator
201
6. Generates unique filename using thread ID and output counter
202
7. Saves DataSet to persistent storage using Hadoop FileSystem API
203
204
#### File Naming Convention
205
Generated files follow the pattern: `dataset_{threadId}_{counter}.bin`
206
- **threadId**: Unique identifier based on JVM UID and thread ID
207
- **counter**: Sequential number for files created by same thread
208
209
#### Storage Compatibility
210
- Supports HDFS, S3, local file systems via Hadoop FileSystem API
211
- Uses DataSet.save() method for binary serialization
212
- Creates FSDataOutputStream for efficient writing
213
214
### Integration Patterns
215
216
#### Production Export Pipeline
217
218
```java
219
// Complete data processing and export pipeline
220
JavaRDD<String> rawData = sc.textFile("hdfs://input/*");
221
222
// Transform and export in single pipeline
223
CSVRecordReader reader = new CSVRecordReader();
224
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
225
new URI("hdfs://output/processed-datasets/"),
226
reader,
227
128, // optimal batch size for HDFS
228
false, // classification
229
0, // label column
230
10 // classes
231
);
232
233
// Process and export
234
rawData.foreachPartition(exporter);
235
236
// Verify export completion
237
FileSystem hdfs = FileSystem.get(new Configuration());
238
FileStatus[] exportedFiles = hdfs.listStatus(new Path("hdfs://output/processed-datasets/"));
239
System.out.println("Exported " + exportedFiles.length + " dataset files");
240
```
241
242
#### Stream Processing Export
243
244
```java
245
// Streaming export for real-time processing
246
JavaDStream<String> streamingData = // ... Spark Streaming source
247
248
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
249
new URI("hdfs://stream-output/"),
250
new CSVRecordReader(),
251
64, // smaller batches for streaming
252
false,
253
1,
254
5
255
);
256
257
streamingData.foreachRDD(rdd -> {
258
if (!rdd.isEmpty()) {
259
rdd.foreachPartition(exporter);
260
}
261
});
262
```
263
264
## Performance Considerations
265
266
### Mini-Batch Optimization
267
- **Batch Size**: Balance between memory usage and computational efficiency
268
- **Partition Count**: Consider Spark cluster resources when sizing batches
269
- **Caching**: Cache mini-batched RDDs when used multiple times
270
271
### Export Optimization
272
- **Batch Size**: Larger batches reduce file count but increase memory usage
273
- **Partitioning**: More partitions enable parallel export but create more files
274
- **Storage Format**: Binary DataSet format optimized for fast loading
275
276
### Memory Management
277
278
```java
279
// Monitor memory usage during batch processing
280
JavaRDD<DataSet> data = // ... your data
281
data.cache(); // Cache if reused
282
283
// Process in smaller batches if memory constrained
284
int conservativeBatchSize = 16;
285
RDDMiniBatches batcher = new RDDMiniBatches(conservativeBatchSize, data);
286
287
// Unpersist when no longer needed
288
data.unpersist();
289
```
290
291
## Error Handling
292
293
```java
294
try {
295
StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(
296
outputPath, reader, batchSize, regression, labelIndex, numLabels
297
);
298
299
dataRDD.foreachPartition(exporter);
300
301
} catch (IOException e) {
302
logger.error("Failed to write DataSet files: " + e.getMessage());
303
} catch (IllegalArgumentException e) {
304
logger.error("Invalid export configuration: " + e.getMessage());
305
} catch (Exception e) {
306
logger.error("Export processing failed: " + e.getMessage());
307
}
308
```