Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
—
The ORC format provides high-performance bulk writing capabilities through the OrcBulkWriterFactory and custom Vectorizer implementations. This enables efficient writing of large datasets to ORC files with full control over the vectorization process.
@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
@VisibleForTesting
protected OrcFile.WriterOptions getWriterOptions();
}Factory Method Usage:
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
// Simple factory with vectorizer only
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, fieldTypes);
OrcBulkWriterFactory<RowData> simpleFactory = new OrcBulkWriterFactory<>(vectorizer);
// Factory with Hadoop configuration
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");
OrcBulkWriterFactory<RowData> configuredFactory = new OrcBulkWriterFactory<>(
vectorizer,
hadoopConfig
);
// Factory with writer properties and configuration
Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "SNAPPY");
writerProps.setProperty("orc.stripe.size", "134217728");
OrcBulkWriterFactory<RowData> fullFactory = new OrcBulkWriterFactory<>(
vectorizer,
writerProps,
hadoopConfig
);@PublicEvolving
public abstract class Vectorizer<T> implements Serializable {
public Vectorizer(String schema);
public TypeDescription getSchema();
public void setWriter(Writer writer);
public void addUserMetadata(String key, ByteBuffer value);
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}public class RowDataVectorizer extends Vectorizer<RowData> {
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
public void vectorize(RowData row, VectorizedRowBatch batch);
}import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
// Define schema and field types
LogicalType[] fieldTypes = {
new BigIntType(), // id
new VarCharType(255), // name
new IntType(), // age
new DecimalType(10, 2), // salary
new BooleanType(), // active
new TimestampType(3) // created_at
};
String orcSchema = "struct<" +
"id:bigint," +
"name:string," +
"age:int," +
"salary:decimal(10,2)," +
"active:boolean," +
"created_at:timestamp" +
">";
// Create vectorizer
RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);
// Create writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
// Use in streaming sink
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(
StreamingFileSink.forBulkFormat(
new Path("/path/to/output"),
writerFactory
).build()
);import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
// Configure ORC writer properties
Properties writerProperties = new Properties();
writerProperties.setProperty("orc.compress", "SNAPPY");
writerProperties.setProperty("orc.stripe.size", "67108864");
writerProperties.setProperty("orc.row.index.stride", "10000");
// Configure Hadoop settings
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("orc.bloom.filter.columns", "name,id");
hadoopConfig.setFloat("orc.bloom.filter.fpp", 0.05f);
// Create configured writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(
vectorizer,
writerProperties,
hadoopConfig
);// Schema with complex types
LogicalType[] complexFieldTypes = {
new BigIntType(), // id
new ArrayType(new VarCharType(100)), // tags array
new MapType(new VarCharType(50), new IntType()), // metrics map
new RowType(Arrays.asList( // address struct
new RowType.RowField("street", new VarCharType(200)),
new RowType.RowField("city", new VarCharType(100)),
new RowType.RowField("zip", new VarCharType(10))
))
};
String complexOrcSchema = "struct<" +
"id:bigint," +
"tags:array<string>," +
"metrics:map<string,int>," +
"address:struct<street:string,city:string,zip:string>" +
">";
RowDataVectorizer complexVectorizer = new RowDataVectorizer(complexOrcSchema, complexFieldTypes);// Custom vectorizer for POJO classes
public class UserVectorizer extends Vectorizer<User> {
public UserVectorizer() {
super("struct<id:bigint,name:string,email:string,active:boolean>");
}
@Override
public void vectorize(User user, VectorizedRowBatch batch) throws IOException {
int rowId = batch.size++;
// Set ID column
((LongColumnVector) batch.cols[0]).vector[rowId] = user.getId();
// Set name column
byte[] nameBytes = user.getName().getBytes(StandardCharsets.UTF_8);
((BytesColumnVector) batch.cols[1]).setVal(rowId, nameBytes);
// Set email column
byte[] emailBytes = user.getEmail().getBytes(StandardCharsets.UTF_8);
((BytesColumnVector) batch.cols[2]).setVal(rowId, emailBytes);
// Set active column
((LongColumnVector) batch.cols[3]).vector[rowId] = user.isActive() ? 1 : 0;
// Add custom metadata
if (user.hasSpecialAttribute()) {
addUserMetadata("special_users", ByteBuffer.wrap("true".getBytes()));
}
}
}
// Usage
UserVectorizer userVectorizer = new UserVectorizer();
OrcBulkWriterFactory<User> userWriterFactory = new OrcBulkWriterFactory<>(userVectorizer);@Internal
public class OrcBulkWriter<T> implements BulkWriter<T> {
public void addElement(T element) throws IOException;
public void flush() throws IOException;
public void finish() throws IOException;
}The OrcBulkWriter handles the actual writing process:
// ORC uses VectorizedRowBatch with default size
VectorizedRowBatch batch = schema.createRowBatch(); // Default: 1024 rows
VectorizedRowBatch customBatch = schema.createRowBatch(2048); // Custom sizeProperties props = new Properties();
props.setProperty("orc.compress", "SNAPPY"); // Fast compression
props.setProperty("orc.compress", "ZLIB"); // Better compression ratio
props.setProperty("orc.compress", "ZSTD"); // Best compression ratio
props.setProperty("orc.compress", "NONE"); // No compressionProperties props = new Properties();
props.setProperty("orc.stripe.size", "134217728"); // 128MB stripes
props.setProperty("orc.row.index.stride", "20000"); // Index every 20k rowsOrcBulkWriterFactory configured with vectorizer and optionscreate() method called with output streamaddElement() vectorizes and batches datafinish() flushes remaining data and closes filetry {
writerFactory.create(outputStream);
} catch (IOException e) {
// Handle writer creation errors
log.error("Failed to create ORC writer", e);
}
// Vectorizer error handling
@Override
public void vectorize(MyData data, VectorizedRowBatch batch) throws IOException {
try {
// Vectorization logic
} catch (Exception e) {
throw new IOException("Failed to vectorize data: " + data, e);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-orc