0
# File Sinks
1
2
File sinks provide unified writing capabilities with exactly-once semantics, bucketing, rolling policies, and optional file compaction for improved performance.
3
4
## Capabilities
5
6
### FileSink Class
7
8
Main entry point for creating file sinks that write to distributed file systems with exactly-once guarantees.
9
10
```java { .api }
11
/**
12
* A unified sink that emits its input elements to FileSystem files within buckets. This
13
* sink achieves exactly-once semantics for both BATCH and STREAMING.
14
*
15
* When creating the sink a basePath must be specified. The base directory contains one
16
* directory for every bucket. The bucket directories themselves contain several part files, with at
17
* least one for each parallel subtask of the sink which is writing data to that bucket. These part
18
* files contain the actual output data.
19
*
20
* The sink uses a BucketAssigner to determine in which bucket directory each element
21
* should be written to inside the base directory. The BucketAssigner can, for example, roll
22
* on every checkpoint or use time or a property of the element to determine the bucket directory.
23
* The default BucketAssigner is a DateTimeBucketAssigner which will create one new
24
* bucket every hour.
25
*/
26
@Experimental
27
public class FileSink<IN>
28
implements Sink<IN>,
29
SupportsWriterState<IN, FileWriterBucketState>,
30
SupportsCommitter<FileSinkCommittable>,
31
SupportsWriterState.WithCompatibleState,
32
SupportsPreCommitTopology<FileSinkCommittable, FileSinkCommittable>,
33
SupportsConcurrentExecutionAttempts {
34
35
/**
36
* Creates a FileSink for row-wise writing using encoders.
37
* The created sink will write each record in a separate line separated by line delimiters.
38
*/
39
public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
40
final Path basePath, final Encoder<IN> encoder);
41
42
/**
43
* Creates a FileSink for bulk writing using BulkWriter factories.
44
* This is suitable for formats such as Parquet or ORC.
45
*/
46
public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
47
final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);
48
}
49
```
50
51
### FileSink.DefaultRowFormatBuilder
52
53
Builder for configuring row-format file sinks with encoders.
54
55
```java { .api }
56
/**
57
* Builder for row-format file sinks
58
*/
59
public static class DefaultRowFormatBuilder<IN> {
60
/**
61
* Sets custom bucketing strategy for organizing output files
62
* @param bucketAssigner Strategy for assigning records to buckets
63
* @return Builder instance for chaining
64
*/
65
public DefaultRowFormatBuilder<IN> withBucketAssigner(
66
BucketAssigner<IN, String> bucketAssigner);
67
68
/**
69
* Sets rolling policy for when to create new files
70
* @param rollingPolicy Policy controlling file rolling behavior
71
* @return Builder instance for chaining
72
*/
73
public DefaultRowFormatBuilder<IN> withRollingPolicy(
74
RollingPolicy<IN, String> rollingPolicy);
75
76
/**
77
* Sets output file configuration for naming and format
78
* @param outputFileConfig Configuration for output file properties
79
* @return Builder instance for chaining
80
*/
81
public DefaultRowFormatBuilder<IN> withOutputFileConfig(
82
OutputFileConfig outputFileConfig);
83
84
/**
85
* Enables file compaction to merge small files
86
* @param compactStrategy Strategy for triggering compaction
87
* @param compactor Implementation for compacting files
88
* @return Builder instance for chaining
89
*/
90
public DefaultRowFormatBuilder<IN> enableCompact(
91
FileCompactStrategy compactStrategy, FileCompactor compactor);
92
93
/**
94
* Builds the final FileSink instance
95
* @return Configured FileSink
96
*/
97
public FileSink<IN> build();
98
}
99
```
100
101
### FileSink.DefaultBulkFormatBuilder
102
103
Builder for configuring bulk-format file sinks.
104
105
```java { .api }
106
/**
107
* Builder for bulk-format file sinks
108
*/
109
public static class DefaultBulkFormatBuilder<IN> {
110
/**
111
* Sets custom bucketing strategy for organizing output files
112
* @param bucketAssigner Strategy for assigning records to buckets
113
* @return Builder instance for chaining
114
*/
115
public DefaultBulkFormatBuilder<IN> withBucketAssigner(
116
BucketAssigner<IN, String> bucketAssigner);
117
118
/**
119
* Sets rolling policy for when to create new files
120
* @param rollingPolicy Policy controlling file rolling behavior
121
* @return Builder instance for chaining
122
*/
123
public DefaultBulkFormatBuilder<IN> withRollingPolicy(
124
RollingPolicy<IN, String> rollingPolicy);
125
126
/**
127
* Sets output file configuration for naming and format
128
* @param outputFileConfig Configuration for output file properties
129
* @return Builder instance for chaining
130
*/
131
public DefaultBulkFormatBuilder<IN> withOutputFileConfig(
132
OutputFileConfig outputFileConfig);
133
134
/**
135
* Enables file compaction to merge small files
136
* @param compactStrategy Strategy for triggering compaction
137
* @param compactor Implementation for compacting files
138
* @return Builder instance for chaining
139
*/
140
public DefaultBulkFormatBuilder<IN> enableCompact(
141
FileCompactStrategy compactStrategy, FileCompactor compactor);
142
143
/**
144
* Disables writing to local file system for HDFS compatibility
145
* @return Builder instance for chaining
146
*/
147
public DefaultBulkFormatBuilder<IN> disableLocalWriting();
148
149
/**
150
* Builds the final FileSink instance
151
* @return Configured FileSink
152
*/
153
public FileSink<IN> build();
154
}
155
```
156
157
**Usage Examples:**
158
159
```java
160
import org.apache.flink.connector.file.sink.FileSink;
161
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
162
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultRollingPolicy;
163
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
164
import org.apache.flink.core.fs.Path;
165
import org.apache.flink.configuration.MemorySize;
166
import java.time.Duration;
167
168
// Basic file sink for text output
169
FileSink<String> basicSink = FileSink
170
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
171
.build();
172
173
// Sink with rolling policy and bucketing
174
FileSink<String> advancedSink = FileSink
175
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
176
.withRollingPolicy(DefaultRollingPolicy.builder()
177
.withMaxPartSize(MemorySize.ofMebiBytes(128))
178
.withRolloverInterval(Duration.ofMinutes(15))
179
.withInactivityInterval(Duration.ofMinutes(5))
180
.build())
181
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
182
.build();
183
184
// Sink with file compaction
185
FileSink<String> compactingSink = FileSink
186
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
187
.enableCompact(
188
FileCompactStrategy.builder()
189
.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
190
.enableCompactionOnCheckpoint(3)
191
.build(),
192
new ConcatFileCompactor())
193
.build();
194
195
// Use with DataStream API
196
stream.sinkTo(advancedSink);
197
```
198
199
### File Writers and Buckets
200
201
Internal components for managing file writing operations.
202
203
```java { .api }
204
/**
205
* Writer that manages file buckets and handles the writing process
206
*/
207
public class FileWriter<IN> implements SinkWriter<IN> {
208
// Internal implementation - not directly used by applications
209
}
210
211
/**
212
* Factory for creating FileWriterBucket instances
213
*/
214
public interface FileWriterBucketFactory<IN> {
215
FileWriterBucket<IN> getWriterBucket(String bucketId) throws IOException;
216
}
217
218
/**
219
* Default implementation of FileWriterBucketFactory
220
*/
221
public class DefaultFileWriterBucketFactory<IN> implements FileWriterBucketFactory<IN> {
222
public DefaultFileWriterBucketFactory(
223
Path basePath,
224
Encoder<IN> encoder,
225
RollingPolicy<IN, String> rollingPolicy,
226
OutputFileConfig outputFileConfig);
227
}
228
```
229
230
### Committable Types
231
232
Types representing committable file operations for exactly-once semantics.
233
234
```java { .api }
235
/**
236
* Represents a committable file operation
237
*/
238
public class FileSinkCommittable {
239
public FileSinkCommittable(String bucketId, Path path, long creationTime);
240
public String getBucketId();
241
public Path getPath();
242
public long getCreationTime();
243
}
244
245
/**
246
* Serializer for FileSinkCommittable instances
247
*/
248
public class FileSinkCommittableSerializer implements SimpleVersionedSerializer<FileSinkCommittable> {
249
public int getVersion();
250
public byte[] serialize(FileSinkCommittable committable) throws IOException;
251
public FileSinkCommittable deserialize(int version, byte[] serialized) throws IOException;
252
}
253
```
254
255
**Advanced Configuration Examples:**
256
257
```java
258
// Custom output file configuration
259
OutputFileConfig fileConfig = OutputFileConfig.builder()
260
.withPartPrefix("data-")
261
.withPartSuffix(".txt")
262
.build();
263
264
// Custom bucket assigner for organizing by key
265
BucketAssigner<MyRecord, String> keyBucketAssigner = new BucketAssigner<MyRecord, String>() {
266
@Override
267
public String getBucketId(MyRecord record, Context context) {
268
return "bucket-" + record.getKey().hashCode() % 10;
269
}
270
271
@Override
272
public SimpleVersionedSerializer<String> getSerializer() {
273
return SimpleVersionedStringSerializer.INSTANCE;
274
}
275
};
276
277
FileSink<MyRecord> customSink = FileSink
278
.forRowFormat(new Path("/output"), new MyRecordEncoder())
279
.withBucketAssigner(keyBucketAssigner)
280
.withOutputFileConfig(fileConfig)
281
.build();
282
```
283
284
## Error Handling
285
286
File sinks handle various error conditions during writing:
287
288
- **IOException**: File system write errors, disk full conditions
289
- **IllegalArgumentException**: Invalid configuration or paths
290
- **RuntimeException**: Encoding errors, compaction failures
291
292
```java
293
try {
294
FileSink<String> sink = FileSink
295
.forRowFormat(new Path("/invalid/output"), new SimpleStringEncoder<>("UTF-8"))
296
.build();
297
stream.sinkTo(sink);
298
} catch (Exception e) {
299
// Handle sink configuration or runtime errors
300
}
301
```
302
303
## Performance Considerations
304
305
- Use appropriate rolling policies to balance file size and number of files
306
- Configure bucketing to distribute load and improve query performance
307
- Enable compaction for workloads that produce many small files
308
- Consider bulk formats for high-throughput scenarios
309
- Monitor checkpoint intervals to balance consistency and performance
310
- Use `disableLocalWriting()` for HDFS deployments to avoid local filesystem usage