0
# Configuration
1
2
Configuration classes provide options for customizing file naming patterns, writer properties, and behavior of the file sink components.
3
4
## Capabilities
5
6
### OutputFileConfig
7
8
Configuration class for customizing part file naming patterns.
9
10
```java { .api }
11
/**
12
* Part file name configuration
13
* Allows defining a prefix and suffix to the part file name
14
*/
15
public class OutputFileConfig implements Serializable {
16
/**
17
* Initiates the OutputFileConfig with values passed as parameters
18
* @param partPrefix the beginning of part file name
19
* @param partSuffix the ending of part file name
20
*/
21
public OutputFileConfig(String partPrefix, String partSuffix);
22
23
/**
24
* The prefix for the part name
25
* @return the part file prefix
26
*/
27
public String getPartPrefix();
28
29
/**
30
* The suffix for the part name
31
* @return the part file suffix
32
*/
33
public String getPartSuffix();
34
35
/**
36
* Creates a builder to create the part file configuration
37
* @return new OutputFileConfigBuilder instance
38
*/
39
public static OutputFileConfigBuilder builder();
40
}
41
```
42
43
### OutputFileConfigBuilder
44
45
Builder for creating OutputFileConfig instances with fluent API.
46
47
```java { .api }
48
/**
49
* Builder to create the part file configuration
50
*/
51
public static class OutputFileConfigBuilder {
52
/** Default part prefix: "part" */
53
private static final String DEFAULT_PART_PREFIX = "part";
54
55
/** Default part suffix: "" (empty string) */
56
private static final String DEFAULT_PART_SUFFIX = "";
57
58
/**
59
* Sets the prefix for part files
60
* @param prefix the desired prefix
61
* @return this builder instance
62
*/
63
public OutputFileConfigBuilder withPartPrefix(String prefix);
64
65
/**
66
* Sets the suffix for part files
67
* @param suffix the desired suffix
68
* @return this builder instance
69
*/
70
public OutputFileConfigBuilder withPartSuffix(String suffix);
71
72
/**
73
* Creates the OutputFileConfig instance
74
* @return configured OutputFileConfig
75
*/
76
public OutputFileConfig build();
77
}
78
```
79
80
**Usage Examples:**
81
82
```java
83
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
84
85
// Default configuration: prefix="part", suffix=""
86
OutputFileConfig defaultConfig = OutputFileConfig.builder().build();
87
// Generates files like: part-0, part-1, part-2
88
89
// Custom prefix and suffix
90
OutputFileConfig customConfig = OutputFileConfig.builder()
91
.withPartPrefix("data")
92
.withPartSuffix(".txt")
93
.build();
94
// Generates files like: data-0.txt, data-1.txt, data-2.txt
95
96
// JSON files with timestamp prefix
97
OutputFileConfig jsonConfig = OutputFileConfig.builder()
98
.withPartPrefix("events-" + System.currentTimeMillis())
99
.withPartSuffix(".json")
100
.build();
101
// Generates files like: events-1640995200000-0.json
102
103
// Log files with descriptive naming
104
OutputFileConfig logConfig = OutputFileConfig.builder()
105
.withPartPrefix("application-logs")
106
.withPartSuffix(".log")
107
.build();
108
// Generates files like: application-logs-0.log
109
```
110
111
### WriterProperties
112
113
Configuration class describing the properties and capabilities of a BucketWriter.
114
115
```java { .api }
116
/**
117
* Class describing the property of the BucketWriter
118
*/
119
public class WriterProperties {
120
/**
121
* Creates WriterProperties with serializers and resume support flag
122
* @param inProgressFileRecoverableSerializer serializer for in-progress file recoverables
123
* @param pendingFileRecoverableSerializer serializer for pending file recoverables
124
* @param supportsResume whether the BucketWriter supports appending data to restored files
125
*/
126
public WriterProperties(
127
SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
128
SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
129
boolean supportsResume);
130
131
/**
132
* @return Whether the BucketWriter supports appending data to the restored in-progress file
133
*/
134
public boolean supportsResume();
135
136
/**
137
* @return the serializer for the PendingFileRecoverable
138
*/
139
public SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>
140
getPendingFileRecoverableSerializer();
141
142
/**
143
* @return the serializer for the InProgressFileRecoverable
144
*/
145
public SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>
146
getInProgressFileRecoverableSerializer();
147
}
148
```
149
150
**Usage Example:**
151
152
```java
153
import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
154
import org.apache.flink.core.io.SimpleVersionedSerializer;
155
156
// Example of creating WriterProperties for a custom BucketWriter
157
public class MyBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
158
private final WriterProperties properties;
159
160
public MyBucketWriter() {
161
// Define serializers for recovery state
162
SimpleVersionedSerializer<InProgressFileRecoverable> inProgressSerializer = // ... implementation
163
SimpleVersionedSerializer<PendingFileRecoverable> pendingSerializer = // ... implementation
164
165
// Configure properties
166
this.properties = new WriterProperties(
167
inProgressSerializer,
168
pendingSerializer,
169
true // This writer supports resume operations
170
);
171
}
172
173
@Override
174
public WriterProperties getProperties() {
175
return properties;
176
}
177
178
// ... other BucketWriter methods
179
}
180
```
181
182
## Configuration Patterns
183
184
### File Naming Strategies
185
186
```java
187
// Time-based naming
188
OutputFileConfig timeBasedConfig = OutputFileConfig.builder()
189
.withPartPrefix("data-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss")))
190
.withPartSuffix(".avro")
191
.build();
192
193
// Environment-specific naming
194
String environment = System.getProperty("app.environment", "dev");
195
OutputFileConfig envConfig = OutputFileConfig.builder()
196
.withPartPrefix(environment + "-events")
197
.withPartSuffix(".parquet")
198
.build();
199
200
// Content-type specific naming
201
OutputFileConfig csvConfig = OutputFileConfig.builder()
202
.withPartPrefix("export")
203
.withPartSuffix(".csv")
204
.build();
205
```
206
207
### Writer Properties Configuration
208
209
```java
210
// High-performance writer with resume support
211
WriterProperties highPerfProperties = new WriterProperties(
212
customInProgressSerializer,
213
customPendingSerializer,
214
true // Supports resume for fault tolerance
215
);
216
217
// Simple writer without resume capability
218
WriterProperties simpleProperties = new WriterProperties(
219
basicInProgressSerializer,
220
basicPendingSerializer,
221
false // No resume support - simpler but less fault tolerant
222
);
223
```
224
225
## Integration with File Sink
226
227
These configuration classes are typically used when setting up file sinks:
228
229
```java
230
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
231
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
232
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
233
234
// Complete file sink configuration
235
StreamingFileSink<String> sink = StreamingFileSink
236
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
237
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
238
.withRollingPolicy(DefaultRollingPolicy.<String, String>builder()
239
.withRolloverInterval(Duration.ofMinutes(15))
240
.withInactivityInterval(Duration.ofMinutes(5))
241
.build())
242
.withOutputFileConfig(OutputFileConfig.builder()
243
.withPartPrefix("events")
244
.withPartSuffix(".txt")
245
.build())
246
.build();
247
```
248
249
## Error Handling
250
251
- Invalid prefix or suffix strings may cause file system errors
252
- Null values for prefix or suffix are not allowed (checked by Preconditions)
253
- Resume capability mismatches between properties and actual implementation will cause runtime failures
254
- Serializer failures during recovery will cause job restart
255
- File naming conflicts may occur if multiple sinks use the same configuration