Core API classes and utilities for CDAP application development, providing common data schema definitions, data format abstractions, stream event handling, and byte manipulation utilities
—
Pluggable format system for converting data between different representations, with built-in support for common formats and extensible architecture for custom formats. The format system provides schema-aware data transformation capabilities essential for data ingestion, processing, and output in various formats.
Define format configurations including name, schema, and custom settings for data transformation.
/**
* Specification for record format including schema and settings
*/
public final class FormatSpecification {
/**
* Create format specification with schema and settings
* @param name Format name
* @param schema Data schema (nullable)
* @param settings Format-specific settings (nullable, defaults to empty map)
*/
public FormatSpecification(String name, Schema schema, Map<String, String> settings);
/**
* Create format specification with schema only
* @param name Format name
* @param schema Data schema (nullable)
*/
public FormatSpecification(String name, Schema schema);
/**
* Get format name
* @return Format name
*/
public String getName();
/**
* Get format schema
* @return Schema or null if not specified
*/
public Schema getSchema();
/**
* Get format settings
* @return Immutable map of settings (never null)
*/
public Map<String, String> getSettings();
}Usage Examples:
// Basic format specification
Schema userSchema = Schema.recordOf("User",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("email", Schema.of(Schema.Type.STRING))
);
FormatSpecification jsonFormat = new FormatSpecification("json", userSchema);
// Format with custom settings
Map<String, String> csvSettings = new HashMap<>();
csvSettings.put("delimiter", ",");
csvSettings.put("header", "true");
csvSettings.put("escape", "\"");
FormatSpecification csvFormat = new FormatSpecification("csv", userSchema, csvSettings);
// Access specification properties
String formatName = csvFormat.getName(); // "csv"
Schema formatSchema = csvFormat.getSchema(); // userSchema
String delimiter = csvFormat.getSettings().get("delimiter"); // ","Base class for implementing custom data format converters with schema validation and configuration.
/**
* Abstract base class for data format conversion
* @param <FROM> Input data type
* @param <TO> Output data type
*/
public abstract class RecordFormat<FROM, TO> {
/**
* Read input data and convert to output format
* @param input Input data to transform
* @return Transformed data
* @throws UnexpectedFormatException if input cannot be processed
*/
public abstract TO read(FROM input) throws UnexpectedFormatException;
/**
* Get default schema for this format (if any)
* @return Default schema or null if schema must be provided
*/
protected abstract Schema getDefaultSchema();
/**
* Validate schema compatibility with this format
* @param schema Schema to validate (guaranteed non-null record with ≥1 field)
* @throws UnsupportedTypeException if schema not supported
*/
protected abstract void validateSchema(Schema schema) throws UnsupportedTypeException;
/**
* Initialize format with specification
* @param formatSpecification Format configuration (nullable)
* @throws UnsupportedTypeException if specification not supported
*/
public void initialize(FormatSpecification formatSpecification) throws UnsupportedTypeException;
/**
* Configure format with settings (called after schema is set)
* @param settings Configuration settings
*/
protected void configure(Map<String, String> settings);
/**
* Get current format schema
* @return Current schema (available after initialization)
*/
public Schema getSchema();
}Usage Examples:
// Example CSV format implementation
public class CsvRecordFormat extends RecordFormat<String, StructuredRecord> {
private String delimiter = ",";
private boolean hasHeader = false;
@Override
public StructuredRecord read(String csvLine) throws UnexpectedFormatException {
if (csvLine == null || csvLine.trim().isEmpty()) {
throw new UnexpectedFormatException("Empty CSV line");
}
String[] values = csvLine.split(delimiter);
Schema schema = getSchema();
List<Schema.Field> fields = schema.getFields();
if (values.length != fields.size()) {
throw new UnexpectedFormatException("CSV field count mismatch");
}
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
builder.convertAndSet(field.getName(), values[i]);
}
return builder.build();
}
@Override
protected Schema getDefaultSchema() {
// CSV has no default schema - must be provided
return null;
}
@Override
protected void validateSchema(Schema schema) throws UnsupportedTypeException {
// CSV only supports simple types
for (Schema.Field field : schema.getFields()) {
Schema fieldSchema = field.getSchema();
if (fieldSchema.isNullable()) {
fieldSchema = fieldSchema.getNonNullable();
}
if (!fieldSchema.getType().isSimpleType()) {
throw new UnsupportedTypeException("CSV format only supports simple types");
}
}
}
@Override
protected void configure(Map<String, String> settings) {
delimiter = settings.getOrDefault("delimiter", ",");
hasHeader = Boolean.parseBoolean(settings.getOrDefault("header", "false"));
}
}
// Usage of custom format
CsvRecordFormat csvFormat = new CsvRecordFormat();
Schema productSchema = Schema.recordOf("Product",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE))
);
Map<String, String> csvSettings = new HashMap<>();
csvSettings.put("delimiter", ",");
csvSettings.put("header", "true");
FormatSpecification spec = new FormatSpecification("csv", productSchema, csvSettings);
csvFormat.initialize(spec);
// Process CSV data
String csvLine = "12345,Widget,29.99";
StructuredRecord product = csvFormat.read(csvLine);
Long id = product.get("id"); // 12345L
String name = product.get("name"); // "Widget"
Double price = product.get("price"); // 29.99Pre-defined format names for common data formats supported by the platform.
/**
* Constants for built-in record format names
*/
public final class Formats {
public static final String AVRO = "avro";
public static final String CSV = "csv";
public static final String TSV = "tsv";
public static final String TEXT = "text";
public static final String COMBINED_LOG_FORMAT = "clf";
public static final String GROK = "grok";
public static final String SYSLOG = "syslog";
/**
* Array of all built-in format names
*/
public static final String[] ALL = {
AVRO, CSV, TSV, TEXT, COMBINED_LOG_FORMAT, GROK, SYSLOG
};
}Usage Examples:
// Using built-in format constants
FormatSpecification avroSpec = new FormatSpecification(Formats.AVRO, schema);
FormatSpecification csvSpec = new FormatSpecification(Formats.CSV, schema);
// Check if format is supported
String formatName = "json";
boolean isBuiltIn = Arrays.asList(Formats.ALL).contains(formatName);
// Iterate through all supported formats
for (String format : Formats.ALL) {
System.out.println("Supported format: " + format);
}Common pattern for managing multiple format implementations.
public class FormatFactory {
private final Map<String, Class<? extends RecordFormat<?, ?>>> formatRegistry;
public FormatFactory() {
formatRegistry = new HashMap<>();
registerBuiltInFormats();
}
private void registerBuiltInFormats() {
formatRegistry.put(Formats.CSV, CsvRecordFormat.class);
formatRegistry.put(Formats.TSV, TsvRecordFormat.class);
formatRegistry.put(Formats.TEXT, TextRecordFormat.class);
// Register other built-in formats
}
public <FROM, TO> RecordFormat<FROM, TO> createFormat(String formatName)
throws InstantiationException, IllegalAccessException {
Class<? extends RecordFormat<?, ?>> formatClass = formatRegistry.get(formatName);
if (formatClass == null) {
throw new IllegalArgumentException("Unknown format: " + formatName);
}
@SuppressWarnings("unchecked")
RecordFormat<FROM, TO> format = (RecordFormat<FROM, TO>) formatClass.newInstance();
return format;
}
public void registerFormat(String name, Class<? extends RecordFormat<?, ?>> formatClass) {
formatRegistry.put(name, formatClass);
}
public Set<String> getSupportedFormats() {
return formatRegistry.keySet();
}
}
// Usage
FormatFactory factory = new FormatFactory();
RecordFormat<String, StructuredRecord> csvFormat = factory.createFormat(Formats.CSV);Example implementations for common data processing scenarios.
JSON Format Implementation:
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class JsonRecordFormat extends RecordFormat<String, StructuredRecord> {
private final Gson gson = new Gson();
private final JsonParser parser = new JsonParser();
@Override
public StructuredRecord read(String jsonString) throws UnexpectedFormatException {
try {
JsonObject jsonObject = parser.parse(jsonString).getAsJsonObject();
Schema schema = getSchema();
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
for (Schema.Field field : schema.getFields()) {
String fieldName = field.getName();
if (jsonObject.has(fieldName)) {
Object value = extractJsonValue(jsonObject.get(fieldName), field.getSchema());
builder.set(fieldName, value);
}
}
return builder.build();
} catch (Exception e) {
throw new UnexpectedFormatException("Invalid JSON: " + jsonString, e);
}
}
private Object extractJsonValue(com.google.gson.JsonElement element, Schema schema) {
if (element.isJsonNull()) {
return null;
}
Schema.Type type = schema.getType();
if (schema.isNullable()) {
type = schema.getNonNullable().getType();
}
switch (type) {
case STRING:
return element.getAsString();
case INT:
return element.getAsInt();
case LONG:
return element.getAsLong();
case DOUBLE:
return element.getAsDouble();
case BOOLEAN:
return element.getAsBoolean();
default:
throw new UnsupportedOperationException("JSON format doesn't support type: " + type);
}
}
@Override
protected Schema getDefaultSchema() {
return null; // JSON requires explicit schema
}
@Override
protected void validateSchema(Schema schema) throws UnsupportedTypeException {
// Validate JSON-compatible schema
for (Schema.Field field : schema.getFields()) {
validateJsonCompatibleType(field.getSchema());
}
}
private void validateJsonCompatibleType(Schema schema) throws UnsupportedTypeException {
Schema.Type type = schema.getType();
if (schema.isNullable()) {
type = schema.getNonNullable().getType();
}
switch (type) {
case STRING:
case INT:
case LONG:
case DOUBLE:
case BOOLEAN:
break; // Supported
default:
throw new UnsupportedTypeException("JSON format doesn't support type: " + type);
}
}
}Binary Format Implementation:
import java.nio.ByteBuffer;
public class BinaryRecordFormat extends RecordFormat<ByteBuffer, StructuredRecord> {
@Override
public StructuredRecord read(ByteBuffer input) throws UnexpectedFormatException {
try {
Schema schema = getSchema();
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
for (Schema.Field field : schema.getFields()) {
Object value = readFieldValue(input, field.getSchema());
builder.set(field.getName(), value);
}
return builder.build();
} catch (Exception e) {
throw new UnexpectedFormatException("Failed to parse binary data", e);
}
}
private Object readFieldValue(ByteBuffer buffer, Schema schema) {
Schema.Type type = schema.getType();
if (schema.isNullable()) {
boolean isNull = buffer.get() == 0;
if (isNull) {
return null;
}
type = schema.getNonNullable().getType();
}
switch (type) {
case INT:
return buffer.getInt();
case LONG:
return buffer.getLong();
case FLOAT:
return buffer.getFloat();
case DOUBLE:
return buffer.getDouble();
case BOOLEAN:
return buffer.get() != 0;
case STRING:
int length = buffer.getInt();
byte[] stringBytes = new byte[length];
buffer.get(stringBytes);
return new String(stringBytes);
default:
throw new UnsupportedOperationException("Binary format doesn't support type: " + type);
}
}
@Override
protected Schema getDefaultSchema() {
return null; // Binary requires explicit schema
}
@Override
protected void validateSchema(Schema schema) throws UnsupportedTypeException {
// Binary format supports most primitive types
for (Schema.Field field : schema.getFields()) {
validateBinaryCompatibleType(field.getSchema());
}
}
private void validateBinaryCompatibleType(Schema schema) throws UnsupportedTypeException {
Schema.Type type = schema.getType();
if (schema.isNullable()) {
type = schema.getNonNullable().getType();
}
if (!type.isSimpleType()) {
throw new UnsupportedTypeException("Binary format only supports simple types");
}
}
}// Manual initialization process
RecordFormat<String, StructuredRecord> format = new CsvRecordFormat();
// Create specification
Schema schema = Schema.recordOf("Data",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING))
);
Map<String, String> settings = new HashMap<>();
settings.put("delimiter", "|");
settings.put("quote", "\"");
FormatSpecification spec = new FormatSpecification("csv", schema, settings);
// Initialize (schema validation and configuration occur here)
format.initialize(spec);
// Now ready to use
StructuredRecord record = format.read("123|John Doe");try {
format.initialize(specification);
} catch (UnsupportedTypeException e) {
// Handle schema validation failure
System.err.println("Schema not supported: " + e.getMessage());
} catch (Exception e) {
// Handle other initialization errors
System.err.println("Initialization failed: " + e.getMessage());
}
try {
StructuredRecord record = format.read(inputData);
} catch (UnexpectedFormatException e) {
// Handle data parsing failure
System.err.println("Failed to parse data: " + e.getMessage());
}/**
* Exception indicating data is in unexpected format
*/
public class UnexpectedFormatException extends RuntimeException {
public UnexpectedFormatException(String message);
public UnexpectedFormatException(String message, Throwable cause);
public UnexpectedFormatException(Throwable cause);
}
/**
* Exception indicating unsupported schema type
*/
public class UnsupportedTypeException extends Exception {
public UnsupportedTypeException(String message);
public UnsupportedTypeException(String message, Throwable cause);
public UnsupportedTypeException(Throwable cause);
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api-common