CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-csv

Apache Flink CSV format support for reading and writing CSV data in stream and batch processing

Pending
Overview
Eval results
Files

serialization.mddocs/

Data Serialization

Convert Flink's internal row data structures to CSV format using CsvRowDataSerializationSchema with extensive configuration options for different CSV dialects and integration with Kafka, Kinesis, and file sinks.

Capabilities

CsvRowDataSerializationSchema Class

Serialization schema that converts Flink's internal RowData objects to CSV byte arrays.

/**
 * Serialization schema for converting RowData to CSV format
 * Implements Flink's SerializationSchema interface for integration with sinks
 */
public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {
    
    /**
     * Serialize a RowData object to CSV byte array
     * @param element The RowData object to serialize
     * @return byte array containing the CSV representation
     */
    public byte[] serialize(RowData element);
    
    /**
     * Builder class for configuring CSV serialization options
     */
    public static class Builder {
        
        /**
         * Create a builder with the specified row type
         * @param rowType Flink RowType defining the structure of data to serialize
         */
        public Builder(RowType rowType);
        
        /**
         * Set the field delimiter character (default: ',')
         * @param delimiter Character to separate fields
         * @return Builder instance for method chaining
         */
        public Builder setFieldDelimiter(char delimiter);
        
        /**
         * Set the array element delimiter for complex types (default: ';')
         * @param delimiter String to separate array elements
         * @return Builder instance for method chaining
         */
        public Builder setArrayElementDelimiter(String delimiter);
        
        /**
         * Disable quote character usage - fields will not be quoted
         * @return Builder instance for method chaining
         */
        public Builder disableQuoteCharacter();
        
        /**
         * Set the quote character for enclosing field values (default: '"')
         * @param quoteCharacter Character to quote fields containing special characters
         * @return Builder instance for method chaining
         */
        public Builder setQuoteCharacter(char quoteCharacter);
        
        /**
         * Set the escape character for escaping special characters within fields
         * @param escapeCharacter Character used for escaping (no default)
         * @return Builder instance for method chaining
         */
        public Builder setEscapeCharacter(char escapeCharacter);
        
        /**
         * Set the null literal string for representing null values
         * @param nullLiteral String representation of null values (no default)
         * @return Builder instance for method chaining
         */
        public Builder setNullLiteral(String nullLiteral);
        
        /**
         * Control BigDecimal scientific notation output (default: true)
         * @param writeInScientificNotation Whether to use scientific notation for BigDecimal
         * @return Builder instance for method chaining
         */
        public Builder setWriteBigDecimalInScientificNotation(boolean writeInScientificNotation);
        
        /**
         * Build the configured serialization schema
         * @return CsvRowDataSerializationSchema instance with specified configuration
         */
        public CsvRowDataSerializationSchema build();
    }
}

Usage Examples

Basic Serialization

import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.BooleanType;

// Define row type
RowType rowType = RowType.of(
    new VarCharType(255),  // name
    new IntType(),         // age  
    new BooleanType()      // active
);

// Create serialization schema with default settings
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .build();

// Serialize row data
byte[] csvBytes = schema.serialize(rowData);
String csvString = new String(csvBytes, StandardCharsets.UTF_8);
// Output: "John Doe,25,true"

Custom Delimiter Configuration

// Create schema with pipe delimiter and custom quote character
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setFieldDelimiter('|')
    .setQuoteCharacter('\'')
    .setArrayElementDelimiter("::")
    .build();

// Output: 'John Doe'|25|true

Null Value Handling

// Configure null literal representation
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setNullLiteral("NULL")
    .setEscapeCharacter('\\')
    .build();

// Null values will be output as "NULL" instead of empty strings

Numeric Formatting

import org.apache.flink.table.types.logical.DecimalType;

// Row type with decimal field
RowType rowType = RowType.of(
    new VarCharType(255),    // name
    new DecimalType(10, 2)   // salary
);

// Control BigDecimal notation
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setWriteBigDecimalInScientificNotation(false)  // Use decimal notation
    .build();

// Large decimals will use decimal notation instead of scientific notation

Disable Quoting

// Create schema without field quoting
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .disableQuoteCharacter()
    .build();

// Fields will never be quoted, even if they contain special characters

Integration with Sinks

Kafka Producer

import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

// Create CSV serialization schema
CsvRowDataSerializationSchema csvSchema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setFieldDelimiter(',')
    .setQuoteCharacter('"')
    .build();

// Create Kafka sink with CSV serialization
KafkaSink<RowData> kafkaSink = KafkaSink.<RowData>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("csv-topic")
            .setValueSerializationSchema(csvSchema)
            .build()
    )
    .build();

// Use with DataStream
dataStream.sinkTo(kafkaSink);

File Sink

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

// Create file sink with CSV serialization
FileSink<RowData> fileSink = FileSink
    .forRowFormat(new Path("output/"), new SimpleStringEncoder<RowData>() {
        @Override
        public void encode(RowData element, OutputStream stream) throws IOException {
            byte[] csvBytes = csvSchema.serialize(element);
            stream.write(csvBytes);
            stream.write('\n');
        }
    })
    .build();

dataStream.sinkTo(fileSink);

Custom Sink Function

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class CsvSinkFunction implements SinkFunction<RowData> {
    private final CsvRowDataSerializationSchema serializer;
    
    public CsvSinkFunction(CsvRowDataSerializationSchema serializer) {
        this.serializer = serializer;
    }
    
    @Override
    public void invoke(RowData value, Context context) throws Exception {
        byte[] csvBytes = serializer.serialize(value);
        // Write to external system, file, database, etc.
        writeToExternalSystem(csvBytes);
    }
    
    private void writeToExternalSystem(byte[] data) {
        // Implementation specific to target system
    }
}

// Use custom sink
dataStream.addSink(new CsvSinkFunction(csvSchema));

Complex Type Handling

Array Types

import org.apache.flink.table.types.logical.ArrayType;

// Row type with array field
RowType rowType = RowType.of(
    new VarCharType(255),                    // name
    new ArrayType(new VarCharType(255))      // tags
);

// Configure array element delimiter
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setArrayElementDelimiter(";")
    .build();

// Arrays will be serialized as: "John Doe","tag1;tag2;tag3"

Nested Types

import org.apache.flink.table.types.logical.RowType;

// Nested row type
RowType addressType = RowType.of(
    new VarCharType(255),  // street
    new VarCharType(255)   // city
);

RowType personType = RowType.of(
    new VarCharType(255),  // name
    addressType            // address
);

// Nested objects are flattened in CSV output
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(personType)
    .build();

// Output: "John Doe","123 Main St","New York"

Performance and Memory Considerations

Schema Reuse

// Create schema once and reuse across multiple serializations
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setFieldDelimiter(',')
    .build();

// Reuse for multiple row serializations
for (RowData row : rows) {
    byte[] csvBytes = schema.serialize(row);
    // Process csvBytes
}

Memory Efficiency

The serialization schema operates efficiently by:

  • Streaming serialization: Writes directly to byte arrays without intermediate string creation
  • Minimal object allocation: Reuses internal buffers and writers
  • Type-specific handling: Optimized serialization paths for different data types
  • Lazy evaluation: Only processes fields that contain actual data

Error Handling

The serialization schema handles various error conditions:

  • Null values: Configurable null literal representation or empty strings
  • Type conversion errors: Automatic conversion between compatible types
  • Special characters: Proper escaping and quoting of special characters
  • Overflow conditions: Graceful handling of numeric overflow with configurable precision
  • Encoding issues: UTF-8 encoding with proper character handling

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-csv

docs

batch-processing.md

configuration.md

index.md

schema-conversion.md

serialization.md

stream-processing.md

tile.json