0
# Configuration Options
1
2
Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization. All configuration options use Flink's ConfigOption framework with support for deprecated keys and environment variable overrides.
3
4
## Capabilities
5
6
### FsStateChangelogOptions
7
8
Configuration options class containing all settings for filesystem-based changelog storage.
9
10
```java { .api }
11
/**
12
* Configuration options for FsStateChangelogStorage.
13
* All options use the "state.changelog.dstl.dfs" prefix.
14
*/
15
public class FsStateChangelogOptions {
16
17
/** Base path for storing changelog files */
18
public static final ConfigOption<String> BASE_PATH;
19
20
/** Enable/disable compression for changelog serialization */
21
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
22
23
/** Size threshold for preemptive persistence */
24
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
25
26
/** Delay before persisting changelog after checkpoint trigger */
27
public static final ConfigOption<Duration> PERSIST_DELAY;
28
29
/** Size threshold for batched persistence operations */
30
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
31
32
/** Buffer size for upload operations */
33
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
34
35
/** Number of threads for upload operations */
36
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
37
38
/** Number of threads for discarding unused changelog data */
39
public static final ConfigOption<Integer> NUM_DISCARD_THREADS;
40
41
/** Maximum amount of data allowed to be in-flight */
42
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
43
44
/** Retry policy for failed uploads */
45
public static final ConfigOption<String> RETRY_POLICY;
46
47
/** Timeout for individual upload operations */
48
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
49
50
/** Maximum number of retry attempts */
51
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
52
53
/** Delay before next retry attempt after failure */
54
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;
55
56
/** Cache idle timeout for recovery operations */
57
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;
58
}
59
```
60
61
### Storage Path Configuration
62
63
Configure the base path where changelog files are stored:
64
65
```java { .api }
66
/**
67
* Base path to store changelog files
68
* Key: "state.changelog.dstl.dfs.base-path"
69
* Deprecated key: "dstl.dfs.base-path"
70
* Type: String
71
* Required: Yes
72
*/
73
public static final ConfigOption<String> BASE_PATH =
74
ConfigOptions.key("state.changelog.dstl.dfs.base-path")
75
.stringType()
76
.noDefaultValue()
77
.withDeprecatedKeys("dstl.dfs.base-path")
78
.withDescription("Base path to store changelog files.");
79
```
80
81
**Usage Examples:**
82
83
```java
84
Configuration config = new Configuration();
85
86
// HDFS path
87
config.set(FsStateChangelogOptions.BASE_PATH, "hdfs://namenode:8020/flink/changelog");
88
89
// S3 path
90
config.set(FsStateChangelogOptions.BASE_PATH, "s3://my-bucket/changelog");
91
92
// Local filesystem (for testing)
93
config.set(FsStateChangelogOptions.BASE_PATH, "file:///tmp/changelog");
94
```
95
96
### Performance Tuning Options
97
98
Configure performance-related settings for optimal throughput:
99
100
```java { .api }
101
/**
102
* Enable compression when serializing changelog
103
* Key: "state.changelog.dstl.dfs.compression.enabled"
104
* Default: false
105
*/
106
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
107
108
/**
109
* Size threshold for preemptive persistence
110
* Key: "state.changelog.dstl.dfs.preemptive-persist-threshold"
111
* Default: 5MB
112
*/
113
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
114
115
/**
116
* Buffer size used when uploading change sets
117
* Key: "state.changelog.dstl.dfs.upload.buffer-size"
118
* Default: 1MB
119
*/
120
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
121
122
/**
123
* Number of threads to use for upload operations
124
* Key: "state.changelog.dstl.dfs.upload.num-threads"
125
* Default: 5
126
*/
127
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
128
```
129
130
**Usage Examples:**
131
132
```java
133
Configuration config = new Configuration();
134
135
// Enable compression for better storage efficiency
136
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);
137
138
// Reduce preemptive threshold for faster checkpoints
139
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("2MB"));
140
141
// Increase buffer size for high-throughput workloads
142
config.set(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE, MemorySize.parse("4MB"));
143
144
// Increase upload threads for better parallelism
145
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);
146
```
147
148
### Batching and Flow Control
149
150
Configure batching behavior and backpressure settings:
151
152
```java { .api }
153
/**
154
* Delay before persisting changelog after receiving persist request
155
* Key: "state.changelog.dstl.dfs.batch.persist-delay"
156
* Default: 10ms
157
*/
158
public static final ConfigOption<Duration> PERSIST_DELAY;
159
160
/**
161
* Size threshold for accumulated changes waiting for persist delay
162
* Key: "state.changelog.dstl.dfs.batch.persist-size-threshold"
163
* Default: 10MB
164
*/
165
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
166
167
/**
168
* Maximum amount of data allowed to be in-flight
169
* Key: "state.changelog.dstl.dfs.upload.max-in-flight"
170
* Default: 100MB
171
*/
172
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
173
```
174
175
**Usage Examples:**
176
177
```java
178
// Configure batching for better efficiency
179
config.set(FsStateChangelogOptions.PERSIST_DELAY, Duration.ofMillis(50));
180
config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));
181
182
// Configure backpressure limit
183
config.set(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT, MemorySize.parse("200MB"));
184
```
185
186
### Retry Policy Configuration
187
188
Configure retry behavior for failed upload operations:
189
190
```java { .api }
191
/**
192
* Retry policy for failed uploads
193
* Key: "state.changelog.dstl.dfs.upload.retry-policy"
194
* Default: "fixed"
195
* Valid values: "none", "fixed"
196
*/
197
public static final ConfigOption<String> RETRY_POLICY;
198
199
/**
200
* Upload timeout duration
201
* Key: "state.changelog.dstl.dfs.upload.timeout"
202
* Default: 1 second
203
*/
204
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
205
206
/**
207
* Maximum number of retry attempts
208
* Key: "state.changelog.dstl.dfs.upload.max-attempts"
209
* Default: 3
210
*/
211
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
212
213
/**
214
* Delay before next retry attempt after failure
215
* Key: "state.changelog.dstl.dfs.upload.next-attempt-delay"
216
* Default: 500ms
217
*/
218
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;
219
```
220
221
**Usage Examples:**
222
223
```java
224
// Configure aggressive retry policy
225
config.set(FsStateChangelogOptions.RETRY_POLICY, "fixed");
226
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(5));
227
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
228
config.set(FsStateChangelogOptions.RETRY_DELAY_AFTER_FAILURE, Duration.ofSeconds(1));
229
230
// Disable retries for fast-fail behavior
231
config.set(FsStateChangelogOptions.RETRY_POLICY, "none");
232
```
233
234
### Recovery and Cleanup Options
235
236
Configure recovery and cleanup behavior:
237
238
```java { .api }
239
/**
240
* Number of threads for discarding unused changelog data
241
* Key: "state.changelog.dstl.dfs.discard.num-threads"
242
* Default: 1
243
*/
244
public static final ConfigOption<Integer> NUM_DISCARD_THREADS;
245
246
/**
247
* Cache idle timeout for recovery operations
248
* Key: "state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms"
249
* Default: 10 minutes
250
*/
251
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;
252
```
253
254
**Usage Examples:**
255
256
```java
257
// Increase discard threads for faster cleanup
258
config.set(FsStateChangelogOptions.NUM_DISCARD_THREADS, 3);
259
260
// Reduce cache timeout for memory efficiency
261
config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(5));
262
```
263
264
### Complete Configuration Example
265
266
```java
267
import org.apache.flink.configuration.Configuration;
268
import org.apache.flink.configuration.MemorySize;
269
import static org.apache.flink.changelog.fs.FsStateChangelogOptions.*;
270
271
// Complete configuration for high-throughput workload
272
Configuration config = new Configuration();
273
274
// Storage location
275
config.set(BASE_PATH, "hdfs://namenode:8020/flink/changelog");
276
277
// Performance tuning
278
config.set(COMPRESSION_ENABLED, true);
279
config.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("3MB"));
280
config.set(UPLOAD_BUFFER_SIZE, MemorySize.parse("2MB"));
281
config.set(NUM_UPLOAD_THREADS, 8);
282
283
// Batching configuration
284
config.set(PERSIST_DELAY, Duration.ofMillis(20));
285
config.set(PERSIST_SIZE_THRESHOLD, MemorySize.parse("15MB"));
286
config.set(IN_FLIGHT_DATA_LIMIT, MemorySize.parse("150MB"));
287
288
// Retry configuration
289
config.set(RETRY_POLICY, "fixed");
290
config.set(UPLOAD_TIMEOUT, Duration.ofSeconds(3));
291
config.set(RETRY_MAX_ATTEMPTS, 4);
292
config.set(RETRY_DELAY_AFTER_FAILURE, Duration.ofMillis(750));
293
294
// Cleanup configuration
295
config.set(NUM_DISCARD_THREADS, 2);
296
config.set(CACHE_IDLE_TIMEOUT, Duration.ofMinutes(15));
297
```
298
299
### Configuration Validation
300
301
Important constraints and validation rules:
302
303
- `PERSIST_SIZE_THRESHOLD` must not exceed `IN_FLIGHT_DATA_LIMIT`
304
- `BASE_PATH` is required and must be accessible by all TaskManagers
305
- `UPLOAD_TIMEOUT * RETRY_MAX_ATTEMPTS` should be less than checkpoint timeout
306
- Thread counts should be reasonable for available CPU cores
307
- Memory sizes should account for available heap space