Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-files@2.1.00
# Apache Flink File Connector
1
2
Apache Flink File Connector provides a unified data source and sink for reading and writing files in both batch and streaming modes. It supports various file formats through StreamFormat and BulkFormat interfaces, with features like continuous monitoring, file splitting, compression support, and distributed processing capabilities across different file systems and object stores.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-files
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Installation**: `<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>2.1.0</version></dependency>`
11
12
## Core Imports
13
14
```java
15
import org.apache.flink.connector.file.src.FileSource;
16
import org.apache.flink.connector.file.sink.FileSink;
17
import org.apache.flink.connector.file.src.reader.StreamFormat;
18
import org.apache.flink.connector.file.src.reader.BulkFormat;
19
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
20
import org.apache.flink.core.fs.Path;
21
```
22
23
## Basic Usage
24
25
### Reading Files
26
27
```java
28
import org.apache.flink.connector.file.src.FileSource;
29
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
30
import org.apache.flink.core.fs.Path;
31
import java.time.Duration;
32
33
// Create a file source for reading text files
34
FileSource<String> source = FileSource
35
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/path/to/input/files"))
36
.monitorContinuously(Duration.ofSeconds(10))
37
.build();
38
39
// Use in DataStream API
40
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
41
```
42
43
### Writing Files
44
45
```java
46
import org.apache.flink.connector.file.sink.FileSink;
47
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
48
import org.apache.flink.core.fs.Path;
49
50
// Create a file sink for writing text files
51
FileSink<String> sink = FileSink
52
.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
53
.withRollingPolicy(DefaultRollingPolicy.builder()
54
.withMaxPartSize(MemorySize.ofMebiBytes(128))
55
.withRolloverInterval(Duration.ofMinutes(15))
56
.build())
57
.build();
58
59
// Use in DataStream API
60
stream.sinkTo(sink);
61
```
62
63
## Architecture
64
65
The Flink File Connector is built around several key components:
66
67
- **File Sources**: `FileSource` provides unified reading with support for both streaming and batch modes
68
- **File Sinks**: `FileSink` handles writing with exactly-once semantics and file rolling
69
- **Format Interfaces**: `StreamFormat` for record-by-record reading, `BulkFormat` for batch-oriented reading
70
- **File Discovery**: `FileEnumerator` implementations discover files and create splits for parallel processing
71
- **Split Assignment**: `FileSplitAssigner` manages distribution of file splits to reader nodes with locality awareness
72
- **Compression Support**: Automatic decompression for common formats (.gz, .bz2, .xz, .deflate)
73
- **File Compaction**: Optional compaction system to merge small files for better performance
74
75
## Capabilities
76
77
### File Source Operations
78
79
Unified file reading with support for various formats, continuous monitoring, and distributed processing.
80
81
```java { .api }
82
public static <T> FileSourceBuilder<T> forRecordStreamFormat(
83
final StreamFormat<T> streamFormat, final Path... paths);
84
85
public static <T> FileSourceBuilder<T> forBulkFileFormat(
86
final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);
87
```
88
89
[File Sources](./file-sources.md)
90
91
### File Sink Operations
92
93
Unified file writing with exactly-once semantics, bucketing, rolling policies, and optional compaction.
94
95
```java { .api }
96
public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
97
final Path basePath, final Encoder<IN> encoder);
98
99
public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
100
final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);
101
```
102
103
[File Sinks](./file-sinks.md)
104
105
### Stream Format Interface
106
107
Interface for record-by-record file reading with automatic compression support.
108
109
```java { .api }
110
@PublicEvolving
111
public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
112
Reader<T> createReader(
113
Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
114
throws IOException;
115
116
Reader<T> restoreReader(
117
Configuration config,
118
FSDataInputStream stream,
119
long restoredOffset,
120
long fileLen,
121
long splitEnd)
122
throws IOException;
123
124
boolean isSplittable();
125
126
@Override
127
TypeInformation<T> getProducedType();
128
129
@PublicEvolving
130
interface Reader<T> extends Closeable {
131
@Nullable
132
T read() throws IOException;
133
134
@Override
135
void close() throws IOException;
136
137
@Nullable
138
default CheckpointedPosition getCheckpointedPosition() {
139
return null;
140
}
141
}
142
}
143
```
144
145
[Stream Formats](./stream-formats.md)
146
147
### Bulk Format Interface
148
149
Interface for batch-oriented reading optimized for formats like ORC and Parquet.
150
151
```java { .api }
152
@PublicEvolving
153
public interface BulkFormat<T, SplitT extends FileSourceSplit>
154
extends Serializable, ResultTypeQueryable<T> {
155
156
BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;
157
158
BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;
159
160
boolean isSplittable();
161
162
@Override
163
TypeInformation<T> getProducedType();
164
165
interface Reader<T> extends Closeable {
166
@Nullable
167
RecordIterator<T> readBatch() throws IOException;
168
169
@Override
170
void close() throws IOException;
171
}
172
173
interface RecordIterator<T> {
174
@Nullable
175
RecordAndPosition<T> next();
176
177
void releaseBatch();
178
}
179
}
180
```
181
182
[Bulk Formats](./bulk-formats.md)
183
184
### File Discovery and Enumeration
185
186
File discovery mechanisms for finding and splitting files across distributed storage systems.
187
188
```java { .api }
189
public interface FileEnumerator {
190
Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;
191
192
interface Provider extends Serializable {
193
FileEnumerator create();
194
}
195
}
196
```
197
198
[File Enumeration](./file-enumeration.md)
199
200
### Split Assignment
201
202
Split assignment strategies for distributing file processing across nodes with locality awareness.
203
204
```java { .api }
205
public interface FileSplitAssigner {
206
Optional<FileSourceSplit> getNext(String hostname);
207
void addSplits(Collection<FileSourceSplit> splits);
208
Collection<FileSourceSplit> remainingSplits();
209
210
interface Provider extends Serializable {
211
FileSplitAssigner create(Collection<FileSourceSplit> splits);
212
}
213
}
214
```
215
216
[Split Assignment](./split-assignment.md)
217
218
### File Compaction
219
220
File compaction system for merging small files to improve performance and reduce metadata overhead.
221
222
```java { .api }
223
public interface FileCompactor {
224
void compact(List<Path> inputFiles, Path outputFile) throws Exception;
225
}
226
227
public interface FileCompactStrategy {
228
long getSizeThreshold();
229
int getNumCheckpointsBeforeCompaction();
230
int getNumCompactThreads();
231
}
232
```
233
234
[File Compaction](./file-compaction.md)
235
236
## Types
237
238
### Core Split and Position Types
239
240
```java { .api }
241
public class FileSourceSplit {
242
public FileSourceSplit(String id, Path path, long offset, long length,
243
long modificationTime, long fileSize, String... hostnames);
244
public Path path();
245
public long offset();
246
public long length();
247
public long fileModificationTime();
248
public String[] hostnames();
249
public CheckpointedPosition getReaderPosition();
250
public FileSourceSplit updateWithCheckpointedPosition(CheckpointedPosition position);
251
}
252
253
public class CheckpointedPosition {
254
public static final long NO_OFFSET = -1L;
255
256
public CheckpointedPosition(long offset, long recordsAfterOffset);
257
public long getOffset();
258
public long getRecordsAfterOffset();
259
}
260
```
261
262
### Configuration Types
263
264
```java { .api }
265
public class ContinuousEnumerationSettings {
266
public ContinuousEnumerationSettings(Duration discoveryInterval);
267
public Duration getDiscoveryInterval();
268
}
269
```