or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md

storage-implementation.mddocs/

0

# Main Storage Implementation

1

2

Filesystem-based implementation of StateChangelogStorage with thread-safe operations, writer creation, and availability tracking. This is the core storage component that coordinates state change persistence.

3

4

## Capabilities

5

6

### FsStateChangelogStorage

7

8

Main filesystem-based implementation that extends the recovery storage and adds writer creation capabilities.

9

10

```java { .api }

11

/**

12

* Filesystem-based implementation of StateChangelogStorage

13

*/

14

@Experimental

15

@ThreadSafe

16

public class FsStateChangelogStorage

17

extends FsStateChangelogStorageForRecovery

18

implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {

19

20

/**

21

* Creates storage with default changelog registry

22

* @param jobID The job identifier

23

* @param config Configuration settings

24

* @param metricGroup Metric group for monitoring

25

* @param localRecoveryConfig Local recovery configuration

26

* @throws IOException If storage initialization fails

27

*/

28

public FsStateChangelogStorage(

29

JobID jobID,

30

Configuration config,

31

TaskManagerJobMetricGroup metricGroup,

32

LocalRecoveryConfig localRecoveryConfig

33

) throws IOException;

34

35

/**

36

* Creates storage with custom changelog registry

37

* @param jobID The job identifier

38

* @param config Configuration settings

39

* @param metricGroup Metric group for monitoring

40

* @param changelogRegistry Custom changelog registry

41

* @param localRecoveryConfig Local recovery configuration

42

* @throws IOException If storage initialization fails

43

*/

44

public FsStateChangelogStorage(

45

JobID jobID,

46

Configuration config,

47

TaskManagerJobMetricGroup metricGroup,

48

TaskChangelogRegistry changelogRegistry,

49

LocalRecoveryConfig localRecoveryConfig

50

) throws IOException;

51

52

/**

53

* Creates a writer for state changes for a specific operator

54

* @param operatorID Identifier of the operator

55

* @param keyGroupRange Range of key groups handled by this writer

56

* @param mailboxExecutor Executor for asynchronous operations

57

* @return FsStateChangelogWriter instance

58

*/

59

public FsStateChangelogWriter createWriter(

60

String operatorID,

61

KeyGroupRange keyGroupRange,

62

MailboxExecutor mailboxExecutor

63

);

64

65

/**

66

* Closes the storage and releases all resources

67

* @throws Exception If closing fails

68

*/

69

public void close() throws Exception;

70

71

/**

72

* Returns availability provider for backpressure handling

73

* @return AvailabilityProvider instance

74

*/

75

public AvailabilityProvider getAvailabilityProvider();

76

}

77

```

78

79

**Usage Examples:**

80

81

```java

82

import org.apache.flink.changelog.fs.FsStateChangelogStorage;

83

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

84

import org.apache.flink.api.common.JobID;

85

import org.apache.flink.configuration.Configuration;

86

87

// Basic storage creation

88

Configuration config = new Configuration();

89

config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog");

90

91

FsStateChangelogStorage storage = new FsStateChangelogStorage(

92

new JobID(),

93

config,

94

metricGroup,

95

localRecoveryConfig

96

);

97

98

// Create writer for an operator

99

FsStateChangelogWriter writer = storage.createWriter(

100

"my-operator-id",

101

KeyGroupRange.of(0, 127),

102

mailboxExecutor

103

);

104

105

// Use writer to persist state changes

106

writer.append(5, stateChangeData);

107

CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> result =

108

writer.persist(sequenceNumber, checkpointId);

109

110

// Clean up

111

writer.close();

112

storage.close();

113

```

114

115

### Testing Constructor

116

117

Constructor provided for testing scenarios with direct uploader configuration.

118

119

```java { .api }

120

/**

121

* Testing constructor with direct uploader configuration

122

* @param jobID The job identifier

123

* @param basePath Base path for changelog storage

124

* @param compression Whether to enable compression

125

* @param bufferSize Buffer size for operations

126

* @param metricGroup Metric group for monitoring

127

* @param changelogRegistry Changelog registry for tracking

128

* @param localRecoveryConfig Local recovery configuration

129

* @throws IOException If storage initialization fails

130

*/

131

@VisibleForTesting

132

public FsStateChangelogStorage(

133

JobID jobID,

134

Path basePath,

135

boolean compression,

136

int bufferSize,

137

ChangelogStorageMetricGroup metricGroup,

138

TaskChangelogRegistry changelogRegistry,

139

LocalRecoveryConfig localRecoveryConfig

140

) throws IOException;

141

```

142

143

### Low-Level Constructor

144

145

Low-level constructor for advanced testing scenarios with custom upload scheduler.

146

147

```java { .api }

148

/**

149

* Low-level constructor with custom upload scheduler

150

* @param uploader Custom state change upload scheduler

151

* @param preEmptivePersistThresholdInBytes Threshold for preemptive persistence

152

* @param changelogRegistry Changelog registry for tracking

153

* @param localRecoveryConfig Local recovery configuration

154

*/

155

@VisibleForTesting

156

public FsStateChangelogStorage(

157

StateChangeUploadScheduler uploader,

158

long preEmptivePersistThresholdInBytes,

159

TaskChangelogRegistry changelogRegistry,

160

LocalRecoveryConfig localRecoveryConfig

161

);

162

```

163

164

**Testing Usage Example:**

165

166

```java

167

// Testing with custom parameters

168

ChangelogStorageMetricGroup testMetricGroup = new ChangelogStorageMetricGroup(metricGroup);

169

TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(1);

170

171

FsStateChangelogStorage testStorage = new FsStateChangelogStorage(

172

new JobID(),

173

new Path("file:///tmp/test-changelog"),

174

true, // compression enabled

175

1024 * 1024, // 1MB buffer

176

testMetricGroup,

177

testRegistry,

178

localRecoveryConfig

179

);

180

```

181

182

### Availability and Backpressure Handling

183

184

The storage provides availability information for backpressure handling in streaming applications.

185

186

```java { .api }

187

/**

188

* Returns availability provider for coordinating backpressure

189

* @return AvailabilityProvider that signals when storage is available for writes

190

*/

191

public AvailabilityProvider getAvailabilityProvider();

192

```

193

194

**Availability Usage Example:**

195

196

```java

197

FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);

198

199

// Check if storage is available for writes

200

AvailabilityProvider availability = storage.getAvailabilityProvider();

201

202

// Wait for availability if needed

203

CompletableFuture<?> availabilityFuture = availability.getAvailabilityFuture();

204

availabilityFuture.thenRun(() -> {

205

// Storage is now available for writes

206

FsStateChangelogWriter writer = storage.createWriter(operatorID, keyGroupRange, mailboxExecutor);

207

// Proceed with writing operations

208

});

209

```

210

211

### Local Recovery Integration

212

213

The storage integrates with Flink's local recovery mechanism for improved fault tolerance.

214

215

**Local Recovery Configuration Example:**

216

217

```java

218

// Configure local recovery

219

LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(

220

true, // enable local recovery

221

new LocalRecoveryDirectoryProvider() {

222

@Override

223

public File allocationBaseDirForCheckpoint(long checkpointId) {

224

return new File("/local/recovery/checkpoint-" + checkpointId);

225

}

226

227

@Override

228

public File subtaskSpecificCheckpointDirectory(long checkpointId, AllocationID allocationID, JobID jobID, int subtaskIndex) {

229

return new File("/local/recovery/checkpoint-" + checkpointId + "/task-" + subtaskIndex);

230

}

231

}

232

);

233

234

// Create storage with local recovery enabled

235

FsStateChangelogStorage storage = new FsStateChangelogStorage(

236

jobID, config, metricGroup, localRecoveryConfig

237

);

238

```

239

240

### Error Handling

241

242

The storage handles various error conditions and provides appropriate exception handling.

243

244

**Common Error Scenarios:**

245

246

```java

247

try {

248

FsStateChangelogStorage storage = new FsStateChangelogStorage(

249

jobID, config, metricGroup, localRecoveryConfig

250

);

251

} catch (IOException e) {

252

// Handle storage initialization failure

253

// Common causes: invalid base path, permissions, filesystem issues

254

log.error("Failed to initialize changelog storage", e);

255

}

256

257

try {

258

storage.close();

259

} catch (Exception e) {

260

// Handle cleanup failure

261

// May include upload scheduler shutdown, resource cleanup

262

log.warn("Error during storage cleanup", e);

263

}

264

```