or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

index.mddocs/

0

# Apache Flink Parquet Format

1

2

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications. This library enables efficient processing of Apache Parquet files within the Flink ecosystem, offering vectorized reading, multiple serialization format support, and seamless integration with Flink's Table API and DataStream API.

3

4

## Package Information

5

6

- **Package Name**: flink-parquet

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-parquet

10

- **Language**: Java

11

- **Installation**: Add to Maven pom.xml:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-parquet</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.formats.parquet.ParquetFileFormatFactory;

25

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;

26

import org.apache.flink.formats.parquet.ParquetWriterFactory;

27

import org.apache.flink.formats.parquet.ParquetBuilder;

28

```

29

30

For Avro integration:

31

32

```java

33

import org.apache.flink.formats.parquet.avro.AvroParquetReaders;

34

import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

35

```

36

37

For RowData integration:

38

39

```java

40

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;

41

```

42

43

For Protocol Buffers integration:

44

45

```java

46

import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

47

```

48

49

## Basic Usage

50

51

### Reading Parquet Files

52

53

```java

54

import org.apache.flink.formats.parquet.avro.AvroParquetReaders;

55

import org.apache.flink.connector.file.src.FileSource;

56

import org.apache.flink.core.fs.Path;

57

58

// Read Avro records from Parquet

59

FileSource<SpecificRecord> source = FileSource

60

.forRecordStreamFormat(

61

AvroParquetReaders.forSpecificRecord(MyAvroRecord.class),

62

new Path("path/to/parquet/files")

63

)

64

.build();

65

66

DataStream<SpecificRecord> stream = env.fromSource(

67

source,

68

WatermarkStrategy.noWatermarks(),

69

"parquet-source"

70

);

71

```

72

73

### Writing Parquet Files

74

75

```java

76

import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

77

import org.apache.flink.connector.file.sink.FileSink;

78

79

// Write Avro records to Parquet

80

FileSink<SpecificRecord> sink = FileSink

81

.forBulkFormat(

82

new Path("path/to/output"),

83

AvroParquetWriters.forSpecificRecord(MyAvroRecord.class)

84

)

85

.build();

86

87

stream.sinkTo(sink);

88

```

89

90

### Table API Integration

91

92

```java

93

import org.apache.flink.table.api.TableEnvironment;

94

95

// Create Parquet table

96

tableEnv.executeSql("""

97

CREATE TABLE parquet_table (

98

id BIGINT,

99

name STRING,

100

timestamp_col TIMESTAMP(3)

101

) WITH (

102

'connector' = 'filesystem',

103

'path' = 'path/to/parquet/files',

104

'format' = 'parquet'

105

)

106

""");

107

```

108

109

## Architecture

110

111

The Flink Parquet module is built around several key architectural components:

112

113

- **Format Factory**: `ParquetFileFormatFactory` provides the main entry point for Table API integration, creating bulk reading/writing formats

114

- **Vectorized Reading**: High-performance columnar readers that process multiple rows in batches for improved throughput

115

- **Multi-Format Support**: Seamless integration with Avro, Protocol Buffers, and Flink's native RowData serialization

116

- **Schema Conversion**: Automatic conversion between Flink types and Parquet schema with support for nested data structures

117

- **Statistics Integration**: Built-in support for collecting and reporting file-level statistics for query optimization

118

119

The design enables efficient large-scale data processing by leveraging Parquet's columnar storage format while maintaining full compatibility with Flink's streaming and batch processing capabilities.

120

121

## Capabilities

122

123

### Table API Format Factory

124

125

Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options.

126

127

```java { .api }

128

public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

129

public BulkDecodingFormat<RowData> createDecodingFormat(

130

DynamicTableFactory.Context context,

131

ReadableConfig formatOptions

132

);

133

134

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(

135

DynamicTableFactory.Context context,

136

ReadableConfig formatOptions

137

);

138

}

139

```

140

141

[Table Integration](./table-integration.md)

142

143

### Writing Support

144

145

Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.

146

147

```java { .api }

148

public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {

149

public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);

150

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

151

}

152

153

@FunctionalInterface

154

public interface ParquetBuilder<T> {

155

ParquetWriter<T> createWriter(OutputFile out) throws IOException;

156

}

157

```

158

159

[Writing Support](./writing-support.md)

160

161

### Avro Integration

162

163

Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.

164

165

```java { .api }

166

public class AvroParquetReaders {

167

public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);

168

public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);

169

public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);

170

}

171

172

public class AvroParquetWriters {

173

public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);

174

public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);

175

public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);

176

}

177

```

178

179

[Avro Integration](./avro-integration.md)

180

181

### RowData Integration

182

183

Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.

184

185

```java { .api }

186

public class ParquetRowDataBuilder {

187

public static ParquetWriterFactory<RowData> createWriterFactory(

188

RowType rowType,

189

Configuration conf,

190

boolean utcTimestamp

191

);

192

}

193

194

public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT>

195

implements FileBasedStatisticsReportableInputFormat {

196

public static ParquetColumnarRowInputFormat<FileSourceSplit> createPartitionedFormat(

197

Configuration conf,

198

RowType producedRowType,

199

TypeInformation<RowData> producedTypeInfo,

200

List<String> partitionKeys,

201

String defaultPartName,

202

int batchSize,

203

boolean utcTimestamp,

204

boolean caseSensitive

205

);

206

}

207

```

208

209

[RowData Integration](./rowdata-integration.md)

210

211

### Protocol Buffers Integration

212

213

Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data.

214

215

```java { .api }

216

public class ParquetProtoWriters {

217

public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);

218

}

219

```

220

221

[Protocol Buffers Integration](./protobuf-integration.md)

222

223

### Vectorized Reading

224

225

High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.

226

227

```java { .api }

228

public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {

229

public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;

230

public boolean isSplittable();

231

}

232

233

@FunctionalInterface

234

public interface ColumnBatchFactory<SplitT> {

235

VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);

236

static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();

237

}

238

```

239

240

[Vectorized Reading](./vectorized-reading.md)

241

242

### Utilities and Schema Conversion

243

244

Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.

245

246

```java { .api }

247

public class ParquetSchemaConverter {

248

public static MessageType convertToParquetMessageType(

249

String name,

250

RowType rowType,

251

Configuration conf

252

);

253

254

public static Type convertToParquetType(

255

String name,

256

LogicalType logicalType,

257

Configuration conf

258

);

259

}

260

261

public class SerializableConfiguration implements Serializable {

262

public SerializableConfiguration(Configuration configuration);

263

public Configuration conf();

264

}

265

```

266

267

[Utilities](./utilities.md)

268

269

## Configuration Options

270

271

```java { .api }

272

// Format factory configuration constants

273

public static final ConfigOption<Boolean> UTC_TIMEZONE; // default: false

274

public static final ConfigOption<String> TIMESTAMP_TIME_UNIT; // default: "micros"

275

public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP; // default: false

276

public static final ConfigOption<Integer> BATCH_SIZE; // default: 2048

277

```

278

279

## Common Types

280

281

### BulkWriter Factory

282

283

```java { .api }

284

public interface BulkWriter<T> {

285

void addElement(T element) throws IOException;

286

void flush() throws IOException;

287

void finish() throws IOException;

288

289

interface Factory<T> extends Serializable {

290

BulkWriter<T> create(FSDataOutputStream out) throws IOException;

291

}

292

}

293

```

294

295

### Stream Format

296

297

```java { .api }

298

public interface StreamFormat<T> extends Serializable {

299

Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;

300

Reader<T> restoreReader(Configuration config, FileSourceSplit split) throws IOException;

301

boolean isSplittable();

302

TypeInformation<T> getProducedType();

303

}

304

```