CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-csv

Apache Flink CSV format support for reading and writing CSV data in stream and batch processing

Pending
Overview
Eval results
Files

schema-conversion.mddocs/

Schema Conversion

Utility functions for converting between Flink type information and Jackson CSV schemas using the CsvRowSchemaConverter class, enabling seamless integration between Flink's type system and CSV processing libraries.

Capabilities

CsvRowSchemaConverter Class

Static utility class that converts Flink type information to Jackson CsvSchema objects for CSV processing.

/**
 * Utility class for converting Flink type information to Jackson CSV schemas
 * Provides static methods for schema conversion supporting both legacy and modern Flink types
 */
public class CsvRowSchemaConverter {
    
    /**
     * Convert legacy RowTypeInfo to Jackson CsvSchema
     * Supports DataSet API and legacy type system integration
     * @param rowType RowTypeInfo containing field names and types
     * @return CsvSchema configured for the specified row structure
     */
    public static CsvSchema convert(RowTypeInfo rowType);
    
    /**
     * Convert modern RowType to Jackson CsvSchema  
     * Supports Table API and modern type system integration
     * @param rowType RowType containing logical field types and names
     * @return CsvSchema configured for the specified row structure
     */
    public static CsvSchema convert(RowType rowType);
}

Usage Examples

Converting RowTypeInfo (Legacy Type System)

import org.apache.flink.formats.csv.CsvRowSchemaConverter;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;

// Create legacy row type info
RowTypeInfo rowTypeInfo = new RowTypeInfo(
    BasicTypeInfo.STRING_TYPE_INFO,                  // name
    BasicTypeInfo.INT_TYPE_INFO,                     // age
    BasicTypeInfo.BOOLEAN_TYPE_INFO,                 // active
    BasicTypeInfo.DOUBLE_TYPE_INFO,                  // salary
    PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO    // tags
);

// Convert to Jackson CSV schema
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowTypeInfo);

// The resulting schema can be used with Jackson CsvMapper
CsvMapper mapper = new CsvMapper();
ObjectReader reader = mapper.readerFor(Row.class).with(csvSchema);

Converting RowType (Modern Type System)

import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.ArrayType;

// Create modern row type
RowType rowType = RowType.of(
    new RowType.RowField("name", new VarCharType(255)),
    new RowType.RowField("age", new IntType()),
    new RowType.RowField("active", new BooleanType()),
    new RowType.RowField("salary", new DoubleType()),
    new RowType.RowField("tags", new ArrayType(new VarCharType(100)))
);

// Convert to Jackson CSV schema
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);

// Use with CsvReaderFormat
CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(
    csvSchema,
    TypeInformation.of(Row.class)
);

Schema Customization

// Convert and customize the schema
CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);

// Customize schema properties
CsvSchema customSchema = baseSchema.rebuild()
    .setUseHeader(true)           // Use first row as header
    .setColumnSeparator('|')      // Change delimiter to pipe
    .setQuoteChar('\'')           // Use single quotes
    .setEscapeChar('\\')          // Set escape character
    .build();

// Use customized schema
CsvMapper mapper = new CsvMapper();
ObjectReader reader = mapper.readerFor(Row.class).with(customSchema);

Integration Patterns

With CsvReaderFormat

// Complete integration with streaming reader
RowType rowType = RowType.of(
    new RowType.RowField("id", new IntType()),
    new RowType.RowField("name", new VarCharType(255)),
    new RowType.RowField("timestamp", new TimestampType(3))
);

// Convert to schema
CsvSchema schema = CsvRowSchemaConverter.convert(rowType);

// Create reader format
CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(
    schema,
    TypeInformation.of(Row.class)
);

// Use with file source
FileSource<Row> source = FileSource
    .forRecordStreamFormat(readerFormat, new Path("data.csv"))
    .build();

With Custom Mapper Factory

// Create custom mapper factory using converted schema
SerializableSupplier<CsvMapper> mapperFactory = () -> {
    CsvMapper mapper = new CsvMapper();
    
    // Configure mapper for specific requirements
    mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    
    return mapper;
};

// Schema generator using converter
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper -> {
    CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);
    return baseSchema.withHeader();
};

// Create reader format with custom configuration
CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(
    mapperFactory,
    schemaGenerator,
    TypeInformation.of(Row.class)
);

Table API Integration

// Use in Table API descriptor
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.Schema;

// Define table schema
Schema tableSchema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .column("active", DataTypes.BOOLEAN())
    .build();

// Convert to RowType
RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();

// Convert to CSV schema for external processing
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);

// The csvSchema can now be used with external Jackson-based CSV processing

Type Mapping

The converter handles mapping between Flink types and Jackson CSV column types:

Primitive Types

// Flink Type -> Jackson CSV Column Type
BasicTypeInfo.STRING_TYPE_INFO -> CsvSchema.ColumnType.STRING
BasicTypeInfo.INT_TYPE_INFO -> CsvSchema.ColumnType.NUMBER  
BasicTypeInfo.LONG_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
BasicTypeInfo.FLOAT_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
BasicTypeInfo.DOUBLE_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
BasicTypeInfo.BOOLEAN_TYPE_INFO -> CsvSchema.ColumnType.BOOLEAN

Temporal Types

// Date and time types
new DateType() -> CsvSchema.ColumnType.STRING  // ISO date format
new TimeType() -> CsvSchema.ColumnType.STRING  // ISO time format  
new TimestampType() -> CsvSchema.ColumnType.STRING  // ISO timestamp format

// Example conversion
RowType temporalType = RowType.of(
    new RowType.RowField("date", new DateType()),
    new RowType.RowField("time", new TimeType()),
    new RowType.RowField("timestamp", new TimestampType(3))
);

CsvSchema schema = CsvRowSchemaConverter.convert(temporalType);
// Results in string columns that accept ISO formatted temporal values

Complex Types

// Array types are flattened or serialized as strings
new ArrayType(new VarCharType()) -> CsvSchema.ColumnType.STRING

// Map types are serialized as strings
new MapType(new VarCharType(), new IntType()) -> CsvSchema.ColumnType.STRING

// Nested row types are flattened
RowType nestedType = RowType.of(
    new RowType.RowField("address", RowType.of(
        new RowType.RowField("street", new VarCharType(255)),
        new RowType.RowField("city", new VarCharType(100))
    ))
);

CsvSchema schema = CsvRowSchemaConverter.convert(nestedType);
// Results in flattened columns: address.street, address.city

Decimal and Numeric Types

// Precise numeric types
new DecimalType(10, 2) -> CsvSchema.ColumnType.NUMBER
new BigIntType() -> CsvSchema.ColumnType.NUMBER

// Example with decimal precision
RowType financialType = RowType.of(
    new RowType.RowField("amount", new DecimalType(15, 4)),
    new RowType.RowField("rate", new DecimalType(5, 6))
);

CsvSchema schema = CsvRowSchemaConverter.convert(financialType);
// Maintains precision information in the schema

Advanced Usage

Schema Validation

// Validate converted schema
public void validateConvertedSchema(RowType rowType) {
    CsvSchema schema = CsvRowSchemaConverter.convert(rowType);
    
    // Check column count matches
    int expectedColumns = rowType.getFieldCount();
    int actualColumns = schema.size();
    
    if (expectedColumns != actualColumns) {
        throw new IllegalStateException(
            String.format("Column count mismatch: expected %d, got %d", 
                expectedColumns, actualColumns)
        );
    }
    
    // Validate column names and types
    for (int i = 0; i < expectedColumns; i++) {
        String expectedName = rowType.getFieldNames().get(i);
        String actualName = schema.columnName(i);
        
        if (!expectedName.equals(actualName)) {
            throw new IllegalStateException(
                String.format("Column name mismatch at index %d: expected '%s', got '%s'",
                    i, expectedName, actualName)
            );
        }
    }
}

Schema Caching

// Cache converted schemas for performance
import java.util.concurrent.ConcurrentHashMap;

public class SchemaCache {
    private final ConcurrentHashMap<RowType, CsvSchema> cache = new ConcurrentHashMap<>();
    
    public CsvSchema getSchema(RowType rowType) {
        return cache.computeIfAbsent(rowType, CsvRowSchemaConverter::convert);
    }
    
    public void clearCache() {
        cache.clear();
    }
}

// Usage
SchemaCache schemaCache = new SchemaCache();
CsvSchema schema = schemaCache.getSchema(rowType);

Dynamic Schema Generation

// Generate schema from table metadata
public CsvSchema createSchemaFromMetadata(TableSchema tableSchema) {
    // Convert table schema to row type
    RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
    
    // Convert to CSV schema
    CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);
    
    // Apply dynamic customizations based on metadata
    CsvSchema.Builder builder = baseSchema.rebuild();
    
    // Add header if table has field names
    if (tableSchema.getFieldNames().length > 0) {
        builder.setUseHeader(true);
    }
    
    // Configure based on table properties
    builder.setColumnSeparator(',');
    builder.setQuoteChar('"');
    
    return builder.build();
}

Error Handling

The schema converter handles various error conditions gracefully:

  • Unsupported types: Unknown types are mapped to STRING columns with warnings
  • Nested complexity: Deep nesting is flattened with generated column names
  • Name conflicts: Conflicting field names are resolved with numeric suffixes
  • Null handling: Nullable types are properly represented in the schema
  • Type precision: Maintains precision information where possible in Jackson schema

The converter ensures that any valid Flink RowType or RowTypeInfo can be converted to a usable Jackson CsvSchema, enabling seamless integration between Flink's type system and CSV processing workflows.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-csv

docs

batch-processing.md

configuration.md

index.md

schema-conversion.md

serialization.md

stream-processing.md

tile.json