0
# Input Sources and Splits
1
2
DataVec provides flexible abstractions for specifying data sources through the InputSplit hierarchy. These enable reading from various sources including local files, distributed file systems, streams, and custom data sources.
3
4
## Capabilities
5
6
### Core InputSplit Interface
7
8
The base interface for all data source specifications. Provides methods for getting source locations, calculating data size, and supporting distributed processing.
9
10
```java { .api }
11
public interface InputSplit {
12
URI[] locations();
13
long length();
14
double getWeight();
15
boolean canWriteToLocation(URI location);
16
String addNewLocation();
17
String addNewLocation(String location);
18
void updateSplitLocations(boolean reset);
19
}
20
```
21
22
**Usage Example:**
23
24
```java
25
InputSplit split = new FileSplit(new File("data.csv"));
26
URI[] locations = split.locations(); // Array of file URIs
27
long dataSize = split.length(); // Size in bytes
28
double weight = split.getWeight(); // Weight for distributed processing
29
```
30
31
### File-Based Splits
32
33
#### Single File Split
34
35
Handles individual files for data input.
36
37
```java { .api }
38
public class FileSplit implements InputSplit {
39
public FileSplit(File file);
40
public FileSplit(File[] files);
41
public FileSplit(String path);
42
public FileSplit(URI uri);
43
public FileSplit(Collection<URI> uris);
44
}
45
```
46
47
**Usage Examples:**
48
49
```java
50
// Single file
51
FileSplit singleFile = new FileSplit(new File("/path/to/data.csv"));
52
53
// Multiple files
54
File[] files = {
55
new File("/path/to/file1.csv"),
56
new File("/path/to/file2.csv")
57
};
58
FileSplit multipleFiles = new FileSplit(files);
59
60
// From string path
61
FileSplit fromPath = new FileSplit("/path/to/data.csv");
62
63
// From URI
64
URI dataUri = new URI("file:///path/to/data.csv");
65
FileSplit fromUri = new FileSplit(dataUri);
66
```
67
68
#### Numbered File Input Split
69
70
Handles sequences of numbered files, useful for processing time-series data or batched exports.
71
72
```java { .api }
73
public class NumberedFileInputSplit implements InputSplit {
74
public NumberedFileInputSplit(String basePattern, int minIndex, int maxIndex);
75
public NumberedFileInputSplit(String basePattern, int minIndex, int maxIndex, String numberFormat);
76
}
77
```
78
79
**Usage Examples:**
80
81
```java
82
// Files: data_0.csv, data_1.csv, ..., data_99.csv
83
NumberedFileInputSplit numberedSplit = new NumberedFileInputSplit(
84
"/path/to/data_%d.csv", 0, 99
85
);
86
87
// Custom number format: data_000.csv, data_001.csv, ..., data_099.csv
88
NumberedFileInputSplit paddedSplit = new NumberedFileInputSplit(
89
"/path/to/data_%03d.csv", 0, 99, "%03d"
90
);
91
92
// Use with record reader
93
RecordReader reader = new CSVRecordReader();
94
reader.initialize(numberedSplit);
95
```
96
97
### Stream-Based Splits
98
99
#### Input Stream Split
100
101
Enables reading from Java input streams, useful for network data or custom data sources.
102
103
```java { .api }
104
public class InputStreamInputSplit implements InputSplit {
105
public InputStreamInputSplit(InputStream inputStream);
106
public InputStreamInputSplit(InputStream inputStream, URI uri);
107
}
108
```
109
110
**Usage Example:**
111
112
```java
113
// Read from network stream
114
URL url = new URL("http://example.com/data.csv");
115
InputStream networkStream = url.openStream();
116
InputStreamInputSplit streamSplit = new InputStreamInputSplit(networkStream);
117
118
// Use with record reader
119
RecordReader reader = new CSVRecordReader();
120
reader.initialize(streamSplit);
121
```
122
123
#### String Split
124
125
Processes string data directly, commonly used in Spark integration and testing scenarios.
126
127
```java { .api }
128
public class StringSplit implements InputSplit {
129
public StringSplit(String data);
130
public StringSplit(String data, URI uri);
131
}
132
```
133
134
**Usage Example:**
135
136
```java
137
String csvData = "name,age,score\nAlice,25,85.5\nBob,30,92.0";
138
StringSplit stringSplit = new StringSplit(csvData);
139
140
RecordReader reader = new CSVRecordReader(1, ","); // Skip header
141
reader.initialize(stringSplit);
142
143
while (reader.hasNext()) {
144
List<Writable> record = reader.next();
145
// Process parsed CSV record
146
}
147
```
148
149
### Collection-Based Splits
150
151
For in-memory data processing and testing scenarios.
152
153
```java { .api }
154
public class CollectionInputSplit implements InputSplit {
155
public CollectionInputSplit(Collection<URI> uris);
156
}
157
```
158
159
**Usage Example:**
160
161
```java
162
// Create collection of data URIs
163
List<URI> dataUris = Arrays.asList(
164
new File("file1.csv").toURI(),
165
new File("file2.csv").toURI(),
166
new File("file3.csv").toURI()
167
);
168
169
CollectionInputSplit collectionSplit = new CollectionInputSplit(dataUris);
170
```
171
172
### Transform Splits
173
174
Enable data transformations and filtering during the split phase.
175
176
```java { .api }
177
public class TransformSplit implements InputSplit {
178
public TransformSplit(InputSplit inputSplit, Transform transform);
179
}
180
181
public interface Transform {
182
String transform(String input);
183
}
184
```
185
186
**Usage Example:**
187
188
```java
189
// Custom transform to convert to uppercase
190
Transform upperCaseTransform = new Transform() {
191
@Override
192
public String transform(String input) {
193
return input.toUpperCase();
194
}
195
};
196
197
InputSplit originalSplit = new FileSplit(new File("data.csv"));
198
TransformSplit transformedSplit = new TransformSplit(originalSplit, upperCaseTransform);
199
```
200
201
## Split Utilities and Helpers
202
203
### Random Split
204
205
Randomly divides data into training/test sets or multiple partitions.
206
207
```java { .api }
208
public class RandomSplit {
209
public static InputSplit[] split(InputSplit inputSplit, double... weights);
210
public static InputSplit[] split(InputSplit inputSplit, Random random, double... weights);
211
}
212
```
213
214
**Usage Example:**
215
216
```java
217
FileSplit fullDataset = new FileSplit(new File("full_dataset.csv"));
218
219
// Split 80% training, 20% testing
220
InputSplit[] splits = RandomSplit.split(fullDataset, 0.8, 0.2);
221
InputSplit trainingSplit = splits[0];
222
InputSplit testingSplit = splits[1];
223
224
// Use splits with different readers
225
RecordReader trainingReader = new CSVRecordReader();
226
trainingReader.initialize(trainingSplit);
227
228
RecordReader testingReader = new CSVRecordReader();
229
testingReader.initialize(testingSplit);
230
```
231
232
### Directory Scanning
233
234
Utilities for automatically discovering files in directories.
235
236
```java { .api }
237
public class BaseInputSplit {
238
public static InputSplit[] createFromDirectories(File[] directories, String[] allowedFormats);
239
public static InputSplit createFromDirectory(File directory, String[] allowedFormats);
240
}
241
```
242
243
**Usage Example:**
244
245
```java
246
File dataDirectory = new File("/path/to/data");
247
String[] csvFormats = {"csv", "txt"};
248
249
// Create split from all CSV files in directory
250
InputSplit directorySplit = BaseInputSplit.createFromDirectory(dataDirectory, csvFormats);
251
252
// Multiple directories
253
File[] directories = {
254
new File("/path/to/train"),
255
new File("/path/to/test")
256
};
257
InputSplit[] multipleDirSplits = BaseInputSplit.createFromDirectories(directories, csvFormats);
258
```
259
260
## Advanced Usage Patterns
261
262
### Distributed Processing
263
264
InputSplits support distributed processing by providing weight information for load balancing:
265
266
```java
267
InputSplit split = new FileSplit(new File("large_dataset.csv"));
268
double weight = split.getWeight(); // Used by distributed frameworks
269
270
// In Spark context
271
JavaRDD<String> rdd = sparkContext.textFile("hdfs://path/to/data");
272
// DataVec integrates with Spark through specialized splits
273
```
274
275
### Custom InputSplit Implementation
276
277
For specialized data sources, implement the InputSplit interface:
278
279
```java
280
public class DatabaseInputSplit implements InputSplit {
281
private final String connectionString;
282
private final String query;
283
284
public DatabaseInputSplit(String connectionString, String query) {
285
this.connectionString = connectionString;
286
this.query = query;
287
}
288
289
@Override
290
public URI[] locations() {
291
try {
292
return new URI[]{new URI("jdbc:" + connectionString)};
293
} catch (URISyntaxException e) {
294
throw new RuntimeException(e);
295
}
296
}
297
298
@Override
299
public long length() {
300
// Return estimated size
301
return -1; // Unknown length
302
}
303
304
@Override
305
public double getWeight() {
306
return 1.0; // Default weight
307
}
308
309
// Implement other required methods...
310
}
311
```
312
313
### Error Handling
314
315
Common exceptions when working with InputSplits:
316
317
```java
318
try {
319
FileSplit split = new FileSplit(new File("nonexistent.csv"));
320
RecordReader reader = new CSVRecordReader();
321
reader.initialize(split);
322
} catch (IOException e) {
323
// Handle file not found or read errors
324
System.err.println("Error reading file: " + e.getMessage());
325
} catch (IllegalArgumentException e) {
326
// Handle invalid split configuration
327
System.err.println("Invalid split configuration: " + e.getMessage());
328
}
329
```
330
331
### Resource Management
332
333
Proper resource cleanup for stream-based splits:
334
335
```java
336
InputStream stream = null;
337
try {
338
stream = new FileInputStream("data.csv");
339
InputStreamInputSplit split = new InputStreamInputSplit(stream);
340
RecordReader reader = new CSVRecordReader();
341
reader.initialize(split);
342
343
// Process data
344
while (reader.hasNext()) {
345
List<Writable> record = reader.next();
346
// Process record
347
}
348
} catch (IOException e) {
349
// Handle errors
350
} finally {
351
if (stream != null) {
352
try {
353
stream.close();
354
} catch (IOException e) {
355
// Handle cleanup errors
356
}
357
}
358
}
359
```
360
361
## Types
362
363
### Core Interfaces
364
365
```java { .api }
366
public interface InputSplit {
367
URI[] locations();
368
long length();
369
double getWeight();
370
boolean canWriteToLocation(URI location);
371
String addNewLocation();
372
String addNewLocation(String location);
373
void updateSplitLocations(boolean reset);
374
}
375
376
public interface Transform {
377
String transform(String input);
378
}
379
```
380
381
### InputSplit Implementations
382
383
```java { .api }
384
// File-based splits
385
public class FileSplit implements InputSplit;
386
public class NumberedFileInputSplit implements InputSplit;
387
388
// Stream-based splits
389
public class InputStreamInputSplit implements InputSplit;
390
public class StringSplit implements InputSplit;
391
392
// Collection-based splits
393
public class CollectionInputSplit implements InputSplit;
394
395
// Transform splits
396
public class TransformSplit implements InputSplit;
397
```
398
399
### Utility Classes
400
401
```java { .api }
402
public class RandomSplit {
403
public static InputSplit[] split(InputSplit inputSplit, double... weights);
404
public static InputSplit[] split(InputSplit inputSplit, Random random, double... weights);
405
}
406
407
public class BaseInputSplit {
408
public static InputSplit[] createFromDirectories(File[] directories, String[] allowedFormats);
409
public static InputSplit createFromDirectory(File directory, String[] allowedFormats);
410
}
411
```