or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

stream-formats.mddocs/

0

# Stream Formats

1

2

Stream formats provide record-by-record reading interfaces with automatic compression support for various file formats.

3

4

## Capabilities

5

6

### StreamFormat Interface

7

8

Core interface for implementing record-wise file reading with compression support.

9

10

```java { .api }

11

/**

12

* A reader format that reads individual records from a stream.

13

*

14

* The outer class StreamFormat acts mainly as a configuration holder and factory for the

15

* reader. The actual reading is done by the Reader, which is created based on

16

* an input stream in the createReader method and restored (from checkpointed positions)

17

* in the restoreReader method.

18

*

19

* Compared to the BulkFormat, the stream format handles a few things out-of-the-box,

20

* like deciding how to batch records or dealing with compression.

21

*/

22

@PublicEvolving

23

public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {

24

/**

25

* Creates a new reader to read in this format. This method is called when a fresh reader is

26

* created for a split that was assigned from the enumerator. This method may also be called on

27

* recovery from a checkpoint, if the reader never stored an offset in the checkpoint.

28

*

29

* If the format is splittable, then the stream is positioned to the beginning of the file split,

30

* otherwise it will be at position zero.

31

*

32

* The fileLen is the length of the entire file, while splitEnd is the offset

33

* of the first byte after the split end boundary (exclusive end boundary). For non-splittable

34

* formats, both values are identical.

35

*/

36

Reader<T> createReader(

37

Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)

38

throws IOException;

39

40

/**

41

* Restores a reader from a checkpointed position. This method is called when the reader is

42

* recovered from a checkpoint and the reader has previously stored an offset into the

43

* checkpoint, by returning from the Reader.getCheckpointedPosition() a value with

44

* non-negative offset. That value is supplied as the restoredOffset.

45

*

46

* If the format is splittable, then the stream is positioned to the beginning of the file split,

47

* otherwise it will be at position zero. The stream is NOT positioned to the checkpointed offset,

48

* because the format is free to interpret this offset in a different way than the byte offset in the file.

49

*/

50

Reader<T> restoreReader(

51

Configuration config,

52

FSDataInputStream stream,

53

long restoredOffset,

54

long fileLen,

55

long splitEnd)

56

throws IOException;

57

58

/**

59

* Checks whether this format is splittable. Splittable formats allow Flink to create multiple

60

* splits per file, so that Flink can read multiple regions of the file concurrently.

61

*/

62

boolean isSplittable();

63

64

/**

65

* Gets the type produced by this format. This type will be the type produced by the file source

66

* as a whole.

67

*/

68

@Override

69

TypeInformation<T> getProducedType();

70

71

/**

72

* The config option to define how many bytes to be read by the I/O thread in one fetch

73

* operation.

74

*/

75

ConfigOption<MemorySize> FETCH_IO_SIZE =

76

ConfigOptions.key("source.file.stream.io-fetch-size")

77

.memoryType()

78

.defaultValue(MemorySize.ofMebiBytes(1L))

79

.withDescription(

80

"The approximate of bytes per fetch that is passed from the I/O thread to file reader.");

81

}

82

```

83

84

### StreamFormat.Reader Interface

85

86

Nested interface for reading individual records from a stream.

87

88

```java { .api }

89

/**

90

* The actual reader that reads the records.

91

*/

92

@PublicEvolving

93

public interface Reader<T> extends Closeable {

94

95

/**

96

* Reads the next record. Returns null when the input has reached its end.

97

*/

98

@Nullable

99

T read() throws IOException;

100

101

/**

102

* Closes the reader to release all resources.

103

*/

104

@Override

105

void close() throws IOException;

106

107

/**

108

* Optionally returns the current position of the reader. This can be implemented by readers

109

* that want to speed up recovery from a checkpoint.

110

*

111

* The current position of the reader is the position of the next record that will be

112

* returned in a call to read(). This can be implemented by readers that want to

113

* speed up recovery from a checkpoint.

114

*/

115

@Nullable

116

default CheckpointedPosition getCheckpointedPosition() {

117

return null;

118

}

119

}

120

```

121

122

### SimpleStreamFormat

123

124

Abstract base class for non-splittable stream formats.

125

126

```java { .api }

127

/**

128

* Simplified stream format for non-splittable files

129

*/

130

public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {

131

/**

132

* Creates a reader for the entire file stream (simplified interface)

133

* @param config Configuration for the reader

134

* @param stream Input stream to read from

135

* @return Reader instance for reading records

136

* @throws IOException If reader creation fails

137

*/

138

public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream)

139

throws IOException;

140

141

/**

142

* Always returns false for simple formats

143

* @return false (simple formats are not splittable)

144

*/

145

public final boolean isSplittable() {

146

return false;

147

}

148

}

149

```

150

151

### TextLineInputFormat

152

153

Built-in implementation for reading text files line by line.

154

155

```java { .api }

156

/**

157

* Stream format for reading text files line by line with charset support

158

*/

159

public class TextLineInputFormat extends SimpleStreamFormat<String> {

160

/**

161

* Creates TextLineInputFormat with UTF-8 encoding

162

*/

163

public TextLineInputFormat();

164

165

/**

166

* Creates TextLineInputFormat with specified charset

167

* @param charsetName Name of charset to use for decoding

168

*/

169

public TextLineInputFormat(String charsetName);

170

171

/**

172

* Creates a reader for reading text lines

173

* @param config Configuration for the reader

174

* @param stream Input stream to read from

175

* @return Reader that returns String lines

176

* @throws IOException If reader creation fails

177

*/

178

public Reader<String> createReader(Configuration config, FSDataInputStream stream)

179

throws IOException;

180

181

/**

182

* Returns TypeInformation for String output

183

* @return TypeInformation describing String type

184

*/

185

public TypeInformation<String> getProducedType() {

186

return Types.STRING;

187

}

188

}

189

```

190

191

**Usage Examples:**

192

193

```java

194

import org.apache.flink.connector.file.src.FileSource;

195

import org.apache.flink.connector.file.src.reader.TextLineInputFormat;

196

import org.apache.flink.connector.file.src.reader.StreamFormat;

197

import org.apache.flink.core.fs.Path;

198

199

// Reading text files with UTF-8 encoding

200

FileSource<String> textSource = FileSource

201

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/logs"))

202

.build();

203

204

// Reading text files with custom encoding

205

FileSource<String> customEncodingSource = FileSource

206

.forRecordStreamFormat(new TextLineInputFormat("ISO-8859-1"), new Path("/data/legacy"))

207

.build();

208

209

// Using the source in a Flink job

210

DataStream<String> lines = env.fromSource(textSource, WatermarkStrategy.noWatermarks(), "text-source");

211

```

212

213

### Custom StreamFormat Implementation

214

215

Example of implementing a custom stream format.

216

217

```java { .api }

218

/**

219

* Example custom stream format for reading CSV records

220

*/

221

public class CsvStreamFormat implements StreamFormat<String[]> {

222

private final String delimiter;

223

224

public CsvStreamFormat(String delimiter) {

225

this.delimiter = delimiter;

226

}

227

228

@Override

229

public Reader<String[]> createReader(

230

Configuration config,

231

FSDataInputStream stream,

232

long fileLen,

233

long splitEnd) throws IOException {

234

return new CsvReader(stream, splitEnd, delimiter);

235

}

236

237

@Override

238

public Reader<String[]> restoreReader(

239

Configuration config,

240

FSDataInputStream stream,

241

long fileLen,

242

long splitEnd,

243

long checkpointedOffset) throws IOException {

244

stream.seek(checkpointedOffset);

245

return new CsvReader(stream, splitEnd, delimiter);

246

}

247

248

@Override

249

public boolean isSplittable() {

250

return true; // CSV can be split at line boundaries

251

}

252

253

@Override

254

public TypeInformation<String[]> getProducedType() {

255

return Types.OBJECT_ARRAY(Types.STRING);

256

}

257

258

private static class CsvReader implements StreamFormat.Reader<String[]> {

259

private final BufferedReader reader;

260

private final long splitEnd;

261

private final String delimiter;

262

private long bytesRead = 0;

263

264

public CsvReader(FSDataInputStream stream, long splitEnd, String delimiter) {

265

this.reader = new BufferedReader(new InputStreamReader(stream));

266

this.splitEnd = splitEnd;

267

this.delimiter = delimiter;

268

}

269

270

@Override

271

public String[] read() throws IOException {

272

if (bytesRead >= splitEnd) {

273

return null;

274

}

275

276

String line = reader.readLine();

277

if (line == null) {

278

return null;

279

}

280

281

bytesRead += line.getBytes().length + 1; // +1 for newline

282

return line.split(delimiter);

283

}

284

}

285

}

286

```

287

288

### Compression Support Integration

289

290

Stream formats automatically support compression through the compression detection system.

291

292

```java { .api }

293

/**

294

* Stream formats automatically detect and handle compressed files

295

* Supported extensions: .gz, .gzip, .bz2, .xz, .deflate

296

*/

297

298

// Reading compressed text files - compression is handled automatically

299

FileSource<String> compressedSource = FileSource

300

.forRecordStreamFormat(new TextLineInputFormat(),

301

new Path("/data/logs.gz"),

302

new Path("/data/archive.bz2"))

303

.build();

304

305

// Custom format with compression support

306

FileSource<String[]> compressedCsvSource = FileSource

307

.forRecordStreamFormat(new CsvStreamFormat(","), new Path("/data/data.csv.gz"))

308

.build();

309

```

310

311

## Error Handling

312

313

Stream formats handle various error conditions during reading:

314

315

- **IOException**: File system read errors, stream corruption

316

- **UnsupportedEncodingException**: Invalid charset specifications

317

- **EOFException**: Unexpected end of file during reading

318

- **RuntimeException**: Format-specific parsing errors

319

320

```java

321

try {

322

StreamFormat<String> format = new TextLineInputFormat("INVALID-CHARSET");

323

} catch (UnsupportedEncodingException e) {

324

// Handle invalid charset

325

}

326

327

// Reader error handling

328

StreamFormat.Reader<String> reader = format.createReader(config, stream, fileLen, splitEnd);

329

try {

330

String record;

331

while ((record = reader.read()) != null) {

332

// Process record

333

}

334

} catch (IOException e) {

335

// Handle read errors

336

}

337

```

338

339

## Performance Considerations

340

341

- Implement `isSplittable()` correctly - splittable formats can be processed in parallel

342

- Use appropriate buffer sizes in custom readers for optimal I/O performance

343

- Consider memory usage when reading large records or implementing custom formats

344

- Compression detection adds minimal overhead and improves storage efficiency

345

- For high-throughput scenarios, consider BulkFormat instead of StreamFormat

346

- Implement proper checkpointing support for exactly-once processing guarantees