0
# Main Storage Implementation
1
2
Filesystem-based implementation of StateChangelogStorage with thread-safe operations, writer creation, and availability tracking. This is the core storage component that coordinates state change persistence.
3
4
## Capabilities
5
6
### FsStateChangelogStorage
7
8
Main filesystem-based implementation that extends the recovery storage and adds writer creation capabilities.
9
10
```java { .api }
11
/**
12
* Filesystem-based implementation of StateChangelogStorage
13
*/
14
@Experimental
15
@ThreadSafe
16
public class FsStateChangelogStorage
17
extends FsStateChangelogStorageForRecovery
18
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
19
20
/**
21
* Creates storage with default changelog registry
22
* @param jobID The job identifier
23
* @param config Configuration settings
24
* @param metricGroup Metric group for monitoring
25
* @param localRecoveryConfig Local recovery configuration
26
* @throws IOException If storage initialization fails
27
*/
28
public FsStateChangelogStorage(
29
JobID jobID,
30
Configuration config,
31
TaskManagerJobMetricGroup metricGroup,
32
LocalRecoveryConfig localRecoveryConfig
33
) throws IOException;
34
35
/**
36
* Creates storage with custom changelog registry
37
* @param jobID The job identifier
38
* @param config Configuration settings
39
* @param metricGroup Metric group for monitoring
40
* @param changelogRegistry Custom changelog registry
41
* @param localRecoveryConfig Local recovery configuration
42
* @throws IOException If storage initialization fails
43
*/
44
public FsStateChangelogStorage(
45
JobID jobID,
46
Configuration config,
47
TaskManagerJobMetricGroup metricGroup,
48
TaskChangelogRegistry changelogRegistry,
49
LocalRecoveryConfig localRecoveryConfig
50
) throws IOException;
51
52
/**
53
* Creates a writer for state changes for a specific operator
54
* @param operatorID Identifier of the operator
55
* @param keyGroupRange Range of key groups handled by this writer
56
* @param mailboxExecutor Executor for asynchronous operations
57
* @return FsStateChangelogWriter instance
58
*/
59
public FsStateChangelogWriter createWriter(
60
String operatorID,
61
KeyGroupRange keyGroupRange,
62
MailboxExecutor mailboxExecutor
63
);
64
65
/**
66
* Closes the storage and releases all resources
67
* @throws Exception If closing fails
68
*/
69
public void close() throws Exception;
70
71
/**
72
* Returns availability provider for backpressure handling
73
* @return AvailabilityProvider instance
74
*/
75
public AvailabilityProvider getAvailabilityProvider();
76
}
77
```
78
79
**Usage Examples:**
80
81
```java
82
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
83
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
84
import org.apache.flink.api.common.JobID;
85
import org.apache.flink.configuration.Configuration;
86
87
// Basic storage creation
88
Configuration config = new Configuration();
89
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");
90
91
FsStateChangelogStorage storage = new FsStateChangelogStorage(
92
new JobID(),
93
config,
94
metricGroup,
95
localRecoveryConfig
96
);
97
98
// Create writer for an operator
99
FsStateChangelogWriter writer = storage.createWriter(
100
"my-operator-id",
101
KeyGroupRange.of(0, 127),
102
mailboxExecutor
103
);
104
105
// Use writer to persist state changes
106
writer.append(5, stateChangeData);
107
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result =
108
writer.persist(sequenceNumber, checkpointId);
109
110
// Clean up
111
writer.close();
112
storage.close();
113
```
114
115
### Testing Constructor
116
117
Constructor provided for testing scenarios with direct uploader configuration.
118
119
```java { .api }
120
/**
121
* Testing constructor with direct uploader configuration
122
* @param jobID The job identifier
123
* @param basePath Base path for changelog storage
124
* @param compression Whether to enable compression
125
* @param bufferSize Buffer size for operations
126
* @param metricGroup Metric group for monitoring
127
* @param changelogRegistry Changelog registry for tracking
128
* @param localRecoveryConfig Local recovery configuration
129
* @throws IOException If storage initialization fails
130
*/
131
@VisibleForTesting
132
public FsStateChangelogStorage(
133
JobID jobID,
134
Path basePath,
135
boolean compression,
136
int bufferSize,
137
ChangelogStorageMetricGroup metricGroup,
138
TaskChangelogRegistry changelogRegistry,
139
LocalRecoveryConfig localRecoveryConfig
140
) throws IOException;
141
```
142
143
### Low-Level Constructor
144
145
Low-level constructor for advanced testing scenarios with custom upload scheduler.
146
147
```java { .api }
148
/**
149
* Low-level constructor with custom upload scheduler
150
* @param uploader Custom state change upload scheduler
151
* @param preEmptivePersistThresholdInBytes Threshold for preemptive persistence
152
* @param changelogRegistry Changelog registry for tracking
153
* @param localRecoveryConfig Local recovery configuration
154
*/
155
@VisibleForTesting
156
public FsStateChangelogStorage(
157
StateChangeUploadScheduler uploader,
158
long preEmptivePersistThresholdInBytes,
159
TaskChangelogRegistry changelogRegistry,
160
LocalRecoveryConfig localRecoveryConfig
161
);
162
```
163
164
**Testing Usage Example:**
165
166
```java
167
// Testing with custom parameters
168
ChangelogStorageMetricGroup testMetricGroup = new ChangelogStorageMetricGroup(metricGroup);
169
TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(1);
170
171
FsStateChangelogStorage testStorage = new FsStateChangelogStorage(
172
new JobID(),
173
new Path("file:///tmp/test-changelog"),
174
true, // compression enabled
175
1024 * 1024, // 1MB buffer
176
testMetricGroup,
177
testRegistry,
178
localRecoveryConfig
179
);
180
```
181
182
### Availability and Backpressure Handling
183
184
The storage provides availability information for backpressure handling in streaming applications.
185
186
```java { .api }
187
/**
188
* Returns availability provider for coordinating backpressure
189
* @return AvailabilityProvider that signals when storage is available for writes
190
*/
191
public AvailabilityProvider getAvailabilityProvider();
192
```
193
194
**Availability Usage Example:**
195
196
```java
197
FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);
198
199
// Check if storage is available for writes
200
AvailabilityProvider availability = storage.getAvailabilityProvider();
201
202
// Wait for availability if needed
203
CompletableFuture<?> availabilityFuture = availability.getAvailabilityFuture();
204
availabilityFuture.thenRun(() -> {
205
// Storage is now available for writes
206
FsStateChangelogWriter writer = storage.createWriter(operatorID, keyGroupRange, mailboxExecutor);
207
// Proceed with writing operations
208
});
209
```
210
211
### Local Recovery Integration
212
213
The storage integrates with Flink's local recovery mechanism for improved fault tolerance.
214
215
**Local Recovery Configuration Example:**
216
217
```java
218
// Configure local recovery
219
LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
220
true, // enable local recovery
221
new LocalRecoveryDirectoryProvider() {
222
@Override
223
public File allocationBaseDirForCheckpoint(long checkpointId) {
224
return new File("/local/recovery/checkpoint-" + checkpointId);
225
}
226
227
@Override
228
public File subtaskSpecificCheckpointDirectory(long checkpointId, AllocationID allocationID, JobID jobID, int subtaskIndex) {
229
return new File("/local/recovery/checkpoint-" + checkpointId + "/task-" + subtaskIndex);
230
}
231
}
232
);
233
234
// Create storage with local recovery enabled
235
FsStateChangelogStorage storage = new FsStateChangelogStorage(
236
jobID, config, metricGroup, localRecoveryConfig
237
);
238
```
239
240
### Error Handling
241
242
The storage handles various error conditions and provides appropriate exception handling.
243
244
**Common Error Scenarios:**
245
246
```java
247
try {
248
FsStateChangelogStorage storage = new FsStateChangelogStorage(
249
jobID, config, metricGroup, localRecoveryConfig
250
);
251
} catch (IOException e) {
252
// Handle storage initialization failure
253
// Common causes: invalid base path, permissions, filesystem issues
254
log.error("Failed to initialize changelog storage", e);
255
}
256
257
try {
258
storage.close();
259
} catch (Exception e) {
260
// Handle cleanup failure
261
// May include upload scheduler shutdown, resource cleanup
262
log.warn("Error during storage cleanup", e);
263
}
264
```