0
# File Writers
1
2
File writers handle the actual writing of data to files, supporting both row-wise and bulk writing patterns. They provide abstractions for file recovery, commit operations, and different serialization strategies.
3
4
## Capabilities
5
6
### BucketWriter Interface
7
8
Factory interface for creating different types of file writers.
9
10
```java { .api }
11
/**
12
* Interface for factories that create different InProgressFileWriter writers
13
* @param <IN> The type of input elements
14
* @param <BucketID> The type of bucket identifier
15
*/
16
public interface BucketWriter<IN, BucketID> {
17
/**
18
* Creates a new InProgressFileWriter
19
* @param bucketID the id of the bucket this writer is writing to
20
* @param path the path this writer will write to
21
* @param creationTime the creation time of the file
22
* @return the new InProgressFileWriter
23
* @throws IOException if creating a writer fails
24
*/
25
InProgressFileWriter<IN, BucketID> openNewInProgressFile(
26
BucketID bucketID, Path path, long creationTime) throws IOException;
27
28
/**
29
* Creates a new CompactingFileWriter of the requesting type
30
* @param type the type of writer (RECORD_WISE or OUTPUT_STREAM)
31
* @param bucketID the id of the bucket this writer is writing to
32
* @param path the path this writer will write to
33
* @param creationTime the creation time of the file
34
* @return the new CompactingFileWriter
35
* @throws IOException if creating a writer fails
36
* @throws UnsupportedOperationException if the type is not supported
37
*/
38
default CompactingFileWriter openNewCompactingFile(
39
CompactingFileWriter.Type type,
40
BucketID bucketID,
41
Path path,
42
long creationTime) throws IOException;
43
44
/**
45
* Resumes an InProgressFileWriter from a recoverable state
46
* @param bucketID the id of the bucket this writer is writing to
47
* @param inProgressFileSnapshot the state of the part file
48
* @param creationTime the creation time of the file
49
* @return the resumed InProgressFileWriter
50
* @throws IOException if resuming a writer fails
51
*/
52
InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
53
BucketID bucketID,
54
InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
55
long creationTime) throws IOException;
56
57
/**
58
* @return the property of the BucketWriter
59
*/
60
WriterProperties getProperties();
61
62
/**
63
* Recovers a pending file for finalizing and committing
64
* @param pendingFileRecoverable The handle with the recovery information
65
* @return A pending file
66
* @throws IOException if recovering a pending file fails
67
*/
68
PendingFile recoverPendingFile(
69
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
70
71
/**
72
* Cleans up resources for a recoverable state
73
* @param inProgressFileRecoverable the recoverable state to clean up
74
* @return true if the resources were successfully freed, false otherwise
75
* @throws IOException if an I/O error occurs
76
*/
77
boolean cleanupInProgressFileRecoverable(
78
InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
79
}
80
```
81
82
### BucketWriter.PendingFile Interface
83
84
Represents a file that is ready to be committed.
85
86
```java { .api }
87
/**
88
* Represents a file that can not write any data to but can be committed
89
*/
90
public interface PendingFile {
91
/**
92
* Commits the pending file, making it visible
93
* The file will contain the exact data as when the pending file was created
94
* @throws IOException if committing fails
95
*/
96
void commit() throws IOException;
97
98
/**
99
* Commits the pending file, making it visible
100
* This method tolerates situations where the file was already committed
101
* Important for idempotent commit retries after recovery
102
* @throws IOException if committing fails
103
*/
104
void commitAfterRecovery() throws IOException;
105
}
106
```
107
108
### InProgressFileWriter Interface
109
110
Interface for writing elements to part files with recovery support.
111
112
```java { .api }
113
/**
114
* The Bucket uses the InProgressFileWriter to write elements to a part file
115
* @param <IN> The type of input elements
116
* @param <BucketID> The type of bucket identifier
117
*/
118
public interface InProgressFileWriter<IN, BucketID>
119
extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {
120
121
/**
122
* Write an element to the part file
123
* @param element the element to be written
124
* @param currentTime the writing time
125
* @throws IOException if writing the element fails
126
*/
127
void write(IN element, long currentTime) throws IOException;
128
129
/**
130
* @return The state of the current part file for recovery
131
* @throws IOException if persisting the part file fails
132
*/
133
InProgressFileRecoverable persist() throws IOException;
134
135
/**
136
* @return The state of the pending part file for commit
137
* @throws IOException if an I/O error occurs
138
*/
139
PendingFileRecoverable closeForCommit() throws IOException;
140
141
/** Dispose the part file and clean up resources */
142
void dispose();
143
144
/**
145
* Default write method using current system time
146
* @param element the element to write
147
*/
148
default void write(IN element) throws IOException;
149
}
150
```
151
152
### InProgressFileWriter Recovery Interfaces
153
154
Support for fault-tolerant recovery of in-progress files.
155
156
```java { .api }
157
/**
158
* Handle that can be used to recover in-progress files
159
*/
160
public interface InProgressFileRecoverable extends PendingFileRecoverable {}
161
162
/**
163
* Handle that can be used to recover pending files
164
*/
165
public interface PendingFileRecoverable {
166
/**
167
* @return The target path of the pending file, null if unavailable
168
*/
169
Path getPath();
170
171
/**
172
* @return The size of the pending file, -1 if unavailable
173
*/
174
long getSize();
175
}
176
```
177
178
### CompactingFileWriter Interface
179
180
Base interface for compacting file writers.
181
182
```java { .api }
183
/**
184
* File sink compactors use CompactingFileWriter to write a compacting file
185
* Classes should implement RecordWiseCompactingFileWriter or OutputStreamBasedCompactingFileWriter
186
*/
187
public interface CompactingFileWriter {
188
/**
189
* Closes the writer and gets the PendingFileRecoverable of the written compacting file
190
* @return The state of the pending part file for commit
191
* @throws IOException if an I/O error occurs
192
*/
193
PendingFileRecoverable closeForCommit() throws IOException;
194
195
/** Enum defining the types of CompactingFileWriter */
196
enum Type {
197
RECORD_WISE,
198
OUTPUT_STREAM
199
}
200
}
201
```
202
203
### RecordWiseCompactingFileWriter Interface
204
205
Interface for record-wise compacting file writers.
206
207
```java { .api }
208
/**
209
* Compactors use RecordWiseCompactingFileWriter to write elements to a compacting file
210
* @param <IN> The type of input elements
211
*/
212
public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {
213
/**
214
* Write an element to the compacting file
215
* @param element the element to be written
216
* @throws IOException if writing the element fails
217
*/
218
void write(IN element) throws IOException;
219
}
220
```
221
222
### OutputStreamBasedCompactingFileWriter Interface
223
224
Interface for output stream based compacting file writers.
225
226
```java { .api }
227
/**
228
* Compactors use OutputStreamBasedCompactingFileWriter to directly write
229
* a compacting file as an OutputStream
230
*/
231
public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {
232
/**
233
* Gets the output stream underlying the writer
234
* The close method of the returned stream should never be called
235
* @return The output stream to write the compacting file
236
* @throws IOException if acquiring the stream fails
237
*/
238
OutputStream asOutputStream() throws IOException;
239
}
240
```
241
242
## Concrete Implementations
243
244
### RowWiseBucketWriter
245
246
Factory for creating row-wise part writers using encoders.
247
248
```java { .api }
249
/**
250
* Factory that creates RowWisePartWriter instances
251
* @param <IN> The type of input elements
252
* @param <BucketID> The type of bucket identifier
253
*/
254
public class RowWiseBucketWriter<IN, BucketID>
255
extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
256
257
/**
258
* Creates a RowWiseBucketWriter
259
* @param recoverableWriter the recoverable writer for the file system
260
* @param encoder the encoder for serializing elements
261
*/
262
public RowWiseBucketWriter(RecoverableWriter recoverableWriter, Encoder<IN> encoder);
263
264
@Override
265
public InProgressFileWriter<IN, BucketID> resumeFrom(
266
BucketID bucketId,
267
RecoverableFsDataOutputStream stream,
268
Path path,
269
RecoverableWriter.ResumeRecoverable resumable,
270
long creationTime);
271
272
@Override
273
public InProgressFileWriter<IN, BucketID> openNew(
274
BucketID bucketId,
275
RecoverableFsDataOutputStream stream,
276
Path path,
277
long creationTime);
278
}
279
```
280
281
### BulkBucketWriter
282
283
Factory for creating bulk part writers using bulk writers.
284
285
```java { .api }
286
/**
287
* Factory that creates BulkPartWriter instances
288
* @param <IN> The type of input elements
289
* @param <BucketID> The type of bucket identifier
290
*/
291
public class BulkBucketWriter<IN, BucketID>
292
extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
293
294
/**
295
* Creates a BulkBucketWriter
296
* @param recoverableWriter the recoverable writer for the file system
297
* @param writerFactory the factory for creating bulk writers
298
*/
299
public BulkBucketWriter(RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException;
300
301
@Override
302
public InProgressFileWriter<IN, BucketID> resumeFrom(
303
BucketID bucketId,
304
RecoverableFsDataOutputStream stream,
305
Path path,
306
RecoverableWriter.ResumeRecoverable resumable,
307
long creationTime) throws IOException;
308
309
@Override
310
public InProgressFileWriter<IN, BucketID> openNew(
311
BucketID bucketId,
312
RecoverableFsDataOutputStream stream,
313
Path path,
314
long creationTime) throws IOException;
315
}
316
```
317
318
## Usage Examples
319
320
### Row-wise Writing with Encoder
321
322
```java
323
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
324
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
325
import org.apache.flink.core.fs.RecoverableWriter;
326
327
// Create encoder for string data
328
Encoder<String> encoder = new SimpleStringEncoder<>("UTF-8");
329
330
// Create row-wise bucket writer
331
BucketWriter<String, String> bucketWriter =
332
new RowWiseBucketWriter<>(recoverableWriter, encoder);
333
334
// Open new file writer
335
InProgressFileWriter<String, String> writer = bucketWriter.openNewInProgressFile(
336
"bucket-1",
337
new Path("/output/bucket-1/part-0"),
338
System.currentTimeMillis()
339
);
340
341
// Write elements
342
writer.write("Hello", System.currentTimeMillis());
343
writer.write("World", System.currentTimeMillis());
344
345
// Close for commit
346
PendingFileRecoverable pendingFile = writer.closeForCommit();
347
```
348
349
### Bulk Writing
350
351
```java
352
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
353
import org.apache.flink.api.common.serialization.BulkWriter;
354
355
// Create bulk writer factory (example for Parquet)
356
BulkWriter.Factory<MyRecord> bulkWriterFactory = // ... implementation specific
357
358
// Create bulk bucket writer
359
BucketWriter<MyRecord, String> bucketWriter =
360
new BulkBucketWriter<>(recoverableWriter, bulkWriterFactory);
361
362
// Usage similar to row-wise, but optimized for bulk operations
363
InProgressFileWriter<MyRecord, String> writer = bucketWriter.openNewInProgressFile(
364
"bucket-1",
365
new Path("/output/bucket-1/part-0"),
366
System.currentTimeMillis()
367
);
368
```
369
370
## Error Handling
371
372
- Writers should handle `IOException` during write operations
373
- Failed writes will typically cause job failures and require restart
374
- Dispose methods should not throw exceptions but clean up resources
375
- Recovery operations may fail if the underlying file system state is corrupted
376
- Commit operations should be idempotent for exactly-once processing guarantees