or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdindex.mdschema-conversion.mdserialization.mdstream-processing.md

serialization.mddocs/

0

# Data Serialization

1

2

Convert Flink's internal row data structures to CSV format using `CsvRowDataSerializationSchema` with extensive configuration options for different CSV dialects and integration with Kafka, Kinesis, and file sinks.

3

4

## Capabilities

5

6

### CsvRowDataSerializationSchema Class

7

8

Serialization schema that converts Flink's internal `RowData` objects to CSV byte arrays.

9

10

```java { .api }

11

/**

12

* Serialization schema for converting RowData to CSV format

13

* Implements Flink's SerializationSchema interface for integration with sinks

14

*/

15

public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {

16

17

/**

18

* Serialize a RowData object to CSV byte array

19

* @param element The RowData object to serialize

20

* @return byte array containing the CSV representation

21

*/

22

public byte[] serialize(RowData element);

23

24

/**

25

* Builder class for configuring CSV serialization options

26

*/

27

public static class Builder {

28

29

/**

30

* Create a builder with the specified row type

31

* @param rowType Flink RowType defining the structure of data to serialize

32

*/

33

public Builder(RowType rowType);

34

35

/**

36

* Set the field delimiter character (default: ',')

37

* @param delimiter Character to separate fields

38

* @return Builder instance for method chaining

39

*/

40

public Builder setFieldDelimiter(char delimiter);

41

42

/**

43

* Set the array element delimiter for complex types (default: ';')

44

* @param delimiter String to separate array elements

45

* @return Builder instance for method chaining

46

*/

47

public Builder setArrayElementDelimiter(String delimiter);

48

49

/**

50

* Disable quote character usage - fields will not be quoted

51

* @return Builder instance for method chaining

52

*/

53

public Builder disableQuoteCharacter();

54

55

/**

56

* Set the quote character for enclosing field values (default: '"')

57

* @param quoteCharacter Character to quote fields containing special characters

58

* @return Builder instance for method chaining

59

*/

60

public Builder setQuoteCharacter(char quoteCharacter);

61

62

/**

63

* Set the escape character for escaping special characters within fields

64

* @param escapeCharacter Character used for escaping (no default)

65

* @return Builder instance for method chaining

66

*/

67

public Builder setEscapeCharacter(char escapeCharacter);

68

69

/**

70

* Set the null literal string for representing null values

71

* @param nullLiteral String representation of null values (no default)

72

* @return Builder instance for method chaining

73

*/

74

public Builder setNullLiteral(String nullLiteral);

75

76

/**

77

* Control BigDecimal scientific notation output (default: true)

78

* @param writeInScientificNotation Whether to use scientific notation for BigDecimal

79

* @return Builder instance for method chaining

80

*/

81

public Builder setWriteBigDecimalInScientificNotation(boolean writeInScientificNotation);

82

83

/**

84

* Build the configured serialization schema

85

* @return CsvRowDataSerializationSchema instance with specified configuration

86

*/

87

public CsvRowDataSerializationSchema build();

88

}

89

}

90

```

91

92

## Usage Examples

93

94

### Basic Serialization

95

96

```java

97

import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;

98

import org.apache.flink.table.types.logical.RowType;

99

import org.apache.flink.table.types.logical.VarCharType;

100

import org.apache.flink.table.types.logical.IntType;

101

import org.apache.flink.table.types.logical.BooleanType;

102

103

// Define row type

104

RowType rowType = RowType.of(

105

new VarCharType(255), // name

106

new IntType(), // age

107

new BooleanType() // active

108

);

109

110

// Create serialization schema with default settings

111

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

112

.build();

113

114

// Serialize row data

115

byte[] csvBytes = schema.serialize(rowData);

116

String csvString = new String(csvBytes, StandardCharsets.UTF_8);

117

// Output: "John Doe,25,true"

118

```

119

120

### Custom Delimiter Configuration

121

122

```java

123

// Create schema with pipe delimiter and custom quote character

124

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

125

.setFieldDelimiter('|')

126

.setQuoteCharacter('\'')

127

.setArrayElementDelimiter("::")

128

.build();

129

130

// Output: 'John Doe'|25|true

131

```

132

133

### Null Value Handling

134

135

```java

136

// Configure null literal representation

137

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

138

.setNullLiteral("NULL")

139

.setEscapeCharacter('\\')

140

.build();

141

142

// Null values will be output as "NULL" instead of empty strings

143

```

144

145

### Numeric Formatting

146

147

```java

148

import org.apache.flink.table.types.logical.DecimalType;

149

150

// Row type with decimal field

151

RowType rowType = RowType.of(

152

new VarCharType(255), // name

153

new DecimalType(10, 2) // salary

154

);

155

156

// Control BigDecimal notation

157

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

158

.setWriteBigDecimalInScientificNotation(false) // Use decimal notation

159

.build();

160

161

// Large decimals will use decimal notation instead of scientific notation

162

```

163

164

### Disable Quoting

165

166

```java

167

// Create schema without field quoting

168

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

169

.disableQuoteCharacter()

170

.build();

171

172

// Fields will never be quoted, even if they contain special characters

173

```

174

175

## Integration with Sinks

176

177

### Kafka Producer

178

179

```java

180

import org.apache.flink.connector.kafka.sink.KafkaSink;

181

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

182

183

// Create CSV serialization schema

184

CsvRowDataSerializationSchema csvSchema = new CsvRowDataSerializationSchema.Builder(rowType)

185

.setFieldDelimiter(',')

186

.setQuoteCharacter('"')

187

.build();

188

189

// Create Kafka sink with CSV serialization

190

KafkaSink<RowData> kafkaSink = KafkaSink.<RowData>builder()

191

.setBootstrapServers("localhost:9092")

192

.setRecordSerializer(

193

KafkaRecordSerializationSchema.builder()

194

.setTopic("csv-topic")

195

.setValueSerializationSchema(csvSchema)

196

.build()

197

)

198

.build();

199

200

// Use with DataStream

201

dataStream.sinkTo(kafkaSink);

202

```

203

204

### File Sink

205

206

```java

207

import org.apache.flink.connector.file.sink.FileSink;

208

import org.apache.flink.core.io.SimpleVersionedSerializer;

209

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

210

211

// Create file sink with CSV serialization

212

FileSink<RowData> fileSink = FileSink

213

.forRowFormat(new Path("output/"), new SimpleStringEncoder<RowData>() {

214

@Override

215

public void encode(RowData element, OutputStream stream) throws IOException {

216

byte[] csvBytes = csvSchema.serialize(element);

217

stream.write(csvBytes);

218

stream.write('\n');

219

}

220

})

221

.build();

222

223

dataStream.sinkTo(fileSink);

224

```

225

226

### Custom Sink Function

227

228

```java

229

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

230

231

public class CsvSinkFunction implements SinkFunction<RowData> {

232

private final CsvRowDataSerializationSchema serializer;

233

234

public CsvSinkFunction(CsvRowDataSerializationSchema serializer) {

235

this.serializer = serializer;

236

}

237

238

@Override

239

public void invoke(RowData value, Context context) throws Exception {

240

byte[] csvBytes = serializer.serialize(value);

241

// Write to external system, file, database, etc.

242

writeToExternalSystem(csvBytes);

243

}

244

245

private void writeToExternalSystem(byte[] data) {

246

// Implementation specific to target system

247

}

248

}

249

250

// Use custom sink

251

dataStream.addSink(new CsvSinkFunction(csvSchema));

252

```

253

254

## Complex Type Handling

255

256

### Array Types

257

258

```java

259

import org.apache.flink.table.types.logical.ArrayType;

260

261

// Row type with array field

262

RowType rowType = RowType.of(

263

new VarCharType(255), // name

264

new ArrayType(new VarCharType(255)) // tags

265

);

266

267

// Configure array element delimiter

268

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

269

.setArrayElementDelimiter(";")

270

.build();

271

272

// Arrays will be serialized as: "John Doe","tag1;tag2;tag3"

273

```

274

275

### Nested Types

276

277

```java

278

import org.apache.flink.table.types.logical.RowType;

279

280

// Nested row type

281

RowType addressType = RowType.of(

282

new VarCharType(255), // street

283

new VarCharType(255) // city

284

);

285

286

RowType personType = RowType.of(

287

new VarCharType(255), // name

288

addressType // address

289

);

290

291

// Nested objects are flattened in CSV output

292

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(personType)

293

.build();

294

295

// Output: "John Doe","123 Main St","New York"

296

```

297

298

## Performance and Memory Considerations

299

300

### Schema Reuse

301

302

```java

303

// Create schema once and reuse across multiple serializations

304

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

305

.setFieldDelimiter(',')

306

.build();

307

308

// Reuse for multiple row serializations

309

for (RowData row : rows) {

310

byte[] csvBytes = schema.serialize(row);

311

// Process csvBytes

312

}

313

```

314

315

### Memory Efficiency

316

317

The serialization schema operates efficiently by:

318

- **Streaming serialization**: Writes directly to byte arrays without intermediate string creation

319

- **Minimal object allocation**: Reuses internal buffers and writers

320

- **Type-specific handling**: Optimized serialization paths for different data types

321

- **Lazy evaluation**: Only processes fields that contain actual data

322

323

## Error Handling

324

325

The serialization schema handles various error conditions:

326

327

- **Null values**: Configurable null literal representation or empty strings

328

- **Type conversion errors**: Automatic conversion between compatible types

329

- **Special characters**: Proper escaping and quoting of special characters

330

- **Overflow conditions**: Graceful handling of numeric overflow with configurable precision

331

- **Encoding issues**: UTF-8 encoding with proper character handling