or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

index.mddocs/

0

# Flink DSTL DFS

1

2

Flink DSTL DFS provides a distributed file system-based implementation of changelog storage for Apache Flink's streaming state backend. It enables efficient state change tracking and recovery by persisting state modifications to distributed file systems like HDFS or S3. The library supports fault-tolerance guarantees by providing reliable state change persistence with built-in metrics, configurable upload scheduling, and preemptive persistence optimizations.

3

4

## Package Information

5

6

- **Package Name**: flink-dstl-dfs

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-dstl-dfs

11

- **Installation**: Add to Maven dependencies:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-dstl-dfs</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;

25

import org.apache.flink.changelog.fs.FsStateChangelogStorage;

26

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

27

```

28

29

## Basic Usage

30

31

```java

32

import org.apache.flink.api.common.JobID;

33

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;

34

import org.apache.flink.configuration.Configuration;

35

import org.apache.flink.runtime.state.LocalRecoveryConfig;

36

import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;

37

38

// Configure filesystem changelog storage

39

Configuration config = new Configuration();

40

FsStateChangelogStorageFactory.configure(

41

config,

42

new File("/path/to/changelog/storage"),

43

Duration.ofSeconds(10), // upload timeout

44

3 // max retry attempts

45

);

46

47

// Create storage factory

48

FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

49

50

// Create storage instance

51

FsStateChangelogStorage storage = (FsStateChangelogStorage) factory.createStorage(

52

new JobID(),

53

config,

54

taskManagerJobMetricGroup,

55

localRecoveryConfig

56

);

57

58

// Create changelog writer for an operator

59

FsStateChangelogWriter writer = storage.createWriter(

60

"operator-id",

61

KeyGroupRange.of(0, 127),

62

mailboxExecutor

63

);

64

65

// Append state changes

66

writer.append(0, stateChangeBytes);

67

writer.appendMeta(metadataBytes);

68

69

// Persist changes during checkpoint

70

CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result =

71

writer.persist(sequenceNumber, checkpointId);

72

```

73

74

## Architecture

75

76

The Flink DSTL DFS library is organized around several key components:

77

78

- **Storage Factory**: `FsStateChangelogStorageFactory` creates storage instances and provides service loader integration

79

- **Storage Implementation**: `FsStateChangelogStorage` manages changelog writers and upload scheduling

80

- **Writers**: `FsStateChangelogWriter` handles state change appending and persistence operations

81

- **Upload System**: Configurable upload schedulers handle batching and asynchronous persistence to distributed file systems

82

- **Recovery System**: `FsStateChangelogStorageForRecovery` provides read-only access for checkpoint recovery

83

- **Configuration**: Comprehensive options for tuning performance, retry policies, and storage behavior

84

85

## Capabilities

86

87

### Storage Factory and Configuration

88

89

Factory for creating filesystem-based changelog storage instances with comprehensive configuration support for distributed file systems.

90

91

```java { .api }

92

public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {

93

public static final String IDENTIFIER = "filesystem";

94

95

public String getIdentifier();

96

public StateChangelogStorage<?> createStorage(

97

JobID jobID,

98

Configuration configuration,

99

TaskManagerJobMetricGroup metricGroup,

100

LocalRecoveryConfig localRecoveryConfig

101

) throws IOException;

102

public StateChangelogStorageView<?> createStorageView(Configuration configuration);

103

public static void configure(

104

Configuration configuration,

105

File newFolder,

106

Duration uploadTimeout,

107

int maxUploadAttempts

108

);

109

}

110

```

111

112

[Storage Factory and Configuration](./storage-factory.md)

113

114

### Main Storage Implementation

115

116

Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications.

117

118

```java { .api }

119

public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery

120

implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {

121

122

public FsStateChangelogWriter createWriter(

123

String operatorID,

124

KeyGroupRange keyGroupRange,

125

MailboxExecutor mailboxExecutor

126

);

127

public void close() throws Exception;

128

public AvailabilityProvider getAvailabilityProvider();

129

}

130

```

131

132

[Main Storage Implementation](./storage-implementation.md)

133

134

### Changelog Writers

135

136

Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination.

137

138

```java { .api }

139

interface StateChangelogWriter<T> {

140

void append(int keyGroup, byte[] value) throws IOException;

141

void appendMeta(byte[] value) throws IOException;

142

SequenceNumber nextSequenceNumber();

143

CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId) throws IOException;

144

void truncate(SequenceNumber to);

145

void truncateAndClose(SequenceNumber from);

146

void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);

147

void reset(SequenceNumber from, SequenceNumber to, long checkpointId);

148

void close();

149

}

150

```

151

152

[Changelog Writers](./changelog-writers.md)

153

154

### Configuration Options

155

156

Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization.

157

158

```java { .api }

159

public class FsStateChangelogOptions {

160

public static final ConfigOption<String> BASE_PATH;

161

public static final ConfigOption<Boolean> COMPRESSION_ENABLED;

162

public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;

163

public static final ConfigOption<Duration> PERSIST_DELAY;

164

public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;

165

public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;

166

public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;

167

public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;

168

public static final ConfigOption<String> RETRY_POLICY;

169

public static final ConfigOption<Duration> UPLOAD_TIMEOUT;

170

public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;

171

}

172

```

173

174

[Configuration Options](./configuration-options.md)

175

176

### Upload Scheduling and Management

177

178

Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations.

179

180

```java { .api }

181

public interface StateChangeUploadScheduler extends AutoCloseable {

182

void upload(UploadTask task);

183

AvailabilityProvider getAvailabilityProvider();

184

void close() throws Exception;

185

186

static StateChangeUploadScheduler fromConfig(

187

JobID jobID,

188

Configuration config,

189

ChangelogStorageMetricGroup metricGroup,

190

TaskChangelogRegistry changelogRegistry,

191

LocalRecoveryConfig localRecoveryConfig

192

) throws IOException;

193

}

194

195

public interface StateChangeUploader extends AutoCloseable {

196

UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;

197

}

198

```

199

200

[Upload System](./upload-system.md)

201

202

### Recovery and State Management

203

204

Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles.

205

206

```java { .api }

207

public class FsStateChangelogStorageForRecovery

208

implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {

209

210

public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();

211

public void close() throws Exception;

212

}

213

214

public interface TaskChangelogRegistry {

215

void startTracking(StreamStateHandle handle, long refCount);

216

void stopTracking(StreamStateHandle handle);

217

void release(StreamStateHandle handle);

218

}

219

```

220

221

[Recovery and State Management](./recovery-system.md)

222

223

### Metrics and Monitoring

224

225

Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments.

226

227

```java { .api }

228

public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {

229

// Provides counters for uploads, failures, batch sizes, latencies, and retry attempts

230

}

231

```

232

233

[Metrics and Monitoring](./metrics-monitoring.md)

234

235

## Types

236

237

```java { .api }

238

public final class UploadResult {

239

public final StreamStateHandle streamStateHandle;

240

public final StreamStateHandle localStreamHandle;

241

public final long offset;

242

public final long localOffset;

243

public final SequenceNumber sequenceNumber;

244

public final long size;

245

246

public UploadResult(StreamStateHandle streamStateHandle, long offset,

247

SequenceNumber sequenceNumber, long size);

248

public StreamStateHandle getStreamStateHandle();

249

public long getOffset();

250

public SequenceNumber getSequenceNumber();

251

public long getSize();

252

}

253

254

public class StateChangeSet {

255

public StateChangeSet(UUID logId, SequenceNumber sequenceNumber, List<StateChange> changes);

256

public UUID getLogId();

257

public List<StateChange> getChanges();

258

public SequenceNumber getSequenceNumber();

259

public long getSize();

260

}

261

262

public interface RetryPolicy {

263

long timeoutFor(int attempt);

264

long retryAfter(int failedAttempt, Exception exception);

265

266

static RetryPolicy fromConfig(ReadableConfig config);

267

static RetryPolicy fixed(int maxAttempts, long timeout, long delayAfterFailure);

268

RetryPolicy NONE = /* ... */;

269

}

270

```