0
# RowData Integration
1
2
Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.
3
4
## Capabilities
5
6
### ParquetRowDataBuilder
7
8
Builder class for creating RowData-based Parquet writers with proper schema conversion and configuration.
9
10
```java { .api }
11
/**
12
* Builder for creating RowData ParquetWriters with schema conversion
13
*/
14
public class ParquetRowDataBuilder {
15
16
/**
17
* Creates a new ParquetRowDataBuilder
18
* @param outputFile OutputFile to write to
19
* @param rowType Flink RowType defining the schema
20
* @param utcTimestamp Whether to use UTC timezone for timestamps
21
*/
22
public ParquetRowDataBuilder(OutputFile outputFile, RowType rowType, boolean utcTimestamp);
23
24
/**
25
* Creates a ParquetWriterFactory for RowData with automatic schema conversion
26
* @param rowType Flink RowType schema
27
* @param conf Hadoop configuration for Parquet settings
28
* @param utcTimestamp Whether to use UTC timezone for timestamps
29
* @return ParquetWriterFactory for writing RowData records
30
*/
31
public static ParquetWriterFactory<RowData> createWriterFactory(
32
RowType rowType,
33
Configuration conf,
34
boolean utcTimestamp
35
);
36
}
37
```
38
39
### ParquetRowDataWriter
40
41
Writer implementation that converts Flink RowData to Parquet columnar format with full type support.
42
43
```java { .api }
44
/**
45
* Writes RowData records to Parquet columnar format
46
*/
47
public class ParquetRowDataWriter {
48
49
/**
50
* Creates a new ParquetRowDataWriter
51
* @param recordConsumer Parquet RecordConsumer for writing
52
* @param rowType Flink RowType schema
53
* @param schema Parquet MessageType schema
54
* @param utcTimestamp Whether to use UTC timezone for timestamps
55
* @param conf Hadoop configuration
56
*/
57
public ParquetRowDataWriter(
58
RecordConsumer recordConsumer,
59
RowType rowType,
60
MessageType schema,
61
boolean utcTimestamp,
62
Configuration conf
63
);
64
65
/**
66
* Writes a RowData record to Parquet
67
* @param record RowData record to write
68
*/
69
public void write(RowData record);
70
}
71
```
72
73
### ParquetColumnarRowInputFormat
74
75
Vectorized input format specifically designed for reading Parquet files as RowData with partition support and statistics reporting.
76
77
```java { .api }
78
/**
79
* Vectorized input format for reading Parquet files as RowData
80
* @param <SplitT> Type of file split
81
*/
82
public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT>
83
implements FileBasedStatisticsReportableInputFormat {
84
85
/**
86
* Creates a new ParquetColumnarRowInputFormat
87
* @param hadoopConfig Hadoop configuration
88
* @param projectedType Flink RowType for the projected output schema
89
* @param producedTypeInfo TypeInformation for RowData
90
* @param batchSize Batch size for vectorized reading
91
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
92
* @param isCaseSensitive Whether field names are case sensitive
93
*/
94
public ParquetColumnarRowInputFormat(
95
Configuration hadoopConfig,
96
RowType projectedType,
97
TypeInformation<RowData> producedTypeInfo,
98
int batchSize,
99
boolean isUtcTimestamp,
100
boolean isCaseSensitive
101
);
102
103
/**
104
* Creates a partitioned format with partition field support
105
* @param <SplitT> Type of file split extending FileSourceSplit
106
* @param hadoopConfig Hadoop configuration
107
* @param producedRowType Output RowType schema
108
* @param producedTypeInfo TypeInformation for RowData
109
* @param partitionKeys List of partition field names
110
* @param extractor Partition field extractor for the split type
111
* @param batchSize Batch size for vectorized reading
112
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
113
* @param isCaseSensitive Whether field names are case sensitive
114
* @return ParquetColumnarRowInputFormat with partition support
115
*/
116
public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
117
Configuration hadoopConfig,
118
RowType producedRowType,
119
TypeInformation<RowData> producedTypeInfo,
120
List<String> partitionKeys,
121
PartitionFieldExtractor<SplitT> extractor,
122
int batchSize,
123
boolean isUtcTimestamp,
124
boolean isCaseSensitive
125
);
126
127
/**
128
* Returns the produced type information
129
* @return TypeInformation for RowData
130
*/
131
public TypeInformation<RowData> getProducedType();
132
133
/**
134
* Reports statistics from Parquet file metadata
135
* @param files List of files to analyze
136
* @param producedDataType Expected output data type
137
* @return TableStats with row counts and column statistics
138
*/
139
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
140
}
141
```
142
143
## Usage Examples
144
145
### Basic RowData Writing
146
147
```java
148
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
149
import org.apache.flink.table.types.logical.RowType;
150
import org.apache.flink.table.types.logical.LogicalType;
151
import org.apache.flink.table.data.RowData;
152
153
// Define schema using Flink types
154
RowType rowType = RowType.of(
155
new LogicalType[] {
156
DataTypes.BIGINT().getLogicalType(), // id
157
DataTypes.STRING().getLogicalType(), // name
158
DataTypes.DOUBLE().getLogicalType(), // price
159
DataTypes.TIMESTAMP(3).getLogicalType() // created_at
160
},
161
new String[] {"id", "name", "price", "created_at"}
162
);
163
164
// Create writer factory
165
Configuration conf = new Configuration();
166
ParquetWriterFactory<RowData> writerFactory =
167
ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);
168
169
// Use with FileSink
170
FileSink<RowData> sink = FileSink
171
.forBulkFormat(new Path("/output/products"), writerFactory)
172
.build();
173
```
174
175
### Reading with Vectorized Format
176
177
```java
178
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
179
import org.apache.flink.connector.file.src.FileSource;
180
181
// Create vectorized input format
182
ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat =
183
ParquetColumnarRowInputFormat.createPartitionedFormat(
184
new Configuration(), // Hadoop config
185
rowType, // Output schema
186
TypeInformation.of(RowData.class), // Type info
187
Arrays.asList("date"), // Partition keys
188
"__DEFAULT_PARTITION__", // Default partition name
189
2048, // Batch size
190
true, // UTC timezone
191
true // Case sensitive
192
);
193
194
// Create file source with vectorized reading
195
FileSource<RowData> source = FileSource
196
.forBulkFormat(inputFormat, new Path("/data/partitioned"))
197
.build();
198
199
DataStream<RowData> rowDataStream = env.fromSource(
200
source,
201
WatermarkStrategy.noWatermarks(),
202
"parquet-rowdata-source"
203
);
204
```
205
206
### Complex Type Support
207
208
```java
209
import org.apache.flink.table.types.logical.*;
210
211
// Define complex schema with nested types
212
RowType nestedRowType = RowType.of(
213
new LogicalType[] {
214
DataTypes.BIGINT().getLogicalType(), // order_id
215
RowType.of( // customer (nested)
216
new LogicalType[] {
217
DataTypes.BIGINT().getLogicalType(), // customer.id
218
DataTypes.STRING().getLogicalType() // customer.name
219
},
220
new String[] {"id", "name"}
221
),
222
ArrayType.newBuilder() // items (array)
223
.elementType(RowType.of(
224
new LogicalType[] {
225
DataTypes.STRING().getLogicalType(), // item.product_id
226
DataTypes.INT().getLogicalType(), // item.quantity
227
DataTypes.DECIMAL(10, 2).getLogicalType() // item.price
228
},
229
new String[] {"product_id", "quantity", "price"}
230
))
231
.build(),
232
MapType.newBuilder() // metadata (map)
233
.keyType(DataTypes.STRING().getLogicalType())
234
.valueType(DataTypes.STRING().getLogicalType())
235
.build()
236
},
237
new String[] {"order_id", "customer", "items", "metadata"}
238
);
239
240
// Create writer for complex types
241
ParquetWriterFactory<RowData> complexWriterFactory =
242
ParquetRowDataBuilder.createWriterFactory(nestedRowType, conf, true);
243
```
244
245
### Partition Field Handling
246
247
```java
248
// Reading partitioned data with automatic partition field injection
249
List<String> partitionKeys = Arrays.asList("year", "month", "day");
250
251
ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat =
252
ParquetColumnarRowInputFormat.createPartitionedFormat(
253
conf,
254
producedRowType, // Schema including partition fields
255
typeInfo,
256
partitionKeys, // Partition field names
257
"UNKNOWN", // Default for null partitions
258
4096, // Larger batch for partitioned data
259
true, // UTC timestamps
260
false // Case insensitive partition names
261
);
262
263
// File structure: /data/year=2023/month=01/day=15/file.parquet
264
// Partition fields are automatically added to RowData
265
```
266
267
### Column Projection
268
269
```java
270
// Only read specific columns for better performance
271
RowType projectedType = RowType.of(
272
new LogicalType[] {
273
DataTypes.BIGINT().getLogicalType(), // id
274
DataTypes.STRING().getLogicalType() // name
275
},
276
new String[] {"id", "name"}
277
);
278
279
ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat =
280
new ParquetColumnarRowInputFormat<>(
281
conf,
282
projectedType, // Only projected fields
283
TypeInformation.of(RowData.class),
284
Arrays.asList("id", "name"), // Selected fields
285
null, // No field ID mapping
286
2048, // Batch size
287
true, // UTC timezone
288
true // Case sensitive
289
);
290
```
291
292
### Integration with Table API
293
294
```java
295
import org.apache.flink.table.api.DataTypes;
296
import org.apache.flink.table.api.Schema;
297
import org.apache.flink.table.api.TableDescriptor;
298
299
// Create table descriptor for RowData integration
300
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
301
.schema(Schema.newBuilder()
302
.column("order_id", DataTypes.BIGINT())
303
.column("customer_name", DataTypes.STRING())
304
.column("amount", DataTypes.DECIMAL(10, 2))
305
.column("order_time", DataTypes.TIMESTAMP(3))
306
.watermark("order_time", "order_time - INTERVAL '5' SECOND")
307
.build())
308
.option("path", "/data/orders")
309
.option("format", "parquet")
310
.option("parquet.batch-size", "4096")
311
.option("parquet.utc-timezone", "true")
312
.build();
313
314
Table ordersTable = tableEnv.from(descriptor);
315
```
316
317
### Type Conversion Examples
318
319
```java
320
// Supported Flink to Parquet type mappings:
321
322
// Primitive types
323
DataTypes.BOOLEAN() → BOOLEAN
324
DataTypes.TINYINT() → INT32 (INT_8)
325
DataTypes.SMALLINT() → INT32 (INT_16)
326
DataTypes.INT() → INT32
327
DataTypes.BIGINT() → INT64
328
DataTypes.FLOAT() → FLOAT
329
DataTypes.DOUBLE() → DOUBLE
330
DataTypes.STRING() → BINARY (UTF8)
331
DataTypes.BYTES() → BINARY
332
333
// Temporal types
334
DataTypes.DATE() → INT32 (DATE)
335
DataTypes.TIME() → INT32 (TIME_MILLIS)
336
DataTypes.TIMESTAMP(3) → INT64 (TIMESTAMP_MILLIS)
337
DataTypes.TIMESTAMP(6) → INT64 (TIMESTAMP_MICROS)
338
339
// Decimal types
340
DataTypes.DECIMAL(p,s) → FIXED_LEN_BYTE_ARRAY or INT32/INT64 (DECIMAL)
341
342
// Complex types
343
DataTypes.ARRAY(T) → LIST with element type conversion
344
DataTypes.MAP(K,V) → MAP with key/value type conversion
345
DataTypes.ROW(...) → GROUP with nested field conversions
346
```
347
348
## Performance Optimization
349
350
### Batch Size Tuning
351
352
```java
353
// Adjust batch size based on memory and performance requirements
354
int batchSize = calculateOptimalBatchSize(
355
availableMemory,
356
numberOfColumns,
357
avgRowSize
358
);
359
360
ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =
361
ParquetColumnarRowInputFormat.createPartitionedFormat(
362
conf, rowType, typeInfo, partitions, defaultPart,
363
batchSize, // Tuned batch size
364
utcTime, caseSensitive
365
);
366
```
367
368
### Memory Management
369
370
```java
371
// Configure Parquet memory settings
372
Configuration conf = new Configuration();
373
conf.set("parquet.memory.min.chunk.size", "1048576"); // 1MB
374
conf.set("parquet.memory.pool.ratio", "0.95"); // 95% of available memory
375
conf.set("parquet.page.size", "1048576"); // 1MB pages
376
conf.set("parquet.block.size", "134217728"); // 128MB blocks
377
```
378
379
The RowData integration provides the most efficient path for Table API operations by directly using Flink's internal row representation without additional serialization overhead.