0
# Recoverable Writer System
1
2
The Flink GS FileSystem plugin provides a comprehensive fault-tolerant streaming write system that enables exactly-once guarantees for Flink streaming applications. The recoverable writer system handles interrupted writes, supports resumption from failure points, and ensures data consistency through a multi-phase commit protocol.
3
4
## Capabilities
5
6
### GSRecoverableWriter
7
8
Main recoverable writer implementation providing fault-tolerant streaming writes with exactly-once semantics.
9
10
```java { .api }
11
/**
12
* The recoverable writer implementation for Google storage
13
* Provides fault-tolerant streaming writes with exactly-once guarantees
14
*/
15
public class GSRecoverableWriter implements RecoverableWriter {
16
17
/**
18
* Construct a GS recoverable writer
19
* @param storage The underlying blob storage instance
20
* @param options The GS file system options
21
*/
22
public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options);
23
24
/**
25
* Whether this writer requires cleanup of recoverable state before commit
26
* @return false - no cleanup required before commit for safety
27
*/
28
public boolean requiresCleanupOfRecoverableState();
29
30
/**
31
* Whether this writer supports resuming interrupted writes
32
* @return true - supports resuming from ResumeRecoverable state
33
*/
34
public boolean supportsResume();
35
36
/**
37
* Open a new recoverable output stream
38
* @param path The target path for the final file
39
* @return GSRecoverableFsDataOutputStream for writing data
40
* @throws IOException If stream creation fails
41
*/
42
public RecoverableFsDataOutputStream open(Path path) throws IOException;
43
44
/**
45
* Recover an existing stream from resumable state
46
* @param resumable The resumable state from previous stream
47
* @return GSRecoverableFsDataOutputStream for continuing writes
48
*/
49
public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
50
51
/**
52
* Clean up recoverable state (no-op for safety)
53
* @param resumable The resumable state to clean up
54
* @return true - always succeeds (no actual cleanup performed)
55
*/
56
public boolean cleanupRecoverableState(ResumeRecoverable resumable);
57
58
/**
59
* Recover a committer for completing writes
60
* @param resumable The commit recoverable state
61
* @return GSRecoverableWriterCommitter for completing the write
62
*/
63
public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
64
65
/**
66
* Get serializer for commit recoverable state
67
* @return GSCommitRecoverableSerializer instance
68
*/
69
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
70
71
/**
72
* Get serializer for resume recoverable state
73
* @return GSResumeRecoverableSerializer instance
74
*/
75
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
76
}
77
```
78
79
**Usage Example:**
80
81
```java
82
import org.apache.flink.core.fs.RecoverableWriter;
83
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
84
import org.apache.flink.core.fs.Path;
85
86
// Get recoverable writer from filesystem
87
FileSystem fs = new Path("gs://bucket/").getFileSystem();
88
RecoverableWriter writer = fs.createRecoverableWriter();
89
90
// Open stream for writing
91
Path outputPath = new Path("gs://my-bucket/output/part-1");
92
RecoverableFsDataOutputStream stream = writer.open(outputPath);
93
94
// Write data
95
stream.write("Hello World".getBytes());
96
stream.flush();
97
98
// Create checkpoint - get resumable state
99
RecoverableWriter.ResumeRecoverable resumable = stream.persist();
100
101
// Later: recover and continue writing
102
RecoverableFsDataOutputStream recoveredStream = writer.recover(resumable);
103
recoveredStream.write(" More data".getBytes());
104
105
// Close and commit
106
RecoverableFsDataOutputStream.Committer committer = recoveredStream.closeForCommit();
107
committer.commit();
108
```
109
110
### GSRecoverableFsDataOutputStream
111
112
Data output stream implementation for recoverable writes providing buffering and state management.
113
114
```java { .api }
115
/**
116
* Main data output stream implementation for the GS recoverable writer
117
* Package-private - accessed through RecoverableWriter interface
118
*/
119
class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
120
121
/**
122
* Write data to the stream
123
* @param b byte array containing data
124
* @param off starting offset in the array
125
* @param len number of bytes to write
126
* @throws IOException if write operation fails
127
*/
128
public void write(byte[] b, int off, int len) throws IOException;
129
130
/**
131
* Flush buffered data to storage
132
* @throws IOException if flush operation fails
133
*/
134
public void flush() throws IOException;
135
136
/**
137
* Force data persistence and sync to storage
138
* @throws IOException if sync operation fails
139
*/
140
public void sync() throws IOException;
141
142
/**
143
* Create resumable state for recovery
144
* @return GSResumeRecoverable containing current stream state
145
* @throws IOException if state creation fails
146
*/
147
public ResumeRecoverable persist() throws IOException;
148
149
/**
150
* Close stream and return committer for final commit
151
* @return GSRecoverableWriterCommitter for completing the write
152
* @throws IOException if close operation fails
153
*/
154
public Committer closeForCommit() throws IOException;
155
156
/**
157
* Close the stream without committing
158
* @throws IOException if close operation fails
159
*/
160
public void close() throws IOException;
161
}
162
```
163
164
### GSRecoverableWriterCommitter
165
166
Handles the commit phase of recoverable writer operations with atomic completion.
167
168
```java { .api }
169
/**
170
* Handles the commit phase of recoverable writer operations
171
* Package-private - obtained through closeForCommit()
172
*/
173
class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer {
174
175
/**
176
* Commit the write operation atomically
177
* @throws IOException if commit fails
178
*/
179
public void commit() throws IOException;
180
181
/**
182
* Commit after recovery from failure
183
* @throws IOException if commit fails
184
*/
185
public void commitAfterRecovery() throws IOException;
186
187
/**
188
* Get the recoverable state for this committer
189
* @return GSCommitRecoverable containing commit information
190
*/
191
public CommitRecoverable getRecoverable();
192
}
193
```
194
195
**Usage Example:**
196
197
```java
198
// After writing data to recoverable stream
199
RecoverableFsDataOutputStream.Committer committer = stream.closeForCommit();
200
201
// Store commit recoverable for recovery scenarios
202
CommitRecoverable commitState = committer.getRecoverable();
203
204
// Commit the write
205
try {
206
committer.commit();
207
} catch (IOException e) {
208
// Recovery scenario: create new committer and retry
209
RecoverableWriter writer = fs.createRecoverableWriter();
210
RecoverableFsDataOutputStream.Committer recoveredCommitter =
211
writer.recoverForCommit(commitState);
212
recoveredCommitter.commitAfterRecovery();
213
}
214
```
215
216
## State Management
217
218
### GSCommitRecoverable
219
220
Represents the state needed to commit a recoverable write operation.
221
222
```java { .api }
223
/**
224
* Represents committable state for a recoverable output stream
225
* Package-private - managed internally by the writer system
226
*/
227
class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
228
/** The target blob identifier for the final committed file */
229
public final GSBlobIdentifier finalBlobIdentifier;
230
231
/** List of temporary object UUIDs that need to be composed into final blob */
232
public final List<UUID> componentObjectIds;
233
234
/**
235
* Package-private constructor
236
* @param finalBlobIdentifier The final blob identifier
237
* @param componentObjectIds List of component object UUIDs
238
*/
239
GSCommitRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds);
240
241
/**
242
* Get component blob identifiers for composition
243
* @param options File system options for temporary bucket resolution
244
* @return List of GSBlobIdentifier for temporary objects
245
*/
246
List<GSBlobIdentifier> getComponentBlobIds(GSFileSystemOptions options);
247
}
248
```
249
250
### GSResumeRecoverable
251
252
Represents the state needed to resume an interrupted write operation.
253
254
```java { .api }
255
/**
256
* Represents resumable state for a recoverable output stream
257
* Extends GSCommitRecoverable with additional resume information
258
* Package-private - managed internally by the writer system
259
*/
260
class GSResumeRecoverable extends GSCommitRecoverable
261
implements RecoverableWriter.ResumeRecoverable {
262
263
/** Current write position in bytes */
264
public final long position;
265
266
/** Whether the stream is closed for writing */
267
public final boolean closed;
268
269
/**
270
* Package-private constructor
271
* @param finalBlobIdentifier The final blob identifier
272
* @param componentObjectIds List of component object UUIDs
273
* @param position The current write position
274
* @param closed Whether the stream is closed
275
*/
276
GSResumeRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds, long position, boolean closed);
277
}
278
```
279
280
## Serializers
281
282
### GSCommitRecoverableSerializer
283
284
Serializer for GSCommitRecoverable objects enabling state persistence across failures.
285
286
```java { .api }
287
/**
288
* Serializer for GSCommitRecoverable objects
289
* Package-private - used internally for state persistence
290
*/
291
class GSCommitRecoverableSerializer implements SimpleVersionedSerializer<GSCommitRecoverable> {
292
/** Singleton instance */
293
public static final GSCommitRecoverableSerializer INSTANCE;
294
295
/**
296
* Get serializer version
297
* @return Current serializer version
298
*/
299
public int getVersion();
300
301
/**
302
* Serialize commit recoverable to byte array
303
* @param obj The GSCommitRecoverable to serialize
304
* @return Serialized byte array
305
* @throws IOException if serialization fails
306
*/
307
public byte[] serialize(GSCommitRecoverable obj) throws IOException;
308
309
/**
310
* Deserialize commit recoverable from byte array
311
* @param version Serializer version used for serialization
312
* @param serialized The serialized byte array
313
* @return Deserialized GSCommitRecoverable
314
* @throws IOException if deserialization fails
315
*/
316
public GSCommitRecoverable deserialize(int version, byte[] serialized) throws IOException;
317
}
318
```
319
320
### GSResumeRecoverableSerializer
321
322
Serializer for GSResumeRecoverable objects enabling resume state persistence.
323
324
```java { .api }
325
/**
326
* Serializer for GSResumeRecoverable objects
327
* Package-private - used internally for state persistence
328
*/
329
class GSResumeRecoverableSerializer implements SimpleVersionedSerializer<GSResumeRecoverable> {
330
/** Singleton instance */
331
public static final GSResumeRecoverableSerializer INSTANCE;
332
333
/**
334
* Get serializer version
335
* @return Current serializer version
336
*/
337
public int getVersion();
338
339
/**
340
* Serialize resume recoverable to byte array
341
* @param obj The GSResumeRecoverable to serialize
342
* @return Serialized byte array
343
* @throws IOException if serialization fails
344
*/
345
public byte[] serialize(GSResumeRecoverable obj) throws IOException;
346
347
/**
348
* Deserialize resume recoverable from byte array
349
* @param version Serializer version used for serialization
350
* @param serialized The serialized byte array
351
* @return Deserialized GSResumeRecoverable
352
* @throws IOException if deserialization fails
353
*/
354
public GSResumeRecoverable deserialize(int version, byte[] serialized) throws IOException;
355
}
356
```
357
358
## Write Process Flow
359
360
### Normal Write Flow
361
362
1. **Open Stream**: `writer.open(path)` creates `GSRecoverableFsDataOutputStream`
363
2. **Write Data**: Multiple calls to `write(bytes)` buffer data in temporary objects
364
3. **Periodic Persistence**: `persist()` creates `GSResumeRecoverable` state for checkpointing
365
4. **Close for Commit**: `closeForCommit()` returns `GSRecoverableWriterCommitter`
366
5. **Commit**: `commit()` composes temporary objects into final blob atomically
367
368
### Recovery Flow
369
370
1. **Resume from Checkpoint**: `writer.recover(resumeRecoverable)` recreates stream
371
2. **Continue Writing**: Additional `write()` calls append to existing temporary objects
372
3. **Close and Commit**: Same as normal flow
373
4. **Commit Recovery**: `writer.recoverForCommit(commitRecoverable)` handles commit failures
374
375
### Temporary Object Management
376
377
- **Naming**: Temporary objects use `.inprogress/<bucket>/<object>/<uuid>` pattern
378
- **Composition**: Final commit uses GCS compose operation to merge up to 32 objects
379
- **Cleanup**: Temporary objects are cleaned up after successful commit
380
- **Entropy Injection**: Optional entropy prefix reduces hotspotting in high-throughput scenarios
381
382
## Error Handling and Recovery
383
384
### Failure Scenarios
385
386
- **Writer Failure**: Resume from `GSResumeRecoverable` state
387
- **Commit Failure**: Retry commit using `GSCommitRecoverable` state
388
- **Network Issues**: Configurable retry policies handle transient failures
389
- **Storage Errors**: Proper exception propagation with context information
390
391
### Safety Guarantees
392
393
- **Exactly-Once**: Each successful commit produces exactly one final file
394
- **No Data Loss**: All written data is recoverable until commit completion
395
- **Atomic Commits**: Final file appears atomically or not at all
396
- **Idempotent Recovery**: Multiple recovery attempts produce same result
397
398
### Performance Considerations
399
400
- **Chunk Size**: Configure `gs.writer.chunk.size` to optimize upload performance
401
- **Temporary Bucket**: Use separate bucket for temporary objects to avoid hotspots
402
- **Entropy Injection**: Enable `gs.filesink.entropy.enabled` for high-throughput scenarios
403
- **Composition Limits**: Automatic handling of GCS 32-object composition limit