0
# File Writers
1
2
Writers handle the actual file I/O operations for different data formats. The filesystem connector provides several built-in writer implementations and allows custom writers through the Writer interface.
3
4
## Writer Interface
5
6
The base interface that all writers must implement.
7
8
```java { .api }
9
public interface Writer<T> extends Serializable
10
```
11
12
### Core Methods
13
14
```java { .api }
15
void open(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path) throws IOException
16
```
17
18
Initializes the writer for a newly opened bucket file.
19
20
**Parameters:**
21
- `fs` - The Hadoop FileSystem containing the file
22
- `path` - The Path of the newly opened file
23
24
```java { .api }
25
void write(T element) throws IOException
26
```
27
28
Writes one element to the bucket file.
29
30
**Parameters:**
31
- `element` - The element to write
32
33
```java { .api }
34
long flush() throws IOException
35
```
36
37
Flushes internally held data and returns the offset for file truncation during recovery.
38
39
**Returns:** The file offset that the file must be truncated to at recovery
40
41
```java { .api }
42
long getPos() throws IOException
43
```
44
45
Retrieves the current position (size) of the output file.
46
47
**Returns:** Current file position in bytes
48
49
```java { .api }
50
void close() throws IOException
51
```
52
53
Closes the writer and associated resources. Safe to call multiple times.
54
55
```java { .api }
56
Writer<T> duplicate()
57
```
58
59
Creates a duplicate of this writer for parallel sink instances.
60
61
**Returns:** A new Writer instance
62
63
## StringWriter
64
65
Writes elements as strings with newline separation using configurable character encoding.
66
67
### Constructors
68
69
```java { .api }
70
public StringWriter()
71
```
72
73
Creates a StringWriter using UTF-8 encoding.
74
75
```java { .api }
76
public StringWriter(String charsetName)
77
```
78
79
Creates a StringWriter with the specified character encoding.
80
81
**Parameters:**
82
- `charsetName` - Character set name (e.g., "UTF-8", "ISO-8859-1")
83
84
### Usage Example
85
86
```java
87
import org.apache.flink.streaming.connectors.fs.StringWriter;
88
89
// UTF-8 encoding (default)
90
StringWriter<String> writer = new StringWriter<>();
91
92
// Custom encoding
93
StringWriter<String> writer = new StringWriter<>("ISO-8859-1");
94
95
BucketingSink<String> sink = new BucketingSink<>("/tmp/output");
96
sink.setWriter(writer);
97
```
98
99
## SequenceFileWriter
100
101
Writes Hadoop SequenceFiles for Tuple2 elements containing Hadoop Writable types.
102
103
### Type Parameters
104
105
- `K extends Writable` - Key type (must implement Hadoop Writable)
106
- `V extends Writable` - Value type (must implement Hadoop Writable)
107
108
### Constructors
109
110
```java { .api }
111
public SequenceFileWriter()
112
```
113
114
Creates a SequenceFileWriter without compression.
115
116
```java { .api }
117
public SequenceFileWriter(String compressionCodecName)
118
```
119
120
Creates a SequenceFileWriter with the specified compression codec.
121
122
**Parameters:**
123
- `compressionCodecName` - Name of the compression codec (e.g., "org.apache.hadoop.io.compress.GzipCodec")
124
125
```java { .api }
126
public SequenceFileWriter(String compressionCodecName, org.apache.hadoop.io.SequenceFile.CompressionType compressionType)
127
```
128
129
Creates a SequenceFileWriter with full compression control.
130
131
**Parameters:**
132
- `compressionCodecName` - Name of the compression codec
133
- `compressionType` - Type of compression (NONE, RECORD, BLOCK)
134
135
### Usage Example
136
137
```java
138
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
139
import org.apache.flink.api.java.tuple.Tuple2;
140
import org.apache.hadoop.io.LongWritable;
141
import org.apache.hadoop.io.Text;
142
143
// No compression
144
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>();
145
146
// With Gzip compression
147
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(
148
"org.apache.hadoop.io.compress.GzipCodec"
149
);
150
151
// Full compression control
152
SequenceFileWriter<LongWritable, Text> writer = new SequenceFileWriter<>(
153
"org.apache.hadoop.io.compress.GzipCodec",
154
SequenceFile.CompressionType.BLOCK
155
);
156
157
BucketingSink<Tuple2<LongWritable, Text>> sink = new BucketingSink<>("/tmp/output");
158
sink.setWriter(writer);
159
```
160
161
## AvroKeyValueSinkWriter
162
163
Writes Avro key-value records for Tuple2 elements using specified Avro schemas.
164
165
### Type Parameters
166
167
- `K` - Key type
168
- `V` - Value type
169
170
### Constructors
171
172
```java { .api }
173
public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema)
174
```
175
176
Creates an AvroKeyValueSinkWriter with the specified schemas.
177
178
**Parameters:**
179
- `keySchema` - Avro schema for keys
180
- `valueSchema` - Avro schema for values
181
182
```java { .api }
183
public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory)
184
```
185
186
Creates an AvroKeyValueSinkWriter with compression.
187
188
**Parameters:**
189
- `keySchema` - Avro schema for keys
190
- `valueSchema` - Avro schema for values
191
- `codecFactory` - Avro compression codec factory
192
193
### Additional Configuration Methods
194
195
```java { .api }
196
public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval)
197
```
198
199
**Parameters:**
200
- `syncInterval` - Sync interval for Avro files
201
202
```java { .api }
203
public AvroKeyValueSinkWriter(org.apache.avro.Schema keySchema, org.apache.avro.Schema valueSchema, org.apache.avro.file.CodecFactory codecFactory, int syncInterval, Map<String, String> metadata)
204
```
205
206
**Parameters:**
207
- `metadata` - Metadata map for Avro files
208
209
### Usage Example
210
211
```java
212
import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
213
import org.apache.avro.Schema;
214
import org.apache.avro.file.CodecFactory;
215
import org.apache.flink.api.java.tuple.Tuple2;
216
217
// Define Avro schemas
218
Schema keySchema = Schema.create(Schema.Type.LONG);
219
Schema valueSchema = Schema.create(Schema.Type.STRING);
220
221
// Basic writer
222
AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(keySchema, valueSchema);
223
224
// With compression
225
AvroKeyValueSinkWriter<Long, String> writer = new AvroKeyValueSinkWriter<>(
226
keySchema, valueSchema, CodecFactory.snappyCodec()
227
);
228
229
BucketingSink<Tuple2<Long, String>> sink = new BucketingSink<>("/tmp/output");
230
sink.setWriter(writer);
231
```
232
233
## StreamWriterBase
234
235
Abstract base class providing common functionality for writer implementations.
236
237
```java { .api }
238
public abstract class StreamWriterBase<T> implements Writer<T>
239
```
240
241
This class provides default implementations for common writer operations and can be extended to create custom writers.
242
243
### Custom Writer Implementation
244
245
```java
246
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
247
import org.apache.hadoop.fs.FileSystem;
248
import org.apache.hadoop.fs.Path;
249
import org.apache.hadoop.fs.FSDataOutputStream;
250
251
public class CustomWriter<T> extends StreamWriterBase<T> {
252
private transient FSDataOutputStream outputStream;
253
254
@Override
255
public void open(FileSystem fs, Path path) throws IOException {
256
super.open(fs, path);
257
this.outputStream = fs.create(path);
258
}
259
260
@Override
261
public void write(T element) throws IOException {
262
// Custom write logic
263
String data = processElement(element);
264
outputStream.writeBytes(data);
265
}
266
267
@Override
268
public Writer<T> duplicate() {
269
return new CustomWriter<>();
270
}
271
272
private String processElement(T element) {
273
// Custom processing logic
274
return element.toString() + "\\n";
275
}
276
}
277
```