Apache Flink CSV format support for reading and writing CSV data in stream and batch processing
—
Convert Flink's internal row data structures to CSV format using CsvRowDataSerializationSchema with extensive configuration options for different CSV dialects and integration with Kafka, Kinesis, and file sinks.
Serialization schema that converts Flink's internal RowData objects to CSV byte arrays.
/**
* Serialization schema for converting RowData to CSV format
* Implements Flink's SerializationSchema interface for integration with sinks
*/
public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {
/**
* Serialize a RowData object to CSV byte array
* @param element The RowData object to serialize
* @return byte array containing the CSV representation
*/
public byte[] serialize(RowData element);
/**
* Builder class for configuring CSV serialization options
*/
public static class Builder {
/**
* Create a builder with the specified row type
* @param rowType Flink RowType defining the structure of data to serialize
*/
public Builder(RowType rowType);
/**
* Set the field delimiter character (default: ',')
* @param delimiter Character to separate fields
* @return Builder instance for method chaining
*/
public Builder setFieldDelimiter(char delimiter);
/**
* Set the array element delimiter for complex types (default: ';')
* @param delimiter String to separate array elements
* @return Builder instance for method chaining
*/
public Builder setArrayElementDelimiter(String delimiter);
/**
* Disable quote character usage - fields will not be quoted
* @return Builder instance for method chaining
*/
public Builder disableQuoteCharacter();
/**
* Set the quote character for enclosing field values (default: '"')
* @param quoteCharacter Character to quote fields containing special characters
* @return Builder instance for method chaining
*/
public Builder setQuoteCharacter(char quoteCharacter);
/**
* Set the escape character for escaping special characters within fields
* @param escapeCharacter Character used for escaping (no default)
* @return Builder instance for method chaining
*/
public Builder setEscapeCharacter(char escapeCharacter);
/**
* Set the null literal string for representing null values
* @param nullLiteral String representation of null values (no default)
* @return Builder instance for method chaining
*/
public Builder setNullLiteral(String nullLiteral);
/**
* Control BigDecimal scientific notation output (default: true)
* @param writeInScientificNotation Whether to use scientific notation for BigDecimal
* @return Builder instance for method chaining
*/
public Builder setWriteBigDecimalInScientificNotation(boolean writeInScientificNotation);
/**
* Build the configured serialization schema
* @return CsvRowDataSerializationSchema instance with specified configuration
*/
public CsvRowDataSerializationSchema build();
}
}import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.BooleanType;
// Define row type
RowType rowType = RowType.of(
new VarCharType(255), // name
new IntType(), // age
new BooleanType() // active
);
// Create serialization schema with default settings
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.build();
// Serialize row data
byte[] csvBytes = schema.serialize(rowData);
String csvString = new String(csvBytes, StandardCharsets.UTF_8);
// Output: "John Doe,25,true"// Create schema with pipe delimiter and custom quote character
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.setFieldDelimiter('|')
.setQuoteCharacter('\'')
.setArrayElementDelimiter("::")
.build();
// Output: 'John Doe'|25|true// Configure null literal representation
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.setNullLiteral("NULL")
.setEscapeCharacter('\\')
.build();
// Null values will be output as "NULL" instead of empty stringsimport org.apache.flink.table.types.logical.DecimalType;
// Row type with decimal field
RowType rowType = RowType.of(
new VarCharType(255), // name
new DecimalType(10, 2) // salary
);
// Control BigDecimal notation
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.setWriteBigDecimalInScientificNotation(false) // Use decimal notation
.build();
// Large decimals will use decimal notation instead of scientific notation// Create schema without field quoting
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.disableQuoteCharacter()
.build();
// Fields will never be quoted, even if they contain special charactersimport org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
// Create CSV serialization schema
CsvRowDataSerializationSchema csvSchema = new CsvRowDataSerializationSchema.Builder(rowType)
.setFieldDelimiter(',')
.setQuoteCharacter('"')
.build();
// Create Kafka sink with CSV serialization
KafkaSink<RowData> kafkaSink = KafkaSink.<RowData>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("csv-topic")
.setValueSerializationSchema(csvSchema)
.build()
)
.build();
// Use with DataStream
dataStream.sinkTo(kafkaSink);import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
// Create file sink with CSV serialization
FileSink<RowData> fileSink = FileSink
.forRowFormat(new Path("output/"), new SimpleStringEncoder<RowData>() {
@Override
public void encode(RowData element, OutputStream stream) throws IOException {
byte[] csvBytes = csvSchema.serialize(element);
stream.write(csvBytes);
stream.write('\n');
}
})
.build();
dataStream.sinkTo(fileSink);import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class CsvSinkFunction implements SinkFunction<RowData> {
private final CsvRowDataSerializationSchema serializer;
public CsvSinkFunction(CsvRowDataSerializationSchema serializer) {
this.serializer = serializer;
}
@Override
public void invoke(RowData value, Context context) throws Exception {
byte[] csvBytes = serializer.serialize(value);
// Write to external system, file, database, etc.
writeToExternalSystem(csvBytes);
}
private void writeToExternalSystem(byte[] data) {
// Implementation specific to target system
}
}
// Use custom sink
dataStream.addSink(new CsvSinkFunction(csvSchema));import org.apache.flink.table.types.logical.ArrayType;
// Row type with array field
RowType rowType = RowType.of(
new VarCharType(255), // name
new ArrayType(new VarCharType(255)) // tags
);
// Configure array element delimiter
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.setArrayElementDelimiter(";")
.build();
// Arrays will be serialized as: "John Doe","tag1;tag2;tag3"import org.apache.flink.table.types.logical.RowType;
// Nested row type
RowType addressType = RowType.of(
new VarCharType(255), // street
new VarCharType(255) // city
);
RowType personType = RowType.of(
new VarCharType(255), // name
addressType // address
);
// Nested objects are flattened in CSV output
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(personType)
.build();
// Output: "John Doe","123 Main St","New York"// Create schema once and reuse across multiple serializations
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.setFieldDelimiter(',')
.build();
// Reuse for multiple row serializations
for (RowData row : rows) {
byte[] csvBytes = schema.serialize(row);
// Process csvBytes
}The serialization schema operates efficiently by:
The serialization schema handles various error conditions:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-csv