or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

rowdata-integration.mddocs/

0

# RowData Integration

1

2

Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.

3

4

## Capabilities

5

6

### ParquetRowDataBuilder

7

8

Builder class for creating RowData-based Parquet writers with proper schema conversion and configuration.

9

10

```java { .api }

11

/**

12

* Builder for creating RowData ParquetWriters with schema conversion

13

*/

14

public class ParquetRowDataBuilder {

15

16

/**

17

* Creates a new ParquetRowDataBuilder

18

* @param outputFile OutputFile to write to

19

* @param rowType Flink RowType defining the schema

20

* @param utcTimestamp Whether to use UTC timezone for timestamps

21

*/

22

public ParquetRowDataBuilder(OutputFile outputFile, RowType rowType, boolean utcTimestamp);

23

24

/**

25

* Creates a ParquetWriterFactory for RowData with automatic schema conversion

26

* @param rowType Flink RowType schema

27

* @param conf Hadoop configuration for Parquet settings

28

* @param utcTimestamp Whether to use UTC timezone for timestamps

29

* @return ParquetWriterFactory for writing RowData records

30

*/

31

public static ParquetWriterFactory<RowData> createWriterFactory(

32

RowType rowType,

33

Configuration conf,

34

boolean utcTimestamp

35

);

36

}

37

```

38

39

### ParquetRowDataWriter

40

41

Writer implementation that converts Flink RowData to Parquet columnar format with full type support.

42

43

```java { .api }

44

/**

45

* Writes RowData records to Parquet columnar format

46

*/

47

public class ParquetRowDataWriter {

48

49

/**

50

* Creates a new ParquetRowDataWriter

51

* @param recordConsumer Parquet RecordConsumer for writing

52

* @param rowType Flink RowType schema

53

* @param schema Parquet MessageType schema

54

* @param utcTimestamp Whether to use UTC timezone for timestamps

55

* @param conf Hadoop configuration

56

*/

57

public ParquetRowDataWriter(

58

RecordConsumer recordConsumer,

59

RowType rowType,

60

MessageType schema,

61

boolean utcTimestamp,

62

Configuration conf

63

);

64

65

/**

66

* Writes a RowData record to Parquet

67

* @param record RowData record to write

68

*/

69

public void write(RowData record);

70

}

71

```

72

73

### ParquetColumnarRowInputFormat

74

75

Vectorized input format specifically designed for reading Parquet files as RowData with partition support and statistics reporting.

76

77

```java { .api }

78

/**

79

* Vectorized input format for reading Parquet files as RowData

80

* @param <SplitT> Type of file split

81

*/

82

public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT>

83

implements FileBasedStatisticsReportableInputFormat {

84

85

/**

86

* Creates a new ParquetColumnarRowInputFormat

87

* @param hadoopConfig Hadoop configuration

88

* @param projectedType Flink RowType for the projected output schema

89

* @param producedTypeInfo TypeInformation for RowData

90

* @param batchSize Batch size for vectorized reading

91

* @param isUtcTimestamp Whether to use UTC timezone for timestamps

92

* @param isCaseSensitive Whether field names are case sensitive

93

*/

94

public ParquetColumnarRowInputFormat(

95

Configuration hadoopConfig,

96

RowType projectedType,

97

TypeInformation<RowData> producedTypeInfo,

98

int batchSize,

99

boolean isUtcTimestamp,

100

boolean isCaseSensitive

101

);

102

103

/**

104

* Creates a partitioned format with partition field support

105

* @param <SplitT> Type of file split extending FileSourceSplit

106

* @param hadoopConfig Hadoop configuration

107

* @param producedRowType Output RowType schema

108

* @param producedTypeInfo TypeInformation for RowData

109

* @param partitionKeys List of partition field names

110

* @param extractor Partition field extractor for the split type

111

* @param batchSize Batch size for vectorized reading

112

* @param isUtcTimestamp Whether to use UTC timezone for timestamps

113

* @param isCaseSensitive Whether field names are case sensitive

114

* @return ParquetColumnarRowInputFormat with partition support

115

*/

116

public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(

117

Configuration hadoopConfig,

118

RowType producedRowType,

119

TypeInformation<RowData> producedTypeInfo,

120

List<String> partitionKeys,

121

PartitionFieldExtractor<SplitT> extractor,

122

int batchSize,

123

boolean isUtcTimestamp,

124

boolean isCaseSensitive

125

);

126

127

/**

128

* Returns the produced type information

129

* @return TypeInformation for RowData

130

*/

131

public TypeInformation<RowData> getProducedType();

132

133

/**

134

* Reports statistics from Parquet file metadata

135

* @param files List of files to analyze

136

* @param producedDataType Expected output data type

137

* @return TableStats with row counts and column statistics

138

*/

139

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

140

}

141

```

142

143

## Usage Examples

144

145

### Basic RowData Writing

146

147

```java

148

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;

149

import org.apache.flink.table.types.logical.RowType;

150

import org.apache.flink.table.types.logical.LogicalType;

151

import org.apache.flink.table.data.RowData;

152

153

// Define schema using Flink types

154

RowType rowType = RowType.of(

155

new LogicalType[] {

156

DataTypes.BIGINT().getLogicalType(), // id

157

DataTypes.STRING().getLogicalType(), // name

158

DataTypes.DOUBLE().getLogicalType(), // price

159

DataTypes.TIMESTAMP(3).getLogicalType() // created_at

160

},

161

new String[] {"id", "name", "price", "created_at"}

162

);

163

164

// Create writer factory

165

Configuration conf = new Configuration();

166

ParquetWriterFactory<RowData> writerFactory =

167

ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);

168

169

// Use with FileSink

170

FileSink<RowData> sink = FileSink

171

.forBulkFormat(new Path("/output/products"), writerFactory)

172

.build();

173

```

174

175

### Reading with Vectorized Format

176

177

```java

178

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;

179

import org.apache.flink.connector.file.src.FileSource;

180

181

// Create vectorized input format

182

ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat =

183

ParquetColumnarRowInputFormat.createPartitionedFormat(

184

new Configuration(), // Hadoop config

185

rowType, // Output schema

186

TypeInformation.of(RowData.class), // Type info

187

Arrays.asList("date"), // Partition keys

188

"__DEFAULT_PARTITION__", // Default partition name

189

2048, // Batch size

190

true, // UTC timezone

191

true // Case sensitive

192

);

193

194

// Create file source with vectorized reading

195

FileSource<RowData> source = FileSource

196

.forBulkFormat(inputFormat, new Path("/data/partitioned"))

197

.build();

198

199

DataStream<RowData> rowDataStream = env.fromSource(

200

source,

201

WatermarkStrategy.noWatermarks(),

202

"parquet-rowdata-source"

203

);

204

```

205

206

### Complex Type Support

207

208

```java

209

import org.apache.flink.table.types.logical.*;

210

211

// Define complex schema with nested types

212

RowType nestedRowType = RowType.of(

213

new LogicalType[] {

214

DataTypes.BIGINT().getLogicalType(), // order_id

215

RowType.of( // customer (nested)

216

new LogicalType[] {

217

DataTypes.BIGINT().getLogicalType(), // customer.id

218

DataTypes.STRING().getLogicalType() // customer.name

219

},

220

new String[] {"id", "name"}

221

),

222

ArrayType.newBuilder() // items (array)

223

.elementType(RowType.of(

224

new LogicalType[] {

225

DataTypes.STRING().getLogicalType(), // item.product_id

226

DataTypes.INT().getLogicalType(), // item.quantity

227

DataTypes.DECIMAL(10, 2).getLogicalType() // item.price

228

},

229

new String[] {"product_id", "quantity", "price"}

230

))

231

.build(),

232

MapType.newBuilder() // metadata (map)

233

.keyType(DataTypes.STRING().getLogicalType())

234

.valueType(DataTypes.STRING().getLogicalType())

235

.build()

236

},

237

new String[] {"order_id", "customer", "items", "metadata"}

238

);

239

240

// Create writer for complex types

241

ParquetWriterFactory<RowData> complexWriterFactory =

242

ParquetRowDataBuilder.createWriterFactory(nestedRowType, conf, true);

243

```

244

245

### Partition Field Handling

246

247

```java

248

// Reading partitioned data with automatic partition field injection

249

List<String> partitionKeys = Arrays.asList("year", "month", "day");

250

251

ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat =

252

ParquetColumnarRowInputFormat.createPartitionedFormat(

253

conf,

254

producedRowType, // Schema including partition fields

255

typeInfo,

256

partitionKeys, // Partition field names

257

"UNKNOWN", // Default for null partitions

258

4096, // Larger batch for partitioned data

259

true, // UTC timestamps

260

false // Case insensitive partition names

261

);

262

263

// File structure: /data/year=2023/month=01/day=15/file.parquet

264

// Partition fields are automatically added to RowData

265

```

266

267

### Column Projection

268

269

```java

270

// Only read specific columns for better performance

271

RowType projectedType = RowType.of(

272

new LogicalType[] {

273

DataTypes.BIGINT().getLogicalType(), // id

274

DataTypes.STRING().getLogicalType() // name

275

},

276

new String[] {"id", "name"}

277

);

278

279

ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat =

280

new ParquetColumnarRowInputFormat<>(

281

conf,

282

projectedType, // Only projected fields

283

TypeInformation.of(RowData.class),

284

Arrays.asList("id", "name"), // Selected fields

285

null, // No field ID mapping

286

2048, // Batch size

287

true, // UTC timezone

288

true // Case sensitive

289

);

290

```

291

292

### Integration with Table API

293

294

```java

295

import org.apache.flink.table.api.DataTypes;

296

import org.apache.flink.table.api.Schema;

297

import org.apache.flink.table.api.TableDescriptor;

298

299

// Create table descriptor for RowData integration

300

TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")

301

.schema(Schema.newBuilder()

302

.column("order_id", DataTypes.BIGINT())

303

.column("customer_name", DataTypes.STRING())

304

.column("amount", DataTypes.DECIMAL(10, 2))

305

.column("order_time", DataTypes.TIMESTAMP(3))

306

.watermark("order_time", "order_time - INTERVAL '5' SECOND")

307

.build())

308

.option("path", "/data/orders")

309

.option("format", "parquet")

310

.option("parquet.batch-size", "4096")

311

.option("parquet.utc-timezone", "true")

312

.build();

313

314

Table ordersTable = tableEnv.from(descriptor);

315

```

316

317

### Type Conversion Examples

318

319

```java

320

// Supported Flink to Parquet type mappings:

321

322

// Primitive types

323

DataTypes.BOOLEAN() → BOOLEAN

324

DataTypes.TINYINT() → INT32 (INT_8)

325

DataTypes.SMALLINT() → INT32 (INT_16)

326

DataTypes.INT() → INT32

327

DataTypes.BIGINT() → INT64

328

DataTypes.FLOAT() → FLOAT

329

DataTypes.DOUBLE() → DOUBLE

330

DataTypes.STRING() → BINARY (UTF8)

331

DataTypes.BYTES() → BINARY

332

333

// Temporal types

334

DataTypes.DATE() → INT32 (DATE)

335

DataTypes.TIME() → INT32 (TIME_MILLIS)

336

DataTypes.TIMESTAMP(3) → INT64 (TIMESTAMP_MILLIS)

337

DataTypes.TIMESTAMP(6) → INT64 (TIMESTAMP_MICROS)

338

339

// Decimal types

340

DataTypes.DECIMAL(p,s) → FIXED_LEN_BYTE_ARRAY or INT32/INT64 (DECIMAL)

341

342

// Complex types

343

DataTypes.ARRAY(T) → LIST with element type conversion

344

DataTypes.MAP(K,V) → MAP with key/value type conversion

345

DataTypes.ROW(...) → GROUP with nested field conversions

346

```

347

348

## Performance Optimization

349

350

### Batch Size Tuning

351

352

```java

353

// Adjust batch size based on memory and performance requirements

354

int batchSize = calculateOptimalBatchSize(

355

availableMemory,

356

numberOfColumns,

357

avgRowSize

358

);

359

360

ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =

361

ParquetColumnarRowInputFormat.createPartitionedFormat(

362

conf, rowType, typeInfo, partitions, defaultPart,

363

batchSize, // Tuned batch size

364

utcTime, caseSensitive

365

);

366

```

367

368

### Memory Management

369

370

```java

371

// Configure Parquet memory settings

372

Configuration conf = new Configuration();

373

conf.set("parquet.memory.min.chunk.size", "1048576"); // 1MB

374

conf.set("parquet.memory.pool.ratio", "0.95"); // 95% of available memory

375

conf.set("parquet.page.size", "1048576"); // 1MB pages

376

conf.set("parquet.block.size", "134217728"); // 128MB blocks

377

```

378

379

The RowData integration provides the most efficient path for Table API operations by directly using Flink's internal row representation without additional serialization overhead.