CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api-common

Core API classes and utilities for CDAP application development, providing common data schema definitions, data format abstractions, stream event handling, and byte manipulation utilities

Pending
Overview
Eval results
Files

data-format-system.mddocs/

Data Format System

Pluggable format system for converting data between different representations, with built-in support for common formats and extensible architecture for custom formats. The format system provides schema-aware data transformation capabilities essential for data ingestion, processing, and output in various formats.

Capabilities

Format Specification

Define format configurations including name, schema, and custom settings for data transformation.

/**
 * Specification for record format including schema and settings
 */
public final class FormatSpecification {
    /**
     * Create format specification with schema and settings
     * @param name Format name
     * @param schema Data schema (nullable)
     * @param settings Format-specific settings (nullable, defaults to empty map)
     */
    public FormatSpecification(String name, Schema schema, Map<String, String> settings);
    
    /**
     * Create format specification with schema only
     * @param name Format name
     * @param schema Data schema (nullable)
     */
    public FormatSpecification(String name, Schema schema);
    
    /**
     * Get format name
     * @return Format name
     */
    public String getName();
    
    /**
     * Get format schema
     * @return Schema or null if not specified
     */
    public Schema getSchema();
    
    /**
     * Get format settings
     * @return Immutable map of settings (never null)
     */
    public Map<String, String> getSettings();
}

Usage Examples:

// Basic format specification
Schema userSchema = Schema.recordOf("User",
    Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
    Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("email", Schema.of(Schema.Type.STRING))
);

FormatSpecification jsonFormat = new FormatSpecification("json", userSchema);

// Format with custom settings
Map<String, String> csvSettings = new HashMap<>();
csvSettings.put("delimiter", ",");
csvSettings.put("header", "true");
csvSettings.put("escape", "\"");

FormatSpecification csvFormat = new FormatSpecification("csv", userSchema, csvSettings);

// Access specification properties
String formatName = csvFormat.getName();           // "csv"
Schema formatSchema = csvFormat.getSchema();       // userSchema
String delimiter = csvFormat.getSettings().get("delimiter"); // ","

Abstract Record Format

Base class for implementing custom data format converters with schema validation and configuration.

/**
 * Abstract base class for data format conversion
 * @param <FROM> Input data type
 * @param <TO> Output data type
 */
public abstract class RecordFormat<FROM, TO> {
    /**
     * Read input data and convert to output format
     * @param input Input data to transform
     * @return Transformed data
     * @throws UnexpectedFormatException if input cannot be processed
     */
    public abstract TO read(FROM input) throws UnexpectedFormatException;
    
    /**
     * Get default schema for this format (if any)
     * @return Default schema or null if schema must be provided
     */
    protected abstract Schema getDefaultSchema();
    
    /**
     * Validate schema compatibility with this format
     * @param schema Schema to validate (guaranteed non-null record with ≥1 field)
     * @throws UnsupportedTypeException if schema not supported
     */
    protected abstract void validateSchema(Schema schema) throws UnsupportedTypeException;
    
    /**
     * Initialize format with specification
     * @param formatSpecification Format configuration (nullable)
     * @throws UnsupportedTypeException if specification not supported
     */
    public void initialize(FormatSpecification formatSpecification) throws UnsupportedTypeException;
    
    /**
     * Configure format with settings (called after schema is set)
     * @param settings Configuration settings
     */
    protected void configure(Map<String, String> settings);
    
    /**
     * Get current format schema
     * @return Current schema (available after initialization)
     */
    public Schema getSchema();
}

Usage Examples:

// Example CSV format implementation
public class CsvRecordFormat extends RecordFormat<String, StructuredRecord> {
    private String delimiter = ",";
    private boolean hasHeader = false;
    
    @Override
    public StructuredRecord read(String csvLine) throws UnexpectedFormatException {
        if (csvLine == null || csvLine.trim().isEmpty()) {
            throw new UnexpectedFormatException("Empty CSV line");
        }
        
        String[] values = csvLine.split(delimiter);
        Schema schema = getSchema();
        List<Schema.Field> fields = schema.getFields();
        
        if (values.length != fields.size()) {
            throw new UnexpectedFormatException("CSV field count mismatch");
        }
        
        StructuredRecord.Builder builder = StructuredRecord.builder(schema);
        for (int i = 0; i < fields.size(); i++) {
            Schema.Field field = fields.get(i);
            builder.convertAndSet(field.getName(), values[i]);
        }
        
        return builder.build();
    }
    
    @Override
    protected Schema getDefaultSchema() {
        // CSV has no default schema - must be provided
        return null;
    }
    
    @Override
    protected void validateSchema(Schema schema) throws UnsupportedTypeException {
        // CSV only supports simple types
        for (Schema.Field field : schema.getFields()) {
            Schema fieldSchema = field.getSchema();
            if (fieldSchema.isNullable()) {
                fieldSchema = fieldSchema.getNonNullable();
            }
            if (!fieldSchema.getType().isSimpleType()) {
                throw new UnsupportedTypeException("CSV format only supports simple types");
            }
        }
    }
    
    @Override
    protected void configure(Map<String, String> settings) {
        delimiter = settings.getOrDefault("delimiter", ",");
        hasHeader = Boolean.parseBoolean(settings.getOrDefault("header", "false"));
    }
}

// Usage of custom format
CsvRecordFormat csvFormat = new CsvRecordFormat();

Schema productSchema = Schema.recordOf("Product",
    Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
    Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE))
);

Map<String, String> csvSettings = new HashMap<>();
csvSettings.put("delimiter", ",");
csvSettings.put("header", "true");

FormatSpecification spec = new FormatSpecification("csv", productSchema, csvSettings);
csvFormat.initialize(spec);

// Process CSV data
String csvLine = "12345,Widget,29.99";
StructuredRecord product = csvFormat.read(csvLine);

Long id = product.get("id");          // 12345L
String name = product.get("name");    // "Widget"
Double price = product.get("price");  // 29.99

Built-in Format Constants

Pre-defined format names for common data formats supported by the platform.

/**
 * Constants for built-in record format names
 */
public final class Formats {
    public static final String AVRO = "avro";
    public static final String CSV = "csv";
    public static final String TSV = "tsv";
    public static final String TEXT = "text";
    public static final String COMBINED_LOG_FORMAT = "clf";
    public static final String GROK = "grok";
    public static final String SYSLOG = "syslog";
    
    /**
     * Array of all built-in format names
     */
    public static final String[] ALL = {
        AVRO, CSV, TSV, TEXT, COMBINED_LOG_FORMAT, GROK, SYSLOG
    };
}

Usage Examples:

// Using built-in format constants
FormatSpecification avroSpec = new FormatSpecification(Formats.AVRO, schema);
FormatSpecification csvSpec = new FormatSpecification(Formats.CSV, schema);

// Check if format is supported
String formatName = "json";
boolean isBuiltIn = Arrays.asList(Formats.ALL).contains(formatName);

// Iterate through all supported formats
for (String format : Formats.ALL) {
    System.out.println("Supported format: " + format);
}

Format Factory Pattern

Common pattern for managing multiple format implementations.

public class FormatFactory {
    private final Map<String, Class<? extends RecordFormat<?, ?>>> formatRegistry;
    
    public FormatFactory() {
        formatRegistry = new HashMap<>();
        registerBuiltInFormats();
    }
    
    private void registerBuiltInFormats() {
        formatRegistry.put(Formats.CSV, CsvRecordFormat.class);
        formatRegistry.put(Formats.TSV, TsvRecordFormat.class);
        formatRegistry.put(Formats.TEXT, TextRecordFormat.class);
        // Register other built-in formats
    }
    
    public <FROM, TO> RecordFormat<FROM, TO> createFormat(String formatName) 
            throws InstantiationException, IllegalAccessException {
        Class<? extends RecordFormat<?, ?>> formatClass = formatRegistry.get(formatName);
        if (formatClass == null) {
            throw new IllegalArgumentException("Unknown format: " + formatName);
        }
        
        @SuppressWarnings("unchecked")
        RecordFormat<FROM, TO> format = (RecordFormat<FROM, TO>) formatClass.newInstance();
        return format;
    }
    
    public void registerFormat(String name, Class<? extends RecordFormat<?, ?>> formatClass) {
        formatRegistry.put(name, formatClass);
    }
    
    public Set<String> getSupportedFormats() {
        return formatRegistry.keySet();
    }
}

// Usage
FormatFactory factory = new FormatFactory();
RecordFormat<String, StructuredRecord> csvFormat = factory.createFormat(Formats.CSV);

Advanced Format Implementations

Example implementations for common data processing scenarios.

JSON Format Implementation:

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

public class JsonRecordFormat extends RecordFormat<String, StructuredRecord> {
    private final Gson gson = new Gson();
    private final JsonParser parser = new JsonParser();
    
    @Override
    public StructuredRecord read(String jsonString) throws UnexpectedFormatException {
        try {
            JsonObject jsonObject = parser.parse(jsonString).getAsJsonObject();
            Schema schema = getSchema();
            StructuredRecord.Builder builder = StructuredRecord.builder(schema);
            
            for (Schema.Field field : schema.getFields()) {
                String fieldName = field.getName();
                if (jsonObject.has(fieldName)) {
                    Object value = extractJsonValue(jsonObject.get(fieldName), field.getSchema());
                    builder.set(fieldName, value);
                }
            }
            
            return builder.build();
        } catch (Exception e) {
            throw new UnexpectedFormatException("Invalid JSON: " + jsonString, e);
        }
    }
    
    private Object extractJsonValue(com.google.gson.JsonElement element, Schema schema) {
        if (element.isJsonNull()) {
            return null;
        }
        
        Schema.Type type = schema.getType();
        if (schema.isNullable()) {
            type = schema.getNonNullable().getType();
        }
        
        switch (type) {
            case STRING:
                return element.getAsString();
            case INT:
                return element.getAsInt();
            case LONG:
                return element.getAsLong();
            case DOUBLE:
                return element.getAsDouble();
            case BOOLEAN:
                return element.getAsBoolean();
            default:
                throw new UnsupportedOperationException("JSON format doesn't support type: " + type);
        }
    }
    
    @Override
    protected Schema getDefaultSchema() {
        return null; // JSON requires explicit schema
    }
    
    @Override
    protected void validateSchema(Schema schema) throws UnsupportedTypeException {
        // Validate JSON-compatible schema
        for (Schema.Field field : schema.getFields()) {
            validateJsonCompatibleType(field.getSchema());
        }
    }
    
    private void validateJsonCompatibleType(Schema schema) throws UnsupportedTypeException {
        Schema.Type type = schema.getType();
        if (schema.isNullable()) {
            type = schema.getNonNullable().getType();
        }
        
        switch (type) {
            case STRING:
            case INT:
            case LONG:
            case DOUBLE:
            case BOOLEAN:
                break; // Supported
            default:
                throw new UnsupportedTypeException("JSON format doesn't support type: " + type);
        }
    }
}

Binary Format Implementation:

import java.nio.ByteBuffer;

public class BinaryRecordFormat extends RecordFormat<ByteBuffer, StructuredRecord> {
    
    @Override
    public StructuredRecord read(ByteBuffer input) throws UnexpectedFormatException {
        try {
            Schema schema = getSchema();
            StructuredRecord.Builder builder = StructuredRecord.builder(schema);
            
            for (Schema.Field field : schema.getFields()) {
                Object value = readFieldValue(input, field.getSchema());
                builder.set(field.getName(), value);
            }
            
            return builder.build();
        } catch (Exception e) {
            throw new UnexpectedFormatException("Failed to parse binary data", e);
        }
    }
    
    private Object readFieldValue(ByteBuffer buffer, Schema schema) {
        Schema.Type type = schema.getType();
        if (schema.isNullable()) {
            boolean isNull = buffer.get() == 0;
            if (isNull) {
                return null;
            }
            type = schema.getNonNullable().getType();
        }
        
        switch (type) {
            case INT:
                return buffer.getInt();
            case LONG:
                return buffer.getLong();
            case FLOAT:
                return buffer.getFloat();
            case DOUBLE:
                return buffer.getDouble();
            case BOOLEAN:
                return buffer.get() != 0;
            case STRING:
                int length = buffer.getInt();
                byte[] stringBytes = new byte[length];
                buffer.get(stringBytes);
                return new String(stringBytes);
            default:
                throw new UnsupportedOperationException("Binary format doesn't support type: " + type);
        }
    }
    
    @Override
    protected Schema getDefaultSchema() {
        return null; // Binary requires explicit schema
    }
    
    @Override
    protected void validateSchema(Schema schema) throws UnsupportedTypeException {
        // Binary format supports most primitive types
        for (Schema.Field field : schema.getFields()) {
            validateBinaryCompatibleType(field.getSchema());
        }
    }
    
    private void validateBinaryCompatibleType(Schema schema) throws UnsupportedTypeException {
        Schema.Type type = schema.getType();
        if (schema.isNullable()) {
            type = schema.getNonNullable().getType();
        }
        
        if (!type.isSimpleType()) {
            throw new UnsupportedTypeException("Binary format only supports simple types");
        }
    }
}

Format Initialization Process

Initialization Flow

  1. Format Creation: Instantiate format class
  2. Schema Resolution: Determine schema (from specification or default)
  3. Schema Validation: Validate schema compatibility with format
  4. Configuration: Apply format-specific settings
  5. Ready for Use: Format ready to process data
// Manual initialization process
RecordFormat<String, StructuredRecord> format = new CsvRecordFormat();

// Create specification
Schema schema = Schema.recordOf("Data",
    Schema.Field.of("id", Schema.of(Schema.Type.INT)),
    Schema.Field.of("name", Schema.of(Schema.Type.STRING))
);

Map<String, String> settings = new HashMap<>();
settings.put("delimiter", "|");
settings.put("quote", "\"");

FormatSpecification spec = new FormatSpecification("csv", schema, settings);

// Initialize (schema validation and configuration occur here)
format.initialize(spec);

// Now ready to use
StructuredRecord record = format.read("123|John Doe");

Error Handling

try {
    format.initialize(specification);
} catch (UnsupportedTypeException e) {
    // Handle schema validation failure
    System.err.println("Schema not supported: " + e.getMessage());
} catch (Exception e) {
    // Handle other initialization errors
    System.err.println("Initialization failed: " + e.getMessage());
}

try {
    StructuredRecord record = format.read(inputData);
} catch (UnexpectedFormatException e) {
    // Handle data parsing failure
    System.err.println("Failed to parse data: " + e.getMessage());
}

Exception Types

Format-Related Exceptions

/**
 * Exception indicating data is in unexpected format
 */
public class UnexpectedFormatException extends RuntimeException {
    public UnexpectedFormatException(String message);
    public UnexpectedFormatException(String message, Throwable cause);
    public UnexpectedFormatException(Throwable cause);
}

/**
 * Exception indicating unsupported schema type
 */
public class UnsupportedTypeException extends Exception {
    public UnsupportedTypeException(String message);
    public UnsupportedTypeException(String message, Throwable cause);
    public UnsupportedTypeException(Throwable cause);
}

Best Practices

Schema Design

  • Design schemas with format limitations in mind
  • Use simple types for maximum format compatibility
  • Consider nullable fields for optional data
  • Validate schemas during format initialization

Performance Optimization

  • Reuse format instances when processing multiple records
  • Cache compiled patterns/parsers in format implementations
  • Use efficient parsing libraries (e.g., Jackson for JSON)
  • Minimize object allocation in read() methods

Error Handling

  • Provide detailed error messages in exceptions
  • Include input data context in error messages
  • Handle edge cases (null/empty inputs, malformed data)
  • Validate configuration settings during initialization

Custom Format Implementation

  • Extend RecordFormat with appropriate generic type parameters
  • Implement all abstract methods with proper error handling
  • Validate schema compatibility in validateSchema()
  • Use configure() method for format-specific settings
  • Document supported schema types and configuration options

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api-common

docs

byte-utilities.md

data-format-system.md

index.md

schema-system.md

stream-processing.md

structured-records.md

tile.json