0
# File System Operations
1
2
## Capabilities
3
4
### Avro Input Format
5
6
File input format for reading Avro files in batch processing scenarios.
7
8
```java { .api }
9
/**
10
* Input format for reading Avro files
11
* @param <T> Type of records to read
12
*/
13
public class AvroInputFormat<T> extends FileInputFormat<T> {
14
15
/**
16
* Creates AvroInputFormat for SpecificRecord types
17
* @param filePath Path to Avro file(s)
18
* @param recordClazz SpecificRecord class to deserialize to
19
*/
20
public AvroInputFormat(Path filePath, Class<T> recordClazz);
21
22
/**
23
* Creates AvroInputFormat for GenericRecord types
24
* @param filePath Path to Avro file(s)
25
* @param schema Avro schema for GenericRecord deserialization
26
*/
27
public AvroInputFormat(Path filePath, Schema schema);
28
29
/**
30
* Reads the next record from the input
31
* @param reuse Object to reuse for the next record (may be null)
32
* @return Next record, or null if end of input
33
* @throws IOException If reading fails
34
*/
35
public T nextRecord(T reuse) throws IOException;
36
37
/**
38
* Checks if the input has been exhausted
39
* @return true if no more records available
40
* @throws IOException If check fails
41
*/
42
public boolean reachedEnd() throws IOException;
43
}
44
```
45
46
### Avro Output Format
47
48
File output format for writing Avro files in batch processing scenarios.
49
50
```java { .api }
51
/**
52
* Output format for writing Avro files
53
* @param <T> Type of records to write
54
*/
55
public class AvroOutputFormat<T> extends FileOutputFormat<T> {
56
57
/**
58
* Creates AvroOutputFormat for SpecificRecord types
59
* @param outputFilePath Path where to write Avro file
60
* @param recordClazz SpecificRecord class to serialize from
61
*/
62
public AvroOutputFormat(Path outputFilePath, Class<T> recordClazz);
63
64
/**
65
* Creates AvroOutputFormat for GenericRecord types
66
* @param outputFilePath Path where to write Avro file
67
* @param schema Avro schema for GenericRecord serialization
68
*/
69
public AvroOutputFormat(Path outputFilePath, Schema schema);
70
71
/**
72
* Writes a record to the output
73
* @param record Record to write
74
* @throws IOException If writing fails
75
*/
76
public void writeRecord(T record) throws IOException;
77
78
/**
79
* Sets the compression codec for the output file
80
* @param codecName Name of compression codec (snappy, gzip, etc.)
81
*/
82
public void setCodec(String codecName);
83
}
84
```
85
86
### Bulk Writer Support
87
88
High-performance bulk writing for streaming scenarios.
89
90
```java { .api }
91
/**
92
* Bulk writer for Avro files that wraps an Avro DataFileWriter
93
* @param <T> Type of records to write
94
*/
95
public class AvroBulkWriter<T> implements BulkWriter<T> {
96
97
/**
98
* Creates a new AvroBulkWriter wrapping the given Avro DataFileWriter
99
* @param dataFileWriter The underlying Avro DataFileWriter
100
*/
101
public AvroBulkWriter(DataFileWriter<T> dataFileWriter);
102
103
/**
104
* Adds an element to the writer
105
* @param element Element to write
106
* @throws IOException If writing fails
107
*/
108
public void addElement(T element) throws IOException;
109
110
/**
111
* Flushes pending writes
112
* @throws IOException If flush fails
113
*/
114
public void flush() throws IOException;
115
116
/**
117
* Finishes writing and closes the writer
118
* @throws IOException If finish fails
119
*/
120
public void finish() throws IOException;
121
}
122
123
/**
124
* Factory for creating Avro bulk writers
125
* @param <T> Type of records to write
126
*/
127
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
128
129
/**
130
* Creates AvroWriterFactory with AvroBuilder
131
* @param avroBuilder Builder for creating DataFileWriter instances
132
*/
133
public AvroWriterFactory(AvroBuilder<T> avroBuilder);
134
135
/**
136
* Creates a bulk writer for the given output stream
137
* @param out Output stream to write to
138
* @return Bulk writer instance
139
* @throws IOException If creation fails
140
*/
141
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
142
}
143
144
/**
145
* Builder interface for creating Avro DataFileWriter instances
146
* This is a functional interface that extends Serializable
147
* @param <T> Type of records the writer will handle
148
*/
149
@FunctionalInterface
150
public interface AvroBuilder<T> extends Serializable {
151
152
/**
153
* Creates and configures an Avro writer to the given output stream
154
* @param outputStream Output stream to write to
155
* @return Configured DataFileWriter
156
* @throws IOException If creation fails
157
*/
158
DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
159
}
160
```
161
162
## Usage Examples
163
164
### Reading Avro Files - Batch Processing
165
166
```java
167
// Reading SpecificRecord files
168
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
169
170
// For generated SpecificRecord classes
171
AvroInputFormat<User> inputFormat =
172
new AvroInputFormat<>(new Path("/path/to/user/files/*.avro"), User.class);
173
174
DataSet<User> users = env.createInput(inputFormat);
175
176
// Process the data
177
DataSet<String> usernames = users
178
.filter(user -> user.getAge() > 18)
179
.map(user -> user.getUsername().toString());
180
181
// Reading GenericRecord files
182
Schema schema = new Schema.Parser().parse(new File("user-schema.avsc"));
183
AvroInputFormat<GenericRecord> genericInputFormat =
184
new AvroInputFormat<>(new Path("/path/to/generic/files/*.avro"), schema);
185
186
DataSet<GenericRecord> records = env.createInput(genericInputFormat);
187
188
// Process generic records
189
DataSet<Long> userIds = records
190
.map(record -> (Long) record.get("user_id"));
191
```
192
193
### Writing Avro Files - Batch Processing
194
195
```java
196
// Writing SpecificRecord files
197
DataSet<User> users = // ... your user dataset
198
199
AvroOutputFormat<User> outputFormat =
200
new AvroOutputFormat<>(new Path("/output/path/users.avro"), User.class);
201
202
// Set compression codec
203
outputFormat.setCodec("snappy");
204
205
users.output(outputFormat);
206
207
// Writing GenericRecord files with custom schema
208
Schema schema = SchemaBuilder.record("ProcessedUser")
209
.fields()
210
.name("id").type().longType().noDefault()
211
.name("processed_name").type().stringType().noDefault()
212
.name("score").type().doubleType().noDefault()
213
.endRecord();
214
215
DataSet<GenericRecord> processedUsers = users
216
.map(new MapFunction<User, GenericRecord>() {
217
@Override
218
public GenericRecord map(User user) throws Exception {
219
GenericRecord record = new GenericData.Record(schema);
220
record.put("id", user.getId());
221
record.put("processed_name", user.getUsername().toString().toUpperCase());
222
record.put("score", calculateScore(user));
223
return record;
224
}
225
});
226
227
AvroOutputFormat<GenericRecord> genericOutputFormat =
228
new AvroOutputFormat<>(new Path("/output/path/processed.avro"), schema);
229
230
processedUsers.output(genericOutputFormat);
231
232
env.execute("Process Avro Files");
233
```
234
235
### Bulk Writing - Streaming to Files
236
237
```java
238
// Setup for streaming to Avro files
239
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
240
241
// Create Avro builder for SpecificRecord
242
AvroBuilder<User> avroBuilder = new AvroBuilder<User>() {
243
@Override
244
public DataFileWriter<User> createWriter(OutputStream out) throws IOException {
245
DatumWriter<User> datumWriter = new SpecificDatumWriter<>(User.class);
246
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
247
248
// Set codec
249
dataFileWriter.setCodec(CodecFactory.snappyCodec());
250
251
// Create with schema from SpecificRecord
252
dataFileWriter.create(User.getClassSchema(), out);
253
return dataFileWriter;
254
}
255
};
256
257
// Create writer factory
258
AvroWriterFactory<User> writerFactory = new AvroWriterFactory<>(avroBuilder);
259
260
// Stream to files
261
DataStream<User> userStream = // ... your user stream
262
263
userStream
264
.addSink(StreamingFileSink
265
.forBulkFormat(new Path("/streaming/output/path"), writerFactory)
266
.withRollingPolicy(OnCheckpointRollingPolicy.build())
267
.build());
268
269
env.execute("Stream to Avro Files");
270
```
271
272
### Custom Avro Builder with Compression
273
274
```java
275
// Custom builder for GenericRecord with custom configuration
276
AvroBuilder<GenericRecord> customBuilder = new AvroBuilder<GenericRecord>() {
277
private final Schema schema;
278
279
public CustomAvroBuilder(Schema schema) {
280
this.schema = schema;
281
}
282
283
@Override
284
public DataFileWriter<GenericRecord> createWriter(OutputStream out) throws IOException {
285
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
286
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
287
288
// Configure compression
289
dataFileWriter.setCodec(CodecFactory.deflateCodec(6)); // Compression level 6
290
291
// Set custom metadata
292
dataFileWriter.setMeta("created_by", "flink-sql-avro");
293
dataFileWriter.setMeta("created_at", System.currentTimeMillis());
294
295
dataFileWriter.create(schema, out);
296
return dataFileWriter;
297
}
298
};
299
300
// Use custom builder
301
AvroWriterFactory<GenericRecord> customWriterFactory =
302
new AvroWriterFactory<>(customBuilder);
303
```
304
305
### File System Table Integration
306
307
```java
308
// Create file system table with Avro format
309
String createTableSQL = """
310
CREATE TABLE user_files (
311
user_id BIGINT,
312
username STRING,
313
email STRING,
314
registration_date DATE,
315
last_login TIMESTAMP(3)
316
) PARTITIONED BY (registration_date)
317
WITH (
318
'connector' = 'filesystem',
319
'path' = '/data/warehouse/users',
320
'format' = 'avro',
321
'avro.codec' = 'snappy',
322
'sink.partition-commit.policy.kind' = 'success-file'
323
)
324
""";
325
326
tableEnv.executeSql(createTableSQL);
327
328
// Insert data into partitioned Avro files
329
String insertSQL = """
330
INSERT INTO user_files
331
SELECT
332
user_id,
333
username,
334
email,
335
CAST(created_at AS DATE) as registration_date,
336
last_login
337
FROM user_stream
338
""";
339
340
tableEnv.executeSql(insertSQL);
341
342
// Read from partitioned Avro files
343
Table result = tableEnv.sqlQuery("""
344
SELECT
345
COUNT(*) as daily_registrations,
346
registration_date
347
FROM user_files
348
WHERE registration_date >= CURRENT_DATE - INTERVAL '30' DAY
349
GROUP BY registration_date
350
ORDER BY registration_date
351
""");
352
```
353
354
### Performance Optimization
355
356
```java
357
// Optimized settings for large file processing
358
AvroInputFormat<GenericRecord> optimizedInput =
359
new AvroInputFormat<>(new Path("/large/files/*.avro"), schema);
360
361
// Configure parallel reading
362
env.getConfig().setParallelism(16);
363
364
// For output, use appropriate compression
365
AvroOutputFormat<GenericRecord> optimizedOutput =
366
new AvroOutputFormat<>(new Path("/output/compressed.avro"), schema);
367
368
// Balance between compression ratio and speed
369
optimizedOutput.setCodec("snappy"); // Fast compression
370
// optimizedOutput.setCodec("gzip"); // Better compression ratio
371
// optimizedOutput.setCodec("zstd"); // Good balance
372
373
// For streaming, configure checkpointing for fault tolerance
374
env.enableCheckpointing(60000); // Checkpoint every minute
375
376
// Bulk writing with appropriate rolling policy
377
StreamingFileSink<GenericRecord> sink = StreamingFileSink
378
.forBulkFormat(new Path("/streaming/output"), writerFactory)
379
.withRollingPolicy(
380
DefaultRollingPolicy.builder()
381
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) // Roll every 15 minutes
382
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) // Roll after 5 minutes of inactivity
383
.withMaxPartSize(1024 * 1024 * 128) // Roll at 128MB
384
.build())
385
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
386
.build();
387
```