or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

configuration-options.mddocs/

0

# Configuration Options

1

2

Comprehensive configuration options for performance tuning, retry policies, and storage behavior optimization. All configuration options use Flink's ConfigOption framework with support for deprecated keys and environment variable overrides.

3

4

## Capabilities

5

6

### FsStateChangelogOptions

7

8

Configuration options class containing all settings for filesystem-based changelog storage.

9

10

```java { .api }

11

/**

12

* Configuration options for FsStateChangelogStorage.

13

* All options use the "state.changelog.dstl.dfs" prefix.

14

*/

15

public class FsStateChangelogOptions {

16

17

/** Base path for storing changelog files */

18

public static final ConfigOption<String> BASE_PATH;

19

20

/** Enable/disable compression for changelog serialization */

21

public static final ConfigOption<Boolean> COMPRESSION_ENABLED;

22

23

/** Size threshold for preemptive persistence */

24

public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;

25

26

/** Delay before persisting changelog after checkpoint trigger */

27

public static final ConfigOption<Duration> PERSIST_DELAY;

28

29

/** Size threshold for batched persistence operations */

30

public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;

31

32

/** Buffer size for upload operations */

33

public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;

34

35

/** Number of threads for upload operations */

36

public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;

37

38

/** Number of threads for discarding unused changelog data */

39

public static final ConfigOption<Integer> NUM_DISCARD_THREADS;

40

41

/** Maximum amount of data allowed to be in-flight */

42

public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;

43

44

/** Retry policy for failed uploads */

45

public static final ConfigOption<String> RETRY_POLICY;

46

47

/** Timeout for individual upload operations */

48

public static final ConfigOption<Duration> UPLOAD_TIMEOUT;

49

50

/** Maximum number of retry attempts */

51

public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;

52

53

/** Delay before next retry attempt after failure */

54

public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;

55

56

/** Cache idle timeout for recovery operations */

57

public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;

58

}

59

```

60

61

### Storage Path Configuration

62

63

Configure the base path where changelog files are stored:

64

65

```java { .api }

66

/**

67

* Base path to store changelog files

68

* Key: "state.changelog.dstl.dfs.base-path"

69

* Deprecated key: "dstl.dfs.base-path"

70

* Type: String

71

* Required: Yes

72

*/

73

public static final ConfigOption<String> BASE_PATH =

74

ConfigOptions.key("state.changelog.dstl.dfs.base-path")

75

.stringType()

76

.noDefaultValue()

77

.withDeprecatedKeys("dstl.dfs.base-path")

78

.withDescription("Base path to store changelog files.");

79

```

80

81

**Usage Examples:**

82

83

```java

84

Configuration config = new Configuration();

85

86

// HDFS path

87

config.set(FsStateChangelogOptions.BASE_PATH, "hdfs://namenode:8020/flink/changelog");

88

89

// S3 path

90

config.set(FsStateChangelogOptions.BASE_PATH, "s3://my-bucket/changelog");

91

92

// Local filesystem (for testing)

93

config.set(FsStateChangelogOptions.BASE_PATH, "file:///tmp/changelog");

94

```

95

96

### Performance Tuning Options

97

98

Configure performance-related settings for optimal throughput:

99

100

```java { .api }

101

/**

102

* Enable compression when serializing changelog

103

* Key: "state.changelog.dstl.dfs.compression.enabled"

104

* Default: false

105

*/

106

public static final ConfigOption<Boolean> COMPRESSION_ENABLED;

107

108

/**

109

* Size threshold for preemptive persistence

110

* Key: "state.changelog.dstl.dfs.preemptive-persist-threshold"

111

* Default: 5MB

112

*/

113

public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD;

114

115

/**

116

* Buffer size used when uploading change sets

117

* Key: "state.changelog.dstl.dfs.upload.buffer-size"

118

* Default: 1MB

119

*/

120

public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE;

121

122

/**

123

* Number of threads to use for upload operations

124

* Key: "state.changelog.dstl.dfs.upload.num-threads"

125

* Default: 5

126

*/

127

public static final ConfigOption<Integer> NUM_UPLOAD_THREADS;

128

```

129

130

**Usage Examples:**

131

132

```java

133

Configuration config = new Configuration();

134

135

// Enable compression for better storage efficiency

136

config.set(FsStateChangelogOptions.COMPRESSION_ENABLED, true);

137

138

// Reduce preemptive threshold for faster checkpoints

139

config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("2MB"));

140

141

// Increase buffer size for high-throughput workloads

142

config.set(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE, MemorySize.parse("4MB"));

143

144

// Increase upload threads for better parallelism

145

config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);

146

```

147

148

### Batching and Flow Control

149

150

Configure batching behavior and backpressure settings:

151

152

```java { .api }

153

/**

154

* Delay before persisting changelog after receiving persist request

155

* Key: "state.changelog.dstl.dfs.batch.persist-delay"

156

* Default: 10ms

157

*/

158

public static final ConfigOption<Duration> PERSIST_DELAY;

159

160

/**

161

* Size threshold for accumulated changes waiting for persist delay

162

* Key: "state.changelog.dstl.dfs.batch.persist-size-threshold"

163

* Default: 10MB

164

*/

165

public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD;

166

167

/**

168

* Maximum amount of data allowed to be in-flight

169

* Key: "state.changelog.dstl.dfs.upload.max-in-flight"

170

* Default: 100MB

171

*/

172

public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT;

173

```

174

175

**Usage Examples:**

176

177

```java

178

// Configure batching for better efficiency

179

config.set(FsStateChangelogOptions.PERSIST_DELAY, Duration.ofMillis(50));

180

config.set(FsStateChangelogOptions.PERSIST_SIZE_THRESHOLD, MemorySize.parse("20MB"));

181

182

// Configure backpressure limit

183

config.set(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT, MemorySize.parse("200MB"));

184

```

185

186

### Retry Policy Configuration

187

188

Configure retry behavior for failed upload operations:

189

190

```java { .api }

191

/**

192

* Retry policy for failed uploads

193

* Key: "state.changelog.dstl.dfs.upload.retry-policy"

194

* Default: "fixed"

195

* Valid values: "none", "fixed"

196

*/

197

public static final ConfigOption<String> RETRY_POLICY;

198

199

/**

200

* Upload timeout duration

201

* Key: "state.changelog.dstl.dfs.upload.timeout"

202

* Default: 1 second

203

*/

204

public static final ConfigOption<Duration> UPLOAD_TIMEOUT;

205

206

/**

207

* Maximum number of retry attempts

208

* Key: "state.changelog.dstl.dfs.upload.max-attempts"

209

* Default: 3

210

*/

211

public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;

212

213

/**

214

* Delay before next retry attempt after failure

215

* Key: "state.changelog.dstl.dfs.upload.next-attempt-delay"

216

* Default: 500ms

217

*/

218

public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE;

219

```

220

221

**Usage Examples:**

222

223

```java

224

// Configure aggressive retry policy

225

config.set(FsStateChangelogOptions.RETRY_POLICY, "fixed");

226

config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(5));

227

config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);

228

config.set(FsStateChangelogOptions.RETRY_DELAY_AFTER_FAILURE, Duration.ofSeconds(1));

229

230

// Disable retries for fast-fail behavior

231

config.set(FsStateChangelogOptions.RETRY_POLICY, "none");

232

```

233

234

### Recovery and Cleanup Options

235

236

Configure recovery and cleanup behavior:

237

238

```java { .api }

239

/**

240

* Number of threads for discarding unused changelog data

241

* Key: "state.changelog.dstl.dfs.discard.num-threads"

242

* Default: 1

243

*/

244

public static final ConfigOption<Integer> NUM_DISCARD_THREADS;

245

246

/**

247

* Cache idle timeout for recovery operations

248

* Key: "state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms"

249

* Default: 10 minutes

250

*/

251

public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT;

252

```

253

254

**Usage Examples:**

255

256

```java

257

// Increase discard threads for faster cleanup

258

config.set(FsStateChangelogOptions.NUM_DISCARD_THREADS, 3);

259

260

// Reduce cache timeout for memory efficiency

261

config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(5));

262

```

263

264

### Complete Configuration Example

265

266

```java

267

import org.apache.flink.configuration.Configuration;

268

import org.apache.flink.configuration.MemorySize;

269

import static org.apache.flink.changelog.fs.FsStateChangelogOptions.*;

270

271

// Complete configuration for high-throughput workload

272

Configuration config = new Configuration();

273

274

// Storage location

275

config.set(BASE_PATH, "hdfs://namenode:8020/flink/changelog");

276

277

// Performance tuning

278

config.set(COMPRESSION_ENABLED, true);

279

config.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("3MB"));

280

config.set(UPLOAD_BUFFER_SIZE, MemorySize.parse("2MB"));

281

config.set(NUM_UPLOAD_THREADS, 8);

282

283

// Batching configuration

284

config.set(PERSIST_DELAY, Duration.ofMillis(20));

285

config.set(PERSIST_SIZE_THRESHOLD, MemorySize.parse("15MB"));

286

config.set(IN_FLIGHT_DATA_LIMIT, MemorySize.parse("150MB"));

287

288

// Retry configuration

289

config.set(RETRY_POLICY, "fixed");

290

config.set(UPLOAD_TIMEOUT, Duration.ofSeconds(3));

291

config.set(RETRY_MAX_ATTEMPTS, 4);

292

config.set(RETRY_DELAY_AFTER_FAILURE, Duration.ofMillis(750));

293

294

// Cleanup configuration

295

config.set(NUM_DISCARD_THREADS, 2);

296

config.set(CACHE_IDLE_TIMEOUT, Duration.ofMinutes(15));

297

```

298

299

### Configuration Validation

300

301

Important constraints and validation rules:

302

303

- `PERSIST_SIZE_THRESHOLD` must not exceed `IN_FLIGHT_DATA_LIMIT`

304

- `BASE_PATH` is required and must be accessible by all TaskManagers

305

- `UPLOAD_TIMEOUT * RETRY_MAX_ATTEMPTS` should be less than checkpoint timeout

306

- Thread counts should be reasonable for available CPU cores

307

- Memory sizes should account for available heap space