0
# File I/O Operations
1
2
Input and output formats for reading and writing Avro files in Flink batch processing scenarios. Provides efficient file-based processing with support for compression, splitting, and type safety.
3
4
## AvroInputFormat
5
6
FileInputFormat implementation for reading Avro files in batch processing jobs.
7
8
```java { .api }
9
public class AvroInputFormat<E> extends FileInputFormat<E>
10
implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
11
12
// Constructor
13
public AvroInputFormat(Path filePath, Class<E> type);
14
15
// Configuration methods
16
public void setReuseAvroValue(boolean reuseAvroValue);
17
public void setUnsplittable(boolean unsplittable);
18
19
// Type information
20
public TypeInformation<E> getProducedType();
21
}
22
```
23
24
### Usage Examples
25
26
**Reading Specific Records:**
27
28
```java
29
import org.apache.flink.formats.avro.AvroInputFormat;
30
import org.apache.flink.core.fs.Path;
31
32
// Create input format for specific record type
33
AvroInputFormat<User> inputFormat = new AvroInputFormat<>(
34
new Path("hdfs://path/to/user/files/*.avro"),
35
User.class
36
);
37
38
// Configure reuse behavior
39
inputFormat.setReuseAvroValue(true); // Default: true for better performance
40
41
// Create dataset
42
DataSet<User> users = env.createInput(inputFormat);
43
```
44
45
**Reading Generic Records:**
46
47
```java
48
import org.apache.avro.generic.GenericRecord;
49
50
// Create input format for generic records
51
AvroInputFormat<GenericRecord> genericInputFormat = new AvroInputFormat<>(
52
new Path("hdfs://path/to/data/*.avro"),
53
GenericRecord.class
54
);
55
56
// Use in batch job
57
DataSet<GenericRecord> records = env.createInput(genericInputFormat);
58
59
// Process generic records
60
DataSet<String> names = records.map(record -> record.get("name").toString());
61
```
62
63
**File Splitting Control:**
64
65
```java
66
// Allow file splitting for parallel processing (default)
67
inputFormat.setUnsplittable(false);
68
69
// Force reading entire files (useful for small files)
70
inputFormat.setUnsplittable(true);
71
72
// Process with parallelism
73
DataSet<User> users = env.createInput(inputFormat)
74
.setParallelism(4);
75
```
76
77
## AvroOutputFormat
78
79
FileOutputFormat implementation for writing Avro files in batch processing jobs.
80
81
```java { .api }
82
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
83
84
// Constructors
85
public AvroOutputFormat(Path filePath, Class<E> type);
86
public AvroOutputFormat(Class<E> type);
87
88
// Configuration methods
89
public void setSchema(Schema schema);
90
public void setCodec(Codec codec);
91
92
// Codec enum
93
public enum Codec {
94
NULL, SNAPPY, BZIP2, DEFLATE, XZ
95
}
96
}
97
```
98
99
### Usage Examples
100
101
**Writing Specific Records:**
102
103
```java
104
import org.apache.flink.formats.avro.AvroOutputFormat;
105
106
// Create output format with path and type
107
AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(
108
new Path("hdfs://output/path/users.avro"),
109
User.class
110
);
111
112
// Configure compression
113
outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);
114
115
// Write dataset
116
DataSet<User> users = ...;
117
users.output(outputFormat);
118
```
119
120
**Writing Generic Records:**
121
122
```java
123
import org.apache.avro.Schema;
124
import org.apache.avro.generic.GenericRecord;
125
126
// Create output format for generic records
127
AvroOutputFormat<GenericRecord> genericOutputFormat = new AvroOutputFormat<>(
128
new Path("hdfs://output/path/records.avro"),
129
GenericRecord.class
130
);
131
132
// Set explicit schema for generic records
133
Schema schema = new Schema.Parser().parse(schemaString);
134
genericOutputFormat.setSchema(schema);
135
136
// Configure compression
137
genericOutputFormat.setCodec(AvroOutputFormat.Codec.DEFLATE);
138
139
// Write generic records
140
DataSet<GenericRecord> records = ...;
141
records.output(genericOutputFormat);
142
```
143
144
**Dynamic Output Paths:**
145
146
```java
147
// Create output format without fixed path
148
AvroOutputFormat<User> dynamicOutputFormat = new AvroOutputFormat<>(User.class);
149
150
// Use with custom output logic
151
users.output(dynamicOutputFormat)
152
.withCustomPartitioning(new UserPartitioner());
153
```
154
155
## Compression Codecs
156
157
Support for various compression algorithms to reduce file size and improve I/O performance.
158
159
```java { .api }
160
public enum Codec {
161
NULL((byte) 0, CodecFactory.nullCodec()), // No compression
162
SNAPPY((byte) 1, CodecFactory.snappyCodec()), // Fast compression
163
BZIP2((byte) 2, CodecFactory.bzip2Codec()), // High compression ratio
164
DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), // Standard compression
165
XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); // High compression ratio
166
}
167
```
168
169
### Codec Selection Guidelines
170
171
**SNAPPY (Recommended):**
172
- Fast compression and decompression
173
- Good balance between speed and compression ratio
174
- Default codec for most use cases
175
176
**DEFLATE:**
177
- Standard compression algorithm
178
- Better compression than SNAPPY, slower processing
179
- Good for storage-constrained environments
180
181
**BZIP2:**
182
- High compression ratio
183
- Slower than SNAPPY and DEFLATE
184
- Best for archival storage
185
186
**XZ:**
187
- Highest compression ratio
188
- Slowest processing
189
- Best for long-term storage with infrequent access
190
191
**NULL:**
192
- No compression
193
- Fastest processing
194
- Use when storage space is not a concern
195
196
## Performance Optimization
197
198
### Input Format Optimization
199
200
**File Splitting:**
201
```java
202
// Enable splitting for large files (parallel processing)
203
inputFormat.setUnsplittable(false);
204
205
// Disable splitting for small files (reduce overhead)
206
inputFormat.setUnsplittable(true);
207
```
208
209
**Object Reuse:**
210
```java
211
// Enable object reuse for better memory performance (default)
212
inputFormat.setReuseAvroValue(true);
213
214
// Disable if objects need to be retained across operations
215
inputFormat.setReuseAvroValue(false);
216
```
217
218
### Output Format Optimization
219
220
**Compression Selection:**
221
```java
222
// For high-throughput scenarios
223
outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);
224
225
// For storage optimization
226
outputFormat.setCodec(AvroOutputFormat.Codec.BZIP2);
227
228
// For archival
229
outputFormat.setCodec(AvroOutputFormat.Codec.XZ);
230
```
231
232
## Error Handling
233
234
**Input Errors:**
235
- File not found: Throws `FileNotFoundException`
236
- Schema mismatch: Throws `IOException` with detailed error message
237
- Corrupted files: Throws `AvroRuntimeException`
238
239
**Output Errors:**
240
- Path creation failure: Throws `IOException`
241
- Schema validation errors: Throws `IllegalArgumentException`
242
- Disk space issues: Throws `IOException`
243
244
### Error Recovery Patterns
245
246
```java
247
// Robust input processing
248
try {
249
DataSet<User> users = env.createInput(inputFormat);
250
// Process data
251
} catch (Exception e) {
252
logger.error("Failed to read Avro files", e);
253
// Implement fallback or retry logic
254
}
255
256
// Safe output writing
257
try {
258
users.output(outputFormat);
259
env.execute("Write Avro Files");
260
} catch (Exception e) {
261
logger.error("Failed to write Avro files", e);
262
// Clean up partial files or retry
263
}
264
```
265
266
## Integration with Hadoop Ecosystem
267
268
**HDFS Integration:**
269
```java
270
// Read from HDFS
271
AvroInputFormat<User> hdfsInputFormat = new AvroInputFormat<>(
272
new Path("hdfs://namenode:8020/data/users/*.avro"),
273
User.class
274
);
275
276
// Write to HDFS with replication
277
AvroOutputFormat<User> hdfsOutputFormat = new AvroOutputFormat<>(
278
new Path("hdfs://namenode:8020/output/users.avro"),
279
User.class
280
);
281
```
282
283
**S3 Integration:**
284
```java
285
// Read from S3
286
AvroInputFormat<User> s3InputFormat = new AvroInputFormat<>(
287
new Path("s3a://bucket/data/users/*.avro"),
288
User.class
289
);
290
291
// Write to S3
292
AvroOutputFormat<User> s3OutputFormat = new AvroOutputFormat<>(
293
new Path("s3a://bucket/output/users.avro"),
294
User.class
295
);
296
```