or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writers.mdfile-io-operations.mdindex.mdschema-registry-integration.mdserialization-deserialization.mdtable-api-integration.mdtype-system-integration.md

bulk-writers.mddocs/

0

# Bulk Writers

1

2

Factory and writer classes for efficient bulk writing of Avro files with support for various record types, compression options, and streaming file operations. Designed for high-throughput scenarios with automatic file rolling and management.

3

4

## AvroWriters

5

6

Utility class providing static factory methods for creating Avro writer factories for different record types.

7

8

```java { .api }

9

public class AvroWriters {

10

// For specific records (generated from Avro schema)

11

public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type);

12

13

// For generic records with explicit schema

14

public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema);

15

16

// For POJO records using reflection

17

public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type);

18

}

19

```

20

21

### Usage Examples

22

23

**Writing Specific Records:**

24

25

```java

26

import org.apache.flink.formats.avro.AvroWriters;

27

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

28

29

// Create writer factory for specific record type

30

AvroWriterFactory<User> writerFactory = AvroWriters.forSpecificRecord(User.class);

31

32

// Create streaming file sink

33

StreamingFileSink<User> sink = StreamingFileSink

34

.forBulkFormat(new Path("output/path"), writerFactory)

35

.withRollingPolicy(DefaultRollingPolicy.builder()

36

.withRolloverInterval(Duration.ofMinutes(15))

37

.withInactivityInterval(Duration.ofMinutes(5))

38

.withMaxPartSize(MemorySize.ofMebiBytes(128))

39

.build())

40

.build();

41

42

// Use in streaming job

43

DataStream<User> userStream = ...;

44

userStream.addSink(sink);

45

```

46

47

**Writing Generic Records:**

48

49

```java

50

import org.apache.avro.Schema;

51

import org.apache.avro.generic.GenericRecord;

52

53

// Define schema

54

Schema schema = new Schema.Parser().parse(schemaString);

55

56

// Create writer factory for generic records

57

AvroWriterFactory<GenericRecord> genericWriterFactory = AvroWriters.forGenericRecord(schema);

58

59

// Create file sink

60

StreamingFileSink<GenericRecord> genericSink = StreamingFileSink

61

.forBulkFormat(new Path("output/generic"), genericWriterFactory)

62

.build();

63

64

// Use with generic records

65

DataStream<GenericRecord> recordStream = ...;

66

recordStream.addSink(genericSink);

67

```

68

69

**Writing Reflection-based Records:**

70

71

```java

72

// For POJOs without generated Avro classes

73

AvroWriterFactory<Person> reflectWriterFactory = AvroWriters.forReflectRecord(Person.class);

74

75

// Create sink for POJO records

76

StreamingFileSink<Person> pojoSink = StreamingFileSink

77

.forBulkFormat(new Path("output/pojos"), reflectWriterFactory)

78

.build();

79

```

80

81

## AvroWriterFactory

82

83

Factory class that implements Flink's BulkWriter.Factory interface for creating Avro bulk writers.

84

85

```java { .api }

86

public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {

87

public AvroWriterFactory(AvroBuilder<T> builder);

88

89

// Factory interface methods

90

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

91

}

92

```

93

94

### Advanced Configuration

95

96

**Custom Builder Usage:**

97

98

```java

99

import org.apache.flink.formats.avro.AvroBuilder;

100

101

// Create custom builder with specific configuration

102

AvroBuilder<User> customBuilder = (outputStream) -> {

103

Schema schema = User.getClassSchema();

104

DatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);

105

DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);

106

107

// Configure compression

108

dataFileWriter.setCodec(CodecFactory.snappyCodec());

109

110

// Set metadata

111

dataFileWriter.setMeta("created_by", "flink-job");

112

dataFileWriter.setMeta("version", "1.0");

113

114

return dataFileWriter.create(schema, outputStream);

115

};

116

117

// Create factory with custom builder

118

AvroWriterFactory<User> customFactory = new AvroWriterFactory<>(customBuilder);

119

```

120

121

## AvroBuilder

122

123

Functional interface for creating DataFileWriter instances with custom configuration.

124

125

```java { .api }

126

public interface AvroBuilder<T> extends Serializable {

127

DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;

128

}

129

```

130

131

### Custom Builder Implementation

132

133

```java

134

// Lambda implementation for specific records

135

AvroBuilder<User> specificBuilder = (out) -> {

136

String schemaString = SpecificData.get().getSchema(User.class).toString();

137

Schema schema = new Schema.Parser().parse(schemaString);

138

SpecificDatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);

139

DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);

140

return dataFileWriter.create(schema, out);

141

};

142

143

// Anonymous class implementation for generic records

144

AvroBuilder<GenericRecord> genericBuilder = new AvroBuilder<GenericRecord>() {

145

@Override

146

public DataFileWriter<GenericRecord> createWriter(OutputStream outputStream) throws IOException {

147

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

148

DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);

149

return dataFileWriter.create(schema, outputStream);

150

}

151

};

152

```

153

154

## AvroBulkWriter

155

156

The actual bulk writer implementation that handles writing records to Avro files.

157

158

```java { .api }

159

public class AvroBulkWriter<T> implements BulkWriter<T> {

160

public void addElement(T element) throws IOException;

161

public void flush() throws IOException;

162

public void finish() throws IOException;

163

}

164

```

165

166

### Writer Lifecycle

167

168

1. **Creation**: Writer is created by factory when new file is started

169

2. **Writing**: Elements are added via `addElement()`

170

3. **Flushing**: Periodic flush calls ensure data is written to disk

171

4. **Finishing**: Writer is finished when file rolling occurs

172

173

## File Rolling and Management

174

175

### Rolling Policies

176

177

**Size-based Rolling:**

178

```java

179

StreamingFileSink<User> sink = StreamingFileSink

180

.forBulkFormat(outputPath, writerFactory)

181

.withRollingPolicy(DefaultRollingPolicy.builder()

182

.withMaxPartSize(MemorySize.ofMebiBytes(256)) // Roll at 256MB

183

.build())

184

.build();

185

```

186

187

**Time-based Rolling:**

188

```java

189

StreamingFileSink<User> sink = StreamingFileSink

190

.forBulkFormat(outputPath, writerFactory)

191

.withRollingPolicy(DefaultRollingPolicy.builder()

192

.withRolloverInterval(Duration.ofMinutes(15)) // Roll every 15 minutes

193

.withInactivityInterval(Duration.ofMinutes(5)) // Roll after 5 minutes of inactivity

194

.build())

195

.build();

196

```

197

198

**Combined Rolling Policy:**

199

```java

200

RollingPolicy<User, String> policy = DefaultRollingPolicy.builder()

201

.withRolloverInterval(Duration.ofHours(1)) // Max 1 hour per file

202

.withInactivityInterval(Duration.ofMinutes(15)) // Roll after 15 min inactivity

203

.withMaxPartSize(MemorySize.of(512, MemoryUnit.MEGA_BYTES)) // Max 512MB per file

204

.build();

205

```

206

207

### Bucket Assignment

208

209

**Default Bucketing:**

210

```java

211

// Files organized by processing time

212

StreamingFileSink<User> sink = StreamingFileSink

213

.forBulkFormat(outputPath, writerFactory)

214

.build();

215

// Results in: output/2023-12-01--18/part-0-0.avro

216

```

217

218

**Custom Bucketing:**

219

```java

220

// Custom bucket assignment based on record fields

221

BucketAssigner<User, String> bucketAssigner = new BucketAssigner<User, String>() {

222

@Override

223

public String getBucketId(User user, Context context) {

224

return "department=" + user.getDepartment() + "/year=" + user.getCreatedYear();

225

}

226

227

@Override

228

public SimpleVersionedSerializer<String> getSerializer() {

229

return SimpleVersionedStringSerializer.INSTANCE;

230

}

231

};

232

233

StreamingFileSink<User> partitionedSink = StreamingFileSink

234

.forBulkFormat(outputPath, writerFactory)

235

.withBucketAssigner(bucketAssigner)

236

.build();

237

// Results in: output/department=engineering/year=2023/part-0-0.avro

238

```

239

240

## Performance Optimization

241

242

### Writer Configuration

243

244

**Buffer Size Tuning:**

245

```java

246

// Configure larger buffers for better I/O performance

247

AvroBuilder<User> optimizedBuilder = (out) -> {

248

DataFileWriter<User> writer = createBasicWriter(out);

249

writer.setSyncInterval(16 * 1024); // 16KB sync intervals

250

return writer;

251

};

252

```

253

254

**Compression Selection:**

255

```java

256

// Choose compression based on use case

257

AvroBuilder<User> compressedBuilder = (out) -> {

258

DataFileWriter<User> writer = createBasicWriter(out);

259

writer.setCodec(CodecFactory.snappyCodec()); // Fast compression

260

// writer.setCodec(CodecFactory.deflateCodec(6)); // Better compression

261

return writer;

262

};

263

```

264

265

### Sink Configuration

266

267

**Parallelism Tuning:**

268

```java

269

// Adjust parallelism based on throughput requirements

270

userStream.addSink(sink).setParallelism(4);

271

```

272

273

**Checkpointing Configuration:**

274

```java

275

// Configure checkpointing for exactly-once guarantees

276

env.enableCheckpointing(30000); // 30 second intervals

277

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

278

```

279

280

## Error Handling and Monitoring

281

282

### Error Recovery

283

284

**Writer Failure Handling:**

285

```java

286

// Writers automatically handle I/O failures through Flink's fault tolerance

287

// Failed writers are recreated on recovery

288

StreamingFileSink<User> resilientSink = StreamingFileSink

289

.forBulkFormat(outputPath, writerFactory)

290

.withRollingPolicy(policy)

291

.build();

292

```

293

294

**Schema Evolution Support:**

295

```java

296

// Handle schema evolution in writers

297

AvroBuilder<GenericRecord> evolvingBuilder = (out) -> {

298

// Use writer schema that's compatible with multiple reader schemas

299

Schema writerSchema = SchemaUtils.getLatestSchema();

300

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(writerSchema);

301

DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter);

302

return writer.create(writerSchema, out);

303

};

304

```

305

306

### Monitoring

307

308

**File Output Monitoring:**

309

```java

310

// Monitor file creation and sizes

311

StreamingFileSink<User> monitoredSink = StreamingFileSink

312

.forBulkFormat(outputPath, writerFactory)

313

.withBucketCheckInterval(60000) // Check every minute

314

.build();

315

```

316

317

**Metrics Integration:**

318

```java

319

// Custom metrics for monitoring throughput

320

public class MetricsAvroBuilder<T> implements AvroBuilder<T> {

321

private final Counter recordsWritten;

322

323

@Override

324

public DataFileWriter<T> createWriter(OutputStream out) throws IOException {

325

// Wrap writer with metrics collection

326

return new MetricsDataFileWriter<>(createBaseWriter(out), recordsWritten);

327

}

328

}

329

```