or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdindex.mdschema-conversion.mdserialization.mdstream-processing.md

schema-conversion.mddocs/

0

# Schema Conversion

1

2

Utility functions for converting between Flink type information and Jackson CSV schemas using the `CsvRowSchemaConverter` class, enabling seamless integration between Flink's type system and CSV processing libraries.

3

4

## Capabilities

5

6

### CsvRowSchemaConverter Class

7

8

Static utility class that converts Flink type information to Jackson `CsvSchema` objects for CSV processing.

9

10

```java { .api }

11

/**

12

* Utility class for converting Flink type information to Jackson CSV schemas

13

* Provides static methods for schema conversion supporting both legacy and modern Flink types

14

*/

15

public class CsvRowSchemaConverter {

16

17

/**

18

* Convert legacy RowTypeInfo to Jackson CsvSchema

19

* Supports DataSet API and legacy type system integration

20

* @param rowType RowTypeInfo containing field names and types

21

* @return CsvSchema configured for the specified row structure

22

*/

23

public static CsvSchema convert(RowTypeInfo rowType);

24

25

/**

26

* Convert modern RowType to Jackson CsvSchema

27

* Supports Table API and modern type system integration

28

* @param rowType RowType containing logical field types and names

29

* @return CsvSchema configured for the specified row structure

30

*/

31

public static CsvSchema convert(RowType rowType);

32

}

33

```

34

35

## Usage Examples

36

37

### Converting RowTypeInfo (Legacy Type System)

38

39

```java

40

import org.apache.flink.formats.csv.CsvRowSchemaConverter;

41

import org.apache.flink.api.java.typeutils.RowTypeInfo;

42

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

43

import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;

44

import com.fasterxml.jackson.dataformat.csv.CsvSchema;

45

46

// Create legacy row type info

47

RowTypeInfo rowTypeInfo = new RowTypeInfo(

48

BasicTypeInfo.STRING_TYPE_INFO, // name

49

BasicTypeInfo.INT_TYPE_INFO, // age

50

BasicTypeInfo.BOOLEAN_TYPE_INFO, // active

51

BasicTypeInfo.DOUBLE_TYPE_INFO, // salary

52

PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO // tags

53

);

54

55

// Convert to Jackson CSV schema

56

CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowTypeInfo);

57

58

// The resulting schema can be used with Jackson CsvMapper

59

CsvMapper mapper = new CsvMapper();

60

ObjectReader reader = mapper.readerFor(Row.class).with(csvSchema);

61

```

62

63

### Converting RowType (Modern Type System)

64

65

```java

66

import org.apache.flink.table.types.logical.RowType;

67

import org.apache.flink.table.types.logical.VarCharType;

68

import org.apache.flink.table.types.logical.IntType;

69

import org.apache.flink.table.types.logical.BooleanType;

70

import org.apache.flink.table.types.logical.DoubleType;

71

import org.apache.flink.table.types.logical.ArrayType;

72

73

// Create modern row type

74

RowType rowType = RowType.of(

75

new RowType.RowField("name", new VarCharType(255)),

76

new RowType.RowField("age", new IntType()),

77

new RowType.RowField("active", new BooleanType()),

78

new RowType.RowField("salary", new DoubleType()),

79

new RowType.RowField("tags", new ArrayType(new VarCharType(100)))

80

);

81

82

// Convert to Jackson CSV schema

83

CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);

84

85

// Use with CsvReaderFormat

86

CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(

87

csvSchema,

88

TypeInformation.of(Row.class)

89

);

90

```

91

92

### Schema Customization

93

94

```java

95

// Convert and customize the schema

96

CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);

97

98

// Customize schema properties

99

CsvSchema customSchema = baseSchema.rebuild()

100

.setUseHeader(true) // Use first row as header

101

.setColumnSeparator('|') // Change delimiter to pipe

102

.setQuoteChar('\'') // Use single quotes

103

.setEscapeChar('\\') // Set escape character

104

.build();

105

106

// Use customized schema

107

CsvMapper mapper = new CsvMapper();

108

ObjectReader reader = mapper.readerFor(Row.class).with(customSchema);

109

```

110

111

## Integration Patterns

112

113

### With CsvReaderFormat

114

115

```java

116

// Complete integration with streaming reader

117

RowType rowType = RowType.of(

118

new RowType.RowField("id", new IntType()),

119

new RowType.RowField("name", new VarCharType(255)),

120

new RowType.RowField("timestamp", new TimestampType(3))

121

);

122

123

// Convert to schema

124

CsvSchema schema = CsvRowSchemaConverter.convert(rowType);

125

126

// Create reader format

127

CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(

128

schema,

129

TypeInformation.of(Row.class)

130

);

131

132

// Use with file source

133

FileSource<Row> source = FileSource

134

.forRecordStreamFormat(readerFormat, new Path("data.csv"))

135

.build();

136

```

137

138

### With Custom Mapper Factory

139

140

```java

141

// Create custom mapper factory using converted schema

142

SerializableSupplier<CsvMapper> mapperFactory = () -> {

143

CsvMapper mapper = new CsvMapper();

144

145

// Configure mapper for specific requirements

146

mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);

147

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

148

149

return mapper;

150

};

151

152

// Schema generator using converter

153

SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper -> {

154

CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);

155

return baseSchema.withHeader();

156

};

157

158

// Create reader format with custom configuration

159

CsvReaderFormat<Row> readerFormat = CsvReaderFormat.forSchema(

160

mapperFactory,

161

schemaGenerator,

162

TypeInformation.of(Row.class)

163

);

164

```

165

166

### Table API Integration

167

168

```java

169

// Use in Table API descriptor

170

import org.apache.flink.table.api.TableDescriptor;

171

import org.apache.flink.table.api.Schema;

172

173

// Define table schema

174

Schema tableSchema = Schema.newBuilder()

175

.column("name", DataTypes.STRING())

176

.column("age", DataTypes.INT())

177

.column("active", DataTypes.BOOLEAN())

178

.build();

179

180

// Convert to RowType

181

RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();

182

183

// Convert to CSV schema for external processing

184

CsvSchema csvSchema = CsvRowSchemaConverter.convert(rowType);

185

186

// The csvSchema can now be used with external Jackson-based CSV processing

187

```

188

189

## Type Mapping

190

191

The converter handles mapping between Flink types and Jackson CSV column types:

192

193

### Primitive Types

194

195

```java

196

// Flink Type -> Jackson CSV Column Type

197

BasicTypeInfo.STRING_TYPE_INFO -> CsvSchema.ColumnType.STRING

198

BasicTypeInfo.INT_TYPE_INFO -> CsvSchema.ColumnType.NUMBER

199

BasicTypeInfo.LONG_TYPE_INFO -> CsvSchema.ColumnType.NUMBER

200

BasicTypeInfo.FLOAT_TYPE_INFO -> CsvSchema.ColumnType.NUMBER

201

BasicTypeInfo.DOUBLE_TYPE_INFO -> CsvSchema.ColumnType.NUMBER

202

BasicTypeInfo.BOOLEAN_TYPE_INFO -> CsvSchema.ColumnType.BOOLEAN

203

```

204

205

### Temporal Types

206

207

```java

208

// Date and time types

209

new DateType() -> CsvSchema.ColumnType.STRING // ISO date format

210

new TimeType() -> CsvSchema.ColumnType.STRING // ISO time format

211

new TimestampType() -> CsvSchema.ColumnType.STRING // ISO timestamp format

212

213

// Example conversion

214

RowType temporalType = RowType.of(

215

new RowType.RowField("date", new DateType()),

216

new RowType.RowField("time", new TimeType()),

217

new RowType.RowField("timestamp", new TimestampType(3))

218

);

219

220

CsvSchema schema = CsvRowSchemaConverter.convert(temporalType);

221

// Results in string columns that accept ISO formatted temporal values

222

```

223

224

### Complex Types

225

226

```java

227

// Array types are flattened or serialized as strings

228

new ArrayType(new VarCharType()) -> CsvSchema.ColumnType.STRING

229

230

// Map types are serialized as strings

231

new MapType(new VarCharType(), new IntType()) -> CsvSchema.ColumnType.STRING

232

233

// Nested row types are flattened

234

RowType nestedType = RowType.of(

235

new RowType.RowField("address", RowType.of(

236

new RowType.RowField("street", new VarCharType(255)),

237

new RowType.RowField("city", new VarCharType(100))

238

))

239

);

240

241

CsvSchema schema = CsvRowSchemaConverter.convert(nestedType);

242

// Results in flattened columns: address.street, address.city

243

```

244

245

### Decimal and Numeric Types

246

247

```java

248

// Precise numeric types

249

new DecimalType(10, 2) -> CsvSchema.ColumnType.NUMBER

250

new BigIntType() -> CsvSchema.ColumnType.NUMBER

251

252

// Example with decimal precision

253

RowType financialType = RowType.of(

254

new RowType.RowField("amount", new DecimalType(15, 4)),

255

new RowType.RowField("rate", new DecimalType(5, 6))

256

);

257

258

CsvSchema schema = CsvRowSchemaConverter.convert(financialType);

259

// Maintains precision information in the schema

260

```

261

262

## Advanced Usage

263

264

### Schema Validation

265

266

```java

267

// Validate converted schema

268

public void validateConvertedSchema(RowType rowType) {

269

CsvSchema schema = CsvRowSchemaConverter.convert(rowType);

270

271

// Check column count matches

272

int expectedColumns = rowType.getFieldCount();

273

int actualColumns = schema.size();

274

275

if (expectedColumns != actualColumns) {

276

throw new IllegalStateException(

277

String.format("Column count mismatch: expected %d, got %d",

278

expectedColumns, actualColumns)

279

);

280

}

281

282

// Validate column names and types

283

for (int i = 0; i < expectedColumns; i++) {

284

String expectedName = rowType.getFieldNames().get(i);

285

String actualName = schema.columnName(i);

286

287

if (!expectedName.equals(actualName)) {

288

throw new IllegalStateException(

289

String.format("Column name mismatch at index %d: expected '%s', got '%s'",

290

i, expectedName, actualName)

291

);

292

}

293

}

294

}

295

```

296

297

### Schema Caching

298

299

```java

300

// Cache converted schemas for performance

301

import java.util.concurrent.ConcurrentHashMap;

302

303

public class SchemaCache {

304

private final ConcurrentHashMap<RowType, CsvSchema> cache = new ConcurrentHashMap<>();

305

306

public CsvSchema getSchema(RowType rowType) {

307

return cache.computeIfAbsent(rowType, CsvRowSchemaConverter::convert);

308

}

309

310

public void clearCache() {

311

cache.clear();

312

}

313

}

314

315

// Usage

316

SchemaCache schemaCache = new SchemaCache();

317

CsvSchema schema = schemaCache.getSchema(rowType);

318

```

319

320

### Dynamic Schema Generation

321

322

```java

323

// Generate schema from table metadata

324

public CsvSchema createSchemaFromMetadata(TableSchema tableSchema) {

325

// Convert table schema to row type

326

RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();

327

328

// Convert to CSV schema

329

CsvSchema baseSchema = CsvRowSchemaConverter.convert(rowType);

330

331

// Apply dynamic customizations based on metadata

332

CsvSchema.Builder builder = baseSchema.rebuild();

333

334

// Add header if table has field names

335

if (tableSchema.getFieldNames().length > 0) {

336

builder.setUseHeader(true);

337

}

338

339

// Configure based on table properties

340

builder.setColumnSeparator(',');

341

builder.setQuoteChar('"');

342

343

return builder.build();

344

}

345

```

346

347

## Error Handling

348

349

The schema converter handles various error conditions gracefully:

350

351

- **Unsupported types**: Unknown types are mapped to STRING columns with warnings

352

- **Nested complexity**: Deep nesting is flattened with generated column names

353

- **Name conflicts**: Conflicting field names are resolved with numeric suffixes

354

- **Null handling**: Nullable types are properly represented in the schema

355

- **Type precision**: Maintains precision information where possible in Jackson schema

356

357

The converter ensures that any valid Flink `RowType` or `RowTypeInfo` can be converted to a usable Jackson `CsvSchema`, enabling seamless integration between Flink's type system and CSV processing workflows.