0
# DataStream API Integration
1
2
Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines, streaming applications, and batch processing with full control over vectorization and performance characteristics.
3
4
## Capabilities
5
6
### Bulk Writer Factory
7
8
Factory for creating ORC bulk writers in DataStream API applications.
9
10
```java { .api }
11
/**
12
* Factory for creating ORC bulk writers for use with DataStream API.
13
* Supports custom vectorization and configuration options.
14
* @param <T> The type of elements to write
15
*/
16
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
17
/**
18
* Constructor with custom vectorizer
19
* @param vectorizer Vectorizer for converting elements to ORC batches
20
*/
21
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
22
23
/**
24
* Constructor with vectorizer and ORC writer configuration
25
* @param vectorizer Vectorizer for converting elements
26
* @param writerConfiguration ORC writer configuration
27
*/
28
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);
29
30
/**
31
* Full constructor with all configuration options
32
* @param vectorizer Vectorizer for converting elements
33
* @param writerProperties ORC writer properties
34
* @param hadoopConfiguration Hadoop configuration for HDFS access
35
*/
36
public OrcBulkWriterFactory(
37
Vectorizer<T> vectorizer,
38
Properties writerProperties,
39
Configuration hadoopConfiguration
40
);
41
42
/**
43
* Creates a bulk writer for the given output stream
44
* @param out Output stream to write to
45
* @return Configured ORC bulk writer
46
* @throws IOException If writer creation fails
47
*/
48
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
49
}
50
```
51
52
**Usage Example:**
53
54
```java
55
import org.apache.flink.core.fs.Path;
56
import org.apache.flink.orc.vector.RowDataVectorizer;
57
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
58
import org.apache.flink.streaming.api.datastream.DataStream;
59
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
60
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
61
import org.apache.flink.table.data.RowData;
62
import org.apache.flink.table.types.logical.*;
63
64
// Define schema for sales records
65
String orcSchema = "struct<user_id:bigint,product_id:bigint,amount:decimal(10,2),purchase_time:timestamp>";
66
LogicalType[] fieldTypes = {
67
new BigIntType(),
68
new BigIntType(),
69
new DecimalType(10, 2),
70
new TimestampType(3)
71
};
72
73
// Create vectorizer
74
RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);
75
76
// Configure ORC writer properties
77
Properties writerProps = new Properties();
78
writerProps.setProperty("orc.compress", "SNAPPY");
79
writerProps.setProperty("orc.stripe.size", "67108864"); // 64MB
80
writerProps.setProperty("orc.row.index.stride", "10000");
81
82
// Create writer factory
83
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(
84
vectorizer, writerProps, new Configuration()
85
);
86
87
// Create streaming file sink
88
StreamingFileSink<RowData> sink = StreamingFileSink
89
.forBulkFormat(new Path("hdfs://namenode:port/sales-data"), writerFactory)
90
.withRollingPolicy(OnCheckpointRollingPolicy.build())
91
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
92
.build();
93
94
// Write data stream to ORC files
95
DataStream<RowData> salesStream = // ... your data stream
96
salesStream.addSink(sink);
97
```
98
99
### Vectorization
100
101
Core vectorization components for converting elements to ORC format.
102
103
```java { .api }
104
/**
105
* Abstract base class for converting elements to ORC VectorizedRowBatch format.
106
* Handles schema management and provides vectorization interface.
107
* @param <T> The type of elements to vectorize
108
*/
109
public abstract class Vectorizer<T> {
110
/**
111
* Constructor with ORC schema string
112
* @param schema ORC schema in string format (e.g., "struct<id:bigint,name:string>")
113
*/
114
public Vectorizer(String schema);
115
116
/** Returns the ORC schema description */
117
public TypeDescription getSchema();
118
119
/**
120
* Add user metadata to be written to ORC file
121
* @param key Metadata key
122
* @param value Metadata value as ByteBuffer
123
*/
124
public void addUserMetadata(String key, ByteBuffer value);
125
126
/**
127
* Abstract method for vectorizing an element into a batch
128
* @param element Element to vectorize
129
* @param batch VectorizedRowBatch to populate
130
* @throws IOException If vectorization fails
131
*/
132
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
133
}
134
135
/**
136
* Concrete vectorizer implementation for RowData elements.
137
* Optimized for Flink's internal RowData representation.
138
*/
139
public class RowDataVectorizer extends Vectorizer<RowData> {
140
/**
141
* Constructor for RowData vectorizer
142
* @param schema ORC schema string
143
* @param fieldTypes Array of Flink logical types corresponding to schema fields
144
*/
145
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
146
147
/**
148
* Vectorize a RowData element into the batch
149
* @param element RowData element to vectorize
150
* @param batch VectorizedRowBatch to populate
151
* @throws IOException If vectorization fails
152
*/
153
public void vectorize(RowData element, VectorizedRowBatch batch) throws IOException;
154
}
155
```
156
157
**Usage Example:**
158
159
```java
160
// Custom vectorizer for POJO objects
161
public class SalesRecordVectorizer extends Vectorizer<SalesRecord> {
162
public SalesRecordVectorizer() {
163
super("struct<user_id:bigint,product_id:bigint,amount:decimal(10,2),category:string>");
164
}
165
166
@Override
167
public void vectorize(SalesRecord record, VectorizedRowBatch batch) throws IOException {
168
int row = batch.size++;
169
170
// Set user_id (bigint)
171
((LongColumnVector) batch.cols[0]).vector[row] = record.getUserId();
172
173
// Set product_id (bigint)
174
((LongColumnVector) batch.cols[1]).vector[row] = record.getProductId();
175
176
// Set amount (decimal)
177
HiveDecimalWritable decimal = new HiveDecimalWritable(record.getAmount());
178
((DecimalColumnVector) batch.cols[2]).set(row, decimal);
179
180
// Set category (string)
181
byte[] categoryBytes = record.getCategory().getBytes(StandardCharsets.UTF_8);
182
((BytesColumnVector) batch.cols[3]).setRef(row, categoryBytes, 0, categoryBytes.length);
183
}
184
}
185
```
186
187
### Column Vector Adapters
188
189
Adapters that bridge Hive ORC column vectors with Flink's vectorized processing.
190
191
```java { .api }
192
/**
193
* Abstract base adapter class for Hive column vectors to Flink column vectors.
194
* Provides unified interface for different column vector types.
195
*/
196
public abstract class AbstractOrcColumnVector {
197
/**
198
* Create a Flink column vector from a Hive column vector
199
* @param hiveVector Hive column vector
200
* @return Flink-compatible column vector
201
*/
202
public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector hiveVector);
203
204
/**
205
* Create a constant Flink vector from a Hive column vector
206
* @param hiveVector Hive column vector with constant value
207
* @param batchSize Size of the batch
208
* @param type Logical type of the column
209
* @return Flink-compatible constant column vector
210
*/
211
public static ColumnVector createFlinkVectorFromConstant(
212
org.apache.hadoop.hive.ql.exec.vector.ColumnVector hiveVector,
213
int batchSize,
214
LogicalType type
215
);
216
217
/** Check if value at index is null */
218
public boolean isNullAt(int i);
219
}
220
221
/**
222
* Factory interface for creating vectorized column batches
223
* @param <BatchT> Type of the batch (e.g., VectorizedRowBatch)
224
* @param <SplitT> Type of the split (e.g., FileSourceSplit)
225
*/
226
@FunctionalInterface
227
public interface ColumnBatchFactory<BatchT, SplitT> {
228
/**
229
* Create a vectorized column batch
230
* @param split File split being processed
231
* @param batch Original batch from ORC reader
232
* @return Vectorized column batch for Flink processing
233
*/
234
VectorizedColumnBatch create(SplitT split, BatchT batch);
235
}
236
237
/**
238
* Wrapper interface for unifying different ORC batch types across versions
239
* @param <T> Type of the underlying batch
240
*/
241
public interface OrcVectorizedBatchWrapper<T> {
242
/** Get the underlying batch object */
243
T getBatch();
244
245
/** Get the size (number of rows) in this batch */
246
int size();
247
}
248
```
249
250
**Usage Example:**
251
252
```java
253
// Reading ORC files programmatically in DataStream API
254
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
255
256
// Configure ORC input format
257
Configuration orcConfig = new Configuration();
258
orcConfig.set("orc.stripe.size", "64MB");
259
260
Path[] inputPaths = {new Path("hdfs://namenode:port/input/sales-2023.orc")};
261
String[] fieldNames = {"user_id", "product_id", "amount", "category"};
262
LogicalType[] fieldTypes = {new BigIntType(), new BigIntType(), new DecimalType(10,2), new VarCharType(50)};
263
264
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
265
new OrcColumnarRowFileInputFormat<>(
266
inputPaths,
267
fieldNames,
268
fieldTypes,
269
new int[]{0, 1, 2, 3}, // Select all fields
270
Collections.emptyList(), // No predicates
271
1000, // Batch size
272
orcConfig,
273
new SerializableHadoopConfigWrapper(new Configuration())
274
);
275
276
// Create source from input format
277
DataStream<RowData> orcStream = env.readFile(inputFormat, "hdfs://namenode:port/input");
278
279
// Process the stream
280
orcStream
281
.filter(row -> row.getDecimal(2, 10, 2).doubleValue() > 100.0) // Amount > 100
282
.keyBy(row -> row.getLong(0)) // Key by user_id
283
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
284
.aggregate(new SalesAggregator())
285
.print();
286
```
287
288
### Batch Wrappers
289
290
Wrapper implementations for different ORC batch types and version compatibility.
291
292
```java { .api }
293
/**
294
* Hive-specific batch wrapper implementation
295
*/
296
public class HiveOrcBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
297
public HiveOrcBatchWrapper(VectorizedRowBatch batch);
298
299
public VectorizedRowBatch getBatch();
300
public int size();
301
}
302
```
303
304
The DataStream API integration provides full programmatic control over ORC file processing, allowing for custom vectorization strategies, fine-tuned performance configurations, and seamless integration with complex stream processing pipelines.