or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdindex.mdschema-conversion.mdserialization.mdstream-processing.md

index.mddocs/

0

# Apache Flink CSV Format

1

2

Apache Flink CSV format module provides comprehensive CSV format support for Apache Flink, enabling reading and writing of CSV data in both stream and batch processing scenarios. It includes serialization and deserialization schemas for converting between CSV format and Flink's internal row data structures, bulk writers for efficient CSV output, input formats for reading CSV files, and configurable options for handling various CSV dialects.

3

4

## Package Information

5

6

- **Package Name**: flink-csv

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-csv

11

- **Version**: 2.1.0

12

- **Installation**: Add to Maven dependencies:

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-csv</artifactId>

18

<version>2.1.0</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

```java

25

import org.apache.flink.formats.csv.CsvReaderFormat;

26

import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;

27

import org.apache.flink.formats.csv.RowCsvInputFormat;

28

import org.apache.flink.formats.csv.CsvFormatOptions;

29

import org.apache.flink.formats.csv.CsvRowSchemaConverter;

30

```

31

32

## Basic Usage

33

34

### Reading CSV files with DataStream API

35

36

```java

37

import org.apache.flink.connector.file.src.FileSource;

38

import org.apache.flink.formats.csv.CsvReaderFormat;

39

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

40

41

// Create CSV reader format for POJO

42

CsvReaderFormat<MyPojo> csvFormat = CsvReaderFormat.forPojo(MyPojo.class);

43

44

// Create file source

45

FileSource<MyPojo> source = FileSource

46

.forRecordStreamFormat(csvFormat, path)

47

.build();

48

49

// Use in streaming job

50

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

51

DataStream<MyPojo> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "csv-source");

52

```

53

54

### Writing CSV data with serialization schema

55

56

```java

57

import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;

58

import org.apache.flink.table.types.logical.RowType;

59

60

// Create serialization schema

61

CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)

62

.setFieldDelimiter('|')

63

.setQuoteCharacter('"')

64

.setIgnoreParseErrors(true)

65

.build();

66

67

// Serialize row data

68

byte[] csvBytes = schema.serialize(rowData);

69

```

70

71

### Batch processing with input format

72

73

```java

74

import org.apache.flink.formats.csv.RowCsvInputFormat;

75

import org.apache.flink.core.fs.Path;

76

77

// Create CSV input format

78

RowCsvInputFormat inputFormat = RowCsvInputFormat

79

.builder(typeInfo, new Path("path/to/file.csv"))

80

.setFieldDelimiter(',')

81

.setIgnoreParseErrors(true)

82

.build();

83

84

// Use with DataSet API

85

DataSet<Row> csvData = env.createInput(inputFormat);

86

```

87

88

## Architecture

89

90

The Apache Flink CSV format module is organized around several key components:

91

92

- **Reader Formats**: `CsvReaderFormat` for streaming file sources with type-safe POJO and schema-based reading

93

- **Serialization Schemas**: `CsvRowDataSerializationSchema` for converting internal row data to CSV bytes

94

- **Input Formats**: `RowCsvInputFormat` for batch processing of CSV files

95

- **Configuration Options**: `CsvFormatOptions` defining all configurable CSV dialect options

96

- **Schema Conversion**: `CsvRowSchemaConverter` for converting Flink type information to Jackson CSV schemas

97

- **Format Factories**: Internal factories for Table API/SQL integration with automatic format discovery

98

99

## Capabilities

100

101

### Stream Processing (Reader Formats)

102

103

Streaming CSV file processing with type-safe deserialization, automatic schema detection, and configurable error handling.

104

105

```java { .api }

106

public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {

107

public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);

108

public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);

109

public CsvReaderFormat<T> withIgnoreParseErrors();

110

}

111

```

112

113

[Stream Processing](./stream-processing.md)

114

115

### Data Serialization

116

117

Convert Flink's internal row data structures to CSV format with extensive configuration options for different CSV dialects.

118

119

```java { .api }

120

public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {

121

public byte[] serialize(RowData element);

122

123

public static class Builder {

124

public Builder(RowType rowType);

125

public Builder setFieldDelimiter(char delimiter);

126

public Builder setQuoteCharacter(char quoteCharacter);

127

public Builder disableQuoteCharacter();

128

public Builder setEscapeCharacter(char escapeCharacter);

129

public CsvRowDataSerializationSchema build();

130

}

131

}

132

```

133

134

[Data Serialization](./serialization.md)

135

136

### Batch Processing (Input Formats)

137

138

Traditional batch processing of CSV files with DataSet API integration and configurable field selection.

139

140

```java { .api }

141

public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {

142

public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);

143

144

public static class Builder {

145

public Builder setFieldDelimiter(char delimiter);

146

public Builder setQuoteCharacter(char quoteCharacter);

147

public Builder setIgnoreParseErrors(boolean ignoreParseErrors);

148

public Builder setSelectedFields(int[] selectedFields);

149

public RowCsvInputFormat build();

150

}

151

}

152

```

153

154

[Batch Processing](./batch-processing.md)

155

156

### Configuration Options

157

158

Comprehensive configuration system for handling various CSV dialects and processing requirements.

159

160

```java { .api }

161

public class CsvFormatOptions {

162

public static final ConfigOption<String> FIELD_DELIMITER;

163

public static final ConfigOption<String> QUOTE_CHARACTER;

164

public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER;

165

public static final ConfigOption<Boolean> ALLOW_COMMENTS;

166

public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;

167

public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER;

168

public static final ConfigOption<String> ESCAPE_CHARACTER;

169

public static final ConfigOption<String> NULL_LITERAL;

170

public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;

171

}

172

```

173

174

[Configuration Options](./configuration.md)

175

176

### Schema Conversion

177

178

Utility functions for converting between Flink type information and Jackson CSV schemas.

179

180

```java { .api }

181

public class CsvRowSchemaConverter {

182

public static CsvSchema convert(RowTypeInfo rowType);

183

public static CsvSchema convert(RowType rowType);

184

}

185

```

186

187

[Schema Conversion](./schema-conversion.md)

188

189

## Types

190

191

```java { .api }

192

// Core Flink types used throughout the API

193

import org.apache.flink.table.data.RowData;

194

import org.apache.flink.table.types.logical.RowType;

195

import org.apache.flink.api.common.typeinfo.TypeInformation;

196

import org.apache.flink.api.java.typeutils.RowTypeInfo;

197

import org.apache.flink.types.Row;

198

import org.apache.flink.core.fs.Path;

199

import org.apache.flink.configuration.ConfigOption;

200

201

// Jackson CSV types for schema definition

202

import com.fasterxml.jackson.dataformat.csv.CsvSchema;

203

204

// Serialization interfaces

205

import org.apache.flink.api.common.serialization.SerializationSchema;

206

207

// File processing interfaces

208

import org.apache.flink.connector.file.src.reader.StreamFormat;

209

import org.apache.flink.api.common.io.FileInputFormat;

210

```