or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bucketers.mdindex.mdsinks.mdutilities.mdwriters.md

writers.mddocs/

0

# File Writers

1

2

Writers handle the actual file I/O operations for different data formats. The filesystem connector provides several built-in writer implementations and allows custom writers through the Writer interface.

3

4

## Writer Interface

5

6

The base interface that all writers must implement.

7

8

```java { .api }

9

public interface Writer<T> extends Serializable

10

```

11

12

### Core Methods

13

14

```java { .api }

15

void open(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException

16

```

17

18

Initializes the writer for a newly opened bucket file.

19

20

**Parameters:**

21

- `fs` - The Hadoop FileSystem containing the file

22

- `path` - The Path of the newly opened file

23

24

```java { .api }

25

void write(T element) throws IOException

26

```

27

28

Writes one element to the bucket file.

29

30

**Parameters:**

31

- `element` - The element to write

32

33

```java { .api }

34

long flush() throws IOException

35

```

36

37

Flushes internally held data and returns the offset for file truncation during recovery.

38

39

**Returns:** The file offset that the file must be truncated to at recovery

40

41

```java { .api }

42

long getPos() throws IOException

43

```

44

45

Retrieves the current position (size) of the output file.

46

47

**Returns:** Current file position in bytes

48

49

```java { .api }

50

void close() throws IOException

51

```

52

53

Closes the writer and associated resources. Safe to call multiple times.

54

55

```java { .api }

56

Writer<T> duplicate()

57

```

58

59

Creates a duplicate of this writer for parallel sink instances.

60

61

**Returns:** A new Writer instance

62

63

## StringWriter

64

65

Writes elements as strings with newline separation using configurable character encoding.

66

67

### Constructors

68

69

```java { .api }

70

public StringWriter()

71

```

72

73

Creates a StringWriter using UTF-8 encoding.

74

75

```java { .api }

76

public StringWriter(String charsetName)

77

```

78

79

Creates a StringWriter with the specified character encoding.

80

81

**Parameters:**

82

- `charsetName` - Character set name (e.g., "UTF-8", "ISO-8859-1")

83

84

### Usage Example

85

86

```java

87

import org.apache.flink.streaming.connectors.fs.StringWriter;

88

89

// UTF-8 encoding (default)

90

StringWriter<String> writer = new StringWriter<>();

91

92

// Custom encoding

93

StringWriter<String> writer = new StringWriter<>("ISO-8859-1");

94

95

BucketingSink<String> sink = new BucketingSink<>("/tmp/output");

96

sink.setWriter(writer);

97

```

98

99

## SequenceFileWriter

100

101

Writes Hadoop SequenceFiles for Tuple2 elements containing Hadoop Writable types.

102

103

### Type Parameters

104

105

- `K extends Writable` - Key type (must implement Hadoop Writable)

106

- `V extends Writable` - Value type (must implement Hadoop Writable)

107

108

### Constructors

109

110

```java { .api }

111

public SequenceFileWriter()

112

```

113

114

Creates a SequenceFileWriter without compression.

115

116

```java { .api }

117

public SequenceFileWriter(String compressionCodecName)

118

```

119

120

Creates a SequenceFileWriter with the specified compression codec.

121

122

**Parameters:**

123

- `compressionCodecName` - Name of the compression codec (e.g., "org.apache.hadoop.io.compress.GzipCodec")

124

125

```java { .api }

126

public SequenceFileWriter(String compressionCodecName, org.apache.hadoop.io.SequenceFile.CompressionType compressionType)

127

```

128

129

Creates a SequenceFileWriter with full compression control.

130

131

**Parameters:**

132

- `compressionCodecName` - Name of the compression codec

133

- `compressionType` - Type of compression (NONE, RECORD, BLOCK)

134

135

### Usage Example

136

137

```java

138

import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;

139

import org.apache.flink.api.java.tuple.Tuple2;

140

import org.apache.hadoop.io.LongWritable;

141

import org.apache.hadoop.io.Text;

142

143

// No compression

144

SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>();

145

146

// With Gzip compression

147

SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(

148

"org.apache.hadoop.io.compress.GzipCodec"

149

);

150

151

// Full compression control

152

SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(

153

"org.apache.hadoop.io.compress.GzipCodec",

154

SequenceFile.CompressionType.BLOCK

155

);

156

157

BucketingSink<Tuple2<LongWritable, Text>> sink = new BucketingSink<>("/tmp/output");

158

sink.setWriter(writer);

159

```

160

161

## AvroKeyValueSinkWriter

162

163

Writes Avro key-value records for Tuple2 elements using specified Avro schemas.

164

165

### Type Parameters

166

167

- `K` - Key type

168

- `V` - Value type

169

170

### Constructors

171

172

```java { .api }

173

public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema)

174

```

175

176

Creates an AvroKeyValueSinkWriter with the specified schemas.

177

178

**Parameters:**

179

- `keySchema` - Avro schema for keys

180

- `valueSchema` - Avro schema for values

181

182

```java { .api }

183

public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory)

184

```

185

186

Creates an AvroKeyValueSinkWriter with compression.

187

188

**Parameters:**

189

- `keySchema` - Avro schema for keys

190

- `valueSchema` - Avro schema for values

191

- `codecFactory` - Avro compression codec factory

192

193

### Additional Configuration Methods

194

195

```java { .api }

196

public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval)

197

```

198

199

**Parameters:**

200

- `syncInterval` - Sync interval for Avro files

201

202

```java { .api }

203

public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval, Map<String, String> metadata)

204

```

205

206

**Parameters:**

207

- `metadata` - Metadata map for Avro files

208

209

### Usage Example

210

211

```java

212

import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;

213

import org.apache.avro.Schema;

214

import org.apache.avro.file.CodecFactory;

215

import org.apache.flink.api.java.tuple.Tuple2;

216

217

// Define Avro schemas

218

Schema keySchema = Schema.create(Schema.Type.LONG);

219

Schema valueSchema = Schema.create(Schema.Type.STRING);

220

221

// Basic writer

222

AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(keySchema, valueSchema);

223

224

// With compression

225

AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(

226

keySchema, valueSchema, CodecFactory.snappyCodec()

227

);

228

229

BucketingSink<Tuple2<Long, String>> sink = new BucketingSink<>("/tmp/output");

230

sink.setWriter(writer);

231

```

232

233

## StreamWriterBase

234

235

Abstract base class providing common functionality for writer implementations.

236

237

```java { .api }

238

public abstract class StreamWriterBase<T> implements Writer<T>

239

```

240

241

This class provides default implementations for common writer operations and can be extended to create custom writers.

242

243

### Custom Writer Implementation

244

245

```java

246

import org.apache.flink.streaming.connectors.fs.StreamWriterBase;

247

import org.apache.hadoop.fs.FileSystem;

248

import org.apache.hadoop.fs.Path;

249

import org.apache.hadoop.fs.FSDataOutputStream;

250

251

public class CustomWriter<T> extends StreamWriterBase<T> {

252

private transient FSDataOutputStream outputStream;

253

254

@Override

255

public void open(FileSystem fs, Path path) throws IOException {

256

super.open(fs, path);

257

this.outputStream = fs.create(path);

258

}

259

260

@Override

261

public void write(T element) throws IOException {

262

// Custom write logic

263

String data = processElement(element);

264

outputStream.writeBytes(data);

265

}

266

267

@Override

268

public Writer<T> duplicate() {

269

return new CustomWriter<>();

270

}

271

272

private String processElement(T element) {

273

// Custom processing logic

274

return element.toString() + "\\n";

275

}

276

}

277

```