or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatastream-api.mdindex.mdtable-api.md

datastream-api.mddocs/

0

# DataStream API Integration

1

2

Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines, streaming applications, and batch processing with full control over vectorization and performance characteristics.

3

4

## Capabilities

5

6

### Bulk Writer Factory

7

8

Factory for creating ORC bulk writers in DataStream API applications.

9

10

```java { .api }

11

/**

12

* Factory for creating ORC bulk writers for use with DataStream API.

13

* Supports custom vectorization and configuration options.

14

* @param <T> The type of elements to write

15

*/

16

public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

17

/**

18

* Constructor with custom vectorizer

19

* @param vectorizer Vectorizer for converting elements to ORC batches

20

*/

21

public OrcBulkWriterFactory(Vectorizer<T> vectorizer);

22

23

/**

24

* Constructor with vectorizer and ORC writer configuration

25

* @param vectorizer Vectorizer for converting elements

26

* @param writerConfiguration ORC writer configuration

27

*/

28

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);

29

30

/**

31

* Full constructor with all configuration options

32

* @param vectorizer Vectorizer for converting elements

33

* @param writerProperties ORC writer properties

34

* @param hadoopConfiguration Hadoop configuration for HDFS access

35

*/

36

public OrcBulkWriterFactory(

37

Vectorizer<T> vectorizer,

38

Properties writerProperties,

39

Configuration hadoopConfiguration

40

);

41

42

/**

43

* Creates a bulk writer for the given output stream

44

* @param out Output stream to write to

45

* @return Configured ORC bulk writer

46

* @throws IOException If writer creation fails

47

*/

48

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

49

}

50

```

51

52

**Usage Example:**

53

54

```java

55

import org.apache.flink.core.fs.Path;

56

import org.apache.flink.orc.vector.RowDataVectorizer;

57

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

58

import org.apache.flink.streaming.api.datastream.DataStream;

59

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

60

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;

61

import org.apache.flink.table.data.RowData;

62

import org.apache.flink.table.types.logical.*;

63

64

// Define schema for sales records

65

String orcSchema = "struct<user_id:bigint,product_id:bigint,amount:decimal(10,2),purchase_time:timestamp>";

66

LogicalType[] fieldTypes = {

67

new BigIntType(),

68

new BigIntType(),

69

new DecimalType(10, 2),

70

new TimestampType(3)

71

};

72

73

// Create vectorizer

74

RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);

75

76

// Configure ORC writer properties

77

Properties writerProps = new Properties();

78

writerProps.setProperty("orc.compress", "SNAPPY");

79

writerProps.setProperty("orc.stripe.size", "67108864"); // 64MB

80

writerProps.setProperty("orc.row.index.stride", "10000");

81

82

// Create writer factory

83

OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(

84

vectorizer, writerProps, new Configuration()

85

);

86

87

// Create streaming file sink

88

StreamingFileSink<RowData> sink = StreamingFileSink

89

.forBulkFormat(new Path("hdfs://namenode:port/sales-data"), writerFactory)

90

.withRollingPolicy(OnCheckpointRollingPolicy.build())

91

.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))

92

.build();

93

94

// Write data stream to ORC files

95

DataStream<RowData> salesStream = // ... your data stream

96

salesStream.addSink(sink);

97

```

98

99

### Vectorization

100

101

Core vectorization components for converting elements to ORC format.

102

103

```java { .api }

104

/**

105

* Abstract base class for converting elements to ORC VectorizedRowBatch format.

106

* Handles schema management and provides vectorization interface.

107

* @param <T> The type of elements to vectorize

108

*/

109

public abstract class Vectorizer<T> {

110

/**

111

* Constructor with ORC schema string

112

* @param schema ORC schema in string format (e.g., "struct<id:bigint,name:string>")

113

*/

114

public Vectorizer(String schema);

115

116

/** Returns the ORC schema description */

117

public TypeDescription getSchema();

118

119

/**

120

* Add user metadata to be written to ORC file

121

* @param key Metadata key

122

* @param value Metadata value as ByteBuffer

123

*/

124

public void addUserMetadata(String key, ByteBuffer value);

125

126

/**

127

* Abstract method for vectorizing an element into a batch

128

* @param element Element to vectorize

129

* @param batch VectorizedRowBatch to populate

130

* @throws IOException If vectorization fails

131

*/

132

public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;

133

}

134

135

/**

136

* Concrete vectorizer implementation for RowData elements.

137

* Optimized for Flink's internal RowData representation.

138

*/

139

public class RowDataVectorizer extends Vectorizer<RowData> {

140

/**

141

* Constructor for RowData vectorizer

142

* @param schema ORC schema string

143

* @param fieldTypes Array of Flink logical types corresponding to schema fields

144

*/

145

public RowDataVectorizer(String schema, LogicalType[] fieldTypes);

146

147

/**

148

* Vectorize a RowData element into the batch

149

* @param element RowData element to vectorize

150

* @param batch VectorizedRowBatch to populate

151

* @throws IOException If vectorization fails

152

*/

153

public void vectorize(RowData element, VectorizedRowBatch batch) throws IOException;

154

}

155

```

156

157

**Usage Example:**

158

159

```java

160

// Custom vectorizer for POJO objects

161

public class SalesRecordVectorizer extends Vectorizer<SalesRecord> {

162

public SalesRecordVectorizer() {

163

super("struct<user_id:bigint,product_id:bigint,amount:decimal(10,2),category:string>");

164

}

165

166

@Override

167

public void vectorize(SalesRecord record, VectorizedRowBatch batch) throws IOException {

168

int row = batch.size++;

169

170

// Set user_id (bigint)

171

((LongColumnVector) batch.cols[0]).vector[row] = record.getUserId();

172

173

// Set product_id (bigint)

174

((LongColumnVector) batch.cols[1]).vector[row] = record.getProductId();

175

176

// Set amount (decimal)

177

HiveDecimalWritable decimal = new HiveDecimalWritable(record.getAmount());

178

((DecimalColumnVector) batch.cols[2]).set(row, decimal);

179

180

// Set category (string)

181

byte[] categoryBytes = record.getCategory().getBytes(StandardCharsets.UTF_8);

182

((BytesColumnVector) batch.cols[3]).setRef(row, categoryBytes, 0, categoryBytes.length);

183

}

184

}

185

```

186

187

### Column Vector Adapters

188

189

Adapters that bridge Hive ORC column vectors with Flink's vectorized processing.

190

191

```java { .api }

192

/**

193

* Abstract base adapter class for Hive column vectors to Flink column vectors.

194

* Provides unified interface for different column vector types.

195

*/

196

public abstract class AbstractOrcColumnVector {

197

/**

198

* Create a Flink column vector from a Hive column vector

199

* @param hiveVector Hive column vector

200

* @return Flink-compatible column vector

201

*/

202

public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector hiveVector);

203

204

/**

205

* Create a constant Flink vector from a Hive column vector

206

* @param hiveVector Hive column vector with constant value

207

* @param batchSize Size of the batch

208

* @param type Logical type of the column

209

* @return Flink-compatible constant column vector

210

*/

211

public static ColumnVector createFlinkVectorFromConstant(

212

org.apache.hadoop.hive.ql.exec.vector.ColumnVector hiveVector,

213

int batchSize,

214

LogicalType type

215

);

216

217

/** Check if value at index is null */

218

public boolean isNullAt(int i);

219

}

220

221

/**

222

* Factory interface for creating vectorized column batches

223

* @param <BatchT> Type of the batch (e.g., VectorizedRowBatch)

224

* @param <SplitT> Type of the split (e.g., FileSourceSplit)

225

*/

226

@FunctionalInterface

227

public interface ColumnBatchFactory<BatchT, SplitT> {

228

/**

229

* Create a vectorized column batch

230

* @param split File split being processed

231

* @param batch Original batch from ORC reader

232

* @return Vectorized column batch for Flink processing

233

*/

234

VectorizedColumnBatch create(SplitT split, BatchT batch);

235

}

236

237

/**

238

* Wrapper interface for unifying different ORC batch types across versions

239

* @param <T> Type of the underlying batch

240

*/

241

public interface OrcVectorizedBatchWrapper<T> {

242

/** Get the underlying batch object */

243

T getBatch();

244

245

/** Get the size (number of rows) in this batch */

246

int size();

247

}

248

```

249

250

**Usage Example:**

251

252

```java

253

// Reading ORC files programmatically in DataStream API

254

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

255

256

// Configure ORC input format

257

Configuration orcConfig = new Configuration();

258

orcConfig.set("orc.stripe.size", "64MB");

259

260

Path[] inputPaths = {new Path("hdfs://namenode:port/input/sales-2023.orc")};

261

String[] fieldNames = {"user_id", "product_id", "amount", "category"};

262

LogicalType[] fieldTypes = {new BigIntType(), new BigIntType(), new DecimalType(10,2), new VarCharType(50)};

263

264

OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =

265

new OrcColumnarRowFileInputFormat<>(

266

inputPaths,

267

fieldNames,

268

fieldTypes,

269

new int[]{0, 1, 2, 3}, // Select all fields

270

Collections.emptyList(), // No predicates

271

1000, // Batch size

272

orcConfig,

273

new SerializableHadoopConfigWrapper(new Configuration())

274

);

275

276

// Create source from input format

277

DataStream<RowData> orcStream = env.readFile(inputFormat, "hdfs://namenode:port/input");

278

279

// Process the stream

280

orcStream

281

.filter(row -> row.getDecimal(2, 10, 2).doubleValue() > 100.0) // Amount > 100

282

.keyBy(row -> row.getLong(0)) // Key by user_id

283

.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))

284

.aggregate(new SalesAggregator())

285

.print();

286

```

287

288

### Batch Wrappers

289

290

Wrapper implementations for different ORC batch types and version compatibility.

291

292

```java { .api }

293

/**

294

* Hive-specific batch wrapper implementation

295

*/

296

public class HiveOrcBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {

297

public HiveOrcBatchWrapper(VectorizedRowBatch batch);

298

299

public VectorizedRowBatch getBatch();

300

public int size();

301

}

302

```

303

304

The DataStream API integration provides full programmatic control over ORC file processing, allowing for custom vectorization strategies, fine-tuned performance configurations, and seamless integration with complex stream processing pipelines.