or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdindex.mdschema-conversion.mdserialization.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Streaming CSV file processing with the `CsvReaderFormat` class, providing type-safe deserialization, automatic schema detection, and configurable error handling for DataStream API integration.

3

4

## Capabilities

5

6

### CsvReaderFormat Class

7

8

Main class for reading CSV files in streaming applications, extending Flink's `SimpleStreamFormat`.

9

10

```java { .api }

11

/**

12

* StreamFormat for reading CSV files with type safety and error handling

13

* @param <T> The type of objects to deserialize CSV records into

14

*/

15

public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {

16

17

/**

18

* Create a CSV reader format for POJO classes with automatic schema detection

19

* @param pojoType The POJO class to deserialize into

20

* @return CsvReaderFormat instance configured for the POJO type

21

*/

22

public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);

23

24

/**

25

* Create a CSV reader format with predefined schema and type information

26

* @param schema Jackson CsvSchema defining the CSV structure

27

* @param typeInformation Flink TypeInformation for the target type

28

* @return CsvReaderFormat instance configured with the provided schema

29

*/

30

public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);

31

32

/**

33

* Create a CSV reader format with custom mapper factory and schema generator

34

* @param mapperFactory Supplier for creating Jackson CsvMapper instances

35

* @param schemaGenerator Function to generate CsvSchema from CsvMapper

36

* @param typeInformation Flink TypeInformation for the target type

37

* @return CsvReaderFormat instance with custom configuration

38

*/

39

public static <T> CsvReaderFormat<T> forSchema(

40

SerializableSupplier<CsvMapper> mapperFactory,

41

SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,

42

TypeInformation<T> typeInformation

43

);

44

45

/**

46

* Configure the reader to ignore parsing errors and skip malformed records

47

* @return New CsvReaderFormat instance with error ignoring enabled

48

*/

49

public CsvReaderFormat<T> withIgnoreParseErrors();

50

51

/**

52

* Create a reader instance for processing a specific input stream

53

* @param config Flink configuration

54

* @param stream Input stream to read from

55

* @return StreamFormat.Reader instance for reading records

56

*/

57

public StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream);

58

59

/**

60

* Get the type information for objects produced by this format

61

* @return TypeInformation describing the output type

62

*/

63

public TypeInformation<T> getProducedType();

64

}

65

```

66

67

## Usage Examples

68

69

### Basic POJO Reading

70

71

```java

72

import org.apache.flink.formats.csv.CsvReaderFormat;

73

import org.apache.flink.connector.file.src.FileSource;

74

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

75

76

// Define POJO class

77

public class Person {

78

public String name;

79

public int age;

80

public boolean active;

81

82

public Person() {} // Default constructor required

83

}

84

85

// Create CSV reader format

86

CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forPojo(Person.class);

87

88

// Create file source

89

FileSource<Person> source = FileSource

90

.forRecordStreamFormat(csvFormat, new Path("persons.csv"))

91

.build();

92

93

// Use in streaming job

94

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

95

DataStream<Person> persons = env.fromSource(

96

source,

97

WatermarkStrategy.noWatermarks(),

98

"csv-source"

99

);

100

101

persons.print();

102

env.execute("CSV Reading Job");

103

```

104

105

### Schema-Based Reading

106

107

```java

108

import com.fasterxml.jackson.dataformat.csv.CsvSchema;

109

import org.apache.flink.api.common.typeinfo.TypeInformation;

110

111

// Define custom schema

112

CsvSchema schema = CsvSchema.builder()

113

.addColumn("name", CsvSchema.ColumnType.STRING)

114

.addColumn("age", CsvSchema.ColumnType.NUMBER)

115

.addColumn("salary", CsvSchema.ColumnType.NUMBER)

116

.setUseHeader(true)

117

.setColumnSeparator('|')

118

.build();

119

120

// Create reader format with schema

121

CsvReaderFormat<Row> csvFormat = CsvReaderFormat.forSchema(

122

schema,

123

TypeInformation.of(Row.class)

124

);

125

126

// Use with file source

127

FileSource<Row> source = FileSource

128

.forRecordStreamFormat(csvFormat, new Path("employees.csv"))

129

.build();

130

```

131

132

### Error Handling

133

134

```java

135

// Create reader format that ignores parsing errors

136

CsvReaderFormat<Person> csvFormat = CsvReaderFormat

137

.forPojo(Person.class)

138

.withIgnoreParseErrors();

139

140

// Malformed records will be skipped instead of causing job failure

141

FileSource<Person> source = FileSource

142

.forRecordStreamFormat(csvFormat, new Path("dirty-data.csv"))

143

.build();

144

145

DataStream<Person> cleanData = env.fromSource(

146

source,

147

WatermarkStrategy.noWatermarks(),

148

"fault-tolerant-csv-source"

149

);

150

```

151

152

### Custom Mapper Configuration

153

154

```java

155

import com.fasterxml.jackson.dataformat.csv.CsvMapper;

156

import com.fasterxml.jackson.databind.DeserializationFeature;

157

158

// Create custom mapper factory

159

SerializableSupplier<CsvMapper> mapperFactory = () -> {

160

CsvMapper mapper = new CsvMapper();

161

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

162

mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);

163

return mapper;

164

};

165

166

// Create schema generator

167

SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper ->

168

mapper.schemaFor(Person.class).withHeader();

169

170

// Create reader format with custom configuration

171

CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forSchema(

172

mapperFactory,

173

schemaGenerator,

174

TypeInformation.of(Person.class)

175

);

176

```

177

178

## Integration with File Sources

179

180

### Monitoring Directory

181

182

```java

183

import org.apache.flink.connector.file.src.enumerate.ContinuousFileEnumerator;

184

185

// Monitor directory for new CSV files

186

FileSource<Person> source = FileSource

187

.forRecordStreamFormat(csvFormat, new Path("input-directory/"))

188

.monitorContinuously(Duration.ofSeconds(10))

189

.build();

190

191

DataStream<Person> stream = env.fromSource(

192

source,

193

WatermarkStrategy.noWatermarks(),

194

"monitoring-csv-source"

195

);

196

```

197

198

### Processing Multiple Files

199

200

```java

201

// Process multiple CSV files with same format

202

FileSource<Person> source = FileSource

203

.forRecordStreamFormat(

204

csvFormat,

205

new Path("file1.csv"),

206

new Path("file2.csv"),

207

new Path("file3.csv")

208

)

209

.build();

210

```

211

212

## Type Safety and Validation

213

214

The `CsvReaderFormat` provides compile-time type safety through generic type parameters and runtime validation through Jackson's deserialization:

215

216

- **Compile-time safety**: Generic type parameter `<T>` ensures type consistency

217

- **Runtime validation**: Jackson validates field types and structure during deserialization

218

- **Error recovery**: `withIgnoreParseErrors()` allows graceful handling of malformed records

219

- **Schema validation**: Custom schemas can enforce strict field requirements

220

221

## Performance Considerations

222

223

- **Schema reuse**: Create `CsvReaderFormat` instances once and reuse them across multiple sources

224

- **POJO optimization**: Simple POJOs with public fields provide better performance than complex beans

225

- **Error handling**: Enable `withIgnoreParseErrors()` for better throughput when data quality issues are expected

226

- **File splitting**: Large CSV files are automatically split for parallel processing by Flink's file source