0
# Bulk Writers
1
2
Factory and writer classes for efficient bulk writing of Avro files with support for various record types, compression options, and streaming file operations. Designed for high-throughput scenarios with automatic file rolling and management.
3
4
## AvroWriters
5
6
Utility class providing static factory methods for creating Avro writer factories for different record types.
7
8
```java { .api }
9
public class AvroWriters {
10
// For specific records (generated from Avro schema)
11
public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type);
12
13
// For generic records with explicit schema
14
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema);
15
16
// For POJO records using reflection
17
public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type);
18
}
19
```
20
21
### Usage Examples
22
23
**Writing Specific Records:**
24
25
```java
26
import org.apache.flink.formats.avro.AvroWriters;
27
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
28
29
// Create writer factory for specific record type
30
AvroWriterFactory<User> writerFactory = AvroWriters.forSpecificRecord(User.class);
31
32
// Create streaming file sink
33
StreamingFileSink<User> sink = StreamingFileSink
34
.forBulkFormat(new Path("output/path"), writerFactory)
35
.withRollingPolicy(DefaultRollingPolicy.builder()
36
.withRolloverInterval(Duration.ofMinutes(15))
37
.withInactivityInterval(Duration.ofMinutes(5))
38
.withMaxPartSize(MemorySize.ofMebiBytes(128))
39
.build())
40
.build();
41
42
// Use in streaming job
43
DataStream<User> userStream = ...;
44
userStream.addSink(sink);
45
```
46
47
**Writing Generic Records:**
48
49
```java
50
import org.apache.avro.Schema;
51
import org.apache.avro.generic.GenericRecord;
52
53
// Define schema
54
Schema schema = new Schema.Parser().parse(schemaString);
55
56
// Create writer factory for generic records
57
AvroWriterFactory<GenericRecord> genericWriterFactory = AvroWriters.forGenericRecord(schema);
58
59
// Create file sink
60
StreamingFileSink<GenericRecord> genericSink = StreamingFileSink
61
.forBulkFormat(new Path("output/generic"), genericWriterFactory)
62
.build();
63
64
// Use with generic records
65
DataStream<GenericRecord> recordStream = ...;
66
recordStream.addSink(genericSink);
67
```
68
69
**Writing Reflection-based Records:**
70
71
```java
72
// For POJOs without generated Avro classes
73
AvroWriterFactory<Person> reflectWriterFactory = AvroWriters.forReflectRecord(Person.class);
74
75
// Create sink for POJO records
76
StreamingFileSink<Person> pojoSink = StreamingFileSink
77
.forBulkFormat(new Path("output/pojos"), reflectWriterFactory)
78
.build();
79
```
80
81
## AvroWriterFactory
82
83
Factory class that implements Flink's BulkWriter.Factory interface for creating Avro bulk writers.
84
85
```java { .api }
86
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
87
public AvroWriterFactory(AvroBuilder<T> builder);
88
89
// Factory interface methods
90
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
91
}
92
```
93
94
### Advanced Configuration
95
96
**Custom Builder Usage:**
97
98
```java
99
import org.apache.flink.formats.avro.AvroBuilder;
100
101
// Create custom builder with specific configuration
102
AvroBuilder<User> customBuilder = (outputStream) -> {
103
Schema schema = User.getClassSchema();
104
DatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);
105
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
106
107
// Configure compression
108
dataFileWriter.setCodec(CodecFactory.snappyCodec());
109
110
// Set metadata
111
dataFileWriter.setMeta("created_by", "flink-job");
112
dataFileWriter.setMeta("version", "1.0");
113
114
return dataFileWriter.create(schema, outputStream);
115
};
116
117
// Create factory with custom builder
118
AvroWriterFactory<User> customFactory = new AvroWriterFactory<>(customBuilder);
119
```
120
121
## AvroBuilder
122
123
Functional interface for creating DataFileWriter instances with custom configuration.
124
125
```java { .api }
126
public interface AvroBuilder<T> extends Serializable {
127
DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
128
}
129
```
130
131
### Custom Builder Implementation
132
133
```java
134
// Lambda implementation for specific records
135
AvroBuilder<User> specificBuilder = (out) -> {
136
String schemaString = SpecificData.get().getSchema(User.class).toString();
137
Schema schema = new Schema.Parser().parse(schemaString);
138
SpecificDatumWriter<User> datumWriter = new SpecificDatumWriter<>(schema);
139
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
140
return dataFileWriter.create(schema, out);
141
};
142
143
// Anonymous class implementation for generic records
144
AvroBuilder<GenericRecord> genericBuilder = new AvroBuilder<GenericRecord>() {
145
@Override
146
public DataFileWriter<GenericRecord> createWriter(OutputStream outputStream) throws IOException {
147
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
148
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
149
return dataFileWriter.create(schema, outputStream);
150
}
151
};
152
```
153
154
## AvroBulkWriter
155
156
The actual bulk writer implementation that handles writing records to Avro files.
157
158
```java { .api }
159
public class AvroBulkWriter<T> implements BulkWriter<T> {
160
public void addElement(T element) throws IOException;
161
public void flush() throws IOException;
162
public void finish() throws IOException;
163
}
164
```
165
166
### Writer Lifecycle
167
168
1. **Creation**: Writer is created by factory when new file is started
169
2. **Writing**: Elements are added via `addElement()`
170
3. **Flushing**: Periodic flush calls ensure data is written to disk
171
4. **Finishing**: Writer is finished when file rolling occurs
172
173
## File Rolling and Management
174
175
### Rolling Policies
176
177
**Size-based Rolling:**
178
```java
179
StreamingFileSink<User> sink = StreamingFileSink
180
.forBulkFormat(outputPath, writerFactory)
181
.withRollingPolicy(DefaultRollingPolicy.builder()
182
.withMaxPartSize(MemorySize.ofMebiBytes(256)) // Roll at 256MB
183
.build())
184
.build();
185
```
186
187
**Time-based Rolling:**
188
```java
189
StreamingFileSink<User> sink = StreamingFileSink
190
.forBulkFormat(outputPath, writerFactory)
191
.withRollingPolicy(DefaultRollingPolicy.builder()
192
.withRolloverInterval(Duration.ofMinutes(15)) // Roll every 15 minutes
193
.withInactivityInterval(Duration.ofMinutes(5)) // Roll after 5 minutes of inactivity
194
.build())
195
.build();
196
```
197
198
**Combined Rolling Policy:**
199
```java
200
RollingPolicy<User, String> policy = DefaultRollingPolicy.builder()
201
.withRolloverInterval(Duration.ofHours(1)) // Max 1 hour per file
202
.withInactivityInterval(Duration.ofMinutes(15)) // Roll after 15 min inactivity
203
.withMaxPartSize(MemorySize.of(512, MemoryUnit.MEGA_BYTES)) // Max 512MB per file
204
.build();
205
```
206
207
### Bucket Assignment
208
209
**Default Bucketing:**
210
```java
211
// Files organized by processing time
212
StreamingFileSink<User> sink = StreamingFileSink
213
.forBulkFormat(outputPath, writerFactory)
214
.build();
215
// Results in: output/2023-12-01--18/part-0-0.avro
216
```
217
218
**Custom Bucketing:**
219
```java
220
// Custom bucket assignment based on record fields
221
BucketAssigner<User, String> bucketAssigner = new BucketAssigner<User, String>() {
222
@Override
223
public String getBucketId(User user, Context context) {
224
return "department=" + user.getDepartment() + "/year=" + user.getCreatedYear();
225
}
226
227
@Override
228
public SimpleVersionedSerializer<String> getSerializer() {
229
return SimpleVersionedStringSerializer.INSTANCE;
230
}
231
};
232
233
StreamingFileSink<User> partitionedSink = StreamingFileSink
234
.forBulkFormat(outputPath, writerFactory)
235
.withBucketAssigner(bucketAssigner)
236
.build();
237
// Results in: output/department=engineering/year=2023/part-0-0.avro
238
```
239
240
## Performance Optimization
241
242
### Writer Configuration
243
244
**Buffer Size Tuning:**
245
```java
246
// Configure larger buffers for better I/O performance
247
AvroBuilder<User> optimizedBuilder = (out) -> {
248
DataFileWriter<User> writer = createBasicWriter(out);
249
writer.setSyncInterval(16 * 1024); // 16KB sync intervals
250
return writer;
251
};
252
```
253
254
**Compression Selection:**
255
```java
256
// Choose compression based on use case
257
AvroBuilder<User> compressedBuilder = (out) -> {
258
DataFileWriter<User> writer = createBasicWriter(out);
259
writer.setCodec(CodecFactory.snappyCodec()); // Fast compression
260
// writer.setCodec(CodecFactory.deflateCodec(6)); // Better compression
261
return writer;
262
};
263
```
264
265
### Sink Configuration
266
267
**Parallelism Tuning:**
268
```java
269
// Adjust parallelism based on throughput requirements
270
userStream.addSink(sink).setParallelism(4);
271
```
272
273
**Checkpointing Configuration:**
274
```java
275
// Configure checkpointing for exactly-once guarantees
276
env.enableCheckpointing(30000); // 30 second intervals
277
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
278
```
279
280
## Error Handling and Monitoring
281
282
### Error Recovery
283
284
**Writer Failure Handling:**
285
```java
286
// Writers automatically handle I/O failures through Flink's fault tolerance
287
// Failed writers are recreated on recovery
288
StreamingFileSink<User> resilientSink = StreamingFileSink
289
.forBulkFormat(outputPath, writerFactory)
290
.withRollingPolicy(policy)
291
.build();
292
```
293
294
**Schema Evolution Support:**
295
```java
296
// Handle schema evolution in writers
297
AvroBuilder<GenericRecord> evolvingBuilder = (out) -> {
298
// Use writer schema that's compatible with multiple reader schemas
299
Schema writerSchema = SchemaUtils.getLatestSchema();
300
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(writerSchema);
301
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter);
302
return writer.create(writerSchema, out);
303
};
304
```
305
306
### Monitoring
307
308
**File Output Monitoring:**
309
```java
310
// Monitor file creation and sizes
311
StreamingFileSink<User> monitoredSink = StreamingFileSink
312
.forBulkFormat(outputPath, writerFactory)
313
.withBucketCheckInterval(60000) // Check every minute
314
.build();
315
```
316
317
**Metrics Integration:**
318
```java
319
// Custom metrics for monitoring throughput
320
public class MetricsAvroBuilder<T> implements AvroBuilder<T> {
321
private final Counter recordsWritten;
322
323
@Override
324
public DataFileWriter<T> createWriter(OutputStream out) throws IOException {
325
// Wrap writer with metrics collection
326
return new MetricsDataFileWriter<>(createBaseWriter(out), recordsWritten);
327
}
328
}
329
```