or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-configuration.mdindex.mdrecoverable-writer.mdstorage-operations.md

recoverable-writer.mddocs/

0

# Recoverable Writer System

1

2

The Flink GS FileSystem plugin provides a comprehensive fault-tolerant streaming write system that enables exactly-once guarantees for Flink streaming applications. The recoverable writer system handles interrupted writes, supports resumption from failure points, and ensures data consistency through a multi-phase commit protocol.

3

4

## Capabilities

5

6

### GSRecoverableWriter

7

8

Main recoverable writer implementation providing fault-tolerant streaming writes with exactly-once semantics.

9

10

```java { .api }

11

/**

12

* The recoverable writer implementation for Google storage

13

* Provides fault-tolerant streaming writes with exactly-once guarantees

14

*/

15

public class GSRecoverableWriter implements RecoverableWriter {

16

17

/**

18

* Construct a GS recoverable writer

19

* @param storage The underlying blob storage instance

20

* @param options The GS file system options

21

*/

22

public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options);

23

24

/**

25

* Whether this writer requires cleanup of recoverable state before commit

26

* @return false - no cleanup required before commit for safety

27

*/

28

public boolean requiresCleanupOfRecoverableState();

29

30

/**

31

* Whether this writer supports resuming interrupted writes

32

* @return true - supports resuming from ResumeRecoverable state

33

*/

34

public boolean supportsResume();

35

36

/**

37

* Open a new recoverable output stream

38

* @param path The target path for the final file

39

* @return GSRecoverableFsDataOutputStream for writing data

40

* @throws IOException If stream creation fails

41

*/

42

public RecoverableFsDataOutputStream open(Path path) throws IOException;

43

44

/**

45

* Recover an existing stream from resumable state

46

* @param resumable The resumable state from previous stream

47

* @return GSRecoverableFsDataOutputStream for continuing writes

48

*/

49

public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);

50

51

/**

52

* Clean up recoverable state (no-op for safety)

53

* @param resumable The resumable state to clean up

54

* @return true - always succeeds (no actual cleanup performed)

55

*/

56

public boolean cleanupRecoverableState(ResumeRecoverable resumable);

57

58

/**

59

* Recover a committer for completing writes

60

* @param resumable The commit recoverable state

61

* @return GSRecoverableWriterCommitter for completing the write

62

*/

63

public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);

64

65

/**

66

* Get serializer for commit recoverable state

67

* @return GSCommitRecoverableSerializer instance

68

*/

69

public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();

70

71

/**

72

* Get serializer for resume recoverable state

73

* @return GSResumeRecoverableSerializer instance

74

*/

75

public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();

76

}

77

```

78

79

**Usage Example:**

80

81

```java

82

import org.apache.flink.core.fs.RecoverableWriter;

83

import org.apache.flink.core.fs.RecoverableFsDataOutputStream;

84

import org.apache.flink.core.fs.Path;

85

86

// Get recoverable writer from filesystem

87

FileSystem fs = new Path("gs://bucket/").getFileSystem();

88

RecoverableWriter writer = fs.createRecoverableWriter();

89

90

// Open stream for writing

91

Path outputPath = new Path("gs://my-bucket/output/part-1");

92

RecoverableFsDataOutputStream stream = writer.open(outputPath);

93

94

// Write data

95

stream.write("Hello World".getBytes());

96

stream.flush();

97

98

// Create checkpoint - get resumable state

99

RecoverableWriter.ResumeRecoverable resumable = stream.persist();

100

101

// Later: recover and continue writing

102

RecoverableFsDataOutputStream recoveredStream = writer.recover(resumable);

103

recoveredStream.write(" More data".getBytes());

104

105

// Close and commit

106

RecoverableFsDataOutputStream.Committer committer = recoveredStream.closeForCommit();

107

committer.commit();

108

```

109

110

### GSRecoverableFsDataOutputStream

111

112

Data output stream implementation for recoverable writes providing buffering and state management.

113

114

```java { .api }

115

/**

116

* Main data output stream implementation for the GS recoverable writer

117

* Package-private - accessed through RecoverableWriter interface

118

*/

119

class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {

120

121

/**

122

* Write data to the stream

123

* @param b byte array containing data

124

* @param off starting offset in the array

125

* @param len number of bytes to write

126

* @throws IOException if write operation fails

127

*/

128

public void write(byte[] b, int off, int len) throws IOException;

129

130

/**

131

* Flush buffered data to storage

132

* @throws IOException if flush operation fails

133

*/

134

public void flush() throws IOException;

135

136

/**

137

* Force data persistence and sync to storage

138

* @throws IOException if sync operation fails

139

*/

140

public void sync() throws IOException;

141

142

/**

143

* Create resumable state for recovery

144

* @return GSResumeRecoverable containing current stream state

145

* @throws IOException if state creation fails

146

*/

147

public ResumeRecoverable persist() throws IOException;

148

149

/**

150

* Close stream and return committer for final commit

151

* @return GSRecoverableWriterCommitter for completing the write

152

* @throws IOException if close operation fails

153

*/

154

public Committer closeForCommit() throws IOException;

155

156

/**

157

* Close the stream without committing

158

* @throws IOException if close operation fails

159

*/

160

public void close() throws IOException;

161

}

162

```

163

164

### GSRecoverableWriterCommitter

165

166

Handles the commit phase of recoverable writer operations with atomic completion.

167

168

```java { .api }

169

/**

170

* Handles the commit phase of recoverable writer operations

171

* Package-private - obtained through closeForCommit()

172

*/

173

class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer {

174

175

/**

176

* Commit the write operation atomically

177

* @throws IOException if commit fails

178

*/

179

public void commit() throws IOException;

180

181

/**

182

* Commit after recovery from failure

183

* @throws IOException if commit fails

184

*/

185

public void commitAfterRecovery() throws IOException;

186

187

/**

188

* Get the recoverable state for this committer

189

* @return GSCommitRecoverable containing commit information

190

*/

191

public CommitRecoverable getRecoverable();

192

}

193

```

194

195

**Usage Example:**

196

197

```java

198

// After writing data to recoverable stream

199

RecoverableFsDataOutputStream.Committer committer = stream.closeForCommit();

200

201

// Store commit recoverable for recovery scenarios

202

CommitRecoverable commitState = committer.getRecoverable();

203

204

// Commit the write

205

try {

206

committer.commit();

207

} catch (IOException e) {

208

// Recovery scenario: create new committer and retry

209

RecoverableWriter writer = fs.createRecoverableWriter();

210

RecoverableFsDataOutputStream.Committer recoveredCommitter =

211

writer.recoverForCommit(commitState);

212

recoveredCommitter.commitAfterRecovery();

213

}

214

```

215

216

## State Management

217

218

### GSCommitRecoverable

219

220

Represents the state needed to commit a recoverable write operation.

221

222

```java { .api }

223

/**

224

* Represents committable state for a recoverable output stream

225

* Package-private - managed internally by the writer system

226

*/

227

class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {

228

/** The target blob identifier for the final committed file */

229

public final GSBlobIdentifier finalBlobIdentifier;

230

231

/** List of temporary object UUIDs that need to be composed into final blob */

232

public final List<UUID> componentObjectIds;

233

234

/**

235

* Package-private constructor

236

* @param finalBlobIdentifier The final blob identifier

237

* @param componentObjectIds List of component object UUIDs

238

*/

239

GSCommitRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds);

240

241

/**

242

* Get component blob identifiers for composition

243

* @param options File system options for temporary bucket resolution

244

* @return List of GSBlobIdentifier for temporary objects

245

*/

246

List<GSBlobIdentifier> getComponentBlobIds(GSFileSystemOptions options);

247

}

248

```

249

250

### GSResumeRecoverable

251

252

Represents the state needed to resume an interrupted write operation.

253

254

```java { .api }

255

/**

256

* Represents resumable state for a recoverable output stream

257

* Extends GSCommitRecoverable with additional resume information

258

* Package-private - managed internally by the writer system

259

*/

260

class GSResumeRecoverable extends GSCommitRecoverable

261

implements RecoverableWriter.ResumeRecoverable {

262

263

/** Current write position in bytes */

264

public final long position;

265

266

/** Whether the stream is closed for writing */

267

public final boolean closed;

268

269

/**

270

* Package-private constructor

271

* @param finalBlobIdentifier The final blob identifier

272

* @param componentObjectIds List of component object UUIDs

273

* @param position The current write position

274

* @param closed Whether the stream is closed

275

*/

276

GSResumeRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds, long position, boolean closed);

277

}

278

```

279

280

## Serializers

281

282

### GSCommitRecoverableSerializer

283

284

Serializer for GSCommitRecoverable objects enabling state persistence across failures.

285

286

```java { .api }

287

/**

288

* Serializer for GSCommitRecoverable objects

289

* Package-private - used internally for state persistence

290

*/

291

class GSCommitRecoverableSerializer implements SimpleVersionedSerializer<GSCommitRecoverable> {

292

/** Singleton instance */

293

public static final GSCommitRecoverableSerializer INSTANCE;

294

295

/**

296

* Get serializer version

297

* @return Current serializer version

298

*/

299

public int getVersion();

300

301

/**

302

* Serialize commit recoverable to byte array

303

* @param obj The GSCommitRecoverable to serialize

304

* @return Serialized byte array

305

* @throws IOException if serialization fails

306

*/

307

public byte[] serialize(GSCommitRecoverable obj) throws IOException;

308

309

/**

310

* Deserialize commit recoverable from byte array

311

* @param version Serializer version used for serialization

312

* @param serialized The serialized byte array

313

* @return Deserialized GSCommitRecoverable

314

* @throws IOException if deserialization fails

315

*/

316

public GSCommitRecoverable deserialize(int version, byte[] serialized) throws IOException;

317

}

318

```

319

320

### GSResumeRecoverableSerializer

321

322

Serializer for GSResumeRecoverable objects enabling resume state persistence.

323

324

```java { .api }

325

/**

326

* Serializer for GSResumeRecoverable objects

327

* Package-private - used internally for state persistence

328

*/

329

class GSResumeRecoverableSerializer implements SimpleVersionedSerializer<GSResumeRecoverable> {

330

/** Singleton instance */

331

public static final GSResumeRecoverableSerializer INSTANCE;

332

333

/**

334

* Get serializer version

335

* @return Current serializer version

336

*/

337

public int getVersion();

338

339

/**

340

* Serialize resume recoverable to byte array

341

* @param obj The GSResumeRecoverable to serialize

342

* @return Serialized byte array

343

* @throws IOException if serialization fails

344

*/

345

public byte[] serialize(GSResumeRecoverable obj) throws IOException;

346

347

/**

348

* Deserialize resume recoverable from byte array

349

* @param version Serializer version used for serialization

350

* @param serialized The serialized byte array

351

* @return Deserialized GSResumeRecoverable

352

* @throws IOException if deserialization fails

353

*/

354

public GSResumeRecoverable deserialize(int version, byte[] serialized) throws IOException;

355

}

356

```

357

358

## Write Process Flow

359

360

### Normal Write Flow

361

362

1. **Open Stream**: `writer.open(path)` creates `GSRecoverableFsDataOutputStream`

363

2. **Write Data**: Multiple calls to `write(bytes)` buffer data in temporary objects

364

3. **Periodic Persistence**: `persist()` creates `GSResumeRecoverable` state for checkpointing

365

4. **Close for Commit**: `closeForCommit()` returns `GSRecoverableWriterCommitter`

366

5. **Commit**: `commit()` composes temporary objects into final blob atomically

367

368

### Recovery Flow

369

370

1. **Resume from Checkpoint**: `writer.recover(resumeRecoverable)` recreates stream

371

2. **Continue Writing**: Additional `write()` calls append to existing temporary objects

372

3. **Close and Commit**: Same as normal flow

373

4. **Commit Recovery**: `writer.recoverForCommit(commitRecoverable)` handles commit failures

374

375

### Temporary Object Management

376

377

- **Naming**: Temporary objects use `.inprogress/<bucket>/<object>/<uuid>` pattern

378

- **Composition**: Final commit uses GCS compose operation to merge up to 32 objects

379

- **Cleanup**: Temporary objects are cleaned up after successful commit

380

- **Entropy Injection**: Optional entropy prefix reduces hotspotting in high-throughput scenarios

381

382

## Error Handling and Recovery

383

384

### Failure Scenarios

385

386

- **Writer Failure**: Resume from `GSResumeRecoverable` state

387

- **Commit Failure**: Retry commit using `GSCommitRecoverable` state

388

- **Network Issues**: Configurable retry policies handle transient failures

389

- **Storage Errors**: Proper exception propagation with context information

390

391

### Safety Guarantees

392

393

- **Exactly-Once**: Each successful commit produces exactly one final file

394

- **No Data Loss**: All written data is recoverable until commit completion

395

- **Atomic Commits**: Final file appears atomically or not at all

396

- **Idempotent Recovery**: Multiple recovery attempts produce same result

397

398

### Performance Considerations

399

400

- **Chunk Size**: Configure `gs.writer.chunk.size` to optimize upload performance

401

- **Temporary Bucket**: Use separate bucket for temporary objects to avoid hotspots

402

- **Entropy Injection**: Enable `gs.filesink.entropy.enabled` for high-throughput scenarios

403

- **Composition Limits**: Automatic handling of GCS 32-object composition limit