Apache Flink DSTL (Distributed State Timeline) - A filesystem-based state changelog implementation for Flink's state management
npx @tessl/cli install tessl/maven-org-apache-flink--flink-dstl@1.20.00
# Apache Flink DSTL
1
2
Apache Flink DSTL (Distributed State Timeline) provides a filesystem-based state changelog implementation for Flink's state management system. It enables durable storage of state changes that can be used for recovery and state reconstruction in distributed streaming environments.
3
4
## Package Information
5
6
- **Package Name**: flink-dstl
7
- **Package Type**: maven
8
- **Group ID**: org.apache.flink
9
- **Artifact ID**: flink-dstl-dfs
10
- **Language**: Java
11
- **Installation**: Include in Maven dependencies with `org.apache.flink:flink-dstl:1.20.2`
12
13
## Core Imports
14
15
```java
16
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
17
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
18
import org.apache.flink.changelog.fs.FsStateChangelogStorage;
19
import org.apache.flink.changelog.fs.FsStateChangelogWriter;
20
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
21
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
22
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
23
import org.apache.flink.api.common.operators.MailboxExecutor;
24
import org.apache.flink.runtime.state.KeyGroupRange;
25
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
26
```
27
28
## Basic Usage
29
30
```java
31
import org.apache.flink.api.common.JobID;
32
import org.apache.flink.configuration.Configuration;
33
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
34
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
35
36
// Configure the storage
37
Configuration config = new Configuration();
38
config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");
39
config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);
40
41
// Create storage factory
42
FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();
43
44
// Create storage instance
45
StateChangelogStorage<?> storage = factory.createStorage(
46
jobID,
47
config,
48
metricGroup,
49
localRecoveryConfig
50
);
51
52
// Create writer for an operator
53
FsStateChangelogWriter writer = storage.createWriter(
54
operatorID,
55
keyGroupRange,
56
mailboxExecutor
57
);
58
```
59
60
## Architecture
61
62
The DSTL module is built around several key components:
63
64
- **Storage Factory**: `FsStateChangelogStorageFactory` creates storage instances with identifier "filesystem"
65
- **Storage Implementation**: `FsStateChangelogStorage` provides the main storage functionality
66
- **Configuration**: `FsStateChangelogOptions` defines all configuration parameters
67
- **Writers**: `FsStateChangelogWriter` handles writing state changes to filesystem
68
- **Upload System**: Pluggable upload schedulers and uploaders for persistence
69
- **Registry**: `TaskChangelogRegistry` tracks changelog segments on TaskManager side
70
71
## Capabilities
72
73
### Storage Factory and Configuration
74
75
Factory for creating filesystem-based changelog storage instances with comprehensive configuration options.
76
77
```java { .api }
78
@Internal
79
public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {
80
public static final String IDENTIFIER = "filesystem";
81
82
public String getIdentifier();
83
public StateChangelogStorage<?> createStorage(
84
JobID jobID,
85
Configuration configuration,
86
TaskManagerJobMetricGroup metricGroup,
87
LocalRecoveryConfig localRecoveryConfig
88
) throws IOException;
89
public StateChangelogStorageView<?> createStorageView(Configuration configuration);
90
public static void configure(
91
Configuration configuration,
92
File newFolder,
93
Duration uploadTimeout,
94
int maxUploadAttempts
95
);
96
}
97
```
98
99
[Storage Factory and Configuration](./storage-factory.md)
100
101
### Main Storage Implementation
102
103
Filesystem-based implementation of StateChangelogStorage with thread-safe operations and writer creation.
104
105
```java { .api }
106
@Experimental
107
@ThreadSafe
108
public class FsStateChangelogStorage
109
extends FsStateChangelogStorageForRecovery
110
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
111
112
public FsStateChangelogStorage(
113
JobID jobID,
114
Configuration config,
115
TaskManagerJobMetricGroup metricGroup,
116
LocalRecoveryConfig localRecoveryConfig
117
) throws IOException;
118
119
public FsStateChangelogWriter createWriter(
120
String operatorID,
121
KeyGroupRange keyGroupRange,
122
MailboxExecutor mailboxExecutor
123
);
124
125
public void close() throws Exception;
126
public AvailabilityProvider getAvailabilityProvider();
127
}
128
```
129
130
[Main Storage Implementation](./storage-implementation.md)
131
132
### State Change Writers
133
134
Writer implementation for persisting state changes to filesystem with batching and upload coordination.
135
136
```java { .api }
137
@NotThreadSafe
138
class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
139
public void appendMeta(byte[] value) throws IOException;
140
public void append(int keyGroup, byte[] value) throws IOException;
141
public SequenceNumber initialSequenceNumber();
142
public SequenceNumber nextSequenceNumber();
143
public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
144
SequenceNumber from,
145
long checkpointId
146
);
147
public void close() throws Exception;
148
public void truncate(SequenceNumber to);
149
public void truncateAndClose(SequenceNumber from);
150
public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
151
public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
152
}
153
```
154
155
[State Change Writers](./writers.md)
156
157
### Upload Scheduling and Management
158
159
Upload scheduler interfaces and implementations for batching and coordinating state change uploads.
160
161
```java { .api }
162
@Internal
163
public interface StateChangeUploadScheduler extends AutoCloseable {
164
void upload(UploadTask uploadTask) throws IOException;
165
166
static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);
167
static StateChangeUploadScheduler fromConfig(
168
JobID jobID,
169
ReadableConfig config,
170
ChangelogStorageMetricGroup metricGroup,
171
TaskChangelogRegistry changelogRegistry,
172
LocalRecoveryConfig localRecoveryConfig
173
) throws IOException;
174
175
default AvailabilityProvider getAvailabilityProvider();
176
}
177
```
178
179
[Upload Scheduling and Management](./upload-scheduling.md)
180
181
### Registry and Tracking
182
183
TaskManager-side registry for tracking changelog segments and managing their lifecycle.
184
185
```java { .api }
186
@Internal
187
public interface TaskChangelogRegistry {
188
TaskChangelogRegistry NO_OP = new TaskChangelogRegistry() { /* no-op implementation */ };
189
190
void startTracking(StreamStateHandle handle, long refCount);
191
void stopTracking(StreamStateHandle handle);
192
void release(StreamStateHandle handle);
193
194
static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);
195
static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);
196
}
197
```
198
199
[Registry and Tracking](./registry.md)
200
201
## Types
202
203
### Configuration Options
204
205
```java { .api }
206
@Experimental
207
public class FsStateChangelogOptions {
208
public static final ConfigOption<String> BASE_PATH;
209
public static final ConfigOption<Boolean> COMPRESSION_ENABLED;
210
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;
211
public static final ConfigOption<Duration> PERSIST_DELAY;
212
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;
213
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;
214
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;
215
public static final ConfigOption<Integer> NUM_DISCARD_THREADS;
216
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;
217
public static final ConfigOption<String> RETRY_POLICY;
218
public static final ConfigOption<Duration> UPLOAD_TIMEOUT;
219
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
220
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;
221
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;
222
}
223
```
224
225
### Core Data Structures
226
227
```java { .api }
228
@ThreadSafe
229
@Internal
230
class StateChangeSet {
231
public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);
232
233
public UUID getLogId();
234
public SequenceNumber getSequenceNumber();
235
public List<StateChange> getChanges();
236
public long getSize();
237
}
238
239
@Internal
240
final class UploadResult {
241
public final StreamStateHandle streamStateHandle;
242
public final @Nullable StreamStateHandle localStreamHandle;
243
public final long offset;
244
public final long localOffset;
245
public final SequenceNumber sequenceNumber;
246
public final long size;
247
248
public UploadResult(
249
StreamStateHandle streamStateHandle,
250
long offset,
251
SequenceNumber sequenceNumber,
252
long size
253
);
254
255
public UploadResult(
256
StreamStateHandle streamStateHandle,
257
@Nullable StreamStateHandle localStreamHandle,
258
long offset,
259
long localOffset,
260
SequenceNumber sequenceNumber,
261
long size
262
);
263
264
public static UploadResult of(
265
StreamStateHandle streamStateHandle,
266
StreamStateHandle localStreamHandle,
267
StateChangeSet changeSet,
268
long offset,
269
long localOffset
270
);
271
272
public StreamStateHandle getStreamStateHandle();
273
public StreamStateHandle getLocalStreamHandleStateHandle();
274
public long getOffset();
275
public long getLocalOffset();
276
public SequenceNumber getSequenceNumber();
277
public long getSize();
278
}
279
```
280
281
### Upload Task Definition
282
283
```java { .api }
284
@ThreadSafe
285
final class UploadTask {
286
final Collection<StateChangeSet> changeSets;
287
final Consumer<List<UploadResult>> successCallback;
288
final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
289
290
public UploadTask(
291
Collection<StateChangeSet> changeSets,
292
Consumer<List<UploadResult>> successCallback,
293
BiConsumer<List<SequenceNumber>, Throwable> failureCallback
294
);
295
296
public void complete(List<UploadResult> results);
297
public void fail(Throwable error);
298
public long getSize();
299
public Collection<StateChangeSet> getChangeSets();
300
}
301
```
302
303
### Retry Policy Interface
304
305
```java { .api }
306
@Internal
307
public interface RetryPolicy {
308
RetryPolicy NONE = new NoRetryPolicy();
309
310
static RetryPolicy fromConfig(ReadableConfig config);
311
static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);
312
313
long timeoutFor(int attempt);
314
long retryAfter(int failedAttempt, Exception exception);
315
}
316
```