or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md

bulk-writing.mddocs/

0

# Bulk Writing

1

2

Factory and implementation for creating ORC bulk writers that efficiently write Flink RowData to ORC files without Hive dependencies. Provides high-performance batch writing with automatic type conversion and memory management.

3

4

## Capabilities

5

6

### ORC Bulk Writer Factory

7

8

Factory class that creates BulkWriter instances for writing RowData to ORC files using the no-hive ORC implementation.

9

10

```java { .api }

11

/**

12

* Factory for creating ORC bulk writers without Hive dependencies

13

* Implements BulkWriter.Factory<RowData> for integration with Flink's file sinks

14

*/

15

public class OrcNoHiveBulkWriterFactory implements BulkWriter.Factory<RowData> {

16

17

/**

18

* Creates a new ORC bulk writer factory

19

* @param conf Hadoop configuration for ORC file settings

20

* @param schema ORC schema string (e.g., "struct<name:string,age:int>")

21

* @param fieldTypes Array of Flink logical types matching the schema fields

22

*/

23

public OrcNoHiveBulkWriterFactory(Configuration conf, String schema, LogicalType[] fieldTypes);

24

25

/**

26

* Creates a BulkWriter instance for the given output stream

27

* @param out Output stream to write ORC data to

28

* @return BulkWriter instance for writing RowData

29

* @throws IOException if writer creation fails

30

*/

31

public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;

32

}

33

```

34

35

**Usage Examples:**

36

37

```java

38

import org.apache.flink.orc.nohive.OrcNoHiveBulkWriterFactory;

39

import org.apache.flink.table.data.RowData;

40

import org.apache.flink.table.types.logical.*;

41

import org.apache.hadoop.conf.Configuration;

42

43

// Define schema and types for a user record

44

String orcSchema = "struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>";

45

LogicalType[] fieldTypes = {

46

new BigIntType(),

47

new VarCharType(255),

48

new VarCharType(255),

49

new IntType(),

50

new DecimalType(10, 2)

51

};

52

53

// Create factory with Hadoop configuration

54

Configuration hadoopConfig = new Configuration();

55

hadoopConfig.set("orc.compress", "ZLIB"); // Optional compression

56

hadoopConfig.setInt("orc.row.batch.size", 1024); // Optional batch size

57

58

OrcNoHiveBulkWriterFactory factory = new OrcNoHiveBulkWriterFactory(

59

hadoopConfig,

60

orcSchema,

61

fieldTypes

62

);

63

64

// Use with StreamingFileSink

65

StreamingFileSink<RowData> sink = StreamingFileSink

66

.forBulkFormat(outputPath, factory)

67

.withRollingPolicy(DefaultRollingPolicy.builder()

68

.withMaxPartSize(128 * 1024 * 1024) // 128MB files

69

.build())

70

.build();

71

72

dataStream.addSink(sink);

73

```

74

75

```java

76

// Complex nested schema example

77

String complexSchema = "struct<" +

78

"user_id:bigint," +

79

"profile:struct<name:string,bio:string>," +

80

"tags:array<string>," +

81

"metrics:map<string,double>" +

82

">";

83

84

LogicalType[] complexTypes = {

85

new BigIntType(),

86

RowType.of(new VarCharType(100), new VarCharType(500)),

87

new ArrayType(new VarCharType(50)),

88

new MapType(new VarCharType(50), new DoubleType())

89

};

90

91

OrcNoHiveBulkWriterFactory complexFactory = new OrcNoHiveBulkWriterFactory(

92

hadoopConfig,

93

complexSchema,

94

complexTypes

95

);

96

```

97

98

### BulkWriter Interface

99

100

The factory creates BulkWriter instances with the following interface:

101

102

```java { .api }

103

/**

104

* BulkWriter interface for writing RowData to ORC files

105

* Created by OrcNoHiveBulkWriterFactory.create()

106

*/

107

interface BulkWriter<RowData> {

108

/**

109

* Add a single RowData element to the ORC file

110

* @param row RowData instance to write

111

* @throws IOException if write operation fails

112

*/

113

void addElement(RowData row) throws IOException;

114

115

/**

116

* Flush any buffered data to the output stream

117

* @throws IOException if flush operation fails

118

*/

119

void flush() throws IOException;

120

121

/**

122

* Finish writing and close the ORC file

123

* @throws IOException if close operation fails

124

*/

125

void finish() throws IOException;

126

}

127

```

128

129

### Physical Writer Implementation

130

131

Internal implementation that handles ORC file writing with relocated Protobuf classes for no-hive compatibility.

132

133

```java { .api }

134

/**

135

* Physical writer implementation for ORC files without Hive dependencies

136

* Handles relocated Protobuf classes in orc-core-nohive

137

*/

138

public class NoHivePhysicalWriterImpl extends PhysicalWriterImpl {

139

140

/**

141

* Creates a new no-hive physical writer

142

* @param out Output stream to write to

143

* @param opts ORC writer options and configuration

144

* @throws IOException if writer initialization fails

145

*/

146

public NoHivePhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException;

147

148

/**

149

* Write ORC metadata using relocated protobuf classes

150

* @param metadata ORC metadata to write

151

* @throws IOException if write operation fails

152

*/

153

protected void writeMetadata(OrcProto.Metadata metadata) throws IOException;

154

155

/**

156

* Write ORC file footer using relocated protobuf classes

157

* @param footer ORC file footer to write

158

* @throws IOException if write operation fails

159

*/

160

protected void writeFileFooter(OrcProto.Footer footer) throws IOException;

161

162

/**

163

* Write ORC stripe footer using relocated protobuf classes

164

* @param footer ORC stripe footer to write

165

* @throws IOException if write operation fails

166

*/

167

protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException;

168

}

169

```

170

171

## Type Conversion

172

173

The bulk writer automatically converts Flink logical types to ORC column vectors:

174

175

| Flink Type | ORC Vector Type | Conversion Notes |

176

|------------|----------------|------------------|

177

| BOOLEAN | LongColumnVector | 1 for true, 0 for false |

178

| TINYINT, SMALLINT, INTEGER, BIGINT | LongColumnVector | Direct mapping |

179

| FLOAT, DOUBLE | DoubleColumnVector | Direct mapping |

180

| CHAR, VARCHAR | BytesColumnVector | UTF-8 encoded |

181

| BINARY, VARBINARY | BytesColumnVector | Direct byte array |

182

| DECIMAL | DecimalColumnVector | Uses HiveDecimal for precision |

183

| DATE | LongColumnVector | Days since epoch |

184

| TIMESTAMP_* | TimestampColumnVector | Microsecond precision |

185

186

## Configuration Options

187

188

Configure the ORC writer through Hadoop Configuration:

189

190

```java

191

Configuration config = new Configuration();

192

193

// Compression settings

194

config.set("orc.compress", "ZLIB"); // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD

195

config.set("orc.compress.size", "262144"); // 256KB compression blocks

196

197

// Performance settings

198

config.setInt("orc.row.batch.size", 1024); // Rows per batch

199

config.setInt("orc.stripe.size", 67108864); // 64MB stripes

200

config.setBoolean("orc.use.zerocopy", true); // Enable zero-copy reads

201

202

// Memory settings

203

config.setDouble("orc.dictionary.key.threshold", 0.8); // Dictionary encoding threshold

204

```

205

206

## Memory Management

207

208

The bulk writer manages memory efficiently through:

209

210

- **Batch Processing**: Writes data in configurable batch sizes (default 1024 rows)

211

- **Automatic Flushing**: Flushes batches when full to prevent memory buildup

212

- **Stream Management**: Properly closes underlying streams and releases resources

213

- **Vector Reuse**: Reuses vectorized row batches across write operations

214

215

## Error Handling

216

217

Common exceptions and handling strategies:

218

219

```java

220

try {

221

BulkWriter<RowData> writer = factory.create(outputStream);

222

writer.addElement(rowData);

223

writer.finish();

224

} catch (IOException e) {

225

// Handle file system errors, ORC format errors, or write failures

226

logger.error("Failed to write ORC data", e);

227

} catch (UnsupportedOperationException e) {

228

// Handle unsupported data types

229

logger.error("Unsupported data type in schema", e);

230

}

231

```