Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.
—
Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines, streaming applications, and batch processing with full control over vectorization and performance characteristics.
Factory for creating ORC bulk writers in DataStream API applications.
/**
* Factory for creating ORC bulk writers for use with DataStream API.
* Supports custom vectorization and configuration options.
* @param <T> The type of elements to write
*/
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
/**
* Constructor with custom vectorizer
* @param vectorizer Vectorizer for converting elements to ORC batches
*/
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
/**
* Constructor with vectorizer and ORC writer configuration
* @param vectorizer Vectorizer for converting elements
* @param writerConfiguration ORC writer configuration
*/
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);
/**
* Full constructor with all configuration options
* @param vectorizer Vectorizer for converting elements
* @param writerProperties ORC writer properties
* @param hadoopConfiguration Hadoop configuration for HDFS access
*/
public OrcBulkWriterFactory(
Vectorizer<T> vectorizer,
Properties writerProperties,
Configuration hadoopConfiguration
);
/**
* Creates a bulk writer for the given output stream
* @param out Output stream to write to
* @return Configured ORC bulk writer
* @throws IOException If writer creation fails
*/
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}Usage Example:
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
// Define schema for sales records
String orcSchema = "struct<user_id:bigint,product_id:bigint,amount:decimal(10,2),purchase_time:timestamp>";
LogicalType[] fieldTypes = {
new BigIntType(),
new BigIntType(),
new DecimalType(10, 2),
new TimestampType(3)
};
// Create vectorizer
RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);
// Configure ORC writer properties
Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "SNAPPY");
writerProps.setProperty("orc.stripe.size", "67108864"); // 64MB
writerProps.setProperty("orc.row.index.stride", "10000");
// Create writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(
vectorizer, writerProps, new Configuration()
);
// Create streaming file sink
StreamingFileSink<RowData> sink = StreamingFileSink
.forBulkFormat(new Path("hdfs://namenode:port/sales-data"), writerFactory)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
.build();
// Write data stream to ORC files
DataStream<RowData> salesStream = // ... your data stream
salesStream.addSink(sink);Core vectorization components for converting elements to ORC format.
/**
* Abstract base class for converting elements to ORC VectorizedRowBatch format.
* Handles schema management and provides vectorization interface.
* @param <T> The type of elements to vectorize
*/
public abstract class Vectorizer<T> {
/**
* Constructor with ORC schema string
* @param schema ORC schema in string format (e.g., "struct<id:bigint,name:string>")
*/
public Vectorizer(String schema);
/** Returns the ORC schema description */
public TypeDescription getSchema();
/**
* Add user metadata to be written to ORC file
* @param key Metadata key
* @param value Metadata value as ByteBuffer
*/
public void addUserMetadata(String key, ByteBuffer value);
/**
* Abstract method for vectorizing an element into a batch
* @param element Element to vectorize
* @param batch VectorizedRowBatch to populate
* @throws IOException If vectorization fails
*/
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}
/**
* Concrete vectorizer implementation for RowData elements.
* Optimized for Flink's internal RowData representation.
*/
public class RowDataVectorizer extends Vectorizer<RowData> {
/**
* Constructor for RowData vectorizer
* @param schema ORC schema string
* @param fieldTypes Array of Flink logical types corresponding to schema fields
*/
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
/**
* Vectorize a RowData element into the batch
* @param element RowData element to vectorize
* @param batch VectorizedRowBatch to populate
* @throws IOException If vectorization fails
*/
public void vectorize(RowData element, VectorizedRowBatch batch) throws IOException;
}Usage Example:
// Custom vectorizer for POJO objects
public class SalesRecordVectorizer extends Vectorizer<SalesRecord> {
public SalesRecordVectorizer() {
super("struct<user_id:bigint,product_id:bigint,amount:decimal(10,2),category:string>");
}
@Override
public void vectorize(SalesRecord record, VectorizedRowBatch batch) throws IOException {
int row = batch.size++;
// Set user_id (bigint)
((LongColumnVector) batch.cols[0]).vector[row] = record.getUserId();
// Set product_id (bigint)
((LongColumnVector) batch.cols[1]).vector[row] = record.getProductId();
// Set amount (decimal)
HiveDecimalWritable decimal = new HiveDecimalWritable(record.getAmount());
((DecimalColumnVector) batch.cols[2]).set(row, decimal);
// Set category (string)
byte[] categoryBytes = record.getCategory().getBytes(StandardCharsets.UTF_8);
((BytesColumnVector) batch.cols[3]).setRef(row, categoryBytes, 0, categoryBytes.length);
}
}Adapters that bridge Hive ORC column vectors with Flink's vectorized processing.
/**
* Abstract base adapter class for Hive column vectors to Flink column vectors.
* Provides unified interface for different column vector types.
*/
public abstract class AbstractOrcColumnVector {
/**
* Create a Flink column vector from a Hive column vector
* @param hiveVector Hive column vector
* @return Flink-compatible column vector
*/
public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector hiveVector);
/**
* Create a constant Flink vector from a Hive column vector
* @param hiveVector Hive column vector with constant value
* @param batchSize Size of the batch
* @param type Logical type of the column
* @return Flink-compatible constant column vector
*/
public static ColumnVector createFlinkVectorFromConstant(
org.apache.hadoop.hive.ql.exec.vector.ColumnVector hiveVector,
int batchSize,
LogicalType type
);
/** Check if value at index is null */
public boolean isNullAt(int i);
}
/**
* Factory interface for creating vectorized column batches
* @param <BatchT> Type of the batch (e.g., VectorizedRowBatch)
* @param <SplitT> Type of the split (e.g., FileSourceSplit)
*/
@FunctionalInterface
public interface ColumnBatchFactory<BatchT, SplitT> {
/**
* Create a vectorized column batch
* @param split File split being processed
* @param batch Original batch from ORC reader
* @return Vectorized column batch for Flink processing
*/
VectorizedColumnBatch create(SplitT split, BatchT batch);
}
/**
* Wrapper interface for unifying different ORC batch types across versions
* @param <T> Type of the underlying batch
*/
public interface OrcVectorizedBatchWrapper<T> {
/** Get the underlying batch object */
T getBatch();
/** Get the size (number of rows) in this batch */
int size();
}Usage Example:
// Reading ORC files programmatically in DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure ORC input format
Configuration orcConfig = new Configuration();
orcConfig.set("orc.stripe.size", "64MB");
Path[] inputPaths = {new Path("hdfs://namenode:port/input/sales-2023.orc")};
String[] fieldNames = {"user_id", "product_id", "amount", "category"};
LogicalType[] fieldTypes = {new BigIntType(), new BigIntType(), new DecimalType(10,2), new VarCharType(50)};
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
new OrcColumnarRowFileInputFormat<>(
inputPaths,
fieldNames,
fieldTypes,
new int[]{0, 1, 2, 3}, // Select all fields
Collections.emptyList(), // No predicates
1000, // Batch size
orcConfig,
new SerializableHadoopConfigWrapper(new Configuration())
);
// Create source from input format
DataStream<RowData> orcStream = env.readFile(inputFormat, "hdfs://namenode:port/input");
// Process the stream
orcStream
.filter(row -> row.getDecimal(2, 10, 2).doubleValue() > 100.0) // Amount > 100
.keyBy(row -> row.getLong(0)) // Key by user_id
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.aggregate(new SalesAggregator())
.print();Wrapper implementations for different ORC batch types and version compatibility.
/**
* Hive-specific batch wrapper implementation
*/
public class HiveOrcBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
public HiveOrcBatchWrapper(VectorizedRowBatch batch);
public VectorizedRowBatch getBatch();
public int size();
}The DataStream API integration provides full programmatic control over ORC file processing, allowing for custom vectorization strategies, fine-tuned performance configurations, and seamless integration with complex stream processing pipelines.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-orc-2-12