or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md

index.mddocs/

0

# Apache Flink DSTL

1

2

Apache Flink DSTL (Distributed State Timeline) provides a filesystem-based state changelog implementation for Flink's state management system. It enables durable storage of state changes that can be used for recovery and state reconstruction in distributed streaming environments.

3

4

## Package Information

5

6

- **Package Name**: flink-dstl

7

- **Package Type**: maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-dstl-dfs

10

- **Language**: Java

11

- **Installation**: Include in Maven dependencies with `org.apache.flink:flink-dstl:1.20.2`

12

13

## Core Imports

14

15

```java

16

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;

17

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

18

import org.apache.flink.changelog.fs.FsStateChangelogStorage;

19

import org.apache.flink.changelog.fs.FsStateChangelogWriter;

20

import org.apache.flink.changelog.fs.StateChangeUploadScheduler;

21

import org.apache.flink.changelog.fs.TaskChangelogRegistry;

22

import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;

23

import org.apache.flink.api.common.operators.MailboxExecutor;

24

import org.apache.flink.runtime.state.KeyGroupRange;

25

import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;

26

```

27

28

## Basic Usage

29

30

```java

31

import org.apache.flink.api.common.JobID;

32

import org.apache.flink.configuration.Configuration;

33

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;

34

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

35

36

// Configure the storage

37

Configuration config = new Configuration();

38

config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");

39

config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);

40

41

// Create storage factory

42

FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

43

44

// Create storage instance

45

StateChangelogStorage<?> storage = factory.createStorage(

46

jobID,

47

config,

48

metricGroup,

49

localRecoveryConfig

50

);

51

52

// Create writer for an operator

53

FsStateChangelogWriter writer = storage.createWriter(

54

operatorID,

55

keyGroupRange,

56

mailboxExecutor

57

);

58

```

59

60

## Architecture

61

62

The DSTL module is built around several key components:

63

64

- **Storage Factory**: `FsStateChangelogStorageFactory` creates storage instances with identifier "filesystem"

65

- **Storage Implementation**: `FsStateChangelogStorage` provides the main storage functionality

66

- **Configuration**: `FsStateChangelogOptions` defines all configuration parameters

67

- **Writers**: `FsStateChangelogWriter` handles writing state changes to filesystem

68

- **Upload System**: Pluggable upload schedulers and uploaders for persistence

69

- **Registry**: `TaskChangelogRegistry` tracks changelog segments on TaskManager side

70

71

## Capabilities

72

73

### Storage Factory and Configuration

74

75

Factory for creating filesystem-based changelog storage instances with comprehensive configuration options.

76

77

```java { .api }

78

@Internal

79

public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {

80

public static final String IDENTIFIER = "filesystem";

81

82

public String getIdentifier();

83

public StateChangelogStorage<?> createStorage(

84

JobID jobID,

85

Configuration configuration,

86

TaskManagerJobMetricGroup metricGroup,

87

LocalRecoveryConfig localRecoveryConfig

88

) throws IOException;

89

public StateChangelogStorageView<?> createStorageView(Configuration configuration);

90

public static void configure(

91

Configuration configuration,

92

File newFolder,

93

Duration uploadTimeout,

94

int maxUploadAttempts

95

);

96

}

97

```

98

99

[Storage Factory and Configuration](./storage-factory.md)

100

101

### Main Storage Implementation

102

103

Filesystem-based implementation of StateChangelogStorage with thread-safe operations and writer creation.

104

105

```java { .api }

106

@Experimental

107

@ThreadSafe

108

public class FsStateChangelogStorage

109

extends FsStateChangelogStorageForRecovery

110

implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {

111

112

public FsStateChangelogStorage(

113

JobID jobID,

114

Configuration config,

115

TaskManagerJobMetricGroup metricGroup,

116

LocalRecoveryConfig localRecoveryConfig

117

) throws IOException;

118

119

public FsStateChangelogWriter createWriter(

120

String operatorID,

121

KeyGroupRange keyGroupRange,

122

MailboxExecutor mailboxExecutor

123

);

124

125

public void close() throws Exception;

126

public AvailabilityProvider getAvailabilityProvider();

127

}

128

```

129

130

[Main Storage Implementation](./storage-implementation.md)

131

132

### State Change Writers

133

134

Writer implementation for persisting state changes to filesystem with batching and upload coordination.

135

136

```java { .api }

137

@NotThreadSafe

138

class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {

139

public void appendMeta(byte[] value) throws IOException;

140

public void append(int keyGroup, byte[] value) throws IOException;

141

public SequenceNumber initialSequenceNumber();

142

public SequenceNumber nextSequenceNumber();

143

public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(

144

SequenceNumber from,

145

long checkpointId

146

);

147

public void close() throws Exception;

148

public void truncate(SequenceNumber to);

149

public void truncateAndClose(SequenceNumber from);

150

public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);

151

public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);

152

}

153

```

154

155

[State Change Writers](./writers.md)

156

157

### Upload Scheduling and Management

158

159

Upload scheduler interfaces and implementations for batching and coordinating state change uploads.

160

161

```java { .api }

162

@Internal

163

public interface StateChangeUploadScheduler extends AutoCloseable {

164

void upload(UploadTask uploadTask) throws IOException;

165

166

static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);

167

static StateChangeUploadScheduler fromConfig(

168

JobID jobID,

169

ReadableConfig config,

170

ChangelogStorageMetricGroup metricGroup,

171

TaskChangelogRegistry changelogRegistry,

172

LocalRecoveryConfig localRecoveryConfig

173

) throws IOException;

174

175

default AvailabilityProvider getAvailabilityProvider();

176

}

177

```

178

179

[Upload Scheduling and Management](./upload-scheduling.md)

180

181

### Registry and Tracking

182

183

TaskManager-side registry for tracking changelog segments and managing their lifecycle.

184

185

```java { .api }

186

@Internal

187

public interface TaskChangelogRegistry {

188

TaskChangelogRegistry NO_OP = new TaskChangelogRegistry() { /* no-op implementation */ };

189

190

void startTracking(StreamStateHandle handle, long refCount);

191

void stopTracking(StreamStateHandle handle);

192

void release(StreamStateHandle handle);

193

194

static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);

195

static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);

196

}

197

```

198

199

[Registry and Tracking](./registry.md)

200

201

## Types

202

203

### Configuration Options

204

205

```java { .api }

206

@Experimental

207

public class FsStateChangelogOptions {

208

public static final ConfigOption<String> BASE_PATH;

209

public static final ConfigOption<Boolean> COMPRESSION_ENABLED;

210

public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;

211

public static final ConfigOption<Duration> PERSIST_DELAY;

212

public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;

213

public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;

214

public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;

215

public static final ConfigOption<Integer> NUM_DISCARD_THREADS;

216

public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;

217

public static final ConfigOption<String> RETRY_POLICY;

218

public static final ConfigOption<Duration> UPLOAD_TIMEOUT;

219

public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;

220

public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;

221

public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;

222

}

223

```

224

225

### Core Data Structures

226

227

```java { .api }

228

@ThreadSafe

229

@Internal

230

class StateChangeSet {

231

public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);

232

233

public UUID getLogId();

234

public SequenceNumber getSequenceNumber();

235

public List<StateChange> getChanges();

236

public long getSize();

237

}

238

239

@Internal

240

final class UploadResult {

241

public final StreamStateHandle streamStateHandle;

242

public final @Nullable StreamStateHandle localStreamHandle;

243

public final long offset;

244

public final long localOffset;

245

public final SequenceNumber sequenceNumber;

246

public final long size;

247

248

public UploadResult(

249

StreamStateHandle streamStateHandle,

250

long offset,

251

SequenceNumber sequenceNumber,

252

long size

253

);

254

255

public UploadResult(

256

StreamStateHandle streamStateHandle,

257

@Nullable StreamStateHandle localStreamHandle,

258

long offset,

259

long localOffset,

260

SequenceNumber sequenceNumber,

261

long size

262

);

263

264

public static UploadResult of(

265

StreamStateHandle streamStateHandle,

266

StreamStateHandle localStreamHandle,

267

StateChangeSet changeSet,

268

long offset,

269

long localOffset

270

);

271

272

public StreamStateHandle getStreamStateHandle();

273

public StreamStateHandle getLocalStreamHandleStateHandle();

274

public long getOffset();

275

public long getLocalOffset();

276

public SequenceNumber getSequenceNumber();

277

public long getSize();

278

}

279

```

280

281

### Upload Task Definition

282

283

```java { .api }

284

@ThreadSafe

285

final class UploadTask {

286

final Collection<StateChangeSet> changeSets;

287

final Consumer<List<UploadResult>> successCallback;

288

final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;

289

290

public UploadTask(

291

Collection<StateChangeSet> changeSets,

292

Consumer<List<UploadResult>> successCallback,

293

BiConsumer<List<SequenceNumber>, Throwable> failureCallback

294

);

295

296

public void complete(List<UploadResult> results);

297

public void fail(Throwable error);

298

public long getSize();

299

public Collection<StateChangeSet> getChangeSets();

300

}

301

```

302

303

### Retry Policy Interface

304

305

```java { .api }

306

@Internal

307

public interface RetryPolicy {

308

RetryPolicy NONE = new NoRetryPolicy();

309

310

static RetryPolicy fromConfig(ReadableConfig config);

311

static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);

312

313

long timeoutFor(int attempt);

314

long retryAfter(int failedAttempt, Exception exception);

315

}

316

```