or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-export.mddata-transformation.mdindex.mdsequence-processing.mdspecialized-inputs.md

specialized-inputs.mddocs/

0

# Specialized Input Processing

1

2

Specialized input processing functions handle binary data and string records using RecordReader implementations. These functions are designed for specific data formats and input sources commonly encountered in production data pipelines.

3

4

## DataVecByteDataSetFunction

5

6

Processes binary data stored as BytesWritable objects, converting them to DataSet objects with configurable batch sizes and file lengths.

7

8

```java { .api }

9

public class DataVecByteDataSetFunction

10

implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {

11

12

public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize, int byteFileLen);

13

14

public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,

15

int byteFileLen, boolean regression);

16

17

public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,

18

int byteFileLen, boolean regression,

19

DataSetPreProcessor preProcessor);

20

21

public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;

22

}

23

```

24

25

### Parameters

26

27

- **labelIndex**: Byte position containing the label (0-based). Use -1 to infer as last byte.

28

- **numPossibleLabels**: Number of classes for classification

29

- **batchSize**: Number of examples to include in each DataSet

30

- **byteFileLen**: Number of bytes per individual record/file

31

- **regression**: `false` for classification, `true` for regression

32

- **preProcessor**: Optional DataSetPreProcessor for data normalization

33

34

### Usage Examples

35

36

#### Image Classification from Bytes

37

38

```java

39

// Process binary image data with labels

40

// Each record: 784 bytes (28x28 pixels) + 1 label byte = 785 bytes total

41

DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(

42

0, // labelIndex: first byte contains label

43

10, // numPossibleLabels: 10 classes (digits 0-9)

44

32, // batchSize: process 32 images per DataSet

45

785 // byteFileLen: 785 bytes per record

46

);

47

48

JavaRDD<Tuple2<Text, BytesWritable>> byteData = // ... load binary data

49

JavaPairRDD<Double, DataSet> results = byteData.map(transformer);

50

```

51

52

#### Binary Sensor Data Processing

53

54

```java

55

import org.nd4j.linalg.dataset.api.preprocessor.NormalizerMinMaxScaler;

56

57

// Process sensor data with normalization

58

DataSetPreProcessor normalizer = new NormalizerMinMaxScaler();

59

DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(

60

-1, // labelIndex: -1 for last byte as label

61

5, // numPossibleLabels: 5 sensor states

62

64, // batchSize: 64 samples per batch

63

100, // byteFileLen: 100 bytes per sensor reading

64

false, // classification mode

65

normalizer

66

);

67

```

68

69

#### Regression from Binary Features

70

71

```java

72

// Binary feature extraction for regression

73

DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(

74

255, // labelIndex: byte 255 contains regression target

75

-1, // numPossibleLabels: ignored for regression

76

16, // batchSize

77

256, // byteFileLen: 256 bytes per record

78

true // regression mode

79

);

80

```

81

82

### Behavior Details

83

84

#### Data Processing Flow

85

1. Reads BytesWritable data as byte array

86

2. Determines feature vector length (byteFileLen - 1 for classification)

87

3. Processes bytes in batches according to batchSize parameter

88

4. Extracts label from specified byte position

89

5. Creates feature vectors from remaining bytes

90

6. Converts labels to one-hot vectors for classification

91

7. Returns Tuple2<Double, DataSet> where Double indicates actual batch count

92

93

#### Label Inference

94

When `labelIndex` is -1, automatically uses the last byte position as the label.

95

96

#### Batch Processing

97

Processes multiple records into a single DataSet until reaching `batchSize` or end of input stream.

98

99

## RecordReaderFunction

100

101

Converts string data to DataSet objects using DataVec RecordReader implementations for flexible data parsing.

102

103

```java { .api }

104

public class RecordReaderFunction implements Function<String, DataSet> {

105

public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels,

106

WritableConverter converter);

107

108

public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);

109

110

public DataSet call(String v1) throws Exception;

111

}

112

```

113

114

### Parameters

115

116

- **recordReader**: DataVec RecordReader implementation for parsing string data

117

- **labelIndex**: Column index containing labels

118

- **numPossibleLabels**: Number of classes for classification

119

- **converter**: Optional WritableConverter for data type conversion

120

121

### Usage Examples

122

123

#### CSV String Processing

124

125

```java

126

import org.datavec.api.records.reader.impl.csv.CSVRecordReader;

127

128

// Process CSV strings to DataSet

129

CSVRecordReader csvReader = new CSVRecordReader();

130

RecordReaderFunction transformer = new RecordReaderFunction(

131

csvReader,

132

4, // labelIndex: column 4 contains labels

133

3 // numPossibleLabels: 3 classes

134

);

135

136

JavaRDD<String> csvStrings = // ... RDD of CSV lines

137

JavaRDD<DataSet> datasets = csvStrings.map(transformer);

138

```

139

140

#### JSON String Processing

141

142

```java

143

import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;

144

import org.datavec.api.io.converters.SelfWritableConverter;

145

146

// Process JSON strings with custom converter

147

JacksonRecordReader jsonReader = new JacksonRecordReader();

148

WritableConverter converter = new SelfWritableConverter();

149

150

RecordReaderFunction transformer = new RecordReaderFunction(

151

jsonReader,

152

0, // labelIndex

153

5, // numPossibleLabels

154

converter // custom conversion logic

155

);

156

157

JavaRDD<String> jsonStrings = // ... RDD of JSON strings

158

JavaRDD<DataSet> datasets = jsonStrings.map(transformer);

159

```

160

161

#### Custom RecordReader

162

163

```java

164

import org.datavec.api.records.reader.RecordReader;

165

import org.datavec.api.split.StringSplit;

166

167

// Implement custom string parsing logic

168

RecordReader customReader = new RecordReader() {

169

@Override

170

public void initialize(InputSplit split) throws IOException, InterruptedException {

171

// Custom initialization

172

}

173

174

@Override

175

public List<Writable> next() {

176

// Custom parsing logic

177

return parsedWritables;

178

}

179

180

@Override

181

public boolean hasNext() {

182

return true; // Single record processing

183

}

184

};

185

186

RecordReaderFunction transformer = new RecordReaderFunction(customReader, 2, 7);

187

```

188

189

### Behavior Details

190

191

#### String Processing Flow

192

1. Initializes RecordReader with StringSplit of input string

193

2. Parses string using RecordReader.next() to get List<Writable>

194

3. Separates features and labels based on labelIndex

195

4. Converts labels to one-hot vectors for classification

196

5. Creates feature vectors from non-label columns

197

6. Returns single-example DataSet

198

199

#### Error Handling

200

- Throws `IllegalStateException` if `numPossibleLabels < 1`

201

- Handles data type conversion through WritableConverter if provided

202

- Supports various RecordReader implementations for different string formats

203

204

#### Data Stacking

205

Internally creates lists of DataSet objects and uses `Nd4j.vstack()` to combine them, though typically processes single records.

206

207

### Integration Patterns

208

209

#### File Processing Pipeline

210

211

```java

212

// Complete file processing workflow

213

JavaRDD<String> fileContents = sc.textFile("hdfs://data/*.csv");

214

215

CSVRecordReader csvReader = new CSVRecordReader();

216

RecordReaderFunction transformer = new RecordReaderFunction(csvReader, 0, 10);

217

218

JavaRDD<DataSet> processedData = fileContents.map(transformer);

219

processedData.cache(); // Cache for reuse

220

```

221

222

#### Stream Processing

223

224

```java

225

// Streaming data processing

226

JavaDStream<String> stream = // ... Spark Streaming source

227

JavaDStream<DataSet> processedStream = stream.map(transformer);

228

229

processedStream.foreachRDD(rdd -> {

230

// Process each batch

231

rdd.collect();

232

});

233

```

234

235

## Input Format Support

236

237

Both specialized input functions support:

238

239

- **Binary formats**: Raw bytes, images, sensor data

240

- **Text formats**: CSV, JSON, XML, custom delimited

241

- **Hadoop formats**: Text/BytesWritable pairs from Hadoop InputFormats

242

- **Custom parsing**: Extensible through RecordReader implementations

243

244

## Error Handling Patterns

245

246

```java

247

try {

248

DataSet result = transformer.call(input);

249

// Process successful result

250

} catch (IllegalStateException e) {

251

// Handle configuration errors

252

logger.error("Configuration error: " + e.getMessage());

253

} catch (IOException e) {

254

// Handle I/O errors during parsing

255

logger.error("I/O error: " + e.getMessage());

256

} catch (Exception e) {

257

// Handle other processing errors

258

logger.error("Processing error: " + e.getMessage());

259

}

260

```