or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdpredicate-pushdown.mdtable-api.mdvector-processing.md

bulk-writing.mddocs/

0

# Bulk Writing

1

2

The ORC format provides high-performance bulk writing capabilities through the `OrcBulkWriterFactory` and custom `Vectorizer` implementations. This enables efficient writing of large datasets to ORC files with full control over the vectorization process.

3

4

## Writer Factory

5

6

```java { .api }

7

@PublicEvolving

8

public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

9

public OrcBulkWriterFactory(Vectorizer<T> vectorizer);

10

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);

11

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);

12

13

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

14

15

@VisibleForTesting

16

protected OrcFile.WriterOptions getWriterOptions();

17

}

18

```

19

20

**Factory Method Usage:**

21

22

```java

23

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

24

import org.apache.flink.orc.vector.RowDataVectorizer;

25

import org.apache.hadoop.conf.Configuration;

26

import java.util.Properties;

27

28

// Simple factory with vectorizer only

29

RowDataVectorizer vectorizer = new RowDataVectorizer(schema, fieldTypes);

30

OrcBulkWriterFactory<RowData> simpleFactory = new OrcBulkWriterFactory<>(vectorizer);

31

32

// Factory with Hadoop configuration

33

Configuration hadoopConfig = new Configuration();

34

hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");

35

OrcBulkWriterFactory<RowData> configuredFactory = new OrcBulkWriterFactory<>(

36

vectorizer,

37

hadoopConfig

38

);

39

40

// Factory with writer properties and configuration

41

Properties writerProps = new Properties();

42

writerProps.setProperty("orc.compress", "SNAPPY");

43

writerProps.setProperty("orc.stripe.size", "134217728");

44

OrcBulkWriterFactory<RowData> fullFactory = new OrcBulkWriterFactory<>(

45

vectorizer,

46

writerProps,

47

hadoopConfig

48

);

49

```

50

51

## Vectorizer Base Class

52

53

```java { .api }

54

@PublicEvolving

55

public abstract class Vectorizer<T> implements Serializable {

56

public Vectorizer(String schema);

57

58

public TypeDescription getSchema();

59

public void setWriter(Writer writer);

60

public void addUserMetadata(String key, ByteBuffer value);

61

62

public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;

63

}

64

```

65

66

## RowData Vectorizer

67

68

```java { .api }

69

public class RowDataVectorizer extends Vectorizer<RowData> {

70

public RowDataVectorizer(String schema, LogicalType[] fieldTypes);

71

72

public void vectorize(RowData row, VectorizedRowBatch batch);

73

}

74

```

75

76

## Usage Examples

77

78

### Basic RowData Writing

79

80

```java

81

import org.apache.flink.orc.writer.OrcBulkWriterFactory;

82

import org.apache.flink.orc.vector.RowDataVectorizer;

83

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

84

import org.apache.flink.table.data.RowData;

85

import org.apache.flink.table.types.logical.*;

86

87

// Define schema and field types

88

LogicalType[] fieldTypes = {

89

new BigIntType(), // id

90

new VarCharType(255), // name

91

new IntType(), // age

92

new DecimalType(10, 2), // salary

93

new BooleanType(), // active

94

new TimestampType(3) // created_at

95

};

96

97

String orcSchema = "struct<" +

98

"id:bigint," +

99

"name:string," +

100

"age:int," +

101

"salary:decimal(10,2)," +

102

"active:boolean," +

103

"created_at:timestamp" +

104

">";

105

106

// Create vectorizer

107

RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);

108

109

// Create writer factory

110

OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

111

112

// Use in streaming sink

113

DataStream<RowData> dataStream = // ... your data stream

114

dataStream.addSink(

115

StreamingFileSink.forBulkFormat(

116

new Path("/path/to/output"),

117

writerFactory

118

).build()

119

);

120

```

121

122

### Writing with Configuration

123

124

```java

125

import org.apache.hadoop.conf.Configuration;

126

import java.util.Properties;

127

128

// Configure ORC writer properties

129

Properties writerProperties = new Properties();

130

writerProperties.setProperty("orc.compress", "SNAPPY");

131

writerProperties.setProperty("orc.stripe.size", "67108864");

132

writerProperties.setProperty("orc.row.index.stride", "10000");

133

134

// Configure Hadoop settings

135

Configuration hadoopConfig = new Configuration();

136

hadoopConfig.set("orc.bloom.filter.columns", "name,id");

137

hadoopConfig.setFloat("orc.bloom.filter.fpp", 0.05f);

138

139

// Create configured writer factory

140

OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(

141

vectorizer,

142

writerProperties,

143

hadoopConfig

144

);

145

```

146

147

### Complex Types Writing

148

149

```java

150

// Schema with complex types

151

LogicalType[] complexFieldTypes = {

152

new BigIntType(), // id

153

new ArrayType(new VarCharType(100)), // tags array

154

new MapType(new VarCharType(50), new IntType()), // metrics map

155

new RowType(Arrays.asList( // address struct

156

new RowType.RowField("street", new VarCharType(200)),

157

new RowType.RowField("city", new VarCharType(100)),

158

new RowType.RowField("zip", new VarCharType(10))

159

))

160

};

161

162

String complexOrcSchema = "struct<" +

163

"id:bigint," +

164

"tags:array<string>," +

165

"metrics:map<string,int>," +

166

"address:struct<street:string,city:string,zip:string>" +

167

">";

168

169

RowDataVectorizer complexVectorizer = new RowDataVectorizer(complexOrcSchema, complexFieldTypes);

170

```

171

172

## Custom Vectorizer Implementation

173

174

```java

175

// Custom vectorizer for POJO classes

176

public class UserVectorizer extends Vectorizer<User> {

177

178

public UserVectorizer() {

179

super("struct<id:bigint,name:string,email:string,active:boolean>");

180

}

181

182

@Override

183

public void vectorize(User user, VectorizedRowBatch batch) throws IOException {

184

int rowId = batch.size++;

185

186

// Set ID column

187

((LongColumnVector) batch.cols[0]).vector[rowId] = user.getId();

188

189

// Set name column

190

byte[] nameBytes = user.getName().getBytes(StandardCharsets.UTF_8);

191

((BytesColumnVector) batch.cols[1]).setVal(rowId, nameBytes);

192

193

// Set email column

194

byte[] emailBytes = user.getEmail().getBytes(StandardCharsets.UTF_8);

195

((BytesColumnVector) batch.cols[2]).setVal(rowId, emailBytes);

196

197

// Set active column

198

((LongColumnVector) batch.cols[3]).vector[rowId] = user.isActive() ? 1 : 0;

199

200

// Add custom metadata

201

if (user.hasSpecialAttribute()) {

202

addUserMetadata("special_users", ByteBuffer.wrap("true".getBytes()));

203

}

204

}

205

}

206

207

// Usage

208

UserVectorizer userVectorizer = new UserVectorizer();

209

OrcBulkWriterFactory<User> userWriterFactory = new OrcBulkWriterFactory<>(userVectorizer);

210

```

211

212

## Bulk Writer Implementation

213

214

```java { .api }

215

@Internal

216

public class OrcBulkWriter<T> implements BulkWriter<T> {

217

public void addElement(T element) throws IOException;

218

public void flush() throws IOException;

219

public void finish() throws IOException;

220

}

221

```

222

223

The `OrcBulkWriter` handles the actual writing process:

224

225

- **addElement()**: Vectorizes and buffers elements, writing batches when full

226

- **flush()**: Forces write of any pending batched data

227

- **finish()**: Finalizes and closes the ORC file

228

229

## Performance Considerations

230

231

### Batch Size Optimization

232

233

```java

234

// ORC uses VectorizedRowBatch with default size

235

VectorizedRowBatch batch = schema.createRowBatch(); // Default: 1024 rows

236

VectorizedRowBatch customBatch = schema.createRowBatch(2048); // Custom size

237

```

238

239

### Compression Settings

240

241

```java

242

Properties props = new Properties();

243

props.setProperty("orc.compress", "SNAPPY"); // Fast compression

244

props.setProperty("orc.compress", "ZLIB"); // Better compression ratio

245

props.setProperty("orc.compress", "ZSTD"); // Best compression ratio

246

props.setProperty("orc.compress", "NONE"); // No compression

247

```

248

249

### Stripe Size Configuration

250

251

```java

252

Properties props = new Properties();

253

props.setProperty("orc.stripe.size", "134217728"); // 128MB stripes

254

props.setProperty("orc.row.index.stride", "20000"); // Index every 20k rows

255

```

256

257

## Writer Lifecycle

258

259

1. **Factory Creation**: `OrcBulkWriterFactory` configured with vectorizer and options

260

2. **Writer Instantiation**: `create()` method called with output stream

261

3. **Element Processing**: `addElement()` vectorizes and batches data

262

4. **Batch Writing**: Full batches automatically written to ORC file

263

5. **Finalization**: `finish()` flushes remaining data and closes file

264

265

## Error Handling

266

267

```java

268

try {

269

writerFactory.create(outputStream);

270

} catch (IOException e) {

271

// Handle writer creation errors

272

log.error("Failed to create ORC writer", e);

273

}

274

275

// Vectorizer error handling

276

@Override

277

public void vectorize(MyData data, VectorizedRowBatch batch) throws IOException {

278

try {

279

// Vectorization logic

280

} catch (Exception e) {

281

throw new IOException("Failed to vectorize data: " + data, e);

282

}

283

}

284

```