or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md

storage-factory.mddocs/

0

# Storage Factory and Configuration

1

2

Factory for creating filesystem-based changelog storage instances with comprehensive configuration options. This is the main entry point for using the DSTL module.

3

4

## Capabilities

5

6

### FsStateChangelogStorageFactory

7

8

Factory class that implements the StateChangelogStorageFactory interface for creating filesystem-based changelog storage.

9

10

```java { .api }

11

/**

12

* Factory for creating FsStateChangelogStorage instances

13

*/

14

@Internal

15

public class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {

16

public static final String IDENTIFIER = "filesystem";

17

18

/**

19

* Returns the identifier for this storage factory

20

* @return "filesystem" identifier string

21

*/

22

public String getIdentifier();

23

24

/**

25

* Creates a new StateChangelogStorage instance

26

* @param jobID The job identifier

27

* @param configuration Configuration settings

28

* @param metricGroup Metric group for monitoring

29

* @param localRecoveryConfig Local recovery configuration

30

* @return StateChangelogStorage instance

31

* @throws IOException If storage creation fails

32

*/

33

public StateChangelogStorage<?> createStorage(

34

JobID jobID,

35

Configuration configuration,

36

TaskManagerJobMetricGroup metricGroup,

37

LocalRecoveryConfig localRecoveryConfig

38

) throws IOException;

39

40

/**

41

* Creates a storage view for recovery operations

42

* @param configuration Configuration settings

43

* @return StateChangelogStorageView instance for recovery

44

*/

45

public StateChangelogStorageView<?> createStorageView(Configuration configuration);

46

47

/**

48

* Helper method for programmatic configuration

49

* @param configuration Configuration object to modify

50

* @param newFolder Base folder for changelog storage

51

* @param uploadTimeout Timeout for upload operations

52

* @param maxUploadAttempts Maximum number of upload retry attempts

53

*/

54

public static void configure(

55

Configuration configuration,

56

File newFolder,

57

Duration uploadTimeout,

58

int maxUploadAttempts

59

);

60

}

61

```

62

63

**Usage Example:**

64

65

```java

66

import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;

67

import org.apache.flink.configuration.Configuration;

68

69

// Create factory

70

FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

71

72

// Verify identifier

73

String identifier = factory.getIdentifier(); // Returns "filesystem"

74

75

// Configure programmatically

76

Configuration config = new Configuration();

77

FsStateChangelogStorageFactory.configure(

78

config,

79

new File("/tmp/changelog"),

80

Duration.ofSeconds(30),

81

5

82

);

83

```

84

85

### Configuration Options

86

87

All configuration options for the filesystem-based changelog storage, defined as static constants in FsStateChangelogOptions.

88

89

```java { .api }

90

/**

91

* Configuration options for FsStateChangelogStorage

92

*/

93

@Experimental

94

public class FsStateChangelogOptions {

95

96

/**

97

* Base path to store changelog files. Required setting.

98

*/

99

public static final ConfigOption<String> BASE_PATH =

100

ConfigOptions.key("state.changelog.dstl.dfs.base-path")

101

.stringType()

102

.noDefaultValue()

103

.withDescription("Base path to store changelog files.");

104

105

/**

106

* Whether to enable compression when serializing changelog. Default: false.

107

*/

108

public static final ConfigOption<Boolean> COMPRESSION_ENABLED =

109

ConfigOptions.key("state.changelog.dstl.dfs.compression.enabled")

110

.booleanType()

111

.defaultValue(false)

112

.withDescription("Whether to enable compression when serializing changelog.");

113

114

/**

115

* Size threshold for preemptive persistence. Default: 5MB.

116

*/

117

public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD =

118

ConfigOptions.key("state.changelog.dstl.dfs.preemptive-persist-threshold")

119

.memoryType()

120

.defaultValue(MemorySize.parse("5MB"))

121

.withDescription("Size threshold for preemptive persistence.");

122

123

/**

124

* Delay before persisting changelog. Default: 10ms.

125

*/

126

public static final ConfigOption<Duration> PERSIST_DELAY =

127

ConfigOptions.key("state.changelog.dstl.dfs.persist-delay")

128

.durationType()

129

.defaultValue(Duration.ofMillis(10))

130

.withDescription("Delay before persisting changelog.");

131

132

/**

133

* Size threshold for batch persistence. Default: 10MB.

134

*/

135

public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD =

136

ConfigOptions.key("state.changelog.dstl.dfs.persist-size-threshold")

137

.memoryType()

138

.defaultValue(MemorySize.parse("10MB"))

139

.withDescription("Size threshold for batch persistence.");

140

141

/**

142

* Buffer size for uploads. Default: 1MB.

143

*/

144

public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =

145

ConfigOptions.key("state.changelog.dstl.dfs.upload-buffer-size")

146

.memoryType()

147

.defaultValue(MemorySize.parse("1MB"))

148

.withDescription("Buffer size for uploads.");

149

150

/**

151

* Number of upload threads. Default: 5.

152

*/

153

public static final ConfigOption<Integer> NUM_UPLOAD_THREADS =

154

ConfigOptions.key("state.changelog.dstl.dfs.num-upload-threads")

155

.intType()

156

.defaultValue(5)

157

.withDescription("Number of upload threads.");

158

159

/**

160

* Number of discard threads. Default: 1.

161

*/

162

public static final ConfigOption<Integer> NUM_DISCARD_THREADS =

163

ConfigOptions.key("state.changelog.dstl.dfs.num-discard-threads")

164

.intType()

165

.defaultValue(1)

166

.withDescription("Number of discard threads.");

167

168

/**

169

* Maximum in-flight data. Default: 100MB.

170

*/

171

public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT =

172

ConfigOptions.key("state.changelog.dstl.dfs.in-flight-data-limit")

173

.memoryType()

174

.defaultValue(MemorySize.parse("100MB"))

175

.withDescription("Maximum in-flight data.");

176

177

/**

178

* Retry policy for uploads. Default: "fixed".

179

*/

180

public static final ConfigOption<String> RETRY_POLICY =

181

ConfigOptions.key("state.changelog.dstl.dfs.retry-policy")

182

.stringType()

183

.defaultValue("fixed")

184

.withDescription("Retry policy for uploads.");

185

186

/**

187

* Upload timeout. Default: 1s.

188

*/

189

public static final ConfigOption<Duration> UPLOAD_TIMEOUT =

190

ConfigOptions.key("state.changelog.dstl.dfs.upload-timeout")

191

.durationType()

192

.defaultValue(Duration.ofSeconds(1))

193

.withDescription("Upload timeout.");

194

195

/**

196

* Maximum retry attempts. Default: 3.

197

*/

198

public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =

199

ConfigOptions.key("state.changelog.dstl.dfs.retry-max-attempts")

200

.intType()

201

.defaultValue(3)

202

.withDescription("Maximum retry attempts.");

203

204

/**

205

* Delay between retries. Default: 500ms.

206

*/

207

public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE =

208

ConfigOptions.key("state.changelog.dstl.dfs.retry-delay-after-failure")

209

.durationType()

210

.defaultValue(Duration.ofMillis(500))

211

.withDescription("Delay between retries.");

212

213

/**

214

* Cache file idle timeout. Default: 10min.

215

*/

216

public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =

217

ConfigOptions.key("state.changelog.dstl.dfs.cache-idle-timeout")

218

.durationType()

219

.defaultValue(Duration.ofMinutes(10))

220

.withDescription("Cache file idle timeout.");

221

}

222

```

223

224

**Configuration Usage Examples:**

225

226

```java

227

import org.apache.flink.configuration.Configuration;

228

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

229

230

Configuration config = new Configuration();

231

232

// Required: Set base path

233

config.set(FsStateChangelogOptions.BASE_PATH, "/path/to/changelog/storage");

234

235

// Optional: Enable compression

236

config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);

237

238

// Optional: Adjust thresholds

239

config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("10MB"));

240

config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));

241

242

// Optional: Configure upload behavior

243

config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);

244

config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));

245

config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);

246

247

// Use configuration with factory

248

FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

249

StateChangelogStorage<?> storage = factory.createStorage(

250

jobID, config, metricGroup, localRecoveryConfig

251

);

252

```

253

254

### Recovery Storage View

255

256

Storage view implementation for recovery-only operations, used when reading existing changelog data.

257

258

```java { .api }

259

/**

260

* Recovery-only implementation of changelog storage

261

*/

262

@Experimental

263

@ThreadSafe

264

public class FsStateChangelogStorageForRecovery

265

implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {

266

267

/**

268

* Creates a reader for changelog handles

269

* @return StateChangelogHandleReader for reading changelog data

270

*/

271

public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();

272

273

/**

274

* Closes the storage view and releases resources

275

* @throws Exception If closing fails

276

*/

277

public void close() throws Exception;

278

}

279

```

280

281

**Recovery Usage Example:**

282

283

```java

284

// Create storage view for recovery

285

FsStateChangelogStorageFactory factory = new FsStateChangelogStorageFactory();

286

StateChangelogStorageView<?> storageView = factory.createStorageView(config);

287

288

// Create reader for recovery

289

StateChangelogHandleReader<?> reader = storageView.createReader();

290

291

// Use reader to iterate through state changes

292

// (reader usage depends on specific changelog handles)

293

```