or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-orc@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-orc@2.1.0

0

# Apache Flink ORC Format Connector

1

2

Apache Flink ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) data files within the Flink ecosystem. This library enables high-performance columnar data processing with advanced features including vectorized reading, predicate pushdown, Table API integration, and comprehensive data type mapping.

3

4

## Package Information

5

6

- **Package Name**: flink-orc

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-orc

11

- **Version**: 2.1.0

12

- **Installation**: Add dependency to your `pom.xml`:

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-orc</artifactId>

18

<version>2.1.0</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

```java

25

// Table API - main format factory

26

import org.apache.flink.orc.OrcFileFormatFactory;

27

28

// Writer API

29

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

30

import org.apache.flink.orc.vector.Vectorizer;

31

import org.apache.flink.orc.vector.RowDataVectorizer;

32

33

// Reader API

34

import org.apache.flink.orc.OrcColumnarRowInputFormat;

35

36

// Filtering

37

import org.apache.flink.orc.OrcFilters;

38

```

39

40

## Basic Usage

41

42

### Table API Integration

43

44

The ORC format integrates seamlessly with Flink's Table API through the format identifier `"orc"`:

45

46

```java

47

// Create table with ORC format

48

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

49

50

tableEnv.executeSql(

51

"CREATE TABLE users (" +

52

" id BIGINT," +

53

" name STRING," +

54

" age INT," +

55

" active BOOLEAN" +

56

") WITH (" +

57

" 'connector' = 'filesystem'," +

58

" 'path' = '/path/to/orc/files'," +

59

" 'format' = 'orc'" +

60

")"

61

);

62

63

// Query ORC data

64

Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE active = true");

65

```

66

67

### DataStream API - Writing ORC Files

68

69

```java

70

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

71

import org.apache.flink.orc.vector.RowDataVectorizer;

72

import org.apache.flink.streaming.api.datastream.DataStream;

73

import org.apache.flink.table.data.RowData;

74

75

// Create vectorizer for RowData

76

LogicalType[] fieldTypes = {

77

new BigIntType(),

78

new VarCharType(255),

79

new IntType(),

80

new BooleanType()

81

};

82

83

RowDataVectorizer vectorizer = new RowDataVectorizer(

84

"struct<id:bigint,name:string,age:int,active:boolean>",

85

fieldTypes

86

);

87

88

// Create ORC writer factory

89

OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

90

91

// Use in sink

92

DataStream<RowData> dataStream = // ... your data stream

93

dataStream.addSink(

94

StreamingFileSink.forBulkFormat(

95

new Path("/path/to/output"),

96

writerFactory

97

).build()

98

);

99

```

100

101

## Architecture

102

103

The Flink ORC connector is organized into several key components:

104

105

- **Format Factory**: Central integration point with Flink's Table API (`OrcFileFormatFactory`)

106

- **Reader Layer**: Vectorized columnar reading with predicate pushdown (`OrcColumnarRowInputFormat`)

107

- **Writer Layer**: Bulk writing with custom vectorization support (`OrcBulkWriterFactory`, `Vectorizer`)

108

- **Vector System**: Column vector implementations for all supported data types

109

- **Filter System**: Predicate pushdown with ORC-native filtering (`OrcFilters`)

110

- **Type System**: Complete mapping between Flink and ORC type systems

111

- **Utility Layer**: Statistics, configuration, and compatibility utilities

112

113

## Capabilities

114

115

### Table API Integration

116

117

Main integration point for ORC format in Flink's Table API and SQL.

118

119

```java { .api }

120

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {

121

public static final String IDENTIFIER = "orc";

122

123

public String factoryIdentifier();

124

public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

125

public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);

126

}

127

```

128

129

[Table API Integration](./table-api.md)

130

131

### Bulk Writing

132

133

High-performance bulk writing of data to ORC files with custom vectorization.

134

135

```java { .api }

136

@PublicEvolving

137

public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

138

public OrcBulkWriterFactory(Vectorizer<T> vectorizer);

139

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);

140

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);

141

142

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

143

}

144

```

145

146

```java { .api }

147

@PublicEvolving

148

public abstract class Vectorizer<T> implements Serializable {

149

public Vectorizer(String schema);

150

public TypeDescription getSchema();

151

public void addUserMetadata(String key, ByteBuffer value);

152

public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;

153

}

154

```

155

156

[Bulk Writing](./bulk-writing.md)

157

158

### Columnar Reading

159

160

Vectorized columnar reading with partition support and statistics reporting.

161

162

```java { .api }

163

public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>

164

extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>

165

implements FileBasedStatisticsReportableInputFormat {

166

167

public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>

168

createPartitionedFormat(

169

OrcShim<VectorizedRowBatch> shim,

170

Configuration hadoopConfig,

171

RowType tableType,

172

List<String> partitionKeys,

173

PartitionFieldExtractor<SplitT> extractor,

174

int[] selectedFields,

175

List<OrcFilters.Predicate> conjunctPredicates,

176

int batchSize,

177

Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory

178

);

179

180

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

181

}

182

```

183

184

[Columnar Reading](./columnar-reading.md)

185

186

### Predicate Pushdown

187

188

Advanced filtering capabilities with ORC-native predicate pushdown.

189

190

```java { .api }

191

public class OrcFilters {

192

public static Predicate toOrcPredicate(Expression expression);

193

194

public abstract static class Predicate implements Serializable {

195

public abstract SearchArgument.Builder add(SearchArgument.Builder builder);

196

}

197

}

198

```

199

200

[Predicate Pushdown](./predicate-pushdown.md)

201

202

### Vector Processing

203

204

Low-level column vector system for high-performance data processing.

205

206

```java { .api }

207

public abstract class AbstractOrcColumnVector {

208

public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, LogicalType type);

209

public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);

210

}

211

```

212

213

[Vector Processing](./vector-processing.md)

214

215

## Types

216

217

### Core Types

218

219

```java { .api }

220

// Vectorizer for RowData

221

public class RowDataVectorizer extends Vectorizer<RowData> {

222

public RowDataVectorizer(String schema, LogicalType[] fieldTypes);

223

public void vectorize(RowData row, VectorizedRowBatch batch);

224

}

225

226

// Bulk writer implementation

227

@Internal

228

public class OrcBulkWriter<T> implements BulkWriter<T> {

229

public void addElement(T element) throws IOException;

230

public void flush() throws IOException;

231

public void finish() throws IOException;

232

}

233

234

// Statistics reporting utility

235

public class OrcFormatStatisticsReportUtil {

236

public static TableStats getTableStatistics(List<Path> files, DataType producedDataType);

237

public static TableStats getTableStatistics(List<Path> files, DataType producedDataType, Configuration hadoopConfig);

238

}

239

240

// Configuration wrapper for serialization

241

public class SerializableHadoopConfigWrapper implements Serializable {

242

public SerializableHadoopConfigWrapper(Configuration configuration);

243

public Configuration get();

244

}

245

```

246

247

### Supported Data Types

248

249

The connector supports comprehensive type mapping between Flink and ORC:

250

251

- **Primitive Types**: `BOOLEAN`, `TINYINT`, `SMALLINT`, `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE`

252

- **String Types**: `CHAR`, `VARCHAR`

253

- **Binary Types**: `BINARY`, `VARBINARY`

254

- **Decimal Types**: `DECIMAL` with precision and scale

255

- **Temporal Types**: `DATE`, `TIMESTAMP_WITHOUT_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`

256

- **Complex Types**: `ARRAY`, `MAP`, `ROW` (nested structures)

257

- **Null Handling**: Full null value support across all types