or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdformat-factory.mdindex.mdprotobuf-integration.mdrowdata-writers.mdschema-utilities.mdvectorized-input.md

format-factory.mddocs/

0

# Format Factory Integration

1

2

The Parquet format factory provides seamless integration with Flink's table ecosystem, enabling automatic format detection and configuration for SQL table definitions.

3

4

## Capabilities

5

6

### ParquetFileFormatFactory

7

8

Main factory class that implements both bulk reader and writer format factories, providing complete integration with Flink's dynamic table system.

9

10

```java { .api }

11

/**

12

* Parquet format factory for file system connector integration

13

* Implements both reading and writing capabilities for SQL tables

14

*/

15

public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

16

17

/**

18

* Creates a bulk decoding format for reading Parquet files

19

* @param context Dynamic table factory context with catalog information

20

* @param formatOptions Configuration options for the format

21

* @return BulkDecodingFormat for RowData processing

22

*/

23

public BulkDecodingFormat<RowData> createDecodingFormat(

24

DynamicTableFactory.Context context,

25

ReadableConfig formatOptions

26

);

27

28

/**

29

* Creates an encoding format for writing Parquet files

30

* @param context Dynamic table factory context with catalog information

31

* @param formatOptions Configuration options for the format

32

* @return EncodingFormat for BulkWriter factory creation

33

*/

34

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(

35

DynamicTableFactory.Context context,

36

ReadableConfig formatOptions

37

);

38

39

/**

40

* Returns the unique identifier for this format factory

41

* @return "parquet" identifier string

42

*/

43

public String factoryIdentifier();

44

45

/**

46

* Returns the set of required configuration options

47

* @return Empty set (no required options)

48

*/

49

public Set<ConfigOption<?>> requiredOptions();

50

51

/**

52

* Returns the set of optional configuration options

53

* @return Set containing UTC_TIMEZONE and other optional configs

54

*/

55

public Set<ConfigOption<?>> optionalOptions();

56

}

57

```

58

59

### Configuration Options

60

61

```java { .api }

62

/**

63

* Controls timezone handling for timestamp conversion

64

* Default: false (use local timezone)

65

* When true: use UTC timezone for epoch time to LocalDateTime conversion

66

*/

67

public static final ConfigOption<Boolean> UTC_TIMEZONE =

68

key("utc-timezone")

69

.booleanType()

70

.defaultValue(false)

71

.withDescription(

72

"Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. " +

73

"Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone"

74

);

75

```

76

77

## Usage Examples

78

79

### SQL Table Definition

80

81

```sql

82

-- Create a Parquet table with UTC timezone handling

83

CREATE TABLE orders (

84

order_id BIGINT,

85

customer_name STRING,

86

order_date TIMESTAMP(3),

87

amount DECIMAL(10,2)

88

) WITH (

89

'connector' = 'filesystem',

90

'path' = '/data/orders',

91

'format' = 'parquet',

92

'parquet.utc-timezone' = 'true'

93

);

94

95

-- Query the table (format factory handles Parquet reading automatically)

96

SELECT customer_name, SUM(amount)

97

FROM orders

98

WHERE order_date >= TIMESTAMP '2023-01-01 00:00:00'

99

GROUP BY customer_name;

100

```

101

102

### Programmatic Table Creation

103

104

```java

105

import org.apache.flink.table.api.TableEnvironment;

106

import org.apache.flink.table.descriptors.FileSystem;

107

import org.apache.flink.table.descriptors.Parquet;

108

109

TableEnvironment tableEnv = // ... get table environment

110

111

// The format factory is automatically discovered via service loader

112

tableEnv.executeSql("""

113

CREATE TABLE parquet_table (

114

id BIGINT,

115

name STRING,

116

created_at TIMESTAMP(3)

117

) WITH (

118

'connector' = 'filesystem',

119

'path' = '/path/to/data',

120

'format' = 'parquet',

121

'parquet.utc-timezone' = 'false'

122

)

123

""");

124

```

125

126

### Service Registration

127

128

The format factory is automatically registered via Java's ServiceLoader mechanism:

129

130

```

131

# File: META-INF/services/org.apache.flink.table.factories.Factory

132

org.apache.flink.formats.parquet.ParquetFileFormatFactory

133

```

134

135

## Internal Implementation Details

136

137

The factory creates different decoders and encoders based on the table context:

138

139

- **Decoding**: Creates `ParquetColumnarRowInputFormat` with vectorized reading

140

- **Encoding**: Creates `ParquetRowDataBuilder` with RowData writing support

141

- **Partitioning**: Automatically handles partitioned tables with proper field extraction

142

- **Schema Conversion**: Converts Flink logical types to Parquet schema format

143

144

## Error Handling

145

146

The format factory handles common configuration errors:

147

148

- Invalid timezone configuration values

149

- Incompatible data types for Parquet format

150

- Missing required Hadoop configuration

151

- Schema conversion failures between Flink and Parquet types