0
# Main Storage Implementation
1
2
Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications. The storage implementation coordinates between writers, upload schedulers, and recovery components.
3
4
## Capabilities
5
6
### FsStateChangelogStorage
7
8
Main storage implementation that manages changelog writers and upload operations for active streaming jobs.
9
10
```java { .api }
11
/**
12
* Filesystem-based implementation of StateChangelogStorage for write operations.
13
* Thread-safe and manages multiple changelog writers for different operators.
14
*/
15
public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
16
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
17
18
/**
19
* Creates a new changelog writer for a specific operator and key group range
20
* @param operatorID Unique identifier for the operator
21
* @param keyGroupRange Key group range this writer handles
22
* @param mailboxExecutor Executor for callback processing
23
* @return FsStateChangelogWriter instance for the operator
24
*/
25
public FsStateChangelogWriter createWriter(
26
String operatorID,
27
KeyGroupRange keyGroupRange,
28
MailboxExecutor mailboxExecutor
29
);
30
31
/**
32
* Closes the storage and all associated resources
33
* @throws Exception if cleanup fails
34
*/
35
public void close() throws Exception;
36
37
/**
38
* Returns availability provider for backpressure control
39
* @return AvailabilityProvider indicating when storage can accept more data
40
*/
41
public AvailabilityProvider getAvailabilityProvider();
42
}
43
```
44
45
### Constructors
46
47
Multiple constructor variants support different initialization scenarios:
48
49
```java { .api }
50
/**
51
* Main constructor for production use
52
*/
53
public FsStateChangelogStorage(
54
JobID jobID,
55
Configuration config,
56
TaskManagerJobMetricGroup metricGroup,
57
LocalRecoveryConfig localRecoveryConfig
58
) throws IOException;
59
60
/**
61
* Constructor with custom changelog registry
62
*/
63
public FsStateChangelogStorage(
64
JobID jobID,
65
Configuration config,
66
TaskManagerJobMetricGroup metricGroup,
67
TaskChangelogRegistry changelogRegistry,
68
LocalRecoveryConfig localRecoveryConfig
69
) throws IOException;
70
71
/**
72
* Testing constructor with direct parameters
73
*/
74
public FsStateChangelogStorage(
75
JobID jobID,
76
Path basePath,
77
boolean compression,
78
int bufferSize,
79
ChangelogStorageMetricGroup metricGroup,
80
TaskChangelogRegistry changelogRegistry,
81
LocalRecoveryConfig localRecoveryConfig
82
) throws IOException;
83
84
/**
85
* Advanced constructor with custom upload scheduler
86
*/
87
public FsStateChangelogStorage(
88
StateChangeUploadScheduler uploader,
89
long preEmptivePersistThresholdInBytes,
90
TaskChangelogRegistry changelogRegistry,
91
LocalRecoveryConfig localRecoveryConfig
92
);
93
```
94
95
**Usage Examples:**
96
97
```java
98
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
99
import org.apache.flink.runtime.state.KeyGroupRange;
100
101
// Create storage instance (typically done by factory)
102
FsStateChangelogStorage storage = new FsStateChangelogStorage(
103
jobId, config, metricGroup, localRecoveryConfig
104
);
105
106
// Create writers for different operators
107
FsStateChangelogWriter operatorWriter1 = storage.createWriter(
108
"map-operator",
109
KeyGroupRange.of(0, 63),
110
mailboxExecutor
111
);
112
113
FsStateChangelogWriter operatorWriter2 = storage.createWriter(
114
"filter-operator",
115
KeyGroupRange.of(64, 127),
116
mailboxExecutor
117
);
118
119
// Check if storage can accept more data
120
AvailabilityProvider availability = storage.getAvailabilityProvider();
121
if (availability.isAvailable()) {
122
// Safe to write more data
123
operatorWriter1.append(keyGroup, stateChangeBytes);
124
}
125
126
// Cleanup when done
127
storage.close();
128
```
129
130
### Integration with Upload System
131
132
The storage implementation integrates with the upload scheduling system:
133
134
```java { .api }
135
/**
136
* Internal components managed by FsStateChangelogStorage
137
*/
138
class InternalComponents {
139
private final StateChangeUploadScheduler uploader;
140
private final long preEmptivePersistThresholdInBytes;
141
private final TaskChangelogRegistry changelogRegistry;
142
private final AtomicInteger logIdGenerator;
143
private final LocalChangelogRegistry localChangelogRegistry;
144
}
145
```
146
147
The storage automatically:
148
- Creates upload schedulers based on configuration
149
- Manages unique log IDs for different writers
150
- Coordinates with the changelog registry for state lifecycle management
151
- Handles local recovery integration when enabled
152
153
### Local Recovery Support
154
155
When local recovery is enabled, the storage manages local changelog registries:
156
157
```java
158
// Local recovery configuration
159
LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.enabled(localStateDirectory);
160
161
FsStateChangelogStorage storage = new FsStateChangelogStorage(
162
jobId, config, metricGroup, localRecoveryConfig
163
);
164
165
// Storage automatically creates LocalChangelogRegistryImpl when enabled
166
// Handles both remote and local persistence of changelog data
167
```
168
169
### Backpressure and Flow Control
170
171
The storage provides backpressure mechanisms through availability providers:
172
173
```java
174
import org.apache.flink.runtime.io.AvailabilityProvider;
175
176
// Monitor storage availability
177
AvailabilityProvider availability = storage.getAvailabilityProvider();
178
179
// Use in async context
180
availability.getAvailabilityFuture().thenRun(() -> {
181
// Storage is available, safe to continue writing
182
writer.append(keyGroup, data);
183
});
184
```
185
186
### Error Handling and Lifecycle
187
188
The storage handles various error conditions and lifecycle events:
189
190
```java
191
try {
192
FsStateChangelogStorage storage = new FsStateChangelogStorage(
193
jobId, config, metricGroup, localRecoveryConfig
194
);
195
196
// Use storage...
197
198
} catch (IOException e) {
199
// Handle initialization or operation errors
200
log.error("Storage operation failed", e);
201
} finally {
202
// Always close to cleanup resources
203
if (storage != null) {
204
storage.close();
205
}
206
}
207
```
208
209
The storage ensures:
210
- Proper cleanup of upload threads and resources
211
- Graceful handling of filesystem failures
212
- Coordination with Flink's checkpoint lifecycle
213
- Thread-safe operations across multiple writers