or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdformat-factory.mdindex.mdprotobuf-integration.mdrowdata-writers.mdschema-utilities.mdvectorized-input.md

rowdata-writers.mddocs/

0

# RowData Writers

1

2

High-performance writers for Flink's internal RowData format, providing comprehensive schema mapping and configuration support for writing Parquet files.

3

4

## Capabilities

5

6

### ParquetRowDataBuilder

7

8

Builder class for creating Parquet writers specifically designed for Flink's RowData format with full schema conversion and configuration support.

9

10

```java { .api }

11

/**

12

* Builder for creating Parquet writers that handle Flink RowData

13

* Extends ParquetWriter.Builder with RowData-specific functionality

14

*/

15

public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {

16

17

/**

18

* Creates a new ParquetRowDataBuilder instance

19

* @param path Output file path for the Parquet writer

20

* @param rowType Flink logical row type defining the schema

21

* @param utcTimestamp Whether to use UTC timezone for timestamp conversion

22

*/

23

public ParquetRowDataBuilder(OutputFile path, RowType rowType, boolean utcTimestamp);

24

25

/**

26

* Creates a complete ParquetWriterFactory for RowData

27

* @param rowType Flink logical row type defining the data schema

28

* @param conf Hadoop configuration for Parquet settings

29

* @param utcTimestamp Whether to use UTC timezone for timestamp conversion

30

* @return ParquetWriterFactory configured for RowData writing

31

*/

32

public static ParquetWriterFactory<RowData> createWriterFactory(

33

RowType rowType,

34

Configuration conf,

35

boolean utcTimestamp

36

);

37

}

38

```

39

40

### ParquetWriterFactory

41

42

Generic factory for creating Parquet bulk writers using user-provided builder configurations.

43

44

```java { .api }

45

/**

46

* Factory that creates Parquet BulkWriter instances

47

* Uses user-supplied ParquetBuilder to configure the underlying ParquetWriter

48

*/

49

public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {

50

51

/**

52

* Creates a new ParquetWriterFactory

53

* @param writerBuilder Builder to construct the ParquetWriter

54

*/

55

public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);

56

57

/**

58

* Creates a BulkWriter for the given output stream

59

* @param stream FSDataOutputStream to write Parquet data to

60

* @return BulkWriter instance wrapping a ParquetWriter

61

* @throws IOException if writer creation fails

62

*/

63

public BulkWriter<T> create(FSDataOutputStream stream) throws IOException;

64

}

65

```

66

67

### ParquetBulkWriter

68

69

BulkWriter implementation that wraps Parquet writers for integration with Flink's streaming sinks.

70

71

```java { .api }

72

/**

73

* BulkWriter implementation wrapping a ParquetWriter

74

* Provides the interface between Flink's streaming framework and Parquet writing

75

*/

76

public class ParquetBulkWriter<T> implements BulkWriter<T> {

77

78

/**

79

* Creates a new ParquetBulkWriter

80

* @param parquetWriter The underlying ParquetWriter to wrap

81

*/

82

public ParquetBulkWriter(ParquetWriter<T> parquetWriter);

83

84

/**

85

* Adds an element to the Parquet file

86

* @param datum The data element to write

87

* @throws IOException if writing fails

88

*/

89

public void addElement(T datum) throws IOException;

90

91

/**

92

* Flushes any buffered data (no-op for Parquet)

93

*/

94

public void flush();

95

96

/**

97

* Finishes writing and closes the Parquet file

98

* @throws IOException if closing fails

99

*/

100

public void finish() throws IOException;

101

}

102

```

103

104

### FlinkParquetBuilder

105

106

Internal builder implementation for creating RowData-specific Parquet writers with proper configuration.

107

108

```java { .api }

109

/**

110

* Internal ParquetBuilder implementation for RowData

111

* Handles Hadoop configuration and Parquet writer setup

112

*/

113

public static class FlinkParquetBuilder implements ParquetBuilder<RowData> {

114

115

/**

116

* Creates a new FlinkParquetBuilder

117

* @param rowType Flink logical row type

118

* @param conf Hadoop configuration

119

* @param utcTimestamp UTC timezone flag

120

*/

121

public FlinkParquetBuilder(RowType rowType, Configuration conf, boolean utcTimestamp);

122

123

/**

124

* Creates a configured ParquetWriter for the given output file

125

* @param out OutputFile to write to

126

* @return Configured ParquetWriter instance

127

* @throws IOException if writer creation fails

128

*/

129

public ParquetWriter<RowData> createWriter(OutputFile out) throws IOException;

130

}

131

```

132

133

## Usage Examples

134

135

### Basic RowData Writer

136

137

```java

138

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;

139

import org.apache.flink.table.types.logical.RowType;

140

import org.apache.flink.table.types.logical.LogicalTypeRoot;

141

import org.apache.hadoop.conf.Configuration;

142

143

// Define schema

144

RowType rowType = RowType.of(

145

new RowType.RowField("id", LogicalType.of(LogicalTypeRoot.BIGINT)),

146

new RowType.RowField("name", LogicalType.of(LogicalTypeRoot.VARCHAR, 255)),

147

new RowType.RowField("timestamp", LogicalType.of(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE))

148

);

149

150

// Create writer factory

151

Configuration hadoopConfig = new Configuration();

152

boolean useUtcTimezone = true;

153

154

ParquetWriterFactory<RowData> writerFactory =

155

ParquetRowDataBuilder.createWriterFactory(rowType, hadoopConfig, useUtcTimezone);

156

```

157

158

### Integration with File Sink

159

160

```java

161

import org.apache.flink.connector.file.sink.FileSink;

162

import org.apache.flink.core.fs.Path;

163

import org.apache.flink.streaming.api.datastream.DataStream;

164

165

DataStream<RowData> dataStream = // ... your data stream

166

167

// Create file sink with Parquet writer

168

FileSink<RowData> parquetSink = FileSink

169

.forBulkFormat(new Path("/output/path"), writerFactory)

170

.withRollingPolicy(DefaultRollingPolicy.builder()

171

.withRolloverInterval(Duration.ofMinutes(15))

172

.withInactivityInterval(Duration.ofMinutes(5))

173

.build())

174

.build();

175

176

dataStream.sinkTo(parquetSink);

177

```

178

179

### Custom Configuration

180

181

```java

182

import org.apache.hadoop.conf.Configuration;

183

import static org.apache.parquet.hadoop.ParquetOutputFormat.*;

184

185

// Configure Hadoop settings for Parquet

186

Configuration conf = new Configuration();

187

conf.set("parquet.compression", "SNAPPY");

188

conf.setInt("parquet.block.size", 134217728); // 128MB

189

conf.setInt("parquet.page.size", 1048576); // 1MB

190

conf.setBoolean("parquet.enable.dictionary", true);

191

192

// Create writer with custom configuration

193

ParquetWriterFactory<RowData> customWriterFactory =

194

ParquetRowDataBuilder.createWriterFactory(rowType, conf, false);

195

```

196

197

### Manual Writer Creation

198

199

```java

200

import org.apache.flink.formats.parquet.StreamOutputFile;

201

import org.apache.flink.core.fs.FSDataOutputStream;

202

import org.apache.parquet.hadoop.ParquetWriter;

203

204

// Create output stream

205

FSDataOutputStream outputStream = // ... create output stream

206

StreamOutputFile outputFile = new StreamOutputFile(outputStream);

207

208

// Build writer manually

209

ParquetWriter<RowData> writer = new ParquetRowDataBuilder(outputFile, rowType, true)

210

.withCompressionCodec(CompressionCodecName.SNAPPY)

211

.withRowGroupSize(134217728)

212

.withPageSize(1048576)

213

.withDictionaryEncoding(true)

214

.build();

215

216

// Write data

217

RowData rowData = // ... create row data

218

writer.write(rowData);

219

writer.close();

220

```

221

222

## Configuration Options

223

224

### Compression Settings

225

226

```java

227

// Available compression codecs

228

.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)

229

.withCompressionCodec(CompressionCodecName.SNAPPY) // Default, good balance

230

.withCompressionCodec(CompressionCodecName.GZIP) // High compression

231

.withCompressionCodec(CompressionCodecName.LZO)

232

.withCompressionCodec(CompressionCodecName.BROTLI) // Best compression

233

.withCompressionCodec(CompressionCodecName.LZ4) // Fast compression

234

.withCompressionCodec(CompressionCodecName.ZSTD) // Good compression + speed

235

```

236

237

### Performance Tuning

238

239

```java

240

// Row group size (default: 134MB)

241

.withRowGroupSize(268435456) // 256MB for better compression

242

243

// Page size (default: 1MB)

244

.withPageSize(2097152) // 2MB for reduced metadata overhead

245

246

// Dictionary encoding (default: true)

247

.withDictionaryEncoding(false) // Disable for high-cardinality data

248

249

// Dictionary page size (default: 1MB)

250

.withDictionaryPageSize(2097152)

251

```

252

253

## Error Handling

254

255

Common exceptions and error scenarios:

256

257

```java

258

try {

259

ParquetWriterFactory<RowData> factory =

260

ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);

261

} catch (IllegalArgumentException e) {

262

// Invalid row type or unsupported data type

263

} catch (RuntimeException e) {

264

// Configuration errors or schema conversion failures

265

}

266

267

try {

268

writer.write(rowData);

269

} catch (IOException e) {

270

// File system errors, disk full, permission issues

271

} catch (RuntimeException e) {

272

// Data conversion errors, schema mismatches

273

}

274

```