0
# Writing Support
1
2
Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.
3
4
## Capabilities
5
6
### ParquetWriterFactory
7
8
Generic factory class for creating BulkWriter instances that wrap ParquetWriter functionality.
9
10
```java { .api }
11
/**
12
* Factory for creating Parquet BulkWriter instances
13
* @param <T> Type of records to write
14
*/
15
@PublicEvolving
16
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
17
18
/**
19
* Creates a new ParquetWriterFactory
20
* @param writerBuilder ParquetBuilder that creates configured ParquetWriter instances
21
*/
22
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
23
24
/**
25
* Creates a BulkWriter instance for the given output stream
26
* @param out FSDataOutputStream to write to
27
* @return BulkWriter instance wrapping a ParquetWriter
28
* @throws IOException if writer creation fails
29
*/
30
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
31
}
32
```
33
34
### ParquetBuilder Interface
35
36
Functional interface for creating configured ParquetWriter instances with custom settings.
37
38
```java { .api }
39
/**
40
* Functional interface for creating ParquetWriter instances
41
* @param <T> Type of records to write
42
*/
43
@FunctionalInterface
44
public interface ParquetBuilder<T> {
45
46
/**
47
* Creates and configures a ParquetWriter for the given output file
48
* @param out OutputFile to write to
49
* @return Configured ParquetWriter instance
50
* @throws IOException if writer creation fails
51
*/
52
ParquetWriter<T> createWriter(OutputFile out) throws IOException;
53
}
54
```
55
56
### ParquetBulkWriter
57
58
BulkWriter implementation that wraps ParquetWriter with Flink's bulk writing interface.
59
60
```java { .api }
61
/**
62
* BulkWriter implementation wrapping ParquetWriter
63
* @param <T> Type of records to write
64
*/
65
@PublicEvolving
66
public class ParquetBulkWriter<T> implements BulkWriter<T> {
67
68
/**
69
* Creates a new ParquetBulkWriter
70
* @param writer ParquetWriter instance to wrap
71
*/
72
public ParquetBulkWriter(ParquetWriter<T> writer);
73
74
/**
75
* Writes an element to the Parquet file
76
* @param element Element to write
77
* @throws IOException if writing fails
78
*/
79
public void addElement(T element) throws IOException;
80
81
/**
82
* Flushes any buffered data (no-op for Parquet)
83
* @throws IOException if flush fails
84
*/
85
public void flush() throws IOException;
86
87
/**
88
* Finishes writing and closes the ParquetWriter
89
* @throws IOException if finishing fails
90
*/
91
public void finish() throws IOException;
92
}
93
```
94
95
### StreamOutputFile
96
97
Internal OutputFile implementation for Flink's streaming file system abstraction.
98
99
```java { .api }
100
/**
101
* OutputFile implementation for streaming file systems
102
*/
103
@Internal
104
public class StreamOutputFile implements OutputFile {
105
106
/**
107
* Creates a new StreamOutputFile
108
* @param out FSDataOutputStream to write to
109
*/
110
public StreamOutputFile(FSDataOutputStream out);
111
112
/**
113
* Creates a position output stream for writing
114
* @return PositionOutputStream for writing data
115
* @throws IOException if stream creation fails
116
*/
117
public PositionOutputStream create() throws IOException;
118
119
/**
120
* Creates a position output stream in overwrite mode
121
* @return PositionOutputStream for writing data
122
* @throws IOException if stream creation fails
123
*/
124
public PositionOutputStream createOrOverwrite() throws IOException;
125
126
/**
127
* Checks if the output file supports block size setting
128
* @return false (not supported for streaming)
129
*/
130
public boolean supportsBlockSize();
131
132
/**
133
* Gets the default block size (not applicable)
134
* @return 0
135
*/
136
public long defaultBlockSize();
137
}
138
```
139
140
## Usage Examples
141
142
### Basic Writer Factory
143
144
```java
145
import org.apache.flink.formats.parquet.ParquetWriterFactory;
146
import org.apache.flink.formats.parquet.ParquetBuilder;
147
import org.apache.parquet.hadoop.ParquetWriter;
148
149
// Create a custom ParquetBuilder
150
ParquetBuilder<MyRecord> builder = (OutputFile out) -> {
151
return MyRecordParquetWriter.builder(out)
152
.withCompressionCodec(CompressionCodecName.SNAPPY)
153
.withPageSize(1024 * 1024)
154
.withRowGroupSize(128 * 1024 * 1024)
155
.build();
156
};
157
158
// Create factory
159
ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(builder);
160
```
161
162
### File Sink Integration
163
164
```java
165
import org.apache.flink.connector.file.sink.FileSink;
166
import org.apache.flink.core.fs.Path;
167
168
// Create FileSink with Parquet writer factory
169
FileSink<MyRecord> sink = FileSink
170
.forBulkFormat(new Path("/output/path"), writerFactory)
171
.withRollingPolicy(
172
DefaultRollingPolicy.builder()
173
.withRolloverInterval(Duration.ofMinutes(15))
174
.withInactivityInterval(Duration.ofMinutes(5))
175
.withMaxPartSize(MemorySize.ofMebiBytes(128))
176
.build()
177
)
178
.build();
179
180
// Use in DataStream
181
dataStream.sinkTo(sink);
182
```
183
184
### Custom Configuration Example
185
186
```java
187
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
188
import org.apache.parquet.hadoop.ParquetWriter;
189
190
// Builder with custom Parquet settings
191
ParquetBuilder<MyRecord> customBuilder = (OutputFile out) -> {
192
return MyRecordParquetWriter.builder(out)
193
.withCompressionCodec(CompressionCodecName.GZIP)
194
.withDictionaryEncoding(true)
195
.withValidation(false)
196
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
197
.withPageSize(2 * 1024 * 1024) // 2MB pages
198
.withRowGroupSize(256 * 1024 * 1024) // 256MB row groups
199
.build();
200
};
201
202
ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(customBuilder);
203
```
204
205
### Error Handling
206
207
```java
208
import org.apache.flink.api.common.functions.MapFunction;
209
210
// Handle writing errors gracefully
211
dataStream
212
.map(new MapFunction<InputType, MyRecord>() {
213
@Override
214
public MyRecord map(InputType input) throws Exception {
215
try {
216
return convertToMyRecord(input);
217
} catch (Exception e) {
218
// Log error and handle appropriately
219
LOG.warn("Failed to convert record: " + input, e);
220
return null; // or default value
221
}
222
}
223
})
224
.filter(Objects::nonNull) // Remove failed conversions
225
.sinkTo(sink);
226
```
227
228
### Batch Writing with Checkpointing
229
230
```java
231
import org.apache.flink.streaming.api.CheckpointingMode;
232
233
// Configure checkpointing for reliable writing
234
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
235
env.enableCheckpointing(30000); // Checkpoint every 30 seconds
236
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
237
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
238
env.getCheckpointConfig().setCheckpointTimeout(60000);
239
240
// FileSink automatically handles checkpointing
241
dataStream.sinkTo(parquetSink);
242
```
243
244
## Performance Considerations
245
246
### Row Group Sizing
247
248
```java
249
// Optimize row group size based on your data
250
ParquetBuilder<T> builder = (out) ->
251
writerBuilder(out)
252
.withRowGroupSize(128 * 1024 * 1024) // 128MB - good for analytics
253
.build();
254
```
255
256
### Compression Selection
257
258
```java
259
// Choose compression based on use case
260
CompressionCodecName compression;
261
262
// For write-heavy workloads (faster compression)
263
compression = CompressionCodecName.SNAPPY;
264
265
// For read-heavy workloads (better compression ratio)
266
compression = CompressionCodecName.GZIP;
267
268
// For balanced performance
269
compression = CompressionCodecName.LZ4;
270
```
271
272
### Memory Management
273
274
```java
275
// Configure page size for memory efficiency
276
ParquetBuilder<T> builder = (out) ->
277
writerBuilder(out)
278
.withPageSize(1024 * 1024) // 1MB pages
279
.withDictionaryPageSize(512 * 1024) // 512KB dictionary pages
280
.build();
281
```
282
283
The writing support provides flexible factory patterns that integrate seamlessly with Flink's bulk writing infrastructure while giving full control over Parquet-specific configurations.