0
# Fault-Tolerant Writing
1
2
The Hadoop FileSystem package provides recoverable writers that enable exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms. These writers are essential for fault-tolerant streaming applications that require durability and consistency guarantees.
3
4
## Capabilities
5
6
### HadoopRecoverableWriter
7
8
Main recoverable writer implementation that provides fault-tolerant writing capabilities for Hadoop file systems.
9
10
```java { .api }
11
/**
12
* An implementation of the RecoverableWriter for Hadoop's file system abstraction.
13
* Supports fault-tolerant writing with exactly-once processing guarantees.
14
*/
15
@Internal
16
public class HadoopRecoverableWriter implements RecoverableWriter {
17
/**
18
* Creates a recoverable writer using the specified Hadoop FileSystem.
19
* @param fs Hadoop file system to write to
20
* @throws IOException if writer creation fails due to unsupported file system
21
*/
22
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) throws IOException;
23
24
/**
25
* Creates a recoverable writer with no-local-write option.
26
* @param fs Hadoop file system to write to
27
* @param noLocalWrite if true, disables local write optimizations
28
* @throws IOException if writer creation fails
29
*/
30
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs, boolean noLocalWrite) throws IOException;
31
}
32
```
33
34
### Opening and Creating Streams
35
36
Methods for creating new recoverable output streams.
37
38
```java { .api }
39
/**
40
* Opens a new recoverable output stream for the given file path.
41
* @param filePath target file path for writing
42
* @return RecoverableFsDataOutputStream for writing data
43
* @throws IOException if stream creation fails
44
*/
45
public RecoverableFsDataOutputStream open(Path filePath) throws IOException;
46
47
/**
48
* Indicates whether this writer supports resuming from a recoverable state.
49
* @return true (Hadoop recoverable writer supports resume)
50
*/
51
public boolean supportsResume();
52
```
53
54
**Usage Examples:**
55
56
```java
57
import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
58
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
59
60
// Create recoverable writer from Hadoop FileSystem
61
org.apache.hadoop.fs.FileSystem hadoopFs = // ... obtain Hadoop FS
62
HadoopRecoverableWriter writer = new HadoopRecoverableWriter(hadoopFs);
63
64
// Open recoverable stream for writing
65
Path outputPath = new Path("hdfs://namenode:9000/output/part-1.txt");
66
RecoverableFsDataOutputStream stream = writer.open(outputPath);
67
68
// Write data
69
stream.write("First batch of data\n".getBytes());
70
stream.write("Second batch of data\n".getBytes());
71
72
// Persist current state for recovery
73
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();
74
75
// Continue writing
76
stream.write("Third batch of data\n".getBytes());
77
78
// Close and commit
79
stream.closeForCommit().commit();
80
```
81
82
### Recovery Operations
83
84
Methods for recovering from persistent state after failures.
85
86
```java { .api }
87
/**
88
* Recovers a stream from a resumable recoverable state.
89
* @param recoverable the resumable state to recover from
90
* @return RecoverableFsDataOutputStream positioned at the recovered state
91
* @throws IOException if recovery fails
92
*/
93
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
94
95
/**
96
* Recovers a committer from a commit recoverable state.
97
* @param recoverable the commit state to recover from
98
* @return Committer that can complete the commit operation
99
* @throws IOException if recovery fails
100
*/
101
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
102
103
/**
104
* Indicates whether this writer requires cleanup of recoverable state.
105
* @return false (Hadoop writer doesn't require cleanup)
106
*/
107
public boolean requiresCleanupOfRecoverableState();
108
109
/**
110
* Cleans up recoverable state (no-op for Hadoop writer).
111
* @param resumable the resumable state to clean up
112
* @return false (no cleanup performed)
113
* @throws IOException if cleanup fails
114
*/
115
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException;
116
```
117
118
**Usage Examples:**
119
120
```java
121
// Recover from a previous session
122
HadoopFsRecoverable savedState = // ... load from checkpoint
123
RecoverableFsDataOutputStream recoveredStream = writer.recover(savedState);
124
125
// Continue writing from where we left off
126
recoveredStream.write("Continuing after recovery\n".getBytes());
127
128
// Or recover for commit only
129
Committer committer = writer.recoverForCommit(commitRecoverable);
130
committer.commit(); // Complete the commit operation
131
```
132
133
### Serialization Support
134
135
Methods for serializing and deserializing recoverable state.
136
137
```java { .api }
138
/**
139
* Gets the serializer for commit recoverable state.
140
* @return SimpleVersionedSerializer for CommitRecoverable objects
141
*/
142
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
143
144
/**
145
* Gets the serializer for resume recoverable state.
146
* @return SimpleVersionedSerializer for ResumeRecoverable objects
147
*/
148
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
149
```
150
151
### HadoopFsRecoverable
152
153
State object that contains all information needed to recover or commit a write operation.
154
155
```java { .api }
156
/**
157
* Implementation of resume and commit descriptor objects for Hadoop's file system abstraction.
158
* Contains the state needed to recover a write operation.
159
*/
160
@Internal
161
public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
162
/**
163
* Creates a recoverable state descriptor.
164
* @param targetFile final target file path
165
* @param tempFile temporary file being written to
166
* @param offset current write position
167
*/
168
public HadoopFsRecoverable(Path targetFile, Path tempFile, long offset);
169
170
/**
171
* Gets the target file path.
172
* @return target file path where data will be committed
173
*/
174
public Path targetFile();
175
176
/**
177
* Gets the temporary file path.
178
* @return temporary file path where data is being written
179
*/
180
public Path tempFile();
181
182
/**
183
* Gets the current write offset.
184
* @return byte offset in the file
185
*/
186
public long offset();
187
188
/**
189
* String representation of the recoverable state.
190
* @return string describing the recoverable state
191
*/
192
public String toString();
193
}
194
```
195
196
**Usage Examples:**
197
198
```java
199
// Access recoverable state information
200
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();
201
202
System.out.println("Target file: " + recoverable.targetFile());
203
System.out.println("Temp file: " + recoverable.tempFile());
204
System.out.println("Current offset: " + recoverable.offset());
205
206
// Serialize for checkpointing
207
SimpleVersionedSerializer<ResumeRecoverable> serializer = writer.getResumeRecoverableSerializer();
208
byte[] serializedState = serializer.serialize(recoverable);
209
210
// Later: deserialize and recover
211
HadoopFsRecoverable restored = (HadoopFsRecoverable) serializer.deserialize(
212
serializer.getVersion(), serializedState);
213
RecoverableFsDataOutputStream recoveredStream = writer.recover(restored);
214
```
215
216
### Recoverable Output Stream Operations
217
218
The recoverable output stream provides standard writing operations plus persistence capabilities.
219
220
```java { .api }
221
/**
222
* Base class for HDFS and ABFS recoverable streams.
223
*/
224
@Internal
225
public abstract class BaseHadoopFsRecoverableFsDataOutputStream
226
extends CommitterFromPersistRecoverableFsDataOutputStream<HadoopFsRecoverable> {
227
228
/**
229
* Gets the current position in the stream.
230
* @return current byte position
231
* @throws IOException if operation fails
232
*/
233
public long getPos() throws IOException;
234
235
/**
236
* Writes a single byte.
237
* @param b byte to write
238
* @throws IOException if write fails
239
*/
240
public void write(int b) throws IOException;
241
242
/**
243
* Writes data from byte array.
244
* @param b byte array containing data
245
* @param off starting offset in array
246
* @param len number of bytes to write
247
* @throws IOException if write fails
248
*/
249
public void write(byte[] b, int off, int len) throws IOException;
250
251
/**
252
* Flushes buffered data.
253
* @throws IOException if flush fails
254
*/
255
public void flush() throws IOException;
256
257
/**
258
* Synchronizes data to storage.
259
* @throws IOException if sync fails
260
*/
261
public void sync() throws IOException;
262
263
/**
264
* Persists the current state for recovery.
265
* @return HadoopFsRecoverable representing current state
266
* @throws IOException if persist operation fails
267
*/
268
public HadoopFsRecoverable persist() throws IOException;
269
270
/**
271
* Closes the stream and returns a committer.
272
* @return Committer that can complete the write operation
273
* @throws IOException if close fails
274
*/
275
public Committer closeForCommit() throws IOException;
276
277
/**
278
* Closes the stream without committing.
279
* @throws IOException if close fails
280
*/
281
public void close() throws IOException;
282
}
283
```
284
285
### Complete Fault-Tolerant Writing Example
286
287
```java
288
import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
289
import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
290
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
291
import org.apache.flink.core.io.SimpleVersionedSerializer;
292
293
public class FaultTolerantWriter {
294
private HadoopRecoverableWriter writer;
295
private SimpleVersionedSerializer<ResumeRecoverable> serializer;
296
297
public void initializeWriter(org.apache.hadoop.fs.FileSystem hadoopFs) throws IOException {
298
writer = new HadoopRecoverableWriter(hadoopFs);
299
serializer = writer.getResumeRecoverableSerializer();
300
}
301
302
public byte[] writeWithCheckpoint(Path outputPath, List<String> data) throws IOException {
303
RecoverableFsDataOutputStream stream = writer.open(outputPath);
304
HadoopFsRecoverable checkpoint = null;
305
306
try {
307
// Write data in batches with periodic checkpoints
308
for (int i = 0; i < data.size(); i++) {
309
stream.write(data.get(i).getBytes());
310
stream.write("\n".getBytes());
311
312
// Checkpoint every 100 records
313
if (i % 100 == 0) {
314
checkpoint = (HadoopFsRecoverable) stream.persist();
315
System.out.println("Checkpoint at record " + i +
316
", offset: " + checkpoint.offset());
317
}
318
}
319
320
// Commit the file
321
stream.closeForCommit().commit();
322
323
return checkpoint != null ? serializer.serialize(checkpoint) : null;
324
325
} catch (IOException e) {
326
// On failure, return checkpoint data for recovery
327
if (checkpoint != null) {
328
return serializer.serialize(checkpoint);
329
}
330
throw e;
331
}
332
}
333
334
public void recoverAndContinue(byte[] checkpointData, List<String> remainingData) throws IOException {
335
// Deserialize checkpoint
336
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) serializer.deserialize(
337
serializer.getVersion(), checkpointData);
338
339
// Recover stream
340
RecoverableFsDataOutputStream stream = writer.recover(recoverable);
341
342
System.out.println("Recovered at offset: " + recoverable.offset());
343
System.out.println("Temp file: " + recoverable.tempFile());
344
345
// Continue writing
346
for (String line : remainingData) {
347
stream.write(line.getBytes());
348
stream.write("\n".getBytes());
349
}
350
351
// Commit
352
stream.closeForCommit().commit();
353
354
System.out.println("Recovery completed, data committed to: " + recoverable.targetFile());
355
}
356
}
357
```
358
359
### HadoopRecoverableSerializer
360
361
Serializer for persisting and restoring recoverable state.
362
363
```java { .api }
364
/**
365
* Simple serializer for HadoopFsRecoverable objects.
366
*/
367
@Internal
368
public class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
369
/**
370
* Singleton instance of the serializer.
371
*/
372
public static final HadoopRecoverableSerializer INSTANCE;
373
374
/**
375
* Gets the version of the serialization format.
376
* @return version number (1)
377
*/
378
public int getVersion();
379
380
/**
381
* Serializes a HadoopFsRecoverable object.
382
* @param obj the recoverable object to serialize
383
* @return byte array containing serialized data
384
* @throws IOException if serialization fails
385
*/
386
public byte[] serialize(HadoopFsRecoverable obj) throws IOException;
387
388
/**
389
* Deserializes a HadoopFsRecoverable object.
390
* @param version version of the serialized data
391
* @param serialized byte array containing serialized data
392
* @return deserialized HadoopFsRecoverable object
393
* @throws IOException if deserialization fails
394
*/
395
public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException;
396
}
397
```
398
399
## Supported File Systems
400
401
Recoverable writers support a subset of Hadoop file systems that provide the necessary features for fault-tolerance:
402
403
- **HDFS**: Full support with atomic rename and append operations
404
- **S3**: Limited support depending on S3 implementation (S3A with certain configurations)
405
- **Azure**: Support for Azure Data Lake Storage (ABFS)
406
- **Local FS**: Full support for testing and development
407
408
## Error Handling
409
410
```java
411
try {
412
RecoverableFsDataOutputStream stream = writer.open(outputPath);
413
// ... write operations
414
} catch (UnsupportedOperationException e) {
415
System.err.println("File system doesn't support recoverable writes: " + e.getMessage());
416
} catch (IOException e) {
417
System.err.println("I/O error during recoverable write: " + e.getMessage());
418
}
419
```
420
421
## Types
422
423
```java { .api }
424
// Core recoverable writer interfaces
425
public interface RecoverableWriter {
426
RecoverableFsDataOutputStream open(Path filePath) throws IOException;
427
RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
428
Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
429
boolean supportsResume();
430
}
431
432
// Recoverable state interfaces
433
public interface ResumeRecoverable extends Serializable {}
434
public interface CommitRecoverable extends Serializable {}
435
436
// Committer interface
437
public interface Committer {
438
void commit() throws IOException;
439
}
440
441
// Serialization interface
442
public interface SimpleVersionedSerializer<T> {
443
int getVersion();
444
byte[] serialize(T obj) throws IOException;
445
T deserialize(int version, byte[] serialized) throws IOException;
446
}
447
```