Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-dstl-dfs@2.1.00
# Flink DSTL DFS
1
2
Flink DSTL DFS provides a distributed file system-based implementation of changelog storage for Apache Flink's streaming state backend. It enables efficient state change tracking and recovery by persisting state modifications to distributed file systems like HDFS or S3. The library supports fault-tolerance guarantees by providing reliable state change persistence with built-in metrics, configurable upload scheduling, and preemptive persistence optimizations.
3
4
## Package Information
5
6
- **Package Name**: flink-dstl-dfs
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-dstl-dfs
11
- **Installation**: Add to Maven dependencies:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-dstl-dfs</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
25
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
26
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
27
```
28
29
## Basic Usage
30
31
```java
32
import org.apache.flink.api.common.JobID;
33
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
34
import org.apache.flink.configuration.Configuration;
35
import org.apache.flink.runtime.state.LocalRecoveryConfig;
36
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
37
38
// Configure filesystem changelog storage
39
Configuration config = new Configuration();
40
FsStateChangelogStorageFactory.configure(
41
config,
42
new File("/path/to/changelog/storage"),
43
Duration.ofSeconds(10), // upload timeout
44
3 // max retry attempts
45
);
46
47
// Create storage factory
48
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
49
50
// Create storage instance
51
FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(
52
new JobID(),
53
config,
54
taskManagerJobMetricGroup,
55
localRecoveryConfig
56
);
57
58
// Create changelog writer for an operator
59
FsStateChangelogWriter writer = storage.createWriter(
60
"operator-id",
61
KeyGroupRange.of(0, 127),
62
mailboxExecutor
63
);
64
65
// Append state changes
66
writer.append(0, stateChangeBytes);
67
writer.appendMeta(metadataBytes);
68
69
// Persist changes during checkpoint
70
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result =
71
writer.persist(sequenceNumber, checkpointId);
72
```
73
74
## Architecture
75
76
The Flink DSTL DFS library is organized around several key components:
77
78
- **Storage Factory**: `FsStateChangelogStorageFactory` creates storage instances and provides service loader integration
79
- **Storage Implementation**: `FsStateChangelogStorage` manages changelog writers and upload scheduling
80
- **Writers**: `FsStateChangelogWriter` handles state change appending and persistence operations
81
- **Upload System**: Configurable upload schedulers handle batching and asynchronous persistence to distributed file systems
82
- **Recovery System**: `FsStateChangelogStorageForRecovery` provides read-only access for checkpoint recovery
83
- **Configuration**: Comprehensive options for tuning performance, retry policies, and storage behavior
84
85
## Capabilities
86
87
### Storage Factory and Configuration
88
89
Factory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems.
90
91
```java { .api }
92
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
93
public static final String IDENTIFIER = "filesystem";
94
95
public String getIdentifier();
96
public StateChangelogStorage<?> createStorage(
97
JobID jobID,
98
Configuration configuration,
99
TaskManagerJobMetricGroup metricGroup,
100
LocalRecoveryConfig localRecoveryConfig
101
) throws IOException;
102
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
103
public static void configure(
104
Configuration configuration,
105
File newFolder,
106
Duration uploadTimeout,
107
int maxUploadAttempts
108
);
109
}
110
```
111
112
[Storage Factory and Configuration](./storage-factory.md)
113
114
### Main Storage Implementation
115
116
Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications.
117
118
```java { .api }
119
public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
120
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
121
122
public FsStateChangelogWriter createWriter(
123
String operatorID,
124
KeyGroupRange keyGroupRange,
125
MailboxExecutor mailboxExecutor
126
);
127
public void close() throws Exception;
128
public AvailabilityProvider getAvailabilityProvider();
129
}
130
```
131
132
[Main Storage Implementation](./storage-implementation.md)
133
134
### Changelog Writers
135
136
Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination.
137
138
```java { .api }
139
interface StateChangelogWriter<T> {
140
void append(int keyGroup, byte[] value) throws IOException;
141
void appendMeta(byte[] value) throws IOException;
142
SequenceNumber nextSequenceNumber();
143
CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId) throws IOException;
144
void truncate(SequenceNumber to);
145
void truncateAndClose(SequenceNumber from);
146
void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
147
void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
148
void close();
149
}
150
```
151
152
[Changelog Writers](./changelog-writers.md)
153
154
### Configuration Options
155
156
Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization.
157
158
```java { .api }
159
public class FsStateChangelogOptions {
160
public static final ConfigOption<String> BASE_PATH;
161
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
162
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
163
public static final ConfigOption<Duration> PERSIST_DELAY;
164
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
165
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
166
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
167
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
168
public static final ConfigOption<String> RETRY_POLICY;
169
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
170
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
171
}
172
```
173
174
[Configuration Options](./configuration-options.md)
175
176
### Upload Scheduling and Management
177
178
Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations.
179
180
```java { .api }
181
public interface StateChangeUploadScheduler extends AutoCloseable {
182
void upload(UploadTask task);
183
AvailabilityProvider getAvailabilityProvider();
184
void close() throws Exception;
185
186
static StateChangeUploadScheduler fromConfig(
187
JobID jobID,
188
Configuration config,
189
ChangelogStorageMetricGroup metricGroup,
190
TaskChangelogRegistry changelogRegistry,
191
LocalRecoveryConfig localRecoveryConfig
192
) throws IOException;
193
}
194
195
public interface StateChangeUploader extends AutoCloseable {
196
UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
197
}
198
```
199
200
[Upload System](./upload-system.md)
201
202
### Recovery and State Management
203
204
Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles.
205
206
```java { .api }
207
public class FsStateChangelogStorageForRecovery
208
implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {
209
210
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();
211
public void close() throws Exception;
212
}
213
214
public interface TaskChangelogRegistry {
215
void startTracking(StreamStateHandle handle, long refCount);
216
void stopTracking(StreamStateHandle handle);
217
void release(StreamStateHandle handle);
218
}
219
```
220
221
[Recovery and State Management](./recovery-system.md)
222
223
### Metrics and Monitoring
224
225
Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments.
226
227
```java { .api }
228
public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {
229
// Provides counters for uploads, failures, batch sizes, latencies, and retry attempts
230
}
231
```
232
233
[Metrics and Monitoring](./metrics-monitoring.md)
234
235
## Types
236
237
```java { .api }
238
public final class UploadResult {
239
public final StreamStateHandle streamStateHandle;
240
public final StreamStateHandle localStreamHandle;
241
public final long offset;
242
public final long localOffset;
243
public final SequenceNumber sequenceNumber;
244
public final long size;
245
246
public UploadResult(StreamStateHandle streamStateHandle, long offset,
247
SequenceNumber sequenceNumber, long size);
248
public StreamStateHandle getStreamStateHandle();
249
public long getOffset();
250
public SequenceNumber getSequenceNumber();
251
public long getSize();
252
}
253
254
public class StateChangeSet {
255
public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);
256
public UUID getLogId();
257
public List<StateChange> getChanges();
258
public SequenceNumber getSequenceNumber();
259
public long getSize();
260
}
261
262
public interface RetryPolicy {
263
long timeoutFor(int attempt);
264
long retryAfter(int failedAttempt, Exception exception);
265
266
static RetryPolicy fromConfig(ReadableConfig config);
267
static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);
268
RetryPolicy NONE = /* ... */;
269
}
270
```