or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

storage-implementation.mddocs/

0

# Main Storage Implementation

1

2

Core filesystem-based implementation providing changelog writers and managing upload operations for high-throughput streaming applications. The storage implementation coordinates between writers, upload schedulers, and recovery components.

3

4

## Capabilities

5

6

### FsStateChangelogStorage

7

8

Main storage implementation that manages changelog writers and upload operations for active streaming jobs.

9

10

```java { .api }

11

/**

12

* Filesystem-based implementation of StateChangelogStorage for write operations.

13

* Thread-safe and manages multiple changelog writers for different operators.

14

*/

15

public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery

16

implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {

17

18

/**

19

* Creates a new changelog writer for a specific operator and key group range

20

* @param operatorID Unique identifier for the operator

21

* @param keyGroupRange Key group range this writer handles

22

* @param mailboxExecutor Executor for callback processing

23

* @return FsStateChangelogWriter instance for the operator

24

*/

25

public FsStateChangelogWriter createWriter(

26

String operatorID,

27

KeyGroupRange keyGroupRange,

28

MailboxExecutor mailboxExecutor

29

);

30

31

/**

32

* Closes the storage and all associated resources

33

* @throws Exception if cleanup fails

34

*/

35

public void close() throws Exception;

36

37

/**

38

* Returns availability provider for backpressure control

39

* @return AvailabilityProvider indicating when storage can accept more data

40

*/

41

public AvailabilityProvider getAvailabilityProvider();

42

}

43

```

44

45

### Constructors

46

47

Multiple constructor variants support different initialization scenarios:

48

49

```java { .api }

50

/**

51

* Main constructor for production use

52

*/

53

public FsStateChangelogStorage(

54

JobID jobID,

55

Configuration config,

56

TaskManagerJobMetricGroup metricGroup,

57

LocalRecoveryConfig localRecoveryConfig

58

) throws IOException;

59

60

/**

61

* Constructor with custom changelog registry

62

*/

63

public FsStateChangelogStorage(

64

JobID jobID,

65

Configuration config,

66

TaskManagerJobMetricGroup metricGroup,

67

TaskChangelogRegistry changelogRegistry,

68

LocalRecoveryConfig localRecoveryConfig

69

) throws IOException;

70

71

/**

72

* Testing constructor with direct parameters

73

*/

74

public FsStateChangelogStorage(

75

JobID jobID,

76

Path basePath,

77

boolean compression,

78

int bufferSize,

79

ChangelogStorageMetricGroup metricGroup,

80

TaskChangelogRegistry changelogRegistry,

81

LocalRecoveryConfig localRecoveryConfig

82

) throws IOException;

83

84

/**

85

* Advanced constructor with custom upload scheduler

86

*/

87

public FsStateChangelogStorage(

88

StateChangeUploadScheduler uploader,

89

long preEmptivePersistThresholdInBytes,

90

TaskChangelogRegistry changelogRegistry,

91

LocalRecoveryConfig localRecoveryConfig

92

);

93

```

94

95

**Usage Examples:**

96

97

```java

98

import org.apache.flink.changelog.fs.FsStateChangelogStorage;

99

import org.apache.flink.runtime.state.KeyGroupRange;

100

101

// Create storage instance (typically done by factory)

102

FsStateChangelogStorage storage = new FsStateChangelogStorage(

103

jobId, config, metricGroup, localRecoveryConfig

104

);

105

106

// Create writers for different operators

107

FsStateChangelogWriter operatorWriter1 = storage.createWriter(

108

"map-operator",

109

KeyGroupRange.of(0, 63),

110

mailboxExecutor

111

);

112

113

FsStateChangelogWriter operatorWriter2 = storage.createWriter(

114

"filter-operator",

115

KeyGroupRange.of(64, 127),

116

mailboxExecutor

117

);

118

119

// Check if storage can accept more data

120

AvailabilityProvider availability = storage.getAvailabilityProvider();

121

if (availability.isAvailable()) {

122

// Safe to write more data

123

operatorWriter1.append(keyGroup, stateChangeBytes);

124

}

125

126

// Cleanup when done

127

storage.close();

128

```

129

130

### Integration with Upload System

131

132

The storage implementation integrates with the upload scheduling system:

133

134

```java { .api }

135

/**

136

* Internal components managed by FsStateChangelogStorage

137

*/

138

class InternalComponents {

139

private final StateChangeUploadScheduler uploader;

140

private final long preEmptivePersistThresholdInBytes;

141

private final TaskChangelogRegistry changelogRegistry;

142

private final AtomicInteger logIdGenerator;

143

private final LocalChangelogRegistry localChangelogRegistry;

144

}

145

```

146

147

The storage automatically:

148

- Creates upload schedulers based on configuration

149

- Manages unique log IDs for different writers

150

- Coordinates with the changelog registry for state lifecycle management

151

- Handles local recovery integration when enabled

152

153

### Local Recovery Support

154

155

When local recovery is enabled, the storage manages local changelog registries:

156

157

```java

158

// Local recovery configuration

159

LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.enabled(localStateDirectory);

160

161

FsStateChangelogStorage storage = new FsStateChangelogStorage(

162

jobId, config, metricGroup, localRecoveryConfig

163

);

164

165

// Storage automatically creates LocalChangelogRegistryImpl when enabled

166

// Handles both remote and local persistence of changelog data

167

```

168

169

### Backpressure and Flow Control

170

171

The storage provides backpressure mechanisms through availability providers:

172

173

```java

174

import org.apache.flink.runtime.io.AvailabilityProvider;

175

176

// Monitor storage availability

177

AvailabilityProvider availability = storage.getAvailabilityProvider();

178

179

// Use in async context

180

availability.getAvailabilityFuture().thenRun(() -> {

181

// Storage is available, safe to continue writing

182

writer.append(keyGroup, data);

183

});

184

```

185

186

### Error Handling and Lifecycle

187

188

The storage handles various error conditions and lifecycle events:

189

190

```java

191

try {

192

FsStateChangelogStorage storage = new FsStateChangelogStorage(

193

jobId, config, metricGroup, localRecoveryConfig

194

);

195

196

// Use storage...

197

198

} catch (IOException e) {

199

// Handle initialization or operation errors

200

log.error("Storage operation failed", e);

201

} finally {

202

// Always close to cleanup resources

203

if (storage != null) {

204

storage.close();

205

}

206

}

207

```

208

209

The storage ensures:

210

- Proper cleanup of upload threads and resources

211

- Graceful handling of filesystem failures

212

- Coordination with Flink's checkpoint lifecycle

213

- Thread-safe operations across multiple writers