or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatastream-api.mdindex.mdtable-api.md

index.mddocs/

0

# Apache Flink SQL ORC Format Connector

1

2

Apache Flink SQL ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) files within Flink's Table API and SQL environments. This package is a shaded JAR that bundles the core flink-orc functionality along with all necessary dependencies for seamless ORC file format integration in distributed processing environments.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-orc_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to Maven dependencies: `org.apache.flink:flink-sql-orc_2.12:1.14.6`

10

11

## Core Imports

12

13

```java

14

import org.apache.flink.orc.OrcFileFormatFactory;

15

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

16

import org.apache.flink.orc.vector.RowDataVectorizer;

17

```

18

19

For specific functionality:

20

21

```java

22

// SQL/Table API format factory

23

import org.apache.flink.orc.OrcFileFormatFactory;

24

25

// DataStream API writing

26

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

27

import org.apache.flink.orc.vector.RowDataVectorizer;

28

29

// Filter pushdown

30

import org.apache.flink.orc.OrcFilters;

31

32

// Input formats and readers

33

import org.apache.flink.orc.OrcColumnarRowFileInputFormat;

34

import org.apache.flink.orc.AbstractOrcFileInputFormat;

35

```

36

37

## Basic Usage

38

39

### SQL/Table API Integration

40

41

```java

42

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

43

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

44

45

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

46

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

47

48

// Create table with ORC format

49

tEnv.executeSql(

50

"CREATE TABLE orc_table (" +

51

" user_id BIGINT," +

52

" item_id BIGINT," +

53

" category_id BIGINT," +

54

" behavior STRING," +

55

" ts TIMESTAMP(3)" +

56

") WITH (" +

57

" 'connector' = 'filesystem'," +

58

" 'path' = 'file:///path/to/orc/files'," +

59

" 'format' = 'orc'" +

60

")"

61

);

62

63

// Query the ORC table

64

tEnv.executeSql("SELECT user_id, COUNT(*) FROM orc_table GROUP BY user_id").print();

65

```

66

67

### DataStream API Writing

68

69

```java

70

import org.apache.flink.api.common.typeinfo.Types;

71

import org.apache.flink.core.fs.Path;

72

import org.apache.flink.orc.vector.RowDataVectorizer;

73

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

74

import org.apache.flink.streaming.api.datastream.DataStream;

75

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

76

import org.apache.flink.table.data.RowData;

77

import org.apache.flink.table.types.logical.BigIntType;

78

import org.apache.flink.table.types.logical.VarCharType;

79

80

// Create vectorizer for schema

81

String schema = "struct<user_id:bigint,item_id:bigint,behavior:varchar(50)>";

82

RowDataVectorizer vectorizer = new RowDataVectorizer(schema, new LogicalType[]{

83

new BigIntType(), new BigIntType(), new VarCharType(50)

84

});

85

86

// Create ORC writer factory

87

OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

88

89

// Create file sink

90

StreamingFileSink<RowData> sink = StreamingFileSink

91

.forBulkFormat(new Path("file:///path/to/output"), writerFactory)

92

.build();

93

94

// Write data stream to ORC files

95

DataStream<RowData> dataStream = // ... your data stream

96

dataStream.addSink(sink);

97

```

98

99

## Architecture

100

101

The ORC format connector is built around several key architectural components:

102

103

- **Format Integration Layer**: `OrcFileFormatFactory` provides seamless integration with Flink's SQL/Table API format system

104

- **Vectorized Processing**: High-performance columnar processing through `VectorizedRowBatch` and column vector abstractions

105

- **Version Compatibility**: Shim layer (`OrcShim`) ensures compatibility across different Hive/ORC versions (2.0.x, 2.1.x, 2.3+)

106

- **Filter Pushdown**: Optimized query performance through predicate pushdown to ORC reader level

107

- **Partition Support**: Native support for partitioned tables and partition pruning

108

- **Bulk Processing**: Efficient batch reading and writing optimized for large-scale data processing

109

110

## Capabilities

111

112

### SQL and Table API Integration

113

114

Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions.

115

116

```java { .api }

117

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

118

public static final String IDENTIFIER = "orc";

119

120

public String factoryIdentifier();

121

public Set<ConfigOption<?>> requiredOptions();

122

public Set<ConfigOption<?>> optionalOptions();

123

public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

124

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

125

}

126

```

127

128

[Table API Integration](./table-api.md)

129

130

### DataStream API Integration

131

132

Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines and streaming applications.

133

134

```java { .api }

135

public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

136

public OrcBulkWriterFactory(Vectorizer<T> vectorizer);

137

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);

138

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration hadoopConfiguration);

139

140

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

141

}

142

143

public abstract class Vectorizer<T> {

144

public Vectorizer(String schema);

145

public TypeDescription getSchema();

146

public void addUserMetadata(String key, ByteBuffer value);

147

public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;

148

}

149

```

150

151

[DataStream API Integration](./datastream-api.md)

152

153

### Configuration and Advanced Features

154

155

Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters and metadata handling.

156

157

```java { .api }

158

public class OrcFilters {

159

public static Predicate toOrcPredicate(Expression expression);

160

161

public abstract static class Predicate implements Serializable { }

162

public static class Equals extends BinaryPredicate { }

163

public static class LessThan extends BinaryPredicate { }

164

public static class LessThanEquals extends BinaryPredicate { }

165

public static class IsNull extends ColumnPredicate { }

166

public static class Between extends ColumnPredicate { }

167

public static class In extends ColumnPredicate { }

168

}

169

```

170

171

[Configuration and Advanced Usage](./configuration.md)

172

173

## Types

174

175

Core type definitions used throughout the ORC connector:

176

177

```java { .api }

178

// Input format for reading ORC files

179

public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {

180

public AbstractOrcFileInputFormat(

181

Path[] filePaths,

182

TypeDescription schema,

183

int[] selectedFields,

184

List<Predicate> conjunctPredicates,

185

int batchSize,

186

Configuration orcConfig,

187

SerializableHadoopConfigWrapper hadoopConfigWrapper

188

);

189

190

public boolean isSplittable();

191

public abstract TypeInformation<T> getProducedType();

192

}

193

194

// Concrete implementation for RowData

195

public class OrcColumnarRowFileInputFormat<BatchT, SplitT> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {

196

public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>

197

createPartitionedFormat(

198

Configuration orcConfig,

199

RowType tableType,

200

SerializableHadoopConfigWrapper hadoopConfigWrapper,

201

List<String> partitionKeys,

202

PartitionFieldExtractor<SplitT> extractor,

203

List<Predicate> conjunctPredicates,

204

int batchSize,

205

boolean caseSensitive

206

);

207

}

208

209

// Version compatibility interface

210

public interface OrcShim<BATCH> extends Serializable {

211

RecordReader createRecordReader(

212

Configuration conf,

213

TypeDescription schema,

214

int[] selectedFields,

215

List<OrcFilters.Predicate> conjunctPredicates,

216

org.apache.flink.core.fs.Path path,

217

long splitStart,

218

long splitLength

219

) throws IOException;

220

OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);

221

boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;

222

223

static OrcShim<VectorizedRowBatch> defaultShim();

224

static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);

225

}

226

```