or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdformat-factory.mdindex.mdprotobuf-integration.mdrowdata-writers.mdschema-utilities.mdvectorized-input.md

index.mddocs/

0

# Flink SQL Parquet

1

2

Apache Flink SQL Parquet provides comprehensive support for reading and writing Parquet files in Flink SQL applications. This package bundles Parquet format libraries with proper shading configuration to avoid dependency conflicts in SQL client environments, supporting both batch and streaming data processing workflows.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-parquet_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Maven dependency `org.apache.flink:flink-sql-parquet_2.12:1.14.6`

10

11

## Core Imports

12

13

```java

14

import org.apache.flink.formats.parquet.ParquetFileFormatFactory;

15

import org.apache.flink.formats.parquet.ParquetWriterFactory;

16

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;

17

import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;

18

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;

19

import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

20

```

21

22

## Basic Usage

23

24

```java

25

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;

26

import org.apache.flink.formats.parquet.ParquetWriterFactory;

27

import org.apache.flink.table.types.logical.RowType;

28

import org.apache.flink.table.data.RowData;

29

import org.apache.flink.connector.file.sink.FileSink;

30

import org.apache.flink.core.fs.Path;

31

import org.apache.hadoop.conf.Configuration;

32

import org.apache.flink.table.types.logical.LogicalType;

33

34

// Create a Parquet writer factory for RowData

35

RowType rowType = RowType.of(/* field types */);

36

Configuration hadoopConfig = new Configuration();

37

boolean utcTimezone = false;

38

39

ParquetWriterFactory<RowData> writerFactory =

40

ParquetRowDataBuilder.createWriterFactory(rowType, hadoopConfig, utcTimezone);

41

42

// Use with Flink's file sink

43

Path outputPath = new Path("/output/path");

44

FileSink<RowData> sink = FileSink

45

.forBulkFormat(outputPath, writerFactory)

46

.build();

47

```

48

49

## Architecture

50

51

Flink SQL Parquet is organized around several key components:

52

53

- **Format Factory**: SQL table factory integration for automatic format detection

54

- **Writer APIs**: Multiple writer implementations for different data types (RowData, Avro, Protobuf)

55

- **Input Formats**: Vectorized and columnar input formats for high-performance reading

56

- **Schema Conversion**: Utilities for converting between Flink and Parquet schemas

57

- **Vectorized Processing**: Column-oriented data processing for improved performance

58

59

## Capabilities

60

61

### Format Factory Integration

62

63

SQL table factory that automatically integrates Parquet format with Flink's table ecosystem. Provides transparent support for CREATE TABLE statements with Parquet format.

64

65

```java { .api }

66

public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

67

public BulkDecodingFormat<RowData> createDecodingFormat(

68

DynamicTableFactory.Context context,

69

ReadableConfig formatOptions

70

);

71

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(

72

DynamicTableFactory.Context context,

73

ReadableConfig formatOptions

74

);

75

public String factoryIdentifier();

76

}

77

```

78

79

[Format Factory Integration](./format-factory.md)

80

81

### RowData Writer APIs

82

83

High-performance writers for Flink's internal RowData format with comprehensive schema mapping and configuration support.

84

85

```java { .api }

86

public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {

87

public static ParquetWriterFactory<RowData> createWriterFactory(

88

RowType rowType,

89

Configuration conf,

90

boolean utcTimestamp

91

);

92

}

93

94

public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {

95

public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);

96

public BulkWriter<T> create(FSDataOutputStream stream) throws IOException;

97

}

98

```

99

100

[RowData Writers](./rowdata-writers.md)

101

102

### Avro Integration

103

104

Complete Avro integration supporting specific records, generic records, and reflection-based serialization with full schema compatibility.

105

106

```java { .api }

107

public class ParquetAvroWriters {

108

public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);

109

public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);

110

public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);

111

}

112

```

113

114

[Avro Integration](./avro-integration.md)

115

116

### Protobuf Integration

117

118

Seamless Protocol Buffers integration for writing Protobuf messages to Parquet format with automatic schema generation.

119

120

```java { .api }

121

public class ParquetProtoWriters {

122

public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);

123

}

124

```

125

126

[Protobuf Integration](./protobuf-integration.md)

127

128

### Vectorized Input Formats

129

130

High-performance columnar input formats optimized for analytical workloads with vectorized processing and partition support.

131

132

```java { .api }

133

public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit>

134

extends ParquetVectorizedInputFormat<RowData, SplitT> {

135

136

public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(

137

Configuration hadoopConfig,

138

RowType producedRowType,

139

List<String> partitionKeys,

140

PartitionFieldExtractor<SplitT> extractor,

141

int batchSize,

142

boolean isUtcTimestamp,

143

boolean isCaseSensitive

144

);

145

}

146

```

147

148

[Vectorized Input](./vectorized-input.md)

149

150

### Schema Utilities

151

152

Utilities for converting between Flink logical types and Parquet schema definitions with full type mapping support.

153

154

```java { .api }

155

public class ParquetSchemaConverter {

156

public static MessageType convertToParquetMessageType(String name, RowType rowType);

157

}

158

159

public class SerializableConfiguration {

160

public SerializableConfiguration(Configuration conf);

161

public Configuration conf();

162

}

163

```

164

165

[Schema Utilities](./schema-utilities.md)

166

167

## Configuration Options

168

169

```java { .api }

170

public static final ConfigOption<Boolean> UTC_TIMEZONE =

171

key("utc-timezone")

172

.booleanType()

173

.defaultValue(false)

174

.withDescription("Use UTC timezone or local timezone to the conversion between epoch" +

175

" time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" +

176

" use UTC timezone");

177

```

178

179

## Common Patterns

180

181

### Creating Table with Parquet Format

182

183

```sql

184

CREATE TABLE my_parquet_table (

185

id BIGINT,

186

name STRING,

187

timestamp_col TIMESTAMP(3)

188

) WITH (

189

'connector' = 'filesystem',

190

'path' = '/path/to/parquet/files',

191

'format' = 'parquet',

192

'parquet.utc-timezone' = 'true'

193

);

194

```

195

196

### Programmatic Writer Creation

197

198

```java

199

// For RowData

200

ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(

201

rowType, hadoopConfig, true

202

);

203

204

// For Avro specific records

205

ParquetWriterFactory<MyAvroRecord> avroFactory =

206

ParquetAvroWriters.forSpecificRecord(MyAvroRecord.class);

207

208

// For Protobuf messages

209

ParquetWriterFactory<MyProtoMessage> protoFactory =

210

ParquetProtoWriters.forType(MyProtoMessage.class);

211

```

212

213

## Types

214

215

```java { .api }

216

@FunctionalInterface

217

public interface ParquetBuilder<T> extends Serializable {

218

ParquetWriter<T> createWriter(OutputFile out) throws IOException;

219

}

220

221

public class ParquetBulkWriter<T> implements BulkWriter<T> {

222

public ParquetBulkWriter(ParquetWriter<T> parquetWriter);

223

public void addElement(T datum) throws IOException;

224

public void flush();

225

public void finish() throws IOException;

226

}

227

228

public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>

229

implements BulkFormat<T, SplitT> {

230

// Abstract base class for vectorized input formats

231

}

232

```