or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-export.mddata-transformation.mdindex.mdsequence-processing.mdspecialized-inputs.md

batch-export.mddocs/

0

# Batch Processing and Export

1

2

Batch processing and export utilities handle mini-batch creation from RDDs and dataset export functionality for production workflows. These tools optimize data processing performance and enable persistent storage of processed datasets.

3

4

## RDDMiniBatches

5

6

Handles mini-batch partitioning of DataSet RDDs for efficient processing in distributed training scenarios.

7

8

```java { .api }

9

public class RDDMiniBatches implements Serializable {

10

public RDDMiniBatches(int miniBatches, JavaRDD<DataSet> toSplit);

11

public JavaRDD<DataSet> miniBatchesJava();

12

13

// Inner classes for batch processing

14

public static class MiniBatchFunction extends BaseFlatMapFunctionAdaptee<Iterator<DataSet>, DataSet> {

15

public MiniBatchFunction(int batchSize);

16

}

17

18

static class MiniBatchFunctionAdapter implements FlatMapFunctionAdapter<Iterator<DataSet>, DataSet> {

19

public MiniBatchFunctionAdapter(int batchSize);

20

public Iterable<DataSet> call(Iterator<DataSet> dataSetIterator) throws Exception;

21

}

22

}

23

```

24

25

### Parameters

26

27

- **miniBatches**: Target number of examples per mini-batch

28

- **toSplit**: Input JavaRDD<DataSet> to be partitioned into mini-batches

29

30

### Usage Examples

31

32

#### Basic Mini-Batch Creation

33

34

```java

35

import org.deeplearning4j.spark.datavec.RDDMiniBatches;

36

37

// Create mini-batches from individual DataSet objects

38

JavaRDD<DataSet> individualDatasets = // ... RDD of single-example DataSets

39

int batchSize = 32;

40

41

RDDMiniBatches batcher = new RDDMiniBatches(batchSize, individualDatasets);

42

JavaRDD<DataSet> miniBatches = batcher.miniBatchesJava();

43

44

// Each DataSet in miniBatches now contains up to 32 examples

45

long batchCount = miniBatches.count();

46

```

47

48

#### Training Pipeline Integration

49

50

```java

51

// Complete training pipeline with mini-batching

52

JavaRDD<List<Writable>> rawData = // ... load raw data

53

DataVecDataSetFunction transformer = new DataVecDataSetFunction(4, 10, false);

54

JavaRDD<DataSet> datasets = rawData.map(transformer);

55

56

// Create mini-batches for efficient training

57

RDDMiniBatches batcher = new RDDMiniBatches(64, datasets);

58

JavaRDD<DataSet> trainingBatches = batcher.miniBatchesJava();

59

60

// Cache for multiple epochs

61

trainingBatches.cache();

62

63

// Use in training loop

64

for (int epoch = 0; epoch < numEpochs; epoch++) {

65

trainingBatches.foreach(batch -> {

66

// Train model with batch

67

model.fit(batch);

68

});

69

}

70

```

71

72

#### Dynamic Batch Sizing

73

74

```java

75

// Adjust batch size based on partition size

76

JavaRDD<DataSet> data = // ... your dataset RDD

77

long totalExamples = data.count();

78

int numPartitions = data.getNumPartitions();

79

int optimalBatchSize = (int) Math.max(1, totalExamples / (numPartitions * 4));

80

81

RDDMiniBatches batcher = new RDDMiniBatches(optimalBatchSize, data);

82

JavaRDD<DataSet> optimizedBatches = batcher.miniBatchesJava();

83

```

84

85

### Behavior Details

86

87

#### Batch Merging Process

88

1. Processes DataSets in groups according to partition boundaries

89

2. Accumulates individual DataSets until reaching target batch size

90

3. Merges accumulated DataSets using `DataSet.merge()`

91

4. Handles edge cases where remaining examples are fewer than batch size

92

5. Returns merged DataSets containing multiple examples

93

94

#### Memory Considerations

95

- Creates copies of DataSets during merging process

96

- Edge case handling: if remaining examples > 1, creates final batch

97

- Race condition handling for map-partitions operations

98

99

## StringToDataSetExportFunction

100

101

Exports DataSet objects to persistent storage using RecordReader conversion, designed for use with `forEachPartition()` operations.

102

103

```java { .api }

104

public class StringToDataSetExportFunction implements VoidFunction<Iterator<String>> {

105

public StringToDataSetExportFunction(URI outputDir, RecordReader recordReader, int batchSize,

106

boolean regression, int labelIndex, int numPossibleLabels);

107

108

public void call(Iterator<String> stringIterator) throws Exception;

109

}

110

```

111

112

### Parameters

113

114

- **outputDir**: URI destination directory for exported DataSet files

115

- **recordReader**: RecordReader implementation for parsing input strings

116

- **batchSize**: Number of records to include in each exported DataSet file

117

- **regression**: `false` for classification, `true` for regression

118

- **labelIndex**: Column index containing labels

119

- **numPossibleLabels**: Number of classes for classification

120

121

### Usage Examples

122

123

#### CSV Export to HDFS

124

125

```java

126

import org.datavec.api.records.reader.impl.csv.CSVRecordReader;

127

import java.net.URI;

128

129

// Export processed CSV data to HDFS

130

JavaRDD<String> csvData = sc.textFile("hdfs://input/data.csv");

131

URI outputPath = new URI("hdfs://output/datasets/");

132

133

CSVRecordReader csvReader = new CSVRecordReader();

134

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(

135

outputPath,

136

csvReader,

137

100, // batchSize: 100 records per file

138

false, // classification mode

139

4, // labelIndex

140

10 // numPossibleLabels

141

);

142

143

// Export in parallel across partitions

144

csvData.foreachPartition(exporter);

145

```

146

147

#### Batch Export with Custom RecordReader

148

149

```java

150

import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;

151

152

// Export JSON data with custom batch sizes

153

JavaRDD<String> jsonData = // ... JSON string RDD

154

URI outputDirectory = new URI("s3://bucket/exported-datasets/");

155

156

JacksonRecordReader jsonReader = new JacksonRecordReader();

157

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(

158

outputDirectory,

159

jsonReader,

160

50, // smaller batches for complex JSON

161

true, // regression mode

162

0, // labelIndex

163

-1 // ignored for regression

164

);

165

166

jsonData.foreachPartition(exporter);

167

```

168

169

#### Local File System Export

170

171

```java

172

import java.io.File;

173

174

// Export to local file system

175

JavaRDD<String> textData = // ... text data RDD

176

File localOutputDir = new File("/tmp/exported-datasets");

177

URI localPath = localOutputDir.toURI();

178

179

CSVRecordReader reader = new CSVRecordReader();

180

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(

181

localPath,

182

reader,

183

200, // batchSize

184

false, // classification

185

-1, // infer label column

186

5 // numPossibleLabels

187

);

188

189

// Process each partition separately

190

textData.foreachPartition(exporter);

191

```

192

193

### Behavior Details

194

195

#### Export Process Flow

196

1. Processes input strings in batches according to batchSize

197

2. Uses RecordReader to parse each string into List<Writable>

198

3. Accumulates parsed records until batch is full or iterator is exhausted

199

4. Creates RecordReaderDataSetIterator for batch conversion

200

5. Converts batch to DataSet using RecordReaderDataSetIterator

201

6. Generates unique filename using thread ID and output counter

202

7. Saves DataSet to persistent storage using Hadoop FileSystem API

203

204

#### File Naming Convention

205

Generated files follow the pattern: `dataset_{threadId}_{counter}.bin`

206

- **threadId**: Unique identifier based on JVM UID and thread ID

207

- **counter**: Sequential number for files created by same thread

208

209

#### Storage Compatibility

210

- Supports HDFS, S3, local file systems via Hadoop FileSystem API

211

- Uses DataSet.save() method for binary serialization

212

- Creates FSDataOutputStream for efficient writing

213

214

### Integration Patterns

215

216

#### Production Export Pipeline

217

218

```java

219

// Complete data processing and export pipeline

220

JavaRDD<String> rawData = sc.textFile("hdfs://input/*");

221

222

// Transform and export in single pipeline

223

CSVRecordReader reader = new CSVRecordReader();

224

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(

225

new URI("hdfs://output/processed-datasets/"),

226

reader,

227

128, // optimal batch size for HDFS

228

false, // classification

229

0, // label column

230

10 // classes

231

);

232

233

// Process and export

234

rawData.foreachPartition(exporter);

235

236

// Verify export completion

237

FileSystem hdfs = FileSystem.get(new Configuration());

238

FileStatus[] exportedFiles = hdfs.listStatus(new Path("hdfs://output/processed-datasets/"));

239

System.out.println("Exported " + exportedFiles.length + " dataset files");

240

```

241

242

#### Stream Processing Export

243

244

```java

245

// Streaming export for real-time processing

246

JavaDStream<String> streamingData = // ... Spark Streaming source

247

248

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(

249

new URI("hdfs://stream-output/"),

250

new CSVRecordReader(),

251

64, // smaller batches for streaming

252

false,

253

1,

254

5

255

);

256

257

streamingData.foreachRDD(rdd -> {

258

if (!rdd.isEmpty()) {

259

rdd.foreachPartition(exporter);

260

}

261

});

262

```

263

264

## Performance Considerations

265

266

### Mini-Batch Optimization

267

- **Batch Size**: Balance between memory usage and computational efficiency

268

- **Partition Count**: Consider Spark cluster resources when sizing batches

269

- **Caching**: Cache mini-batched RDDs when used multiple times

270

271

### Export Optimization

272

- **Batch Size**: Larger batches reduce file count but increase memory usage

273

- **Partitioning**: More partitions enable parallel export but create more files

274

- **Storage Format**: Binary DataSet format optimized for fast loading

275

276

### Memory Management

277

278

```java

279

// Monitor memory usage during batch processing

280

JavaRDD<DataSet> data = // ... your data

281

data.cache(); // Cache if reused

282

283

// Process in smaller batches if memory constrained

284

int conservativeBatchSize = 16;

285

RDDMiniBatches batcher = new RDDMiniBatches(conservativeBatchSize, data);

286

287

// Unpersist when no longer needed

288

data.unpersist();

289

```

290

291

## Error Handling

292

293

```java

294

try {

295

StringToDataSetExportFunction exporter = new StringToDataSetExportFunction(

296

outputPath, reader, batchSize, regression, labelIndex, numLabels

297

);

298

299

dataRDD.foreachPartition(exporter);

300

301

} catch (IOException e) {

302

logger.error("Failed to write DataSet files: " + e.getMessage());

303

} catch (IllegalArgumentException e) {

304

logger.error("Invalid export configuration: " + e.getMessage());

305

} catch (Exception e) {

306

logger.error("Export processing failed: " + e.getMessage());

307

}

308

```