0
# Specialized Input Processing
1
2
Specialized input processing functions handle binary data and string records using RecordReader implementations. These functions are designed for specific data formats and input sources commonly encountered in production data pipelines.
3
4
## DataVecByteDataSetFunction
5
6
Processes binary data stored as BytesWritable objects, converting them to DataSet objects with configurable batch sizes and file lengths.
7
8
```java { .api }
9
public class DataVecByteDataSetFunction
10
implements PairFunction<Tuple2<Text, BytesWritable>, Double, DataSet> {
11
12
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize, int byteFileLen);
13
14
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,
15
int byteFileLen, boolean regression);
16
17
public DataVecByteDataSetFunction(int labelIndex, int numPossibleLabels, int batchSize,
18
int byteFileLen, boolean regression,
19
DataSetPreProcessor preProcessor);
20
21
public Tuple2<Double, DataSet> call(Tuple2<Text, BytesWritable> inputTuple) throws Exception;
22
}
23
```
24
25
### Parameters
26
27
- **labelIndex**: Byte position containing the label (0-based). Use -1 to infer as last byte.
28
- **numPossibleLabels**: Number of classes for classification
29
- **batchSize**: Number of examples to include in each DataSet
30
- **byteFileLen**: Number of bytes per individual record/file
31
- **regression**: `false` for classification, `true` for regression
32
- **preProcessor**: Optional DataSetPreProcessor for data normalization
33
34
### Usage Examples
35
36
#### Image Classification from Bytes
37
38
```java
39
// Process binary image data with labels
40
// Each record: 784 bytes (28x28 pixels) + 1 label byte = 785 bytes total
41
DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(
42
0, // labelIndex: first byte contains label
43
10, // numPossibleLabels: 10 classes (digits 0-9)
44
32, // batchSize: process 32 images per DataSet
45
785 // byteFileLen: 785 bytes per record
46
);
47
48
JavaRDD<Tuple2<Text, BytesWritable>> byteData = // ... load binary data
49
JavaPairRDD<Double, DataSet> results = byteData.map(transformer);
50
```
51
52
#### Binary Sensor Data Processing
53
54
```java
55
import org.nd4j.linalg.dataset.api.preprocessor.NormalizerMinMaxScaler;
56
57
// Process sensor data with normalization
58
DataSetPreProcessor normalizer = new NormalizerMinMaxScaler();
59
DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(
60
-1, // labelIndex: -1 for last byte as label
61
5, // numPossibleLabels: 5 sensor states
62
64, // batchSize: 64 samples per batch
63
100, // byteFileLen: 100 bytes per sensor reading
64
false, // classification mode
65
normalizer
66
);
67
```
68
69
#### Regression from Binary Features
70
71
```java
72
// Binary feature extraction for regression
73
DataVecByteDataSetFunction transformer = new DataVecByteDataSetFunction(
74
255, // labelIndex: byte 255 contains regression target
75
-1, // numPossibleLabels: ignored for regression
76
16, // batchSize
77
256, // byteFileLen: 256 bytes per record
78
true // regression mode
79
);
80
```
81
82
### Behavior Details
83
84
#### Data Processing Flow
85
1. Reads BytesWritable data as byte array
86
2. Determines feature vector length (byteFileLen - 1 for classification)
87
3. Processes bytes in batches according to batchSize parameter
88
4. Extracts label from specified byte position
89
5. Creates feature vectors from remaining bytes
90
6. Converts labels to one-hot vectors for classification
91
7. Returns Tuple2<Double, DataSet> where Double indicates actual batch count
92
93
#### Label Inference
94
When `labelIndex` is -1, automatically uses the last byte position as the label.
95
96
#### Batch Processing
97
Processes multiple records into a single DataSet until reaching `batchSize` or end of input stream.
98
99
## RecordReaderFunction
100
101
Converts string data to DataSet objects using DataVec RecordReader implementations for flexible data parsing.
102
103
```java { .api }
104
public class RecordReaderFunction implements Function<String, DataSet> {
105
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels,
106
WritableConverter converter);
107
108
public RecordReaderFunction(RecordReader recordReader, int labelIndex, int numPossibleLabels);
109
110
public DataSet call(String v1) throws Exception;
111
}
112
```
113
114
### Parameters
115
116
- **recordReader**: DataVec RecordReader implementation for parsing string data
117
- **labelIndex**: Column index containing labels
118
- **numPossibleLabels**: Number of classes for classification
119
- **converter**: Optional WritableConverter for data type conversion
120
121
### Usage Examples
122
123
#### CSV String Processing
124
125
```java
126
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
127
128
// Process CSV strings to DataSet
129
CSVRecordReader csvReader = new CSVRecordReader();
130
RecordReaderFunction transformer = new RecordReaderFunction(
131
csvReader,
132
4, // labelIndex: column 4 contains labels
133
3 // numPossibleLabels: 3 classes
134
);
135
136
JavaRDD<String> csvStrings = // ... RDD of CSV lines
137
JavaRDD<DataSet> datasets = csvStrings.map(transformer);
138
```
139
140
#### JSON String Processing
141
142
```java
143
import org.datavec.api.records.reader.impl.jackson.JacksonRecordReader;
144
import org.datavec.api.io.converters.SelfWritableConverter;
145
146
// Process JSON strings with custom converter
147
JacksonRecordReader jsonReader = new JacksonRecordReader();
148
WritableConverter converter = new SelfWritableConverter();
149
150
RecordReaderFunction transformer = new RecordReaderFunction(
151
jsonReader,
152
0, // labelIndex
153
5, // numPossibleLabels
154
converter // custom conversion logic
155
);
156
157
JavaRDD<String> jsonStrings = // ... RDD of JSON strings
158
JavaRDD<DataSet> datasets = jsonStrings.map(transformer);
159
```
160
161
#### Custom RecordReader
162
163
```java
164
import org.datavec.api.records.reader.RecordReader;
165
import org.datavec.api.split.StringSplit;
166
167
// Implement custom string parsing logic
168
RecordReader customReader = new RecordReader() {
169
@Override
170
public void initialize(InputSplit split) throws IOException, InterruptedException {
171
// Custom initialization
172
}
173
174
@Override
175
public List<Writable> next() {
176
// Custom parsing logic
177
return parsedWritables;
178
}
179
180
@Override
181
public boolean hasNext() {
182
return true; // Single record processing
183
}
184
};
185
186
RecordReaderFunction transformer = new RecordReaderFunction(customReader, 2, 7);
187
```
188
189
### Behavior Details
190
191
#### String Processing Flow
192
1. Initializes RecordReader with StringSplit of input string
193
2. Parses string using RecordReader.next() to get List<Writable>
194
3. Separates features and labels based on labelIndex
195
4. Converts labels to one-hot vectors for classification
196
5. Creates feature vectors from non-label columns
197
6. Returns single-example DataSet
198
199
#### Error Handling
200
- Throws `IllegalStateException` if `numPossibleLabels < 1`
201
- Handles data type conversion through WritableConverter if provided
202
- Supports various RecordReader implementations for different string formats
203
204
#### Data Stacking
205
Internally creates lists of DataSet objects and uses `Nd4j.vstack()` to combine them, though typically processes single records.
206
207
### Integration Patterns
208
209
#### File Processing Pipeline
210
211
```java
212
// Complete file processing workflow
213
JavaRDD<String> fileContents = sc.textFile("hdfs://data/*.csv");
214
215
CSVRecordReader csvReader = new CSVRecordReader();
216
RecordReaderFunction transformer = new RecordReaderFunction(csvReader, 0, 10);
217
218
JavaRDD<DataSet> processedData = fileContents.map(transformer);
219
processedData.cache(); // Cache for reuse
220
```
221
222
#### Stream Processing
223
224
```java
225
// Streaming data processing
226
JavaDStream<String> stream = // ... Spark Streaming source
227
JavaDStream<DataSet> processedStream = stream.map(transformer);
228
229
processedStream.foreachRDD(rdd -> {
230
// Process each batch
231
rdd.collect();
232
});
233
```
234
235
## Input Format Support
236
237
Both specialized input functions support:
238
239
- **Binary formats**: Raw bytes, images, sensor data
240
- **Text formats**: CSV, JSON, XML, custom delimited
241
- **Hadoop formats**: Text/BytesWritable pairs from Hadoop InputFormats
242
- **Custom parsing**: Extensible through RecordReader implementations
243
244
## Error Handling Patterns
245
246
```java
247
try {
248
DataSet result = transformer.call(input);
249
// Process successful result
250
} catch (IllegalStateException e) {
251
// Handle configuration errors
252
logger.error("Configuration error: " + e.getMessage());
253
} catch (IOException e) {
254
// Handle I/O errors during parsing
255
logger.error("I/O error: " + e.getMessage());
256
} catch (Exception e) {
257
// Handle other processing errors
258
logger.error("Processing error: " + e.getMessage());
259
}
260
```