or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdpredicate-pushdown.mdtable-api.mdvector-processing.md

table-api.mddocs/

0

# Table API Integration

1

2

The ORC format integrates with Flink's Table API through the `OrcFileFormatFactory`, providing both reading and writing capabilities with the format identifier `"orc"`.

3

4

## Format Factory

5

6

```java { .api }

7

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

8

public static final String IDENTIFIER = "orc";

9

10

public String factoryIdentifier();

11

public Set<ConfigOption<?>> requiredOptions();

12

public Set<ConfigOption<?>> optionalOptions();

13

14

public BulkDecodingFormat<RowData> createDecodingFormat(

15

DynamicTableFactory.Context context,

16

ReadableConfig formatOptions

17

);

18

19

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(

20

DynamicTableFactory.Context context,

21

ReadableConfig formatOptions

22

);

23

}

24

```

25

26

## Usage Examples

27

28

### Creating ORC Tables

29

30

```java

31

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

32

33

// Simple ORC table

34

tableEnv.executeSql(

35

"CREATE TABLE orders (" +

36

" order_id BIGINT," +

37

" customer_id INT," +

38

" product_name STRING," +

39

" quantity INT," +

40

" price DECIMAL(10,2)," +

41

" order_date DATE," +

42

" created_at TIMESTAMP(3)" +

43

") WITH (" +

44

" 'connector' = 'filesystem'," +

45

" 'path' = '/data/orders'," +

46

" 'format' = 'orc'" +

47

")"

48

);

49

```

50

51

### Partitioned ORC Tables

52

53

```java

54

// Partitioned table

55

tableEnv.executeSql(

56

"CREATE TABLE partitioned_orders (" +

57

" order_id BIGINT," +

58

" customer_id INT," +

59

" product_name STRING," +

60

" quantity INT," +

61

" price DECIMAL(10,2)," +

62

" order_date DATE," +

63

" created_at TIMESTAMP(3)," +

64

" year INT," +

65

" month INT" +

66

") PARTITIONED BY (year, month) " +

67

"WITH (" +

68

" 'connector' = 'filesystem'," +

69

" 'path' = '/data/partitioned_orders'," +

70

" 'format' = 'orc'" +

71

")"

72

);

73

```

74

75

### Reading and Writing

76

77

```java

78

// Read from ORC table

79

Table orders = tableEnv.from("orders");

80

Table highValueOrders = orders

81

.filter($("price").isGreater(100.0))

82

.select($("order_id"), $("customer_id"), $("price"));

83

84

// Write to ORC table

85

tableEnv.executeSql(

86

"INSERT INTO high_value_orders " +

87

"SELECT order_id, customer_id, price " +

88

"FROM orders " +

89

"WHERE price > 100.0"

90

);

91

```

92

93

## Format Options

94

95

### Configuration Properties

96

97

The ORC format supports various configuration options through Hadoop Configuration properties:

98

99

```java

100

// Table with ORC-specific options

101

tableEnv.executeSql(

102

"CREATE TABLE configured_orders (" +

103

" order_id BIGINT," +

104

" data STRING" +

105

") WITH (" +

106

" 'connector' = 'filesystem'," +

107

" 'path' = '/data/configured_orders'," +

108

" 'format' = 'orc'," +

109

" 'orc.compress' = 'SNAPPY'," +

110

" 'orc.stripe.size' = '67108864'," +

111

" 'orc.row.index.stride' = '10000'" +

112

")"

113

);

114

```

115

116

### Supported ORC Properties

117

118

Common ORC configuration properties that can be used with the `orc.` prefix:

119

120

- `orc.compress`: Compression codec (`NONE`, `ZLIB`, `SNAPPY`, `LZO`, `LZ4`, `ZSTD`)

121

- `orc.stripe.size`: Target stripe size in bytes (default: 67108864)

122

- `orc.row.index.stride`: Number of rows between index entries (default: 10000)

123

- `orc.create.index`: Whether to create row group indexes (default: true)

124

- `orc.bloom.filter.columns`: Columns for bloom filter creation

125

- `orc.bloom.filter.fpp`: Bloom filter false positive probability

126

127

## Bulk Decoding Format

128

129

```java { .api }

130

@VisibleForTesting

131

public static class OrcBulkDecodingFormat

132

implements BulkDecodingFormat<RowData>,

133

ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,

134

FileBasedStatisticsReportableInputFormat {

135

136

public OrcBulkDecodingFormat(ReadableConfig formatOptions);

137

138

public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(

139

DynamicTableSource.Context sourceContext,

140

DataType producedDataType,

141

int[][] projections

142

);

143

144

public ChangelogMode getChangelogMode();

145

public void applyFilters(List<ResolvedExpression> filters);

146

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

147

}

148

```

149

150

## Features

151

152

### Projection Pushdown

153

154

The format automatically supports column projection, reading only the required columns:

155

156

```java

157

// Only reads order_id and price columns from ORC files

158

Table result = tableEnv.sqlQuery(

159

"SELECT order_id, price FROM orders WHERE customer_id = 12345"

160

);

161

```

162

163

### Filter Pushdown

164

165

Supported filter predicates are automatically pushed down to the ORC reader:

166

167

```java

168

// Filters are pushed down to ORC level

169

Table result = tableEnv.sqlQuery(

170

"SELECT * FROM orders " +

171

"WHERE order_date >= DATE '2023-01-01' " +

172

"AND price BETWEEN 50.0 AND 500.0 " +

173

"AND product_name IS NOT NULL"

174

);

175

```

176

177

### Statistics Reporting

178

179

The format can report table statistics from ORC file metadata:

180

181

```java

182

// Statistics are automatically extracted from ORC files

183

TableStats stats = bulkDecodingFormat.reportStatistics(files, producedDataType);

184

```

185

186

## Integration with Catalogs

187

188

```java

189

// Using with Hive catalog

190

HiveCatalog catalog = new HiveCatalog("hive", "default", "/path/to/hive-conf");

191

tableEnv.registerCatalog("hive", catalog);

192

tableEnv.useCatalog("hive");

193

194

// Create ORC table in Hive catalog

195

tableEnv.executeSql(

196

"CREATE TABLE hive.default.orc_table (" +

197

" id BIGINT," +

198

" name STRING" +

199

") STORED AS ORC " +

200

"LOCATION '/warehouse/orc_table'"

201

);

202

```