or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-factory.mdfilesystem-operations.mdhadoop-utilities.mdindex.mdio-streams.mdrecoverable-writers.md

recoverable-writers.mddocs/

0

# Fault-Tolerant Writing

1

2

The Hadoop FileSystem package provides recoverable writers that enable exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms. These writers are essential for fault-tolerant streaming applications that require durability and consistency guarantees.

3

4

## Capabilities

5

6

### HadoopRecoverableWriter

7

8

Main recoverable writer implementation that provides fault-tolerant writing capabilities for Hadoop file systems.

9

10

```java { .api }

11

/**

12

* An implementation of the RecoverableWriter for Hadoop's file system abstraction.

13

* Supports fault-tolerant writing with exactly-once processing guarantees.

14

*/

15

@Internal

16

public class HadoopRecoverableWriter implements RecoverableWriter {

17

/**

18

* Creates a recoverable writer using the specified Hadoop FileSystem.

19

* @param fs Hadoop file system to write to

20

* @throws IOException if writer creation fails due to unsupported file system

21

*/

22

public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) throws IOException;

23

24

/**

25

* Creates a recoverable writer with no-local-write option.

26

* @param fs Hadoop file system to write to

27

* @param noLocalWrite if true, disables local write optimizations

28

* @throws IOException if writer creation fails

29

*/

30

public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs, boolean noLocalWrite) throws IOException;

31

}

32

```

33

34

### Opening and Creating Streams

35

36

Methods for creating new recoverable output streams.

37

38

```java { .api }

39

/**

40

* Opens a new recoverable output stream for the given file path.

41

* @param filePath target file path for writing

42

* @return RecoverableFsDataOutputStream for writing data

43

* @throws IOException if stream creation fails

44

*/

45

public RecoverableFsDataOutputStream open(Path filePath) throws IOException;

46

47

/**

48

* Indicates whether this writer supports resuming from a recoverable state.

49

* @return true (Hadoop recoverable writer supports resume)

50

*/

51

public boolean supportsResume();

52

```

53

54

**Usage Examples:**

55

56

```java

57

import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;

58

import org.apache.flink.core.fs.RecoverableFsDataOutputStream;

59

60

// Create recoverable writer from Hadoop FileSystem

61

org.apache.hadoop.fs.FileSystem hadoopFs = // ... obtain Hadoop FS

62

HadoopRecoverableWriter writer = new HadoopRecoverableWriter(hadoopFs);

63

64

// Open recoverable stream for writing

65

Path outputPath = new Path("hdfs://namenode:9000/output/part-1.txt");

66

RecoverableFsDataOutputStream stream = writer.open(outputPath);

67

68

// Write data

69

stream.write("First batch of data\n".getBytes());

70

stream.write("Second batch of data\n".getBytes());

71

72

// Persist current state for recovery

73

HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();

74

75

// Continue writing

76

stream.write("Third batch of data\n".getBytes());

77

78

// Close and commit

79

stream.closeForCommit().commit();

80

```

81

82

### Recovery Operations

83

84

Methods for recovering from persistent state after failures.

85

86

```java { .api }

87

/**

88

* Recovers a stream from a resumable recoverable state.

89

* @param recoverable the resumable state to recover from

90

* @return RecoverableFsDataOutputStream positioned at the recovered state

91

* @throws IOException if recovery fails

92

*/

93

public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;

94

95

/**

96

* Recovers a committer from a commit recoverable state.

97

* @param recoverable the commit state to recover from

98

* @return Committer that can complete the commit operation

99

* @throws IOException if recovery fails

100

*/

101

public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;

102

103

/**

104

* Indicates whether this writer requires cleanup of recoverable state.

105

* @return false (Hadoop writer doesn't require cleanup)

106

*/

107

public boolean requiresCleanupOfRecoverableState();

108

109

/**

110

* Cleans up recoverable state (no-op for Hadoop writer).

111

* @param resumable the resumable state to clean up

112

* @return false (no cleanup performed)

113

* @throws IOException if cleanup fails

114

*/

115

public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException;

116

```

117

118

**Usage Examples:**

119

120

```java

121

// Recover from a previous session

122

HadoopFsRecoverable savedState = // ... load from checkpoint

123

RecoverableFsDataOutputStream recoveredStream = writer.recover(savedState);

124

125

// Continue writing from where we left off

126

recoveredStream.write("Continuing after recovery\n".getBytes());

127

128

// Or recover for commit only

129

Committer committer = writer.recoverForCommit(commitRecoverable);

130

committer.commit(); // Complete the commit operation

131

```

132

133

### Serialization Support

134

135

Methods for serializing and deserializing recoverable state.

136

137

```java { .api }

138

/**

139

* Gets the serializer for commit recoverable state.

140

* @return SimpleVersionedSerializer for CommitRecoverable objects

141

*/

142

public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();

143

144

/**

145

* Gets the serializer for resume recoverable state.

146

* @return SimpleVersionedSerializer for ResumeRecoverable objects

147

*/

148

public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();

149

```

150

151

### HadoopFsRecoverable

152

153

State object that contains all information needed to recover or commit a write operation.

154

155

```java { .api }

156

/**

157

* Implementation of resume and commit descriptor objects for Hadoop's file system abstraction.

158

* Contains the state needed to recover a write operation.

159

*/

160

@Internal

161

public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {

162

/**

163

* Creates a recoverable state descriptor.

164

* @param targetFile final target file path

165

* @param tempFile temporary file being written to

166

* @param offset current write position

167

*/

168

public HadoopFsRecoverable(Path targetFile, Path tempFile, long offset);

169

170

/**

171

* Gets the target file path.

172

* @return target file path where data will be committed

173

*/

174

public Path targetFile();

175

176

/**

177

* Gets the temporary file path.

178

* @return temporary file path where data is being written

179

*/

180

public Path tempFile();

181

182

/**

183

* Gets the current write offset.

184

* @return byte offset in the file

185

*/

186

public long offset();

187

188

/**

189

* String representation of the recoverable state.

190

* @return string describing the recoverable state

191

*/

192

public String toString();

193

}

194

```

195

196

**Usage Examples:**

197

198

```java

199

// Access recoverable state information

200

HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();

201

202

System.out.println("Target file: " + recoverable.targetFile());

203

System.out.println("Temp file: " + recoverable.tempFile());

204

System.out.println("Current offset: " + recoverable.offset());

205

206

// Serialize for checkpointing

207

SimpleVersionedSerializer<ResumeRecoverable> serializer = writer.getResumeRecoverableSerializer();

208

byte[] serializedState = serializer.serialize(recoverable);

209

210

// Later: deserialize and recover

211

HadoopFsRecoverable restored = (HadoopFsRecoverable) serializer.deserialize(

212

serializer.getVersion(), serializedState);

213

RecoverableFsDataOutputStream recoveredStream = writer.recover(restored);

214

```

215

216

### Recoverable Output Stream Operations

217

218

The recoverable output stream provides standard writing operations plus persistence capabilities.

219

220

```java { .api }

221

/**

222

* Base class for HDFS and ABFS recoverable streams.

223

*/

224

@Internal

225

public abstract class BaseHadoopFsRecoverableFsDataOutputStream

226

extends CommitterFromPersistRecoverableFsDataOutputStream<HadoopFsRecoverable> {

227

228

/**

229

* Gets the current position in the stream.

230

* @return current byte position

231

* @throws IOException if operation fails

232

*/

233

public long getPos() throws IOException;

234

235

/**

236

* Writes a single byte.

237

* @param b byte to write

238

* @throws IOException if write fails

239

*/

240

public void write(int b) throws IOException;

241

242

/**

243

* Writes data from byte array.

244

* @param b byte array containing data

245

* @param off starting offset in array

246

* @param len number of bytes to write

247

* @throws IOException if write fails

248

*/

249

public void write(byte[] b, int off, int len) throws IOException;

250

251

/**

252

* Flushes buffered data.

253

* @throws IOException if flush fails

254

*/

255

public void flush() throws IOException;

256

257

/**

258

* Synchronizes data to storage.

259

* @throws IOException if sync fails

260

*/

261

public void sync() throws IOException;

262

263

/**

264

* Persists the current state for recovery.

265

* @return HadoopFsRecoverable representing current state

266

* @throws IOException if persist operation fails

267

*/

268

public HadoopFsRecoverable persist() throws IOException;

269

270

/**

271

* Closes the stream and returns a committer.

272

* @return Committer that can complete the write operation

273

* @throws IOException if close fails

274

*/

275

public Committer closeForCommit() throws IOException;

276

277

/**

278

* Closes the stream without committing.

279

* @throws IOException if close fails

280

*/

281

public void close() throws IOException;

282

}

283

```

284

285

### Complete Fault-Tolerant Writing Example

286

287

```java

288

import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;

289

import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;

290

import org.apache.flink.core.fs.RecoverableFsDataOutputStream;

291

import org.apache.flink.core.io.SimpleVersionedSerializer;

292

293

public class FaultTolerantWriter {

294

private HadoopRecoverableWriter writer;

295

private SimpleVersionedSerializer<ResumeRecoverable> serializer;

296

297

public void initializeWriter(org.apache.hadoop.fs.FileSystem hadoopFs) throws IOException {

298

writer = new HadoopRecoverableWriter(hadoopFs);

299

serializer = writer.getResumeRecoverableSerializer();

300

}

301

302

public byte[] writeWithCheckpoint(Path outputPath, List<String> data) throws IOException {

303

RecoverableFsDataOutputStream stream = writer.open(outputPath);

304

HadoopFsRecoverable checkpoint = null;

305

306

try {

307

// Write data in batches with periodic checkpoints

308

for (int i = 0; i < data.size(); i++) {

309

stream.write(data.get(i).getBytes());

310

stream.write("\n".getBytes());

311

312

// Checkpoint every 100 records

313

if (i % 100 == 0) {

314

checkpoint = (HadoopFsRecoverable) stream.persist();

315

System.out.println("Checkpoint at record " + i +

316

", offset: " + checkpoint.offset());

317

}

318

}

319

320

// Commit the file

321

stream.closeForCommit().commit();

322

323

return checkpoint != null ? serializer.serialize(checkpoint) : null;

324

325

} catch (IOException e) {

326

// On failure, return checkpoint data for recovery

327

if (checkpoint != null) {

328

return serializer.serialize(checkpoint);

329

}

330

throw e;

331

}

332

}

333

334

public void recoverAndContinue(byte[] checkpointData, List<String> remainingData) throws IOException {

335

// Deserialize checkpoint

336

HadoopFsRecoverable recoverable = (HadoopFsRecoverable) serializer.deserialize(

337

serializer.getVersion(), checkpointData);

338

339

// Recover stream

340

RecoverableFsDataOutputStream stream = writer.recover(recoverable);

341

342

System.out.println("Recovered at offset: " + recoverable.offset());

343

System.out.println("Temp file: " + recoverable.tempFile());

344

345

// Continue writing

346

for (String line : remainingData) {

347

stream.write(line.getBytes());

348

stream.write("\n".getBytes());

349

}

350

351

// Commit

352

stream.closeForCommit().commit();

353

354

System.out.println("Recovery completed, data committed to: " + recoverable.targetFile());

355

}

356

}

357

```

358

359

### HadoopRecoverableSerializer

360

361

Serializer for persisting and restoring recoverable state.

362

363

```java { .api }

364

/**

365

* Simple serializer for HadoopFsRecoverable objects.

366

*/

367

@Internal

368

public class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {

369

/**

370

* Singleton instance of the serializer.

371

*/

372

public static final HadoopRecoverableSerializer INSTANCE;

373

374

/**

375

* Gets the version of the serialization format.

376

* @return version number (1)

377

*/

378

public int getVersion();

379

380

/**

381

* Serializes a HadoopFsRecoverable object.

382

* @param obj the recoverable object to serialize

383

* @return byte array containing serialized data

384

* @throws IOException if serialization fails

385

*/

386

public byte[] serialize(HadoopFsRecoverable obj) throws IOException;

387

388

/**

389

* Deserializes a HadoopFsRecoverable object.

390

* @param version version of the serialized data

391

* @param serialized byte array containing serialized data

392

* @return deserialized HadoopFsRecoverable object

393

* @throws IOException if deserialization fails

394

*/

395

public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException;

396

}

397

```

398

399

## Supported File Systems

400

401

Recoverable writers support a subset of Hadoop file systems that provide the necessary features for fault-tolerance:

402

403

- **HDFS**: Full support with atomic rename and append operations

404

- **S3**: Limited support depending on S3 implementation (S3A with certain configurations)

405

- **Azure**: Support for Azure Data Lake Storage (ABFS)

406

- **Local FS**: Full support for testing and development

407

408

## Error Handling

409

410

```java

411

try {

412

RecoverableFsDataOutputStream stream = writer.open(outputPath);

413

// ... write operations

414

} catch (UnsupportedOperationException e) {

415

System.err.println("File system doesn't support recoverable writes: " + e.getMessage());

416

} catch (IOException e) {

417

System.err.println("I/O error during recoverable write: " + e.getMessage());

418

}

419

```

420

421

## Types

422

423

```java { .api }

424

// Core recoverable writer interfaces

425

public interface RecoverableWriter {

426

RecoverableFsDataOutputStream open(Path filePath) throws IOException;

427

RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;

428

Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;

429

boolean supportsResume();

430

}

431

432

// Recoverable state interfaces

433

public interface ResumeRecoverable extends Serializable {}

434

public interface CommitRecoverable extends Serializable {}

435

436

// Committer interface

437

public interface Committer {

438

void commit() throws IOException;

439

}

440

441

// Serialization interface

442

public interface SimpleVersionedSerializer<T> {

443

int getVersion();

444

byte[] serialize(T obj) throws IOException;

445

T deserialize(int version, byte[] serialized) throws IOException;

446

}

447

```