0
# Spark Integration
1
2
Distributed data processing functions for Apache Spark, enabling large-scale data processing and training across clusters. These functions provide seamless integration between DataVec record processing and DeepLearning4j's distributed training capabilities.
3
4
## Capabilities
5
6
### DataVecDataSetFunction
7
8
Spark function for converting Collections of Writables to DataSet objects in distributed environments.
9
10
```java { .api }
11
public class DataVecDataSetFunction implements Function<List<Writable>, DataSet>, Serializable {
12
// Main constructors
13
public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
14
public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,
15
DataSetPreProcessor preProcessor, WritableConverter converter);
16
public DataVecDataSetFunction(int labelIndexFrom, int labelIndexTo, int numPossibleLabels,
17
boolean regression, DataSetPreProcessor preProcessor,
18
WritableConverter converter);
19
20
// Spark function method
21
public DataSet call(List<Writable> currList) throws Exception;
22
}
23
```
24
25
### DataVecSequenceDataSetFunction
26
27
Spark function for converting sequence data (Collections of Collections of Writables) to DataSet objects.
28
29
```java { .api }
30
public class DataVecSequenceDataSetFunction implements Function<List<List<Writable>>, DataSet>, Serializable {
31
public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);
32
public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,
33
DataSetPreProcessor preProcessor, WritableConverter converter);
34
35
public DataSet call(List<List<Writable>> input) throws Exception;
36
}
37
```
38
39
### DataVecSequencePairDataSetFunction
40
41
Spark function for converting pairs of sequence data to DataSet objects (separate feature and label sequences).
42
43
```java { .api }
44
public class DataVecSequencePairDataSetFunction
45
implements Function<Tuple2<List<List<Writable>>, List<List<Writable>>>, DataSet>, Serializable {
46
47
public DataVecSequencePairDataSetFunction();
48
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression);
49
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,
50
AlignmentMode alignmentMode);
51
public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,
52
AlignmentMode alignmentMode,
53
DataSetPreProcessor preProcessor,
54
WritableConverter converter);
55
56
public DataSet call(Tuple2<List<List<Writable>>, List<List<Writable>>> input) throws Exception;
57
}
58
```
59
60
### DataVecByteDataSetFunction
61
62
Spark function for converting byte data to DataSet objects, useful for image and binary data processing.
63
64
```java { .api }
65
public class DataVecByteDataSetFunction
66
implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {
67
68
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,
69
int batchSize, int byteFileLen);
70
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,
71
int batchSize, int byteFileLen, boolean regression);
72
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,
73
int batchSize, int byteFileLen, boolean regression,
74
DataSetPreProcessor preProcessor);
75
76
public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;
77
}
78
```
79
80
### RecordReaderFunction
81
82
Spark function for converting strings to DataSet objects using a RecordReader.
83
84
```java { .api }
85
public class RecordReaderFunction implements Function<String, DataSet> {
86
public RecordReaderFunction(RecordReader recordReader, int labelIndex,
87
int numPossibleLabels, WritableConverter converter);
88
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);
89
90
public DataSet call(String v1) throws Exception;
91
}
92
```
93
94
### RDDMiniBatches
95
96
Utility for creating mini-batches from RDD<DataSet> for distributed training.
97
98
```java { .api }
99
public class RDDMiniBatches implements Serializable {
100
public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);
101
102
public JavaRDD<DataSet> miniBatchesJava();
103
104
// Inner classes for batch processing
105
public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {
106
public MiniBatchFunction(int batchSize);
107
}
108
109
public static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {
110
public MiniBatchFunctionAdapter(int batchSize);
111
public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;
112
}
113
}
114
```
115
116
### StringToDataSetExportFunction
117
118
Export function for converting strings to DataSet objects with file output.
119
120
```java { .api }
121
public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {
122
public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader,
123
int batchSize, boolean regression,
124
int labelIndex, int numPossibleLabels);
125
126
public void call(Iterator<String> stringIterator) throws Exception;
127
}
128
```
129
130
## Usage Examples
131
132
### Basic Spark DataSet Processing
133
134
```java
135
import org.apache.spark.api.java.JavaRDD;
136
import org.apache.spark.api.java.JavaSparkContext;
137
import org.datavec.api.writable.Writable;
138
import org.deeplearning4j.spark.datavec.DataVecDataSetFunction;
139
140
// Setup Spark context
141
JavaSparkContext sc = new JavaSparkContext();
142
143
// Load data as RDD<List<Writable>>
144
JavaRDD<List<Writable>> rawData = sc.textFile("hdfs://data.csv")
145
.map(line -> {
146
// Parse CSV line to List<Writable>
147
String[] values = line.split(",");
148
List<Writable> writables = new ArrayList<>();
149
for (String value : values) {
150
writables.add(new DoubleWritable(Double.parseDouble(value)));
151
}
152
return writables;
153
});
154
155
// Convert to DataSet RDD
156
DataVecDataSetFunction dataSetFunction = new DataVecDataSetFunction(
157
4, // labelIndex (column 4)
158
3, // numPossibleLabels (3 classes)
159
false // regression = false (classification)
160
);
161
162
JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);
163
164
// Use for distributed training
165
dataSetRDD.foreach(dataSet -> {
166
// Train model with dataSet
167
System.out.println("Processing batch with " + dataSet.numExamples() + " examples");
168
});
169
```
170
171
### Sequence Processing with Spark
172
173
```java
174
import org.deeplearning4j.spark.datavec.DataVecSequenceDataSetFunction;
175
176
// RDD of sequence data (each element is a List<List<Writable>>)
177
JavaRDD<List<List<Writable>>> sequenceData = sc.textFile("hdfs://sequences/")
178
.map(sequenceFile -> {
179
// Parse sequence file into List<List<Writable>>
180
// Each line represents one time step
181
List<List<Writable>> sequence = new ArrayList<>();
182
String[] lines = sequenceFile.split("\n");
183
for (String line : lines) {
184
List<Writable> timeStep = parseTimeStep(line);
185
sequence.add(timeStep);
186
}
187
return sequence;
188
});
189
190
// Convert to sequence DataSet RDD
191
DataVecSequenceDataSetFunction sequenceFunction =
192
new DataVecSequenceDataSetFunction(
193
5, // labelIndex (column 5 in each time step)
194
10, // numPossibleLabels (10 classes)
195
false // regression = false
196
);
197
198
JavaRDD<DataSet> sequenceDataSetRDD = sequenceData.map(sequenceFunction);
199
```
200
201
### Separate Feature and Label Sequences
202
203
```java
204
import org.deeplearning4j.spark.datavec.DataVecSequencePairDataSetFunction;
205
import scala.Tuple2;
206
207
// RDD of paired sequences (features, labels)
208
JavaRDD<Tuple2<List<List<Writable>>, List<List<Writable>>>> pairedSequences =
209
featureSequenceRDD.zip(labelSequenceRDD);
210
211
// Convert paired sequences to DataSet
212
DataVecSequencePairDataSetFunction pairFunction =
213
new DataVecSequencePairDataSetFunction(
214
5, // numPossibleLabels
215
false, // regression = false
216
AlignmentMode.ALIGN_START // alignment mode
217
);
218
219
JavaRDD<DataSet> pairedDataSetRDD = pairedSequences.map(pairFunction);
220
```
221
222
### Binary/Image Data Processing
223
224
```java
225
import org.apache.hadoop.io.BytesWritable;
226
import org.apache.hadoop.io.Text;
227
import org.deeplearning4j.spark.datavec.DataVecByteDataSetFunction;
228
229
// RDD of binary files (e.g., images with labels in filename)
230
JavaPairRDD<Text, BytesWritable> binaryFiles = sc.sequenceFile("hdfs://images/",
231
Text.class,
232
BytesWritable.class);
233
234
// Convert binary data to DataSet
235
DataVecByteDataSetFunction byteFunction = new DataVecByteDataSetFunction(
236
0, // labelIndex (extract from filename)
237
10, // numPossibleLabels (10 image classes)
238
32, // batchSize
239
28*28*3 // byteFileLen (image size)
240
);
241
242
JavaRDD<Tuple2<Double, DataSet>> imageDataSets = binaryFiles.map(byteFunction);
243
```
244
245
### Mini-Batch Creation
246
247
```java
248
import org.deeplearning4j.spark.datavec.RDDMiniBatches;
249
250
// Create mini-batches from individual DataSet objects
251
JavaRDD<DataSet> individualDataSets = // ... RDD of single-example DataSets
252
253
RDDMiniBatches miniBatcher = new RDDMiniBatches(64, individualDataSets); // 64 examples per batch
254
JavaRDD<DataSet> miniBatchRDD = miniBatcher.miniBatchesJava();
255
256
// Each element in miniBatchRDD now contains up to 64 examples
257
miniBatchRDD.foreach(batch -> {
258
System.out.println("Mini-batch size: " + batch.numExamples());
259
});
260
```
261
262
### RecordReader with Spark
263
264
```java
265
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
266
import org.deeplearning4j.spark.datavec.RecordReaderFunction;
267
268
// Setup RecordReader (must be serializable)
269
RecordReader csvReader = new CSVRecordReader();
270
271
// Create function to process strings with RecordReader
272
RecordReaderFunction readerFunction = new RecordReaderFunction(
273
csvReader, // recordReader
274
4, // labelIndex
275
3 // numPossibleLabels
276
);
277
278
// Apply to RDD of strings (e.g., CSV lines)
279
JavaRDD<String> csvLines = sc.textFile("hdfs://data.csv");
280
JavaRDD<DataSet> processedData = csvLines.map(readerFunction);
281
```
282
283
### Data Export
284
285
```java
286
import org.deeplearning4j.spark.datavec.export.StringToDataSetExportFunction;
287
import java.net.URI;
288
289
// Export processed data to files
290
URI outputPath = new URI("hdfs://output/datasets/");
291
RecordReader exportReader = new CSVRecordReader();
292
293
StringToDataSetExportFunction exportFunction = new StringToDataSetExportFunction(
294
outputPath, // outputDir
295
exportReader, // recordReader
296
100, // batchSize
297
false, // regression
298
4, // labelIndex
299
3 // numPossibleLabels
300
);
301
302
// Apply export function to partitions
303
csvLines.foreachPartition(exportFunction);
304
```
305
306
## Performance Considerations
307
308
### Serialization
309
All Spark functions are Serializable and can be distributed across cluster nodes:
310
- RecordReaders must be serializable
311
- DataSetPreProcessors must be serializable
312
- Custom WritableConverters must be serializable
313
314
### Memory Management
315
- Configure appropriate batch sizes to avoid OOM errors
316
- Use `RDDMiniBatches` to create properly sized mini-batches
317
- Consider data partitioning strategy for optimal cluster utilization
318
319
### Caching
320
```java
321
// Cache frequently accessed RDDs
322
JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);
323
dataSetRDD.cache(); // Cache in memory for repeated access
324
325
// Use for multiple training epochs
326
for (int epoch = 0; epoch < numEpochs; epoch++) {
327
dataSetRDD.foreach(batch -> trainModel(batch));
328
}
329
```
330
331
## Error Handling
332
333
### Common Exceptions
334
- **SerializationException**: Non-serializable objects in Spark functions
335
- **IllegalArgumentException**: Invalid function parameters
336
- **IOException**: Data reading/writing errors
337
- **SparkException**: Spark cluster execution errors
338
339
### Best Practices
340
- Always test functions locally before running on cluster
341
- Use appropriate error handling in map/foreach operations
342
- Monitor Spark UI for task failures and performance bottlenecks
343
- Validate data schema consistency across partitions