or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

writing-support.mddocs/

0

# Writing Support

1

2

Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.

3

4

## Capabilities

5

6

### ParquetWriterFactory

7

8

Generic factory class for creating BulkWriter instances that wrap ParquetWriter functionality.

9

10

```java { .api }

11

/**

12

* Factory for creating Parquet BulkWriter instances

13

* @param <T> Type of records to write

14

*/

15

@PublicEvolving

16

public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {

17

18

/**

19

* Creates a new ParquetWriterFactory

20

* @param writerBuilder ParquetBuilder that creates configured ParquetWriter instances

21

*/

22

public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);

23

24

/**

25

* Creates a BulkWriter instance for the given output stream

26

* @param out FSDataOutputStream to write to

27

* @return BulkWriter instance wrapping a ParquetWriter

28

* @throws IOException if writer creation fails

29

*/

30

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

31

}

32

```

33

34

### ParquetBuilder Interface

35

36

Functional interface for creating configured ParquetWriter instances with custom settings.

37

38

```java { .api }

39

/**

40

* Functional interface for creating ParquetWriter instances

41

* @param <T> Type of records to write

42

*/

43

@FunctionalInterface

44

public interface ParquetBuilder<T> {

45

46

/**

47

* Creates and configures a ParquetWriter for the given output file

48

* @param out OutputFile to write to

49

* @return Configured ParquetWriter instance

50

* @throws IOException if writer creation fails

51

*/

52

ParquetWriter<T> createWriter(OutputFile out) throws IOException;

53

}

54

```

55

56

### ParquetBulkWriter

57

58

BulkWriter implementation that wraps ParquetWriter with Flink's bulk writing interface.

59

60

```java { .api }

61

/**

62

* BulkWriter implementation wrapping ParquetWriter

63

* @param <T> Type of records to write

64

*/

65

@PublicEvolving

66

public class ParquetBulkWriter<T> implements BulkWriter<T> {

67

68

/**

69

* Creates a new ParquetBulkWriter

70

* @param writer ParquetWriter instance to wrap

71

*/

72

public ParquetBulkWriter(ParquetWriter<T> writer);

73

74

/**

75

* Writes an element to the Parquet file

76

* @param element Element to write

77

* @throws IOException if writing fails

78

*/

79

public void addElement(T element) throws IOException;

80

81

/**

82

* Flushes any buffered data (no-op for Parquet)

83

* @throws IOException if flush fails

84

*/

85

public void flush() throws IOException;

86

87

/**

88

* Finishes writing and closes the ParquetWriter

89

* @throws IOException if finishing fails

90

*/

91

public void finish() throws IOException;

92

}

93

```

94

95

### StreamOutputFile

96

97

Internal OutputFile implementation for Flink's streaming file system abstraction.

98

99

```java { .api }

100

/**

101

* OutputFile implementation for streaming file systems

102

*/

103

@Internal

104

public class StreamOutputFile implements OutputFile {

105

106

/**

107

* Creates a new StreamOutputFile

108

* @param out FSDataOutputStream to write to

109

*/

110

public StreamOutputFile(FSDataOutputStream out);

111

112

/**

113

* Creates a position output stream for writing

114

* @return PositionOutputStream for writing data

115

* @throws IOException if stream creation fails

116

*/

117

public PositionOutputStream create() throws IOException;

118

119

/**

120

* Creates a position output stream in overwrite mode

121

* @return PositionOutputStream for writing data

122

* @throws IOException if stream creation fails

123

*/

124

public PositionOutputStream createOrOverwrite() throws IOException;

125

126

/**

127

* Checks if the output file supports block size setting

128

* @return false (not supported for streaming)

129

*/

130

public boolean supportsBlockSize();

131

132

/**

133

* Gets the default block size (not applicable)

134

* @return 0

135

*/

136

public long defaultBlockSize();

137

}

138

```

139

140

## Usage Examples

141

142

### Basic Writer Factory

143

144

```java

145

import org.apache.flink.formats.parquet.ParquetWriterFactory;

146

import org.apache.flink.formats.parquet.ParquetBuilder;

147

import org.apache.parquet.hadoop.ParquetWriter;

148

149

// Create a custom ParquetBuilder

150

ParquetBuilder<MyRecord> builder = (OutputFile out) -> {

151

return MyRecordParquetWriter.builder(out)

152

.withCompressionCodec(CompressionCodecName.SNAPPY)

153

.withPageSize(1024 * 1024)

154

.withRowGroupSize(128 * 1024 * 1024)

155

.build();

156

};

157

158

// Create factory

159

ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(builder);

160

```

161

162

### File Sink Integration

163

164

```java

165

import org.apache.flink.connector.file.sink.FileSink;

166

import org.apache.flink.core.fs.Path;

167

168

// Create FileSink with Parquet writer factory

169

FileSink<MyRecord> sink = FileSink

170

.forBulkFormat(new Path("/output/path"), writerFactory)

171

.withRollingPolicy(

172

DefaultRollingPolicy.builder()

173

.withRolloverInterval(Duration.ofMinutes(15))

174

.withInactivityInterval(Duration.ofMinutes(5))

175

.withMaxPartSize(MemorySize.ofMebiBytes(128))

176

.build()

177

)

178

.build();

179

180

// Use in DataStream

181

dataStream.sinkTo(sink);

182

```

183

184

### Custom Configuration Example

185

186

```java

187

import org.apache.parquet.hadoop.metadata.CompressionCodecName;

188

import org.apache.parquet.hadoop.ParquetWriter;

189

190

// Builder with custom Parquet settings

191

ParquetBuilder<MyRecord> customBuilder = (OutputFile out) -> {

192

return MyRecordParquetWriter.builder(out)

193

.withCompressionCodec(CompressionCodecName.GZIP)

194

.withDictionaryEncoding(true)

195

.withValidation(false)

196

.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)

197

.withPageSize(2 * 1024 * 1024) // 2MB pages

198

.withRowGroupSize(256 * 1024 * 1024) // 256MB row groups

199

.build();

200

};

201

202

ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(customBuilder);

203

```

204

205

### Error Handling

206

207

```java

208

import org.apache.flink.api.common.functions.MapFunction;

209

210

// Handle writing errors gracefully

211

dataStream

212

.map(new MapFunction<InputType, MyRecord>() {

213

@Override

214

public MyRecord map(InputType input) throws Exception {

215

try {

216

return convertToMyRecord(input);

217

} catch (Exception e) {

218

// Log error and handle appropriately

219

LOG.warn("Failed to convert record: " + input, e);

220

return null; // or default value

221

}

222

}

223

})

224

.filter(Objects::nonNull) // Remove failed conversions

225

.sinkTo(sink);

226

```

227

228

### Batch Writing with Checkpointing

229

230

```java

231

import org.apache.flink.streaming.api.CheckpointingMode;

232

233

// Configure checkpointing for reliable writing

234

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

235

env.enableCheckpointing(30000); // Checkpoint every 30 seconds

236

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

237

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

238

env.getCheckpointConfig().setCheckpointTimeout(60000);

239

240

// FileSink automatically handles checkpointing

241

dataStream.sinkTo(parquetSink);

242

```

243

244

## Performance Considerations

245

246

### Row Group Sizing

247

248

```java

249

// Optimize row group size based on your data

250

ParquetBuilder<T> builder = (out) ->

251

writerBuilder(out)

252

.withRowGroupSize(128 * 1024 * 1024) // 128MB - good for analytics

253

.build();

254

```

255

256

### Compression Selection

257

258

```java

259

// Choose compression based on use case

260

CompressionCodecName compression;

261

262

// For write-heavy workloads (faster compression)

263

compression = CompressionCodecName.SNAPPY;

264

265

// For read-heavy workloads (better compression ratio)

266

compression = CompressionCodecName.GZIP;

267

268

// For balanced performance

269

compression = CompressionCodecName.LZ4;

270

```

271

272

### Memory Management

273

274

```java

275

// Configure page size for memory efficiency

276

ParquetBuilder<T> builder = (out) ->

277

writerBuilder(out)

278

.withPageSize(1024 * 1024) // 1MB pages

279

.withDictionaryPageSize(512 * 1024) // 512KB dictionary pages

280

.build();

281

```

282

283

The writing support provides flexible factory patterns that integrate seamlessly with Flink's bulk writing infrastructure while giving full control over Parquet-specific configurations.