0
# RowData Writers
1
2
High-performance writers for Flink's internal RowData format, providing comprehensive schema mapping and configuration support for writing Parquet files.
3
4
## Capabilities
5
6
### ParquetRowDataBuilder
7
8
Builder class for creating Parquet writers specifically designed for Flink's RowData format with full schema conversion and configuration support.
9
10
```java { .api }
11
/**
12
* Builder for creating Parquet writers that handle Flink RowData
13
* Extends ParquetWriter.Builder with RowData-specific functionality
14
*/
15
public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {
16
17
/**
18
* Creates a new ParquetRowDataBuilder instance
19
* @param path Output file path for the Parquet writer
20
* @param rowType Flink logical row type defining the schema
21
* @param utcTimestamp Whether to use UTC timezone for timestamp conversion
22
*/
23
public ParquetRowDataBuilder(OutputFile path, RowType rowType, boolean utcTimestamp);
24
25
/**
26
* Creates a complete ParquetWriterFactory for RowData
27
* @param rowType Flink logical row type defining the data schema
28
* @param conf Hadoop configuration for Parquet settings
29
* @param utcTimestamp Whether to use UTC timezone for timestamp conversion
30
* @return ParquetWriterFactory configured for RowData writing
31
*/
32
public static ParquetWriterFactory<RowData> createWriterFactory(
33
RowType rowType,
34
Configuration conf,
35
boolean utcTimestamp
36
);
37
}
38
```
39
40
### ParquetWriterFactory
41
42
Generic factory for creating Parquet bulk writers using user-provided builder configurations.
43
44
```java { .api }
45
/**
46
* Factory that creates Parquet BulkWriter instances
47
* Uses user-supplied ParquetBuilder to configure the underlying ParquetWriter
48
*/
49
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
50
51
/**
52
* Creates a new ParquetWriterFactory
53
* @param writerBuilder Builder to construct the ParquetWriter
54
*/
55
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
56
57
/**
58
* Creates a BulkWriter for the given output stream
59
* @param stream FSDataOutputStream to write Parquet data to
60
* @return BulkWriter instance wrapping a ParquetWriter
61
* @throws IOException if writer creation fails
62
*/
63
public BulkWriter<T> create(FSDataOutputStream stream) throws IOException;
64
}
65
```
66
67
### ParquetBulkWriter
68
69
BulkWriter implementation that wraps Parquet writers for integration with Flink's streaming sinks.
70
71
```java { .api }
72
/**
73
* BulkWriter implementation wrapping a ParquetWriter
74
* Provides the interface between Flink's streaming framework and Parquet writing
75
*/
76
public class ParquetBulkWriter<T> implements BulkWriter<T> {
77
78
/**
79
* Creates a new ParquetBulkWriter
80
* @param parquetWriter The underlying ParquetWriter to wrap
81
*/
82
public ParquetBulkWriter(ParquetWriter<T> parquetWriter);
83
84
/**
85
* Adds an element to the Parquet file
86
* @param datum The data element to write
87
* @throws IOException if writing fails
88
*/
89
public void addElement(T datum) throws IOException;
90
91
/**
92
* Flushes any buffered data (no-op for Parquet)
93
*/
94
public void flush();
95
96
/**
97
* Finishes writing and closes the Parquet file
98
* @throws IOException if closing fails
99
*/
100
public void finish() throws IOException;
101
}
102
```
103
104
### FlinkParquetBuilder
105
106
Internal builder implementation for creating RowData-specific Parquet writers with proper configuration.
107
108
```java { .api }
109
/**
110
* Internal ParquetBuilder implementation for RowData
111
* Handles Hadoop configuration and Parquet writer setup
112
*/
113
public static class FlinkParquetBuilder implements ParquetBuilder<RowData> {
114
115
/**
116
* Creates a new FlinkParquetBuilder
117
* @param rowType Flink logical row type
118
* @param conf Hadoop configuration
119
* @param utcTimestamp UTC timezone flag
120
*/
121
public FlinkParquetBuilder(RowType rowType, Configuration conf, boolean utcTimestamp);
122
123
/**
124
* Creates a configured ParquetWriter for the given output file
125
* @param out OutputFile to write to
126
* @return Configured ParquetWriter instance
127
* @throws IOException if writer creation fails
128
*/
129
public ParquetWriter<RowData> createWriter(OutputFile out) throws IOException;
130
}
131
```
132
133
## Usage Examples
134
135
### Basic RowData Writer
136
137
```java
138
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
139
import org.apache.flink.table.types.logical.RowType;
140
import org.apache.flink.table.types.logical.LogicalTypeRoot;
141
import org.apache.hadoop.conf.Configuration;
142
143
// Define schema
144
RowType rowType = RowType.of(
145
new RowType.RowField("id", LogicalType.of(LogicalTypeRoot.BIGINT)),
146
new RowType.RowField("name", LogicalType.of(LogicalTypeRoot.VARCHAR, 255)),
147
new RowType.RowField("timestamp", LogicalType.of(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE))
148
);
149
150
// Create writer factory
151
Configuration hadoopConfig = new Configuration();
152
boolean useUtcTimezone = true;
153
154
ParquetWriterFactory<RowData> writerFactory =
155
ParquetRowDataBuilder.createWriterFactory(rowType, hadoopConfig, useUtcTimezone);
156
```
157
158
### Integration with File Sink
159
160
```java
161
import org.apache.flink.connector.file.sink.FileSink;
162
import org.apache.flink.core.fs.Path;
163
import org.apache.flink.streaming.api.datastream.DataStream;
164
165
DataStream<RowData> dataStream = // ... your data stream
166
167
// Create file sink with Parquet writer
168
FileSink<RowData> parquetSink = FileSink
169
.forBulkFormat(new Path("/output/path"), writerFactory)
170
.withRollingPolicy(DefaultRollingPolicy.builder()
171
.withRolloverInterval(Duration.ofMinutes(15))
172
.withInactivityInterval(Duration.ofMinutes(5))
173
.build())
174
.build();
175
176
dataStream.sinkTo(parquetSink);
177
```
178
179
### Custom Configuration
180
181
```java
182
import org.apache.hadoop.conf.Configuration;
183
import static org.apache.parquet.hadoop.ParquetOutputFormat.*;
184
185
// Configure Hadoop settings for Parquet
186
Configuration conf = new Configuration();
187
conf.set("parquet.compression", "SNAPPY");
188
conf.setInt("parquet.block.size", 134217728); // 128MB
189
conf.setInt("parquet.page.size", 1048576); // 1MB
190
conf.setBoolean("parquet.enable.dictionary", true);
191
192
// Create writer with custom configuration
193
ParquetWriterFactory<RowData> customWriterFactory =
194
ParquetRowDataBuilder.createWriterFactory(rowType, conf, false);
195
```
196
197
### Manual Writer Creation
198
199
```java
200
import org.apache.flink.formats.parquet.StreamOutputFile;
201
import org.apache.flink.core.fs.FSDataOutputStream;
202
import org.apache.parquet.hadoop.ParquetWriter;
203
204
// Create output stream
205
FSDataOutputStream outputStream = // ... create output stream
206
StreamOutputFile outputFile = new StreamOutputFile(outputStream);
207
208
// Build writer manually
209
ParquetWriter<RowData> writer = new ParquetRowDataBuilder(outputFile, rowType, true)
210
.withCompressionCodec(CompressionCodecName.SNAPPY)
211
.withRowGroupSize(134217728)
212
.withPageSize(1048576)
213
.withDictionaryEncoding(true)
214
.build();
215
216
// Write data
217
RowData rowData = // ... create row data
218
writer.write(rowData);
219
writer.close();
220
```
221
222
## Configuration Options
223
224
### Compression Settings
225
226
```java
227
// Available compression codecs
228
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
229
.withCompressionCodec(CompressionCodecName.SNAPPY) // Default, good balance
230
.withCompressionCodec(CompressionCodecName.GZIP) // High compression
231
.withCompressionCodec(CompressionCodecName.LZO)
232
.withCompressionCodec(CompressionCodecName.BROTLI) // Best compression
233
.withCompressionCodec(CompressionCodecName.LZ4) // Fast compression
234
.withCompressionCodec(CompressionCodecName.ZSTD) // Good compression + speed
235
```
236
237
### Performance Tuning
238
239
```java
240
// Row group size (default: 134MB)
241
.withRowGroupSize(268435456) // 256MB for better compression
242
243
// Page size (default: 1MB)
244
.withPageSize(2097152) // 2MB for reduced metadata overhead
245
246
// Dictionary encoding (default: true)
247
.withDictionaryEncoding(false) // Disable for high-cardinality data
248
249
// Dictionary page size (default: 1MB)
250
.withDictionaryPageSize(2097152)
251
```
252
253
## Error Handling
254
255
Common exceptions and error scenarios:
256
257
```java
258
try {
259
ParquetWriterFactory<RowData> factory =
260
ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);
261
} catch (IllegalArgumentException e) {
262
// Invalid row type or unsupported data type
263
} catch (RuntimeException e) {
264
// Configuration errors or schema conversion failures
265
}
266
267
try {
268
writer.write(rowData);
269
} catch (IOException e) {
270
// File system errors, disk full, permission issues
271
} catch (RuntimeException e) {
272
// Data conversion errors, schema mismatches
273
}
274
```