or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatastream-api.mdindex.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions with full support for partitioned tables, filter pushdown, and vectorized processing.

3

4

## Capabilities

5

6

### Format Factory

7

8

The main entry point for integrating ORC format with Flink's Table API and SQL engine.

9

10

```java { .api }

11

/**

12

* Factory for creating ORC format in Flink SQL/Table API contexts.

13

* Provides both reading and writing capabilities with comprehensive configuration options.

14

*/

15

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

16

/** Format identifier for SQL DDL statements */

17

public static final String IDENTIFIER = "orc";

18

19

/** Returns the unique identifier for this format */

20

public String factoryIdentifier();

21

22

/** Returns the set of required configuration options */

23

public Set<ConfigOption<?>> requiredOptions();

24

25

/** Returns the set of optional configuration options */

26

public Set<ConfigOption<?>> optionalOptions();

27

28

/** Creates a decoding format for reading ORC files */

29

public BulkDecodingFormat<RowData> createDecodingFormat(

30

DynamicTableFactory.Context context,

31

ReadableConfig formatOptions

32

);

33

34

/** Creates an encoding format for writing ORC files */

35

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(

36

DynamicTableFactory.Context context,

37

ReadableConfig formatOptions

38

);

39

}

40

```

41

42

**Usage Example:**

43

44

```java

45

// SQL DDL for creating ORC table

46

tEnv.executeSql(

47

"CREATE TABLE sales_data (" +

48

" transaction_id BIGINT," +

49

" user_id BIGINT," +

50

" product_id BIGINT," +

51

" amount DECIMAL(10,2)," +

52

" transaction_time TIMESTAMP(3)," +

53

" region STRING" +

54

") PARTITIONED BY (region) " +

55

"WITH (" +

56

" 'connector' = 'filesystem'," +

57

" 'path' = 'hdfs://namenode:port/path/to/sales'," +

58

" 'format' = 'orc'," +

59

" 'orc.compress' = 'snappy'," +

60

" 'orc.stripe.size' = '64MB'" +

61

")"

62

);

63

```

64

65

### Input Format for Bulk Reading

66

67

Specialized input format for reading ORC files with vectorized processing and partition support.

68

69

```java { .api }

70

/**

71

* Abstract base class for ORC input formats providing vectorized reading capabilities

72

* @param <T> The type of records produced by the format

73

* @param <BatchT> The type of batch used internally (e.g., VectorizedRowBatch)

74

* @param <SplitT> The type of input split

75

*/

76

public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {

77

/**

78

* Constructor for ORC input format

79

* @param filePaths Array of file paths to read

80

* @param schema ORC schema description

81

* @param selectedFields Indices of fields to read (for projection)

82

* @param conjunctPredicates List of predicates for pushdown filtering

83

* @param batchSize Size of vectorized batches

84

* @param orcConfig ORC-specific configuration

85

* @param hadoopConfigWrapper Serializable Hadoop configuration

86

*/

87

public AbstractOrcFileInputFormat(

88

Path[] filePaths,

89

TypeDescription schema,

90

int[] selectedFields,

91

List<Predicate> conjunctPredicates,

92

int batchSize,

93

Configuration orcConfig,

94

SerializableHadoopConfigWrapper hadoopConfigWrapper

95

);

96

97

/** Returns true as ORC files support splitting */

98

public boolean isSplittable();

99

100

/** Abstract method to return the produced type information */

101

public abstract TypeInformation<T> getProducedType();

102

}

103

104

/**

105

* Concrete ORC input format that produces RowData with columnar processing

106

*/

107

public class OrcColumnarRowFileInputFormat<BatchT, SplitT>

108

extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {

109

110

/** Constructor for non-partitioned tables */

111

public OrcColumnarRowFileInputFormat(

112

Path[] filePaths,

113

String[] fieldNames,

114

LogicalType[] fieldTypes,

115

int[] selectedFields,

116

List<Predicate> conjunctPredicates,

117

int batchSize,

118

Configuration orcConfig,

119

SerializableHadoopConfigWrapper hadoopConfigWrapper

120

);

121

122

/**

123

* Factory method for creating partitioned table format

124

* @param orcConfig ORC configuration

125

* @param tableType Row type of the table schema

126

* @param hadoopConfigWrapper Hadoop configuration

127

* @param partitionKeys List of partition column names

128

* @param extractor Partition field extractor

129

* @param conjunctPredicates Filter predicates

130

* @param batchSize Vectorized batch size

131

* @param caseSensitive Whether names are case sensitive

132

* @return Configured input format for partitioned tables

133

*/

134

public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>

135

createPartitionedFormat(

136

Configuration orcConfig,

137

RowType tableType,

138

SerializableHadoopConfigWrapper hadoopConfigWrapper,

139

List<String> partitionKeys,

140

PartitionFieldExtractor<SplitT> extractor,

141

List<Predicate> conjunctPredicates,

142

int batchSize,

143

boolean caseSensitive

144

);

145

146

/** Returns RowData type information */

147

public TypeInformation<RowData> getProducedType();

148

}

149

```

150

151

**Usage Example:**

152

153

```java

154

// Programmatic table creation with ORC format

155

TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")

156

.schema(Schema.newBuilder()

157

.column("user_id", DataTypes.BIGINT())

158

.column("product_id", DataTypes.BIGINT())

159

.column("purchase_time", DataTypes.TIMESTAMP_LTZ(3))

160

.column("amount", DataTypes.DECIMAL(10, 2))

161

.build())

162

.partitionedBy("region")

163

.format("orc")

164

.option("path", "/path/to/orc/data")

165

.option("orc.compress", "zstd")

166

.build();

167

168

tEnv.createTable("purchases", descriptor);

169

```

170

171

### Split Readers

172

173

Internal components for reading ORC file splits with vectorized processing.

174

175

```java { .api }

176

/**

177

* Split reader for ORC files that produces RowData from vectorized batches

178

*/

179

public class OrcColumnarRowSplitReader<BATCH> extends OrcSplitReader<RowData, BATCH> {

180

/**

181

* Constructor for columnar row split reader

182

* @param orcShim Version compatibility shim

183

* @param orcConfig ORC configuration

184

* @param fieldNames Array of field names

185

* @param fieldTypes Array of logical field types

186

* @param selectedFields Indices of selected fields for projection

187

* @param conjunctPredicates Filter predicates for pushdown

188

* @param batchSize Size of vectorized batches

189

* @param split File split to read

190

* @param generator Batch generator for creating column batches

191

*/

192

public OrcColumnarRowSplitReader(

193

OrcShim<BATCH> orcShim,

194

Configuration orcConfig,

195

String[] fieldNames,

196

LogicalType[] fieldTypes,

197

int[] selectedFields,

198

List<Predicate> conjunctPredicates,

199

int batchSize,

200

SplitT split,

201

ColumnBatchGenerator<BATCH> generator

202

);

203

204

/** Reads the next record as RowData */

205

public RowData nextRecord(RowData reuse) throws IOException;

206

207

/**

208

* Interface for generating column batches from different batch types

209

*/

210

public interface ColumnBatchGenerator<BATCH> extends Serializable {

211

VectorizedColumnBatch generate(BATCH batch);

212

}

213

}

214

215

/**

216

* Abstract base class for ORC split readers with batch processing capabilities

217

*/

218

public abstract class OrcSplitReader<T, BATCH> implements Closeable {

219

/**

220

* Constructor for ORC split reader

221

* @param orcShim Version compatibility shim

222

* @param orcConfig ORC configuration

223

* @param split File split to read

224

* @param batchSize Size of vectorized batches

225

*/

226

public OrcSplitReader(

227

OrcShim<BATCH> orcShim,

228

Configuration orcConfig,

229

SplitT split,

230

int batchSize

231

);

232

233

/** Seek to a specific row number */

234

public void seekToRow(long rowNumber) throws IOException;

235

236

/** Check if the end of input has been reached */

237

public boolean reachedEnd();

238

239

/** Abstract method for reading the next record */

240

public abstract T nextRecord(T reuse) throws IOException;

241

242

/** Close the reader and release resources */

243

public void close() throws IOException;

244

}

245

```

246

247

### Utility Classes

248

249

Helper classes for ORC reader creation and type conversions in Table API contexts.

250

251

```java { .api }

252

/**

253

* Utility class for creating ORC readers and performing type conversions

254

*/

255

public class OrcSplitReaderUtil {

256

/**

257

* Generate a partitioned columnar row reader

258

* @param orcShim Version compatibility shim

259

* @param orcConfig ORC configuration

260

* @param fieldNames Field names in the schema

261

* @param fieldTypes Logical types of the fields

262

* @param selectedFields Selected field indices for projection

263

* @param conjunctPredicates Filter predicates

264

* @param batchSize Vectorized batch size

265

* @param split File split to read

266

* @param partitionKeys Partition column names

267

* @param defaultPartName Default partition name for null values

268

* @param extractor Partition field extractor

269

* @return Configured partitioned reader

270

*/

271

public static <SplitT extends FileSourceSplit> OrcColumnarRowSplitReader<VectorizedRowBatch>

272

genPartColumnarRowReader(

273

OrcShim<VectorizedRowBatch> orcShim,

274

Configuration orcConfig,

275

String[] fieldNames,

276

LogicalType[] fieldTypes,

277

int[] selectedFields,

278

List<Predicate> conjunctPredicates,

279

int batchSize,

280

SplitT split,

281

List<String> partitionKeys,

282

String defaultPartName,

283

PartitionFieldExtractor<SplitT> extractor

284

);

285

286

/** Get selected field indices from ORC schema and projection */

287

public static int[] getSelectedOrcFields(

288

RowType tableType,

289

int[] selectedFields,

290

List<String> partitionKeys

291

);

292

293

/** Filter out partition column names from schema */

294

public static List<String> getNonPartNames(List<String> fieldNames, List<String> partitionKeys);

295

296

/** Convert Flink row type to ORC type with partition support */

297

public static TypeDescription convertToOrcTypeWithPart(RowType type, List<String> partitionKeys);

298

299

/** Convert Flink logical type to ORC TypeDescription */

300

public static TypeDescription logicalTypeToOrcType(LogicalType type);

301

}

302

```

303

304

**Usage Example:**

305

306

```java

307

// Query with partition pruning and filter pushdown

308

tEnv.executeSql(

309

"SELECT user_id, SUM(amount) as total_spent " +

310

"FROM sales_data " +

311

"WHERE region = 'US' AND transaction_time > TIMESTAMP '2023-01-01 00:00:00' " +

312

"GROUP BY user_id " +

313

"HAVING total_spent > 1000"

314

).print();

315

```