or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

table-integration.mddocs/

0

# Table Integration

1

2

Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options and statistics support.

3

4

## Capabilities

5

6

### ParquetFileFormatFactory

7

8

Main factory class implementing both bulk reader and writer format factories for seamless Table API integration.

9

10

```java { .api }

11

/**

12

* Parquet format factory for file system connectors

13

*/

14

public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

15

16

/**

17

* Creates a bulk decoding format for reading Parquet files

18

* @param context Dynamic table factory context

19

* @param formatOptions Configuration options

20

* @return BulkDecodingFormat for reading RowData from Parquet files

21

*/

22

public BulkDecodingFormat<RowData> createDecodingFormat(

23

DynamicTableFactory.Context context,

24

ReadableConfig formatOptions

25

);

26

27

/**

28

* Creates an encoding format for writing Parquet files

29

* @param context Dynamic table factory context

30

* @param formatOptions Configuration options

31

* @return EncodingFormat for writing RowData to Parquet files

32

*/

33

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(

34

DynamicTableFactory.Context context,

35

ReadableConfig formatOptions

36

);

37

38

/**

39

* Returns the format identifier for this factory

40

* @return "parquet"

41

*/

42

public String factoryIdentifier();

43

44

/**

45

* Required configuration options (none for Parquet)

46

* @return Empty set

47

*/

48

public Set<ConfigOption<?>> requiredOptions();

49

50

/**

51

* Optional configuration options

52

* @return Set of supported configuration options

53

*/

54

public Set<ConfigOption<?>> optionalOptions();

55

}

56

```

57

58

### Configuration Options

59

60

Essential configuration options for controlling Parquet reading and writing behavior.

61

62

```java { .api }

63

/**

64

* Format identifier constant

65

*/

66

public static final String IDENTIFIER = "parquet";

67

68

/**

69

* Use UTC timezone or local timezone for timestamp conversion

70

* Default: false (use local timezone)

71

*/

72

public static final ConfigOption<Boolean> UTC_TIMEZONE =

73

key("utc-timezone")

74

.booleanType()

75

.defaultValue(false);

76

77

/**

78

* Time unit for storing Parquet timestamps

79

* Values: "nanos", "micros", "millis"

80

* Default: "micros"

81

*/

82

public static final ConfigOption<String> TIMESTAMP_TIME_UNIT =

83

key("timestamp.time.unit")

84

.stringType()

85

.defaultValue("micros");

86

87

/**

88

* Write timestamps as int64/LogicalTypes instead of int96/OriginalTypes

89

* Default: false (use int96 format)

90

*/

91

public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP =

92

key("write.int64.timestamp")

93

.booleanType()

94

.defaultValue(false);

95

96

/**

97

* Batch size for vectorized reading

98

* Default: 2048 rows per batch

99

*/

100

public static final ConfigOption<Integer> BATCH_SIZE =

101

key("batch-size")

102

.intType()

103

.defaultValue(2048);

104

```

105

106

### ParquetBulkDecodingFormat

107

108

Bulk decoding format implementation with statistics reporting and projection support.

109

110

```java { .api }

111

/**

112

* Bulk decoding format for reading Parquet files with statistics support

113

*/

114

public static class ParquetBulkDecodingFormat

115

implements ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,

116

BulkDecodingFormat<RowData>,

117

FileBasedStatisticsReportableInputFormat {

118

119

/**

120

* Creates runtime decoder with projection support

121

* @param sourceContext Table source context

122

* @param producedDataType Output data type

123

* @param projections Column projections

124

* @return BulkFormat for reading projected RowData

125

*/

126

public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(

127

DynamicTableSource.Context sourceContext,

128

DataType producedDataType,

129

int[][] projections

130

);

131

132

/**

133

* Returns supported changelog mode (insert-only)

134

* @return ChangelogMode.insertOnly()

135

*/

136

public ChangelogMode getChangelogMode();

137

138

/**

139

* Reports table statistics from Parquet file metadata

140

* @param files List of Parquet files to analyze

141

* @param producedDataType Expected output data type

142

* @return TableStats with row counts and column statistics

143

*/

144

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

145

}

146

```

147

148

## Usage Examples

149

150

### Basic Table Creation

151

152

```java

153

import org.apache.flink.table.api.TableEnvironment;

154

155

TableEnvironment tableEnv = TableEnvironment.create(settings);

156

157

// Create table with Parquet format

158

tableEnv.executeSql("""

159

CREATE TABLE orders (

160

order_id BIGINT,

161

customer_id STRING,

162

product_name STRING,

163

quantity INT,

164

price DECIMAL(10,2),

165

order_date TIMESTAMP(3)

166

) WITH (

167

'connector' = 'filesystem',

168

'path' = '/data/orders',

169

'format' = 'parquet'

170

)

171

""");

172

```

173

174

### Advanced Configuration

175

176

```java

177

// Table with custom Parquet settings

178

tableEnv.executeSql("""

179

CREATE TABLE events (

180

event_id BIGINT,

181

timestamp_col TIMESTAMP(3),

182

payload STRING

183

) WITH (

184

'connector' = 'filesystem',

185

'path' = '/data/events',

186

'format' = 'parquet',

187

'parquet.utc-timezone' = 'true',

188

'parquet.timestamp.time.unit' = 'nanos',

189

'parquet.write.int64.timestamp' = 'true',

190

'parquet.batch-size' = '4096'

191

)

192

""");

193

```

194

195

### Reading with Projections

196

197

```java

198

// Only read specific columns for better performance

199

Table result = tableEnv.sqlQuery("""

200

SELECT order_id, customer_id, price

201

FROM orders

202

WHERE order_date >= TIMESTAMP '2023-01-01 00:00:00'

203

""");

204

```

205

206

### Partition Support

207

208

```java

209

// Partitioned Parquet table

210

tableEnv.executeSql("""

211

CREATE TABLE sales_partitioned (

212

transaction_id BIGINT,

213

amount DECIMAL(10,2),

214

product_category STRING,

215

sale_date DATE

216

) PARTITIONED BY (sale_date) WITH (

217

'connector' = 'filesystem',

218

'path' = '/data/sales',

219

'format' = 'parquet',

220

'sink.partition-commit.policy.kind' = 'success-file'

221

)

222

""");

223

```

224

225

## Statistics and Performance

226

227

The Parquet format factory automatically extracts statistics from Parquet file metadata:

228

229

- **Row counts**: Exact counts from file metadata

230

- **Column statistics**: Min/max values, null counts where available

231

- **File-level metrics**: Used for query planning and optimization

232

- **Projection pushdown**: Only reads required columns from storage

233

- **Predicate pushdown**: Filters applied at file level when possible

234

235

This enables Flink's cost-based optimizer to make intelligent decisions about query execution plans.