or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-iteration.mdindex.mdmulti-input-output.mdsequence-processing.mdspark-integration.md

spark-integration.mddocs/

0

# Spark Integration

1

2

Distributed data processing functions for Apache Spark, enabling large-scale data processing and training across clusters. These functions provide seamless integration between DataVec record processing and DeepLearning4j's distributed training capabilities.

3

4

## Capabilities

5

6

### DataVecDataSetFunction

7

8

Spark function for converting Collections of Writables to DataSet objects in distributed environments.

9

10

```java { .api }

11

public class DataVecDataSetFunction implements Function<List<Writable>, DataSet>, Serializable {

12

// Main constructors

13

public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);

14

public DataVecDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,

15

DataSetPreProcessor preProcessor, WritableConverter converter);

16

public DataVecDataSetFunction(int labelIndexFrom, int labelIndexTo, int numPossibleLabels,

17

boolean regression, DataSetPreProcessor preProcessor,

18

WritableConverter converter);

19

20

// Spark function method

21

public DataSet call(List<Writable> currList) throws Exception;

22

}

23

```

24

25

### DataVecSequenceDataSetFunction

26

27

Spark function for converting sequence data (Collections of Collections of Writables) to DataSet objects.

28

29

```java { .api }

30

public class DataVecSequenceDataSetFunction implements Function<List<List<Writable>>, DataSet>, Serializable {

31

public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression);

32

public DataVecSequenceDataSetFunction(int labelIndex, int numPossibleLabels, boolean regression,

33

DataSetPreProcessor preProcessor, WritableConverter converter);

34

35

public DataSet call(List<List<Writable>> input) throws Exception;

36

}

37

```

38

39

### DataVecSequencePairDataSetFunction

40

41

Spark function for converting pairs of sequence data to DataSet objects (separate feature and label sequences).

42

43

```java { .api }

44

public class DataVecSequencePairDataSetFunction

45

implements Function<Tuple2<List<List<Writable>>, List<List<Writable>>>, DataSet>, Serializable {

46

47

public DataVecSequencePairDataSetFunction();

48

public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression);

49

public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,

50

AlignmentMode alignmentMode);

51

public DataVecSequencePairDataSetFunction(int numPossibleLabels, boolean regression,

52

AlignmentMode alignmentMode,

53

DataSetPreProcessor preProcessor,

54

WritableConverter converter);

55

56

public DataSet call(Tuple2<List<List<Writable>>, List<List<Writable>>> input) throws Exception;

57

}

58

```

59

60

### DataVecByteDataSetFunction

61

62

Spark function for converting byte data to DataSet objects, useful for image and binary data processing.

63

64

```java { .api }

65

public class DataVecByteDataSetFunction

66

implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {

67

68

public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,

69

int batchSize, int byteFileLen);

70

public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,

71

int batchSize, int byteFileLen, boolean regression);

72

public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels,

73

int batchSize, int byteFileLen, boolean regression,

74

DataSetPreProcessor preProcessor);

75

76

public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;

77

}

78

```

79

80

### RecordReaderFunction

81

82

Spark function for converting strings to DataSet objects using a RecordReader.

83

84

```java { .api }

85

public class RecordReaderFunction implements Function<String, DataSet> {

86

public RecordReaderFunction(RecordReader recordReader, int labelIndex,

87

int numPossibleLabels, WritableConverter converter);

88

public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);

89

90

public DataSet call(String v1) throws Exception;

91

}

92

```

93

94

### RDDMiniBatches

95

96

Utility for creating mini-batches from RDD<DataSet> for distributed training.

97

98

```java { .api }

99

public class RDDMiniBatches implements Serializable {

100

public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);

101

102

public JavaRDD<DataSet> miniBatchesJava();

103

104

// Inner classes for batch processing

105

public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {

106

public MiniBatchFunction(int batchSize);

107

}

108

109

public static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {

110

public MiniBatchFunctionAdapter(int batchSize);

111

public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;

112

}

113

}

114

```

115

116

### StringToDataSetExportFunction

117

118

Export function for converting strings to DataSet objects with file output.

119

120

```java { .api }

121

public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {

122

public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader,

123

int batchSize, boolean regression,

124

int labelIndex, int numPossibleLabels);

125

126

public void call(Iterator<String> stringIterator) throws Exception;

127

}

128

```

129

130

## Usage Examples

131

132

### Basic Spark DataSet Processing

133

134

```java

135

import org.apache.spark.api.java.JavaRDD;

136

import org.apache.spark.api.java.JavaSparkContext;

137

import org.datavec.api.writable.Writable;

138

import org.deeplearning4j.spark.datavec.DataVecDataSetFunction;

139

140

// Setup Spark context

141

JavaSparkContext sc = new JavaSparkContext();

142

143

// Load data as RDD<List<Writable>>

144

JavaRDD<List<Writable>> rawData = sc.textFile("hdfs://data.csv")

145

.map(line -> {

146

// Parse CSV line to List<Writable>

147

String[] values = line.split(",");

148

List<Writable> writables = new ArrayList<>();

149

for (String value : values) {

150

writables.add(new DoubleWritable(Double.parseDouble(value)));

151

}

152

return writables;

153

});

154

155

// Convert to DataSet RDD

156

DataVecDataSetFunction dataSetFunction = new DataVecDataSetFunction(

157

4, // labelIndex (column 4)

158

3, // numPossibleLabels (3 classes)

159

false // regression = false (classification)

160

);

161

162

JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);

163

164

// Use for distributed training

165

dataSetRDD.foreach(dataSet -> {

166

// Train model with dataSet

167

System.out.println("Processing batch with " + dataSet.numExamples() + " examples");

168

});

169

```

170

171

### Sequence Processing with Spark

172

173

```java

174

import org.deeplearning4j.spark.datavec.DataVecSequenceDataSetFunction;

175

176

// RDD of sequence data (each element is a List<List<Writable>>)

177

JavaRDD<List<List<Writable>>> sequenceData = sc.textFile("hdfs://sequences/")

178

.map(sequenceFile -> {

179

// Parse sequence file into List<List<Writable>>

180

// Each line represents one time step

181

List<List<Writable>> sequence = new ArrayList<>();

182

String[] lines = sequenceFile.split("\n");

183

for (String line : lines) {

184

List<Writable> timeStep = parseTimeStep(line);

185

sequence.add(timeStep);

186

}

187

return sequence;

188

});

189

190

// Convert to sequence DataSet RDD

191

DataVecSequenceDataSetFunction sequenceFunction =

192

new DataVecSequenceDataSetFunction(

193

5, // labelIndex (column 5 in each time step)

194

10, // numPossibleLabels (10 classes)

195

false // regression = false

196

);

197

198

JavaRDD<DataSet> sequenceDataSetRDD = sequenceData.map(sequenceFunction);

199

```

200

201

### Separate Feature and Label Sequences

202

203

```java

204

import org.deeplearning4j.spark.datavec.DataVecSequencePairDataSetFunction;

205

import scala.Tuple2;

206

207

// RDD of paired sequences (features, labels)

208

JavaRDD<Tuple2<List<List<Writable>>, List<List<Writable>>>> pairedSequences =

209

featureSequenceRDD.zip(labelSequenceRDD);

210

211

// Convert paired sequences to DataSet

212

DataVecSequencePairDataSetFunction pairFunction =

213

new DataVecSequencePairDataSetFunction(

214

5, // numPossibleLabels

215

false, // regression = false

216

AlignmentMode.ALIGN_START // alignment mode

217

);

218

219

JavaRDD<DataSet> pairedDataSetRDD = pairedSequences.map(pairFunction);

220

```

221

222

### Binary/Image Data Processing

223

224

```java

225

import org.apache.hadoop.io.BytesWritable;

226

import org.apache.hadoop.io.Text;

227

import org.deeplearning4j.spark.datavec.DataVecByteDataSetFunction;

228

229

// RDD of binary files (e.g., images with labels in filename)

230

JavaPairRDD<Text, BytesWritable> binaryFiles = sc.sequenceFile("hdfs://images/",

231

Text.class,

232

BytesWritable.class);

233

234

// Convert binary data to DataSet

235

DataVecByteDataSetFunction byteFunction = new DataVecByteDataSetFunction(

236

0, // labelIndex (extract from filename)

237

10, // numPossibleLabels (10 image classes)

238

32, // batchSize

239

28*28*3 // byteFileLen (image size)

240

);

241

242

JavaRDD<Tuple2<Double, DataSet>> imageDataSets = binaryFiles.map(byteFunction);

243

```

244

245

### Mini-Batch Creation

246

247

```java

248

import org.deeplearning4j.spark.datavec.RDDMiniBatches;

249

250

// Create mini-batches from individual DataSet objects

251

JavaRDD<DataSet> individualDataSets = // ... RDD of single-example DataSets

252

253

RDDMiniBatches miniBatcher = new RDDMiniBatches(64, individualDataSets); // 64 examples per batch

254

JavaRDD<DataSet> miniBatchRDD = miniBatcher.miniBatchesJava();

255

256

// Each element in miniBatchRDD now contains up to 64 examples

257

miniBatchRDD.foreach(batch -> {

258

System.out.println("Mini-batch size: " + batch.numExamples());

259

});

260

```

261

262

### RecordReader with Spark

263

264

```java

265

import org.datavec.api.records.reader.impl.csv.CSVRecordReader;

266

import org.deeplearning4j.spark.datavec.RecordReaderFunction;

267

268

// Setup RecordReader (must be serializable)

269

RecordReader csvReader = new CSVRecordReader();

270

271

// Create function to process strings with RecordReader

272

RecordReaderFunction readerFunction = new RecordReaderFunction(

273

csvReader, // recordReader

274

4, // labelIndex

275

3 // numPossibleLabels

276

);

277

278

// Apply to RDD of strings (e.g., CSV lines)

279

JavaRDD<String> csvLines = sc.textFile("hdfs://data.csv");

280

JavaRDD<DataSet> processedData = csvLines.map(readerFunction);

281

```

282

283

### Data Export

284

285

```java

286

import org.deeplearning4j.spark.datavec.export.StringToDataSetExportFunction;

287

import java.net.URI;

288

289

// Export processed data to files

290

URI outputPath = new URI("hdfs://output/datasets/");

291

RecordReader exportReader = new CSVRecordReader();

292

293

StringToDataSetExportFunction exportFunction = new StringToDataSetExportFunction(

294

outputPath, // outputDir

295

exportReader, // recordReader

296

100, // batchSize

297

false, // regression

298

4, // labelIndex

299

3 // numPossibleLabels

300

);

301

302

// Apply export function to partitions

303

csvLines.foreachPartition(exportFunction);

304

```

305

306

## Performance Considerations

307

308

### Serialization

309

All Spark functions are Serializable and can be distributed across cluster nodes:

310

- RecordReaders must be serializable

311

- DataSetPreProcessors must be serializable

312

- Custom WritableConverters must be serializable

313

314

### Memory Management

315

- Configure appropriate batch sizes to avoid OOM errors

316

- Use `RDDMiniBatches` to create properly sized mini-batches

317

- Consider data partitioning strategy for optimal cluster utilization

318

319

### Caching

320

```java

321

// Cache frequently accessed RDDs

322

JavaRDD<DataSet> dataSetRDD = rawData.map(dataSetFunction);

323

dataSetRDD.cache(); // Cache in memory for repeated access

324

325

// Use for multiple training epochs

326

for (int epoch = 0; epoch < numEpochs; epoch++) {

327

dataSetRDD.foreach(batch -> trainModel(batch));

328

}

329

```

330

331

## Error Handling

332

333

### Common Exceptions

334

- **SerializationException**: Non-serializable objects in Spark functions

335

- **IllegalArgumentException**: Invalid function parameters

336

- **IOException**: Data reading/writing errors

337

- **SparkException**: Spark cluster execution errors

338

339

### Best Practices

340

- Always test functions locally before running on cluster

341

- Use appropriate error handling in map/foreach operations

342

- Monitor Spark UI for task failures and performance bottlenecks

343

- Validate data schema consistency across partitions