0
# Storage Factory and Configuration
1
2
Factory for creating filesystem-based changelog storage instances with comprehensive configuration options. This is the main entry point for using the DSTL module.
3
4
## Capabilities
5
6
### FsStateChangelogStorageFactory
7
8
Factory class that implements the StateChangelogStorageFactory interface for creating filesystem-based changelog storage.
9
10
```java { .api }
11
/**
12
* Factory for creating FsStateChangelogStorage instances
13
*/
14
@Internal
15
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
16
public static final String IDENTIFIER = "filesystem";
17
18
/**
19
* Returns the identifier for this storage factory
20
* @return "filesystem" identifier string
21
*/
22
public String getIdentifier();
23
24
/**
25
* Creates a new StateChangelogStorage instance
26
* @param jobID The job identifier
27
* @param configuration Configuration settings
28
* @param metricGroup Metric group for monitoring
29
* @param localRecoveryConfig Local recovery configuration
30
* @return StateChangelogStorage instance
31
* @throws IOException If storage creation fails
32
*/
33
public StateChangelogStorage<?> createStorage(
34
JobID jobID,
35
Configuration configuration,
36
TaskManagerJobMetricGroup metricGroup,
37
LocalRecoveryConfig localRecoveryConfig
38
) throws IOException;
39
40
/**
41
* Creates a storage view for recovery operations
42
* @param configuration Configuration settings
43
* @return StateChangelogStorageView instance for recovery
44
*/
45
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
46
47
/**
48
* Helper method for programmatic configuration
49
* @param configuration Configuration object to modify
50
* @param newFolder Base folder for changelog storage
51
* @param uploadTimeout Timeout for upload operations
52
* @param maxUploadAttempts Maximum number of upload retry attempts
53
*/
54
public static void configure(
55
Configuration configuration,
56
File newFolder,
57
Duration uploadTimeout,
58
int maxUploadAttempts
59
);
60
}
61
```
62
63
**Usage Example:**
64
65
```java
66
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
67
import org.apache.flink.configuration.Configuration;
68
69
// Create factory
70
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
71
72
// Verify identifier
73
String identifier = factory.getIdentifier(); // Returns "filesystem"
74
75
// Configure programmatically
76
Configuration config = new Configuration();
77
FsStateChangelogStorageFactory.configure(
78
config,
79
new File("/tmp/changelog"),
80
Duration.ofSeconds(30),
81
5
82
);
83
```
84
85
### Configuration Options
86
87
All configuration options for the filesystem-based changelog storage, defined as static constants in FsStateChangelogOptions.
88
89
```java { .api }
90
/**
91
* Configuration options for FsStateChangelogStorage
92
*/
93
@Experimental
94
public class FsStateChangelogOptions {
95
96
/**
97
* Base path to store changelog files. Required setting.
98
*/
99
public static final ConfigOption<String> BASE_PATH =
100
ConfigOptions.key("state.changelog.dstl.dfs.base-path")
101
.stringType()
102
.noDefaultValue()
103
.withDescription("Base path to store changelog files.");
104
105
/**
106
* Whether to enable compression when serializing changelog. Default: false.
107
*/
108
public static final ConfigOption<Boolean> COMPRESSION_ENABLED =
109
ConfigOptions.key("state.changelog.dstl.dfs.compression.enabled")
110
.booleanType()
111
.defaultValue(false)
112
.withDescription("Whether to enable compression when serializing changelog.");
113
114
/**
115
* Size threshold for preemptive persistence. Default: 5MB.
116
*/
117
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD =
118
ConfigOptions.key("state.changelog.dstl.dfs.preemptive-persist-threshold")
119
.memoryType()
120
.defaultValue(MemorySize.parse("5MB"))
121
.withDescription("Size threshold for preemptive persistence.");
122
123
/**
124
* Delay before persisting changelog. Default: 10ms.
125
*/
126
public static final ConfigOption<Duration> PERSIST_DELAY =
127
ConfigOptions.key("state.changelog.dstl.dfs.persist-delay")
128
.durationType()
129
.defaultValue(Duration.ofMillis(10))
130
.withDescription("Delay before persisting changelog.");
131
132
/**
133
* Size threshold for batch persistence. Default: 10MB.
134
*/
135
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD =
136
ConfigOptions.key("state.changelog.dstl.dfs.persist-size-threshold")
137
.memoryType()
138
.defaultValue(MemorySize.parse("10MB"))
139
.withDescription("Size threshold for batch persistence.");
140
141
/**
142
* Buffer size for uploads. Default: 1MB.
143
*/
144
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =
145
ConfigOptions.key("state.changelog.dstl.dfs.upload-buffer-size")
146
.memoryType()
147
.defaultValue(MemorySize.parse("1MB"))
148
.withDescription("Buffer size for uploads.");
149
150
/**
151
* Number of upload threads. Default: 5.
152
*/
153
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS =
154
ConfigOptions.key("state.changelog.dstl.dfs.num-upload-threads")
155
.intType()
156
.defaultValue(5)
157
.withDescription("Number of upload threads.");
158
159
/**
160
* Number of discard threads. Default: 1.
161
*/
162
public static final ConfigOption<Integer> NUM_DISCARD_THREADS =
163
ConfigOptions.key("state.changelog.dstl.dfs.num-discard-threads")
164
.intType()
165
.defaultValue(1)
166
.withDescription("Number of discard threads.");
167
168
/**
169
* Maximum in-flight data. Default: 100MB.
170
*/
171
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT =
172
ConfigOptions.key("state.changelog.dstl.dfs.in-flight-data-limit")
173
.memoryType()
174
.defaultValue(MemorySize.parse("100MB"))
175
.withDescription("Maximum in-flight data.");
176
177
/**
178
* Retry policy for uploads. Default: "fixed".
179
*/
180
public static final ConfigOption<String> RETRY_POLICY =
181
ConfigOptions.key("state.changelog.dstl.dfs.retry-policy")
182
.stringType()
183
.defaultValue("fixed")
184
.withDescription("Retry policy for uploads.");
185
186
/**
187
* Upload timeout. Default: 1s.
188
*/
189
public static final ConfigOption<Duration> UPLOAD_TIMEOUT =
190
ConfigOptions.key("state.changelog.dstl.dfs.upload-timeout")
191
.durationType()
192
.defaultValue(Duration.ofSeconds(1))
193
.withDescription("Upload timeout.");
194
195
/**
196
* Maximum retry attempts. Default: 3.
197
*/
198
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =
199
ConfigOptions.key("state.changelog.dstl.dfs.retry-max-attempts")
200
.intType()
201
.defaultValue(3)
202
.withDescription("Maximum retry attempts.");
203
204
/**
205
* Delay between retries. Default: 500ms.
206
*/
207
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE =
208
ConfigOptions.key("state.changelog.dstl.dfs.retry-delay-after-failure")
209
.durationType()
210
.defaultValue(Duration.ofMillis(500))
211
.withDescription("Delay between retries.");
212
213
/**
214
* Cache file idle timeout. Default: 10min.
215
*/
216
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =
217
ConfigOptions.key("state.changelog.dstl.dfs.cache-idle-timeout")
218
.durationType()
219
.defaultValue(Duration.ofMinutes(10))
220
.withDescription("Cache file idle timeout.");
221
}
222
```
223
224
**Configuration Usage Examples:**
225
226
```java
227
import org.apache.flink.configuration.Configuration;
228
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
229
230
Configuration config = new Configuration();
231
232
// Required: Set base path
233
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog/storage");
234
235
// Optional: Enable compression
236
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);
237
238
// Optional: Adjust thresholds
239
config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("10MB"));
240
config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));
241
242
// Optional: Configure upload behavior
243
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);
244
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));
245
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
246
247
// Use configuration with factory
248
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
249
StateChangelogStorage<?> storage = factory.createStorage(
250
jobID, config, metricGroup, localRecoveryConfig
251
);
252
```
253
254
### Recovery Storage View
255
256
Storage view implementation for recovery-only operations, used when reading existing changelog data.
257
258
```java { .api }
259
/**
260
* Recovery-only implementation of changelog storage
261
*/
262
@Experimental
263
@ThreadSafe
264
public class FsStateChangelogStorageForRecovery
265
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
266
267
/**
268
* Creates a reader for changelog handles
269
* @return StateChangelogHandleReader for reading changelog data
270
*/
271
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
272
273
/**
274
* Closes the storage view and releases resources
275
* @throws Exception If closing fails
276
*/
277
public void close() throws Exception;
278
}
279
```
280
281
**Recovery Usage Example:**
282
283
```java
284
// Create storage view for recovery
285
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
286
StateChangelogStorageView<?> storageView = factory.createStorageView(config);
287
288
// Create reader for recovery
289
StateChangelogHandleReader<?> reader = storageView.createReader();
290
291
// Use reader to iterate through state changes
292
// (reader usage depends on specific changelog handles)
293
```