0
# Data Serialization
1
2
Convert Flink's internal row data structures to CSV format using `CsvRowDataSerializationSchema` with extensive configuration options for different CSV dialects and integration with Kafka, Kinesis, and file sinks.
3
4
## Capabilities
5
6
### CsvRowDataSerializationSchema Class
7
8
Serialization schema that converts Flink's internal `RowData` objects to CSV byte arrays.
9
10
```java { .api }
11
/**
12
* Serialization schema for converting RowData to CSV format
13
* Implements Flink's SerializationSchema interface for integration with sinks
14
*/
15
public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {
16
17
/**
18
* Serialize a RowData object to CSV byte array
19
* @param element The RowData object to serialize
20
* @return byte array containing the CSV representation
21
*/
22
public byte[] serialize(RowData element);
23
24
/**
25
* Builder class for configuring CSV serialization options
26
*/
27
public static class Builder {
28
29
/**
30
* Create a builder with the specified row type
31
* @param rowType Flink RowType defining the structure of data to serialize
32
*/
33
public Builder(RowType rowType);
34
35
/**
36
* Set the field delimiter character (default: ',')
37
* @param delimiter Character to separate fields
38
* @return Builder instance for method chaining
39
*/
40
public Builder setFieldDelimiter(char delimiter);
41
42
/**
43
* Set the array element delimiter for complex types (default: ';')
44
* @param delimiter String to separate array elements
45
* @return Builder instance for method chaining
46
*/
47
public Builder setArrayElementDelimiter(String delimiter);
48
49
/**
50
* Disable quote character usage - fields will not be quoted
51
* @return Builder instance for method chaining
52
*/
53
public Builder disableQuoteCharacter();
54
55
/**
56
* Set the quote character for enclosing field values (default: '"')
57
* @param quoteCharacter Character to quote fields containing special characters
58
* @return Builder instance for method chaining
59
*/
60
public Builder setQuoteCharacter(char quoteCharacter);
61
62
/**
63
* Set the escape character for escaping special characters within fields
64
* @param escapeCharacter Character used for escaping (no default)
65
* @return Builder instance for method chaining
66
*/
67
public Builder setEscapeCharacter(char escapeCharacter);
68
69
/**
70
* Set the null literal string for representing null values
71
* @param nullLiteral String representation of null values (no default)
72
* @return Builder instance for method chaining
73
*/
74
public Builder setNullLiteral(String nullLiteral);
75
76
/**
77
* Control BigDecimal scientific notation output (default: true)
78
* @param writeInScientificNotation Whether to use scientific notation for BigDecimal
79
* @return Builder instance for method chaining
80
*/
81
public Builder setWriteBigDecimalInScientificNotation(boolean writeInScientificNotation);
82
83
/**
84
* Build the configured serialization schema
85
* @return CsvRowDataSerializationSchema instance with specified configuration
86
*/
87
public CsvRowDataSerializationSchema build();
88
}
89
}
90
```
91
92
## Usage Examples
93
94
### Basic Serialization
95
96
```java
97
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
98
import org.apache.flink.table.types.logical.RowType;
99
import org.apache.flink.table.types.logical.VarCharType;
100
import org.apache.flink.table.types.logical.IntType;
101
import org.apache.flink.table.types.logical.BooleanType;
102
103
// Define row type
104
RowType rowType = RowType.of(
105
new VarCharType(255), // name
106
new IntType(), // age
107
new BooleanType() // active
108
);
109
110
// Create serialization schema with default settings
111
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
112
.build();
113
114
// Serialize row data
115
byte[] csvBytes = schema.serialize(rowData);
116
String csvString = new String(csvBytes, StandardCharsets.UTF_8);
117
// Output: "John Doe,25,true"
118
```
119
120
### Custom Delimiter Configuration
121
122
```java
123
// Create schema with pipe delimiter and custom quote character
124
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
125
.setFieldDelimiter('|')
126
.setQuoteCharacter('\'')
127
.setArrayElementDelimiter("::")
128
.build();
129
130
// Output: 'John Doe'|25|true
131
```
132
133
### Null Value Handling
134
135
```java
136
// Configure null literal representation
137
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
138
.setNullLiteral("NULL")
139
.setEscapeCharacter('\\')
140
.build();
141
142
// Null values will be output as "NULL" instead of empty strings
143
```
144
145
### Numeric Formatting
146
147
```java
148
import org.apache.flink.table.types.logical.DecimalType;
149
150
// Row type with decimal field
151
RowType rowType = RowType.of(
152
new VarCharType(255), // name
153
new DecimalType(10, 2) // salary
154
);
155
156
// Control BigDecimal notation
157
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
158
.setWriteBigDecimalInScientificNotation(false) // Use decimal notation
159
.build();
160
161
// Large decimals will use decimal notation instead of scientific notation
162
```
163
164
### Disable Quoting
165
166
```java
167
// Create schema without field quoting
168
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
169
.disableQuoteCharacter()
170
.build();
171
172
// Fields will never be quoted, even if they contain special characters
173
```
174
175
## Integration with Sinks
176
177
### Kafka Producer
178
179
```java
180
import org.apache.flink.connector.kafka.sink.KafkaSink;
181
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
182
183
// Create CSV serialization schema
184
CsvRowDataSerializationSchema csvSchema = new CsvRowDataSerializationSchema.Builder(rowType)
185
.setFieldDelimiter(',')
186
.setQuoteCharacter('"')
187
.build();
188
189
// Create Kafka sink with CSV serialization
190
KafkaSink<RowData> kafkaSink = KafkaSink.<RowData>builder()
191
.setBootstrapServers("localhost:9092")
192
.setRecordSerializer(
193
KafkaRecordSerializationSchema.builder()
194
.setTopic("csv-topic")
195
.setValueSerializationSchema(csvSchema)
196
.build()
197
)
198
.build();
199
200
// Use with DataStream
201
dataStream.sinkTo(kafkaSink);
202
```
203
204
### File Sink
205
206
```java
207
import org.apache.flink.connector.file.sink.FileSink;
208
import org.apache.flink.core.io.SimpleVersionedSerializer;
209
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
210
211
// Create file sink with CSV serialization
212
FileSink<RowData> fileSink = FileSink
213
.forRowFormat(new Path("output/"), new SimpleStringEncoder<RowData>() {
214
@Override
215
public void encode(RowData element, OutputStream stream) throws IOException {
216
byte[] csvBytes = csvSchema.serialize(element);
217
stream.write(csvBytes);
218
stream.write('\n');
219
}
220
})
221
.build();
222
223
dataStream.sinkTo(fileSink);
224
```
225
226
### Custom Sink Function
227
228
```java
229
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
230
231
public class CsvSinkFunction implements SinkFunction<RowData> {
232
private final CsvRowDataSerializationSchema serializer;
233
234
public CsvSinkFunction(CsvRowDataSerializationSchema serializer) {
235
this.serializer = serializer;
236
}
237
238
@Override
239
public void invoke(RowData value, Context context) throws Exception {
240
byte[] csvBytes = serializer.serialize(value);
241
// Write to external system, file, database, etc.
242
writeToExternalSystem(csvBytes);
243
}
244
245
private void writeToExternalSystem(byte[] data) {
246
// Implementation specific to target system
247
}
248
}
249
250
// Use custom sink
251
dataStream.addSink(new CsvSinkFunction(csvSchema));
252
```
253
254
## Complex Type Handling
255
256
### Array Types
257
258
```java
259
import org.apache.flink.table.types.logical.ArrayType;
260
261
// Row type with array field
262
RowType rowType = RowType.of(
263
new VarCharType(255), // name
264
new ArrayType(new VarCharType(255)) // tags
265
);
266
267
// Configure array element delimiter
268
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
269
.setArrayElementDelimiter(";")
270
.build();
271
272
// Arrays will be serialized as: "John Doe","tag1;tag2;tag3"
273
```
274
275
### Nested Types
276
277
```java
278
import org.apache.flink.table.types.logical.RowType;
279
280
// Nested row type
281
RowType addressType = RowType.of(
282
new VarCharType(255), // street
283
new VarCharType(255) // city
284
);
285
286
RowType personType = RowType.of(
287
new VarCharType(255), // name
288
addressType // address
289
);
290
291
// Nested objects are flattened in CSV output
292
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(personType)
293
.build();
294
295
// Output: "John Doe","123 Main St","New York"
296
```
297
298
## Performance and Memory Considerations
299
300
### Schema Reuse
301
302
```java
303
// Create schema once and reuse across multiple serializations
304
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
305
.setFieldDelimiter(',')
306
.build();
307
308
// Reuse for multiple row serializations
309
for (RowData row : rows) {
310
byte[] csvBytes = schema.serialize(row);
311
// Process csvBytes
312
}
313
```
314
315
### Memory Efficiency
316
317
The serialization schema operates efficiently by:
318
- **Streaming serialization**: Writes directly to byte arrays without intermediate string creation
319
- **Minimal object allocation**: Reuses internal buffers and writers
320
- **Type-specific handling**: Optimized serialization paths for different data types
321
- **Lazy evaluation**: Only processes fields that contain actual data
322
323
## Error Handling
324
325
The serialization schema handles various error conditions:
326
327
- **Null values**: Configurable null literal representation or empty strings
328
- **Type conversion errors**: Automatic conversion between compatible types
329
- **Special characters**: Proper escaping and quoting of special characters
330
- **Overflow conditions**: Graceful handling of numeric overflow with configurable precision
331
- **Encoding issues**: UTF-8 encoding with proper character handling