or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

index.mddocs/

0

# Apache Flink File Connector

1

2

Apache Flink File Connector provides a unified data source and sink for reading and writing files in both batch and streaming modes. It supports various file formats through StreamFormat and BulkFormat interfaces, with features like continuous monitoring, file splitting, compression support, and distributed processing capabilities across different file systems and object stores.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-files

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>2.1.0</version></dependency>`

11

12

## Core Imports

13

14

```java

15

import org.apache.flink.connector.file.src.FileSource;

16

import org.apache.flink.connector.file.sink.FileSink;

17

import org.apache.flink.connector.file.src.reader.StreamFormat;

18

import org.apache.flink.connector.file.src.reader.BulkFormat;

19

import org.apache.flink.connector.file.src.reader.TextLineInputFormat;

20

import org.apache.flink.core.fs.Path;

21

```

22

23

## Basic Usage

24

25

### Reading Files

26

27

```java

28

import org.apache.flink.connector.file.src.FileSource;

29

import org.apache.flink.connector.file.src.reader.TextLineInputFormat;

30

import org.apache.flink.core.fs.Path;

31

import java.time.Duration;

32

33

// Create a file source for reading text files

34

FileSource<String> source = FileSource

35

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/path/to/input/files"))

36

.monitorContinuously(Duration.ofSeconds(10))

37

.build();

38

39

// Use in DataStream API

40

DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

41

```

42

43

### Writing Files

44

45

```java

46

import org.apache.flink.connector.file.sink.FileSink;

47

import org.apache.flink.api.common.serialization.SimpleStringEncoder;

48

import org.apache.flink.core.fs.Path;

49

50

// Create a file sink for writing text files

51

FileSink<String> sink = FileSink

52

.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))

53

.withRollingPolicy(DefaultRollingPolicy.builder()

54

.withMaxPartSize(MemorySize.ofMebiBytes(128))

55

.withRolloverInterval(Duration.ofMinutes(15))

56

.build())

57

.build();

58

59

// Use in DataStream API

60

stream.sinkTo(sink);

61

```

62

63

## Architecture

64

65

The Flink File Connector is built around several key components:

66

67

- **File Sources**: `FileSource` provides unified reading with support for both streaming and batch modes

68

- **File Sinks**: `FileSink` handles writing with exactly-once semantics and file rolling

69

- **Format Interfaces**: `StreamFormat` for record-by-record reading, `BulkFormat` for batch-oriented reading

70

- **File Discovery**: `FileEnumerator` implementations discover files and create splits for parallel processing

71

- **Split Assignment**: `FileSplitAssigner` manages distribution of file splits to reader nodes with locality awareness

72

- **Compression Support**: Automatic decompression for common formats (.gz, .bz2, .xz, .deflate)

73

- **File Compaction**: Optional compaction system to merge small files for better performance

74

75

## Capabilities

76

77

### File Source Operations

78

79

Unified file reading with support for various formats, continuous monitoring, and distributed processing.

80

81

```java { .api }

82

public static <T> FileSourceBuilder<T> forRecordStreamFormat(

83

final StreamFormat<T> streamFormat, final Path... paths);

84

85

public static <T> FileSourceBuilder<T> forBulkFileFormat(

86

final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);

87

```

88

89

[File Sources](./file-sources.md)

90

91

### File Sink Operations

92

93

Unified file writing with exactly-once semantics, bucketing, rolling policies, and optional compaction.

94

95

```java { .api }

96

public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(

97

final Path basePath, final Encoder<IN> encoder);

98

99

public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(

100

final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);

101

```

102

103

[File Sinks](./file-sinks.md)

104

105

### Stream Format Interface

106

107

Interface for record-by-record file reading with automatic compression support.

108

109

```java { .api }

110

@PublicEvolving

111

public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {

112

Reader<T> createReader(

113

Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)

114

throws IOException;

115

116

Reader<T> restoreReader(

117

Configuration config,

118

FSDataInputStream stream,

119

long restoredOffset,

120

long fileLen,

121

long splitEnd)

122

throws IOException;

123

124

boolean isSplittable();

125

126

@Override

127

TypeInformation<T> getProducedType();

128

129

@PublicEvolving

130

interface Reader<T> extends Closeable {

131

@Nullable

132

T read() throws IOException;

133

134

@Override

135

void close() throws IOException;

136

137

@Nullable

138

default CheckpointedPosition getCheckpointedPosition() {

139

return null;

140

}

141

}

142

}

143

```

144

145

[Stream Formats](./stream-formats.md)

146

147

### Bulk Format Interface

148

149

Interface for batch-oriented reading optimized for formats like ORC and Parquet.

150

151

```java { .api }

152

@PublicEvolving

153

public interface BulkFormat<T, SplitT extends FileSourceSplit>

154

extends Serializable, ResultTypeQueryable<T> {

155

156

BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;

157

158

BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;

159

160

boolean isSplittable();

161

162

@Override

163

TypeInformation<T> getProducedType();

164

165

interface Reader<T> extends Closeable {

166

@Nullable

167

RecordIterator<T> readBatch() throws IOException;

168

169

@Override

170

void close() throws IOException;

171

}

172

173

interface RecordIterator<T> {

174

@Nullable

175

RecordAndPosition<T> next();

176

177

void releaseBatch();

178

}

179

}

180

```

181

182

[Bulk Formats](./bulk-formats.md)

183

184

### File Discovery and Enumeration

185

186

File discovery mechanisms for finding and splitting files across distributed storage systems.

187

188

```java { .api }

189

public interface FileEnumerator {

190

Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;

191

192

interface Provider extends Serializable {

193

FileEnumerator create();

194

}

195

}

196

```

197

198

[File Enumeration](./file-enumeration.md)

199

200

### Split Assignment

201

202

Split assignment strategies for distributing file processing across nodes with locality awareness.

203

204

```java { .api }

205

public interface FileSplitAssigner {

206

Optional<FileSourceSplit> getNext(String hostname);

207

void addSplits(Collection<FileSourceSplit> splits);

208

Collection<FileSourceSplit> remainingSplits();

209

210

interface Provider extends Serializable {

211

FileSplitAssigner create(Collection<FileSourceSplit> splits);

212

}

213

}

214

```

215

216

[Split Assignment](./split-assignment.md)

217

218

### File Compaction

219

220

File compaction system for merging small files to improve performance and reduce metadata overhead.

221

222

```java { .api }

223

public interface FileCompactor {

224

void compact(List<Path> inputFiles, Path outputFile) throws Exception;

225

}

226

227

public interface FileCompactStrategy {

228

long getSizeThreshold();

229

int getNumCheckpointsBeforeCompaction();

230

int getNumCompactThreads();

231

}

232

```

233

234

[File Compaction](./file-compaction.md)

235

236

## Types

237

238

### Core Split and Position Types

239

240

```java { .api }

241

public class FileSourceSplit {

242

public FileSourceSplit(String id, Path path, long offset, long length,

243

long modificationTime, long fileSize, String... hostnames);

244

public Path path();

245

public long offset();

246

public long length();

247

public long fileModificationTime();

248

public String[] hostnames();

249

public CheckpointedPosition getReaderPosition();

250

public FileSourceSplit updateWithCheckpointedPosition(CheckpointedPosition position);

251

}

252

253

public class CheckpointedPosition {

254

public static final long NO_OFFSET = -1L;

255

256

public CheckpointedPosition(long offset, long recordsAfterOffset);

257

public long getOffset();

258

public long getRecordsAfterOffset();

259

}

260

```

261

262

### Configuration Types

263

264

```java { .api }

265

public class ContinuousEnumerationSettings {

266

public ContinuousEnumerationSettings(Duration discoveryInterval);

267

public Duration getDiscoveryInterval();

268

}

269

```