0
# Schema Conversion
1
2
Utility functions for converting between Flink type information and Jackson CSV schemas using the `CsvRowSchemaConverter` class, enabling seamless integration between Flink's type system and CSV processing libraries.
3
4
## Capabilities
5
6
### CsvRowSchemaConverter Class
7
8
Static utility class that converts Flink type information to Jackson `CsvSchema` objects for CSV processing.
9
10
```java { .api }
11
/**
12
* Utility class for converting Flink type information to Jackson CSV schemas
13
* Provides static methods for schema conversion supporting both legacy and modern Flink types
14
*/
15
public class CsvRowSchemaConverter {
16
17
/**
18
* Convert legacy RowTypeInfo to Jackson CsvSchema
19
* Supports DataSet API and legacy type system integration
20
* @param rowType RowTypeInfo containing field names and types
21
* @return CsvSchema configured for the specified row structure
22
*/
23
public static CsvSchema convert(RowTypeInfo rowType);
24
25
/**
26
* Convert modern RowType to Jackson CsvSchema
27
* Supports Table API and modern type system integration
28
* @param rowType RowType containing logical field types and names
29
* @return CsvSchema configured for the specified row structure
30
*/
31
public static CsvSchema convert(RowType rowType);
32
}
33
```
34
35
## Usage Examples
36
37
### Converting RowTypeInfo (Legacy Type System)
38
39
```java
40
import org.apache.flink.formats.csv.CsvRowSchemaConverter;
41
import org.apache.flink.api.java.typeutils.RowTypeInfo;
42
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
43
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
44
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
45
46
// Create legacy row type info
47
RowTypeInfo rowTypeInfo = new RowTypeInfo(
48
BasicTypeInfo.STRING_TYPE_INFO, // name
49
BasicTypeInfo.INT_TYPE_INFO, // age
50
BasicTypeInfo.BOOLEAN_TYPE_INFO, // active
51
BasicTypeInfo.DOUBLE_TYPE_INFO, // salary
52
PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO // tags
53
);
54
55
// Convert to Jackson CSV schema
56
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowTypeInfo);
57
58
// The resulting schema can be used with Jackson CsvMapper
59
CsvMapper mapper = new CsvMapper();
60
ObjectReader reader = mapper.readerFor(Row.class).with(csvSchema);
61
```
62
63
### Converting RowType (Modern Type System)
64
65
```java
66
import org.apache.flink.table.types.logical.RowType;
67
import org.apache.flink.table.types.logical.VarCharType;
68
import org.apache.flink.table.types.logical.IntType;
69
import org.apache.flink.table.types.logical.BooleanType;
70
import org.apache.flink.table.types.logical.DoubleType;
71
import org.apache.flink.table.types.logical.ArrayType;
72
73
// Create modern row type
74
RowType rowType = RowType.of(
75
new RowType.RowField("name", new VarCharType(255)),
76
new RowType.RowField("age", new IntType()),
77
new RowType.RowField("active", new BooleanType()),
78
new RowType.RowField("salary", new DoubleType()),
79
new RowType.RowField("tags", new ArrayType(new VarCharType(100)))
80
);
81
82
// Convert to Jackson CSV schema
83
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);
84
85
// Use with CsvReaderFormat
86
CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(
87
csvSchema,
88
TypeInformation.of(Row.class)
89
);
90
```
91
92
### Schema Customization
93
94
```java
95
// Convert and customize the schema
96
CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);
97
98
// Customize schema properties
99
CsvSchema customSchema = baseSchema.rebuild()
100
.setUseHeader(true) // Use first row as header
101
.setColumnSeparator('|') // Change delimiter to pipe
102
.setQuoteChar('\'') // Use single quotes
103
.setEscapeChar('\\') // Set escape character
104
.build();
105
106
// Use customized schema
107
CsvMapper mapper = new CsvMapper();
108
ObjectReader reader = mapper.readerFor(Row.class).with(customSchema);
109
```
110
111
## Integration Patterns
112
113
### With CsvReaderFormat
114
115
```java
116
// Complete integration with streaming reader
117
RowType rowType = RowType.of(
118
new RowType.RowField("id", new IntType()),
119
new RowType.RowField("name", new VarCharType(255)),
120
new RowType.RowField("timestamp", new TimestampType(3))
121
);
122
123
// Convert to schema
124
CsvSchema schema = CsvRowSchemaConverter.convert(rowType);
125
126
// Create reader format
127
CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(
128
schema,
129
TypeInformation.of(Row.class)
130
);
131
132
// Use with file source
133
FileSource<Row> source = FileSource
134
.forRecordStreamFormat(readerFormat, new Path("data.csv"))
135
.build();
136
```
137
138
### With Custom Mapper Factory
139
140
```java
141
// Create custom mapper factory using converted schema
142
SerializableSupplier<CsvMapper> mapperFactory = () -> {
143
CsvMapper mapper = new CsvMapper();
144
145
// Configure mapper for specific requirements
146
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
147
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
148
149
return mapper;
150
};
151
152
// Schema generator using converter
153
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper -> {
154
CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);
155
return baseSchema.withHeader();
156
};
157
158
// Create reader format with custom configuration
159
CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(
160
mapperFactory,
161
schemaGenerator,
162
TypeInformation.of(Row.class)
163
);
164
```
165
166
### Table API Integration
167
168
```java
169
// Use in Table API descriptor
170
import org.apache.flink.table.api.TableDescriptor;
171
import org.apache.flink.table.api.Schema;
172
173
// Define table schema
174
Schema tableSchema = Schema.newBuilder()
175
.column("name", DataTypes.STRING())
176
.column("age", DataTypes.INT())
177
.column("active", DataTypes.BOOLEAN())
178
.build();
179
180
// Convert to RowType
181
RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
182
183
// Convert to CSV schema for external processing
184
CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);
185
186
// The csvSchema can now be used with external Jackson-based CSV processing
187
```
188
189
## Type Mapping
190
191
The converter handles mapping between Flink types and Jackson CSV column types:
192
193
### Primitive Types
194
195
```java
196
// Flink Type -> Jackson CSV Column Type
197
BasicTypeInfo.STRING_TYPE_INFO -> CsvSchema.ColumnType.STRING
198
BasicTypeInfo.INT_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
199
BasicTypeInfo.LONG_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
200
BasicTypeInfo.FLOAT_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
201
BasicTypeInfo.DOUBLE_TYPE_INFO -> CsvSchema.ColumnType.NUMBER
202
BasicTypeInfo.BOOLEAN_TYPE_INFO -> CsvSchema.ColumnType.BOOLEAN
203
```
204
205
### Temporal Types
206
207
```java
208
// Date and time types
209
new DateType() -> CsvSchema.ColumnType.STRING // ISO date format
210
new TimeType() -> CsvSchema.ColumnType.STRING // ISO time format
211
new TimestampType() -> CsvSchema.ColumnType.STRING // ISO timestamp format
212
213
// Example conversion
214
RowType temporalType = RowType.of(
215
new RowType.RowField("date", new DateType()),
216
new RowType.RowField("time", new TimeType()),
217
new RowType.RowField("timestamp", new TimestampType(3))
218
);
219
220
CsvSchema schema = CsvRowSchemaConverter.convert(temporalType);
221
// Results in string columns that accept ISO formatted temporal values
222
```
223
224
### Complex Types
225
226
```java
227
// Array types are flattened or serialized as strings
228
new ArrayType(new VarCharType()) -> CsvSchema.ColumnType.STRING
229
230
// Map types are serialized as strings
231
new MapType(new VarCharType(), new IntType()) -> CsvSchema.ColumnType.STRING
232
233
// Nested row types are flattened
234
RowType nestedType = RowType.of(
235
new RowType.RowField("address", RowType.of(
236
new RowType.RowField("street", new VarCharType(255)),
237
new RowType.RowField("city", new VarCharType(100))
238
))
239
);
240
241
CsvSchema schema = CsvRowSchemaConverter.convert(nestedType);
242
// Results in flattened columns: address.street, address.city
243
```
244
245
### Decimal and Numeric Types
246
247
```java
248
// Precise numeric types
249
new DecimalType(10, 2) -> CsvSchema.ColumnType.NUMBER
250
new BigIntType() -> CsvSchema.ColumnType.NUMBER
251
252
// Example with decimal precision
253
RowType financialType = RowType.of(
254
new RowType.RowField("amount", new DecimalType(15, 4)),
255
new RowType.RowField("rate", new DecimalType(5, 6))
256
);
257
258
CsvSchema schema = CsvRowSchemaConverter.convert(financialType);
259
// Maintains precision information in the schema
260
```
261
262
## Advanced Usage
263
264
### Schema Validation
265
266
```java
267
// Validate converted schema
268
public void validateConvertedSchema(RowType rowType) {
269
CsvSchema schema = CsvRowSchemaConverter.convert(rowType);
270
271
// Check column count matches
272
int expectedColumns = rowType.getFieldCount();
273
int actualColumns = schema.size();
274
275
if (expectedColumns != actualColumns) {
276
throw new IllegalStateException(
277
String.format("Column count mismatch: expected %d, got %d",
278
expectedColumns, actualColumns)
279
);
280
}
281
282
// Validate column names and types
283
for (int i = 0; i < expectedColumns; i++) {
284
String expectedName = rowType.getFieldNames().get(i);
285
String actualName = schema.columnName(i);
286
287
if (!expectedName.equals(actualName)) {
288
throw new IllegalStateException(
289
String.format("Column name mismatch at index %d: expected '%s', got '%s'",
290
i, expectedName, actualName)
291
);
292
}
293
}
294
}
295
```
296
297
### Schema Caching
298
299
```java
300
// Cache converted schemas for performance
301
import java.util.concurrent.ConcurrentHashMap;
302
303
public class SchemaCache {
304
private final ConcurrentHashMap<RowType, CsvSchema> cache = new ConcurrentHashMap<>();
305
306
public CsvSchema getSchema(RowType rowType) {
307
return cache.computeIfAbsent(rowType, CsvRowSchemaConverter::convert);
308
}
309
310
public void clearCache() {
311
cache.clear();
312
}
313
}
314
315
// Usage
316
SchemaCache schemaCache = new SchemaCache();
317
CsvSchema schema = schemaCache.getSchema(rowType);
318
```
319
320
### Dynamic Schema Generation
321
322
```java
323
// Generate schema from table metadata
324
public CsvSchema createSchemaFromMetadata(TableSchema tableSchema) {
325
// Convert table schema to row type
326
RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
327
328
// Convert to CSV schema
329
CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);
330
331
// Apply dynamic customizations based on metadata
332
CsvSchema.Builder builder = baseSchema.rebuild();
333
334
// Add header if table has field names
335
if (tableSchema.getFieldNames().length > 0) {
336
builder.setUseHeader(true);
337
}
338
339
// Configure based on table properties
340
builder.setColumnSeparator(',');
341
builder.setQuoteChar('"');
342
343
return builder.build();
344
}
345
```
346
347
## Error Handling
348
349
The schema converter handles various error conditions gracefully:
350
351
- **Unsupported types**: Unknown types are mapped to STRING columns with warnings
352
- **Nested complexity**: Deep nesting is flattened with generated column names
353
- **Name conflicts**: Conflicting field names are resolved with numeric suffixes
354
- **Null handling**: Nullable types are properly represented in the schema
355
- **Type precision**: Maintains precision information where possible in Jackson schema
356
357
The converter ensures that any valid Flink `RowType` or `RowTypeInfo` can be converted to a usable Jackson `CsvSchema`, enabling seamless integration between Flink's type system and CSV processing workflows.