or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucket-assignment.mdconfiguration.mdfile-writers.mdindex.mdrolling-policies.md

file-writers.mddocs/

0

# File Writers

1

2

File writers handle the actual writing of data to files, supporting both row-wise and bulk writing patterns. They provide abstractions for file recovery, commit operations, and different serialization strategies.

3

4

## Capabilities

5

6

### BucketWriter Interface

7

8

Factory interface for creating different types of file writers.

9

10

```java { .api }

11

/**

12

* Interface for factories that create different InProgressFileWriter writers

13

* @param <IN> The type of input elements

14

* @param <BucketID> The type of bucket identifier

15

*/

16

public interface BucketWriter<IN, BucketID> {

17

/**

18

* Creates a new InProgressFileWriter

19

* @param bucketID the id of the bucket this writer is writing to

20

* @param path the path this writer will write to

21

* @param creationTime the creation time of the file

22

* @return the new InProgressFileWriter

23

* @throws IOException if creating a writer fails

24

*/

25

InProgressFileWriter<IN, BucketID> openNewInProgressFile(

26

BucketID bucketID, Path path, long creationTime) throws IOException;

27

28

/**

29

* Creates a new CompactingFileWriter of the requesting type

30

* @param type the type of writer (RECORD_WISE or OUTPUT_STREAM)

31

* @param bucketID the id of the bucket this writer is writing to

32

* @param path the path this writer will write to

33

* @param creationTime the creation time of the file

34

* @return the new CompactingFileWriter

35

* @throws IOException if creating a writer fails

36

* @throws UnsupportedOperationException if the type is not supported

37

*/

38

default CompactingFileWriter openNewCompactingFile(

39

CompactingFileWriter.Type type,

40

BucketID bucketID,

41

Path path,

42

long creationTime) throws IOException;

43

44

/**

45

* Resumes an InProgressFileWriter from a recoverable state

46

* @param bucketID the id of the bucket this writer is writing to

47

* @param inProgressFileSnapshot the state of the part file

48

* @param creationTime the creation time of the file

49

* @return the resumed InProgressFileWriter

50

* @throws IOException if resuming a writer fails

51

*/

52

InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(

53

BucketID bucketID,

54

InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,

55

long creationTime) throws IOException;

56

57

/**

58

* @return the property of the BucketWriter

59

*/

60

WriterProperties getProperties();

61

62

/**

63

* Recovers a pending file for finalizing and committing

64

* @param pendingFileRecoverable The handle with the recovery information

65

* @return A pending file

66

* @throws IOException if recovering a pending file fails

67

*/

68

PendingFile recoverPendingFile(

69

InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;

70

71

/**

72

* Cleans up resources for a recoverable state

73

* @param inProgressFileRecoverable the recoverable state to clean up

74

* @return true if the resources were successfully freed, false otherwise

75

* @throws IOException if an I/O error occurs

76

*/

77

boolean cleanupInProgressFileRecoverable(

78

InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;

79

}

80

```

81

82

### BucketWriter.PendingFile Interface

83

84

Represents a file that is ready to be committed.

85

86

```java { .api }

87

/**

88

* Represents a file that can not write any data to but can be committed

89

*/

90

public interface PendingFile {

91

/**

92

* Commits the pending file, making it visible

93

* The file will contain the exact data as when the pending file was created

94

* @throws IOException if committing fails

95

*/

96

void commit() throws IOException;

97

98

/**

99

* Commits the pending file, making it visible

100

* This method tolerates situations where the file was already committed

101

* Important for idempotent commit retries after recovery

102

* @throws IOException if committing fails

103

*/

104

void commitAfterRecovery() throws IOException;

105

}

106

```

107

108

### InProgressFileWriter Interface

109

110

Interface for writing elements to part files with recovery support.

111

112

```java { .api }

113

/**

114

* The Bucket uses the InProgressFileWriter to write elements to a part file

115

* @param <IN> The type of input elements

116

* @param <BucketID> The type of bucket identifier

117

*/

118

public interface InProgressFileWriter<IN, BucketID>

119

extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {

120

121

/**

122

* Write an element to the part file

123

* @param element the element to be written

124

* @param currentTime the writing time

125

* @throws IOException if writing the element fails

126

*/

127

void write(IN element, long currentTime) throws IOException;

128

129

/**

130

* @return The state of the current part file for recovery

131

* @throws IOException if persisting the part file fails

132

*/

133

InProgressFileRecoverable persist() throws IOException;

134

135

/**

136

* @return The state of the pending part file for commit

137

* @throws IOException if an I/O error occurs

138

*/

139

PendingFileRecoverable closeForCommit() throws IOException;

140

141

/** Dispose the part file and clean up resources */

142

void dispose();

143

144

/**

145

* Default write method using current system time

146

* @param element the element to write

147

*/

148

default void write(IN element) throws IOException;

149

}

150

```

151

152

### InProgressFileWriter Recovery Interfaces

153

154

Support for fault-tolerant recovery of in-progress files.

155

156

```java { .api }

157

/**

158

* Handle that can be used to recover in-progress files

159

*/

160

public interface InProgressFileRecoverable extends PendingFileRecoverable {}

161

162

/**

163

* Handle that can be used to recover pending files

164

*/

165

public interface PendingFileRecoverable {

166

/**

167

* @return The target path of the pending file, null if unavailable

168

*/

169

Path getPath();

170

171

/**

172

* @return The size of the pending file, -1 if unavailable

173

*/

174

long getSize();

175

}

176

```

177

178

### CompactingFileWriter Interface

179

180

Base interface for compacting file writers.

181

182

```java { .api }

183

/**

184

* File sink compactors use CompactingFileWriter to write a compacting file

185

* Classes should implement RecordWiseCompactingFileWriter or OutputStreamBasedCompactingFileWriter

186

*/

187

public interface CompactingFileWriter {

188

/**

189

* Closes the writer and gets the PendingFileRecoverable of the written compacting file

190

* @return The state of the pending part file for commit

191

* @throws IOException if an I/O error occurs

192

*/

193

PendingFileRecoverable closeForCommit() throws IOException;

194

195

/** Enum defining the types of CompactingFileWriter */

196

enum Type {

197

RECORD_WISE,

198

OUTPUT_STREAM

199

}

200

}

201

```

202

203

### RecordWiseCompactingFileWriter Interface

204

205

Interface for record-wise compacting file writers.

206

207

```java { .api }

208

/**

209

* Compactors use RecordWiseCompactingFileWriter to write elements to a compacting file

210

* @param <IN> The type of input elements

211

*/

212

public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {

213

/**

214

* Write an element to the compacting file

215

* @param element the element to be written

216

* @throws IOException if writing the element fails

217

*/

218

void write(IN element) throws IOException;

219

}

220

```

221

222

### OutputStreamBasedCompactingFileWriter Interface

223

224

Interface for output stream based compacting file writers.

225

226

```java { .api }

227

/**

228

* Compactors use OutputStreamBasedCompactingFileWriter to directly write

229

* a compacting file as an OutputStream

230

*/

231

public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {

232

/**

233

* Gets the output stream underlying the writer

234

* The close method of the returned stream should never be called

235

* @return The output stream to write the compacting file

236

* @throws IOException if acquiring the stream fails

237

*/

238

OutputStream asOutputStream() throws IOException;

239

}

240

```

241

242

## Concrete Implementations

243

244

### RowWiseBucketWriter

245

246

Factory for creating row-wise part writers using encoders.

247

248

```java { .api }

249

/**

250

* Factory that creates RowWisePartWriter instances

251

* @param <IN> The type of input elements

252

* @param <BucketID> The type of bucket identifier

253

*/

254

public class RowWiseBucketWriter<IN, BucketID>

255

extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {

256

257

/**

258

* Creates a RowWiseBucketWriter

259

* @param recoverableWriter the recoverable writer for the file system

260

* @param encoder the encoder for serializing elements

261

*/

262

public RowWiseBucketWriter(RecoverableWriter recoverableWriter, Encoder<IN> encoder);

263

264

@Override

265

public InProgressFileWriter<IN, BucketID> resumeFrom(

266

BucketID bucketId,

267

RecoverableFsDataOutputStream stream,

268

Path path,

269

RecoverableWriter.ResumeRecoverable resumable,

270

long creationTime);

271

272

@Override

273

public InProgressFileWriter<IN, BucketID> openNew(

274

BucketID bucketId,

275

RecoverableFsDataOutputStream stream,

276

Path path,

277

long creationTime);

278

}

279

```

280

281

### BulkBucketWriter

282

283

Factory for creating bulk part writers using bulk writers.

284

285

```java { .api }

286

/**

287

* Factory that creates BulkPartWriter instances

288

* @param <IN> The type of input elements

289

* @param <BucketID> The type of bucket identifier

290

*/

291

public class BulkBucketWriter<IN, BucketID>

292

extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {

293

294

/**

295

* Creates a BulkBucketWriter

296

* @param recoverableWriter the recoverable writer for the file system

297

* @param writerFactory the factory for creating bulk writers

298

*/

299

public BulkBucketWriter(RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException;

300

301

@Override

302

public InProgressFileWriter<IN, BucketID> resumeFrom(

303

BucketID bucketId,

304

RecoverableFsDataOutputStream stream,

305

Path path,

306

RecoverableWriter.ResumeRecoverable resumable,

307

long creationTime) throws IOException;

308

309

@Override

310

public InProgressFileWriter<IN, BucketID> openNew(

311

BucketID bucketId,

312

RecoverableFsDataOutputStream stream,

313

Path path,

314

long creationTime) throws IOException;

315

}

316

```

317

318

## Usage Examples

319

320

### Row-wise Writing with Encoder

321

322

```java

323

import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;

324

import org.apache.flink.api.common.serialization.SimpleStringEncoder;

325

import org.apache.flink.core.fs.RecoverableWriter;

326

327

// Create encoder for string data

328

Encoder<String> encoder = new SimpleStringEncoder<>("UTF-8");

329

330

// Create row-wise bucket writer

331

BucketWriter<String, String> bucketWriter =

332

new RowWiseBucketWriter<>(recoverableWriter, encoder);

333

334

// Open new file writer

335

InProgressFileWriter<String, String> writer = bucketWriter.openNewInProgressFile(

336

"bucket-1",

337

new Path("/output/bucket-1/part-0"),

338

System.currentTimeMillis()

339

);

340

341

// Write elements

342

writer.write("Hello", System.currentTimeMillis());

343

writer.write("World", System.currentTimeMillis());

344

345

// Close for commit

346

PendingFileRecoverable pendingFile = writer.closeForCommit();

347

```

348

349

### Bulk Writing

350

351

```java

352

import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;

353

import org.apache.flink.api.common.serialization.BulkWriter;

354

355

// Create bulk writer factory (example for Parquet)

356

BulkWriter.Factory<MyRecord> bulkWriterFactory = // ... implementation specific

357

358

// Create bulk bucket writer

359

BucketWriter<MyRecord, String> bucketWriter =

360

new BulkBucketWriter<>(recoverableWriter, bulkWriterFactory);

361

362

// Usage similar to row-wise, but optimized for bulk operations

363

InProgressFileWriter<MyRecord, String> writer = bucketWriter.openNewInProgressFile(

364

"bucket-1",

365

new Path("/output/bucket-1/part-0"),

366

System.currentTimeMillis()

367

);

368

```

369

370

## Error Handling

371

372

- Writers should handle `IOException` during write operations

373

- Failed writes will typically cause job failures and require restart

374

- Dispose methods should not throw exceptions but clean up resources

375

- Recovery operations may fail if the underlying file system state is corrupted

376

- Commit operations should be idempotent for exactly-once processing guarantees