ETL library for machine learning data preprocessing across diverse formats including HDFS, Spark, Images, Video, Audio, CSV, and Excel
—
DataVec provides flexible abstractions for specifying data sources through the InputSplit hierarchy. These enable reading from various sources including local files, distributed file systems, streams, and custom data sources.
The base interface for all data source specifications. Provides methods for getting source locations, calculating data size, and supporting distributed processing.
public interface InputSplit {
URI[] locations();
long length();
double getWeight();
boolean canWriteToLocation(URI location);
String addNewLocation();
String addNewLocation(String location);
void updateSplitLocations(boolean reset);
}Usage Example:
InputSplit split = new FileSplit(new File("data.csv"));
URI[] locations = split.locations(); // Array of file URIs
long dataSize = split.length(); // Size in bytes
double weight = split.getWeight(); // Weight for distributed processingHandles individual files for data input.
public class FileSplit implements InputSplit {
public FileSplit(File file);
public FileSplit(File[] files);
public FileSplit(String path);
public FileSplit(URI uri);
public FileSplit(Collection<URI> uris);
}Usage Examples:
// Single file
FileSplit singleFile = new FileSplit(new File("/path/to/data.csv"));
// Multiple files
File[] files = {
new File("/path/to/file1.csv"),
new File("/path/to/file2.csv")
};
FileSplit multipleFiles = new FileSplit(files);
// From string path
FileSplit fromPath = new FileSplit("/path/to/data.csv");
// From URI
URI dataUri = new URI("file:///path/to/data.csv");
FileSplit fromUri = new FileSplit(dataUri);Handles sequences of numbered files, useful for processing time-series data or batched exports.
public class NumberedFileInputSplit implements InputSplit {
public NumberedFileInputSplit(String basePattern, int minIndex, int maxIndex);
public NumberedFileInputSplit(String basePattern, int minIndex, int maxIndex, String numberFormat);
}Usage Examples:
// Files: data_0.csv, data_1.csv, ..., data_99.csv
NumberedFileInputSplit numberedSplit = new NumberedFileInputSplit(
"/path/to/data_%d.csv", 0, 99
);
// Custom number format: data_000.csv, data_001.csv, ..., data_099.csv
NumberedFileInputSplit paddedSplit = new NumberedFileInputSplit(
"/path/to/data_%03d.csv", 0, 99, "%03d"
);
// Use with record reader
RecordReader reader = new CSVRecordReader();
reader.initialize(numberedSplit);Enables reading from Java input streams, useful for network data or custom data sources.
public class InputStreamInputSplit implements InputSplit {
public InputStreamInputSplit(InputStream inputStream);
public InputStreamInputSplit(InputStream inputStream, URI uri);
}Usage Example:
// Read from network stream
URL url = new URL("http://example.com/data.csv");
InputStream networkStream = url.openStream();
InputStreamInputSplit streamSplit = new InputStreamInputSplit(networkStream);
// Use with record reader
RecordReader reader = new CSVRecordReader();
reader.initialize(streamSplit);Processes string data directly, commonly used in Spark integration and testing scenarios.
public class StringSplit implements InputSplit {
public StringSplit(String data);
public StringSplit(String data, URI uri);
}Usage Example:
String csvData = "name,age,score\nAlice,25,85.5\nBob,30,92.0";
StringSplit stringSplit = new StringSplit(csvData);
RecordReader reader = new CSVRecordReader(1, ","); // Skip header
reader.initialize(stringSplit);
while (reader.hasNext()) {
List<Writable> record = reader.next();
// Process parsed CSV record
}For in-memory data processing and testing scenarios.
public class CollectionInputSplit implements InputSplit {
public CollectionInputSplit(Collection<URI> uris);
}Usage Example:
// Create collection of data URIs
List<URI> dataUris = Arrays.asList(
new File("file1.csv").toURI(),
new File("file2.csv").toURI(),
new File("file3.csv").toURI()
);
CollectionInputSplit collectionSplit = new CollectionInputSplit(dataUris);Enable data transformations and filtering during the split phase.
public class TransformSplit implements InputSplit {
public TransformSplit(InputSplit inputSplit, Transform transform);
}
public interface Transform {
String transform(String input);
}Usage Example:
// Custom transform to convert to uppercase
Transform upperCaseTransform = new Transform() {
@Override
public String transform(String input) {
return input.toUpperCase();
}
};
InputSplit originalSplit = new FileSplit(new File("data.csv"));
TransformSplit transformedSplit = new TransformSplit(originalSplit, upperCaseTransform);Randomly divides data into training/test sets or multiple partitions.
public class RandomSplit {
public static InputSplit[] split(InputSplit inputSplit, double... weights);
public static InputSplit[] split(InputSplit inputSplit, Random random, double... weights);
}Usage Example:
FileSplit fullDataset = new FileSplit(new File("full_dataset.csv"));
// Split 80% training, 20% testing
InputSplit[] splits = RandomSplit.split(fullDataset, 0.8, 0.2);
InputSplit trainingSplit = splits[0];
InputSplit testingSplit = splits[1];
// Use splits with different readers
RecordReader trainingReader = new CSVRecordReader();
trainingReader.initialize(trainingSplit);
RecordReader testingReader = new CSVRecordReader();
testingReader.initialize(testingSplit);Utilities for automatically discovering files in directories.
public class BaseInputSplit {
public static InputSplit[] createFromDirectories(File[] directories, String[] allowedFormats);
public static InputSplit createFromDirectory(File directory, String[] allowedFormats);
}Usage Example:
File dataDirectory = new File("/path/to/data");
String[] csvFormats = {"csv", "txt"};
// Create split from all CSV files in directory
InputSplit directorySplit = BaseInputSplit.createFromDirectory(dataDirectory, csvFormats);
// Multiple directories
File[] directories = {
new File("/path/to/train"),
new File("/path/to/test")
};
InputSplit[] multipleDirSplits = BaseInputSplit.createFromDirectories(directories, csvFormats);InputSplits support distributed processing by providing weight information for load balancing:
InputSplit split = new FileSplit(new File("large_dataset.csv"));
double weight = split.getWeight(); // Used by distributed frameworks
// In Spark context
JavaRDD<String> rdd = sparkContext.textFile("hdfs://path/to/data");
// DataVec integrates with Spark through specialized splitsFor specialized data sources, implement the InputSplit interface:
public class DatabaseInputSplit implements InputSplit {
private final String connectionString;
private final String query;
public DatabaseInputSplit(String connectionString, String query) {
this.connectionString = connectionString;
this.query = query;
}
@Override
public URI[] locations() {
try {
return new URI[]{new URI("jdbc:" + connectionString)};
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
@Override
public long length() {
// Return estimated size
return -1; // Unknown length
}
@Override
public double getWeight() {
return 1.0; // Default weight
}
// Implement other required methods...
}Common exceptions when working with InputSplits:
try {
FileSplit split = new FileSplit(new File("nonexistent.csv"));
RecordReader reader = new CSVRecordReader();
reader.initialize(split);
} catch (IOException e) {
// Handle file not found or read errors
System.err.println("Error reading file: " + e.getMessage());
} catch (IllegalArgumentException e) {
// Handle invalid split configuration
System.err.println("Invalid split configuration: " + e.getMessage());
}Proper resource cleanup for stream-based splits:
InputStream stream = null;
try {
stream = new FileInputStream("data.csv");
InputStreamInputSplit split = new InputStreamInputSplit(stream);
RecordReader reader = new CSVRecordReader();
reader.initialize(split);
// Process data
while (reader.hasNext()) {
List<Writable> record = reader.next();
// Process record
}
} catch (IOException e) {
// Handle errors
} finally {
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
// Handle cleanup errors
}
}
}public interface InputSplit {
URI[] locations();
long length();
double getWeight();
boolean canWriteToLocation(URI location);
String addNewLocation();
String addNewLocation(String location);
void updateSplitLocations(boolean reset);
}
public interface Transform {
String transform(String input);
}// File-based splits
public class FileSplit implements InputSplit;
public class NumberedFileInputSplit implements InputSplit;
// Stream-based splits
public class InputStreamInputSplit implements InputSplit;
public class StringSplit implements InputSplit;
// Collection-based splits
public class CollectionInputSplit implements InputSplit;
// Transform splits
public class TransformSplit implements InputSplit;public class RandomSplit {
public static InputSplit[] split(InputSplit inputSplit, double... weights);
public static InputSplit[] split(InputSplit inputSplit, Random random, double... weights);
}
public class BaseInputSplit {
public static InputSplit[] createFromDirectories(File[] directories, String[] allowedFormats);
public static InputSplit createFromDirectory(File directory, String[] allowedFormats);
}Install with Tessl CLI
npx tessl i tessl/maven-org-datavec--datavec-api