or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-sql-orc-2-12

Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-orc_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-orc-2-12@1.14.0

0

# Apache Flink SQL ORC Format Connector

1

2

Apache Flink SQL ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) files within Flink's Table API and SQL environments. This package is a shaded JAR that bundles the core flink-orc functionality along with all necessary dependencies for seamless ORC file format integration in distributed processing environments.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-orc_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to Maven dependencies: `org.apache.flink:flink-sql-orc_2.12:1.14.6`

10

11

## Core Imports

12

13

```java

14

import org.apache.flink.orc.OrcFileFormatFactory;

15

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

16

import org.apache.flink.orc.vector.RowDataVectorizer;

17

```

18

19

For specific functionality:

20

21

```java

22

// SQL/Table API format factory

23

import org.apache.flink.orc.OrcFileFormatFactory;

24

25

// DataStream API writing

26

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

27

import org.apache.flink.orc.vector.RowDataVectorizer;

28

29

// Filter pushdown

30

import org.apache.flink.orc.OrcFilters;

31

32

// Input formats and readers

33

import org.apache.flink.orc.OrcColumnarRowFileInputFormat;

34

import org.apache.flink.orc.AbstractOrcFileInputFormat;

35

```

36

37

## Basic Usage

38

39

### SQL/Table API Integration

40

41

```java

42

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

43

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

44

45

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

46

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

47

48

// Create table with ORC format

49

tEnv.executeSql(

50

"CREATE TABLE orc_table (" +

51

" user_id BIGINT," +

52

" item_id BIGINT," +

53

" category_id BIGINT," +

54

" behavior STRING," +

55

" ts TIMESTAMP(3)" +

56

") WITH (" +

57

" 'connector' = 'filesystem'," +

58

" 'path' = 'file:///path/to/orc/files'," +

59

" 'format' = 'orc'" +

60

")"

61

);

62

63

// Query the ORC table

64

tEnv.executeSql("SELECT user_id, COUNT(*) FROM orc_table GROUP BY user_id").print();

65

```

66

67

### DataStream API Writing

68

69

```java

70

import org.apache.flink.api.common.typeinfo.Types;

71

import org.apache.flink.core.fs.Path;

72

import org.apache.flink.orc.vector.RowDataVectorizer;

73

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

74

import org.apache.flink.streaming.api.datastream.DataStream;

75

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

76

import org.apache.flink.table.data.RowData;

77

import org.apache.flink.table.types.logical.BigIntType;

78

import org.apache.flink.table.types.logical.VarCharType;

79

80

// Create vectorizer for schema

81

String schema = "struct<user_id:bigint,item_id:bigint,behavior:varchar(50)>";

82

RowDataVectorizer vectorizer = new RowDataVectorizer(schema, new LogicalType[]{

83

new BigIntType(), new BigIntType(), new VarCharType(50)

84

});

85

86

// Create ORC writer factory

87

OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

88

89

// Create file sink

90

StreamingFileSink<RowData> sink = StreamingFileSink

91

.forBulkFormat(new Path("file:///path/to/output"), writerFactory)

92

.build();

93

94

// Write data stream to ORC files

95

DataStream<RowData> dataStream = // ... your data stream

96

dataStream.addSink(sink);

97

```

98

99

## Architecture

100

101

The ORC format connector is built around several key architectural components:

102

103

- **Format Integration Layer**: `OrcFileFormatFactory` provides seamless integration with Flink's SQL/Table API format system

104

- **Vectorized Processing**: High-performance columnar processing through `VectorizedRowBatch` and column vector abstractions

105

- **Version Compatibility**: Shim layer (`OrcShim`) ensures compatibility across different Hive/ORC versions (2.0.x, 2.1.x, 2.3+)

106

- **Filter Pushdown**: Optimized query performance through predicate pushdown to ORC reader level

107

- **Partition Support**: Native support for partitioned tables and partition pruning

108

- **Bulk Processing**: Efficient batch reading and writing optimized for large-scale data processing

109

110

## Capabilities

111

112

### SQL and Table API Integration

113

114

Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions.

115

116

```java { .api }

117

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

118

public static final String IDENTIFIER = "orc";

119

120

public String factoryIdentifier();

121

public Set<ConfigOption<?>> requiredOptions();

122

public Set<ConfigOption<?>> optionalOptions();

123

public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

124

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

125

}

126

```

127

128

[Table API Integration](./table-api.md)

129

130

### DataStream API Integration

131

132

Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines and streaming applications.

133

134

```java { .api }

135

public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

136

public OrcBulkWriterFactory(Vectorizer<T> vectorizer);

137

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);

138

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration hadoopConfiguration);

139

140

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

141

}

142

143

public abstract class Vectorizer<T> {

144

public Vectorizer(String schema);

145

public TypeDescription getSchema();

146

public void addUserMetadata(String key, ByteBuffer value);

147

public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;

148

}

149

```

150

151

[DataStream API Integration](./datastream-api.md)

152

153

### Configuration and Advanced Features

154

155

Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters and metadata handling.

156

157

```java { .api }

158

public class OrcFilters {

159

public static Predicate toOrcPredicate(Expression expression);

160

161

public abstract static class Predicate implements Serializable { }

162

public static class Equals extends BinaryPredicate { }

163

public static class LessThan extends BinaryPredicate { }

164

public static class LessThanEquals extends BinaryPredicate { }

165

public static class IsNull extends ColumnPredicate { }

166

public static class Between extends ColumnPredicate { }

167

public static class In extends ColumnPredicate { }

168

}

169

```

170

171

[Configuration and Advanced Usage](./configuration.md)

172

173

## Types

174

175

Core type definitions used throughout the ORC connector:

176

177

```java { .api }

178

// Input format for reading ORC files

179

public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {

180

public AbstractOrcFileInputFormat(

181

Path[] filePaths,

182

TypeDescription schema,

183

int[] selectedFields,

184

List<Predicate> conjunctPredicates,

185

int batchSize,

186

Configuration orcConfig,

187

SerializableHadoopConfigWrapper hadoopConfigWrapper

188

);

189

190

public boolean isSplittable();

191

public abstract TypeInformation<T> getProducedType();

192

}

193

194

// Concrete implementation for RowData

195

public class OrcColumnarRowFileInputFormat<BatchT, SplitT> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {

196

public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>

197

createPartitionedFormat(

198

Configuration orcConfig,

199

RowType tableType,

200

SerializableHadoopConfigWrapper hadoopConfigWrapper,

201

List<String> partitionKeys,

202

PartitionFieldExtractor<SplitT> extractor,

203

List<Predicate> conjunctPredicates,

204

int batchSize,

205

boolean caseSensitive

206

);

207

}

208

209

// Version compatibility interface

210

public interface OrcShim<BATCH> extends Serializable {

211

RecordReader createRecordReader(

212

Configuration conf,

213

TypeDescription schema,

214

int[] selectedFields,

215

List<OrcFilters.Predicate> conjunctPredicates,

216

org.apache.flink.core.fs.Path path,

217

long splitStart,

218

long splitLength

219

) throws IOException;

220

OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);

221

boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;

222

223

static OrcShim<VectorizedRowBatch> defaultShim();

224

static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);

225

}

226

```