or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

upload-system.mddocs/

0

# Upload System

1

2

Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations. The upload system coordinates between multiple changelog writers and provides backpressure control.

3

4

## Capabilities

5

6

### StateChangeUploadScheduler Interface

7

8

Core interface for scheduling and managing upload operations with backpressure support.

9

10

```java { .api }

11

/**

12

* Interface for scheduling state change uploads with backpressure control.

13

* Implementations handle batching, scheduling, and coordination between multiple writers.

14

*/

15

public interface StateChangeUploadScheduler extends AutoCloseable {

16

17

/**

18

* Schedules an upload task for execution

19

* @param task Upload task containing change sets and completion callbacks

20

*/

21

void upload(UploadTask task);

22

23

/**

24

* Returns availability provider for backpressure control

25

* @return AvailabilityProvider indicating when scheduler can accept more tasks

26

*/

27

AvailabilityProvider getAvailabilityProvider();

28

29

/**

30

* Closes the scheduler and releases all resources

31

* @throws Exception if cleanup fails

32

*/

33

void close() throws Exception;

34

35

/**

36

* Creates a scheduler from configuration

37

* @param jobID Job identifier

38

* @param config Flink configuration

39

* @param metricGroup Metric group for collecting upload metrics

40

* @param changelogRegistry Registry for managing state handle lifecycle

41

* @param localRecoveryConfig Configuration for local recovery

42

* @return Configured StateChangeUploadScheduler instance

43

* @throws IOException if scheduler creation fails

44

*/

45

static StateChangeUploadScheduler fromConfig(

46

JobID jobID,

47

Configuration config,

48

ChangelogStorageMetricGroup metricGroup,

49

TaskChangelogRegistry changelogRegistry,

50

LocalRecoveryConfig localRecoveryConfig

51

) throws IOException;

52

53

/**

54

* Creates a direct scheduler that uploads immediately

55

* @param uploader State change uploader implementation

56

* @return Direct scheduler with no batching

57

*/

58

static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);

59

}

60

```

61

62

### StateChangeUploader Interface

63

64

Interface for the actual upload operations to distributed file systems.

65

66

```java { .api }

67

/**

68

* Interface for uploading state changes to distributed file systems.

69

* Implementations handle the actual persistence operations.

70

*/

71

public interface StateChangeUploader extends AutoCloseable {

72

73

/**

74

* Executes upload tasks and returns results

75

* @param tasks Collection of upload tasks to execute

76

* @return UploadTasksResult containing successful and failed uploads

77

* @throws IOException if upload execution fails

78

*/

79

UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;

80

81

/**

82

* Closes the uploader and releases resources

83

* @throws Exception if cleanup fails

84

*/

85

void close() throws Exception;

86

}

87

```

88

89

### Upload Task Structure

90

91

Upload tasks encapsulate the work to be performed and completion callbacks.

92

93

```java { .api }

94

/**

95

* Represents an upload task containing change sets and completion callbacks

96

*/

97

public class UploadTask {

98

99

/**

100

* Creates an upload task

101

* @param changeSets Collection of state change sets to upload

102

* @param successCallback Callback for successful uploads

103

* @param failureCallback Callback for failed uploads

104

*/

105

public UploadTask(

106

Collection<StateChangeSet> changeSets,

107

Consumer<List<UploadResult>> successCallback,

108

BiConsumer<List<SequenceNumber>, Throwable> failureCallback

109

);

110

111

/**

112

* Completes the task with upload results

113

* @param results List of upload results

114

*/

115

public void complete(List<UploadResult> results);

116

117

/**

118

* Fails the task with an exception

119

* @param exception Failure cause

120

*/

121

public void fail(Throwable exception);

122

}

123

124

/**

125

* Result of executing upload tasks

126

*/

127

public class UploadTasksResult {

128

129

/**

130

* Creates upload result

131

* @param successful Map of successfully uploaded tasks to their results

132

* @param failed Map of failed tasks to their exceptions

133

*/

134

public UploadTasksResult(

135

Map<UploadTask, List<UploadResult>> successful,

136

Map<UploadTask, Throwable> failed

137

);

138

139

public Map<UploadTask, List<UploadResult>> getSuccessful();

140

public Map<UploadTask, Throwable> getFailed();

141

}

142

```

143

144

**Usage Examples:**

145

146

```java

147

import org.apache.flink.changelog.fs.*;

148

149

// Create scheduler from configuration

150

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(

151

jobId, config, metricGroup, changelogRegistry, localRecoveryConfig

152

);

153

154

// Create upload task

155

Collection<StateChangeSet> changeSets = Arrays.asList(changeSet1, changeSet2);

156

UploadTask task = new UploadTask(

157

changeSets,

158

results -> {

159

// Handle successful upload

160

log.info("Uploaded {} change sets", results.size());

161

for (UploadResult result : results) {

162

log.debug("Uploaded sequence {}, size {}",

163

result.getSequenceNumber(), result.getSize());

164

}

165

},

166

(failedSequenceNumbers, throwable) -> {

167

// Handle upload failure

168

log.error("Upload failed for sequences: {}", failedSequenceNumbers, throwable);

169

}

170

);

171

172

// Schedule upload

173

scheduler.upload(task);

174

175

// Check backpressure

176

AvailabilityProvider availability = scheduler.getAvailabilityProvider();

177

if (!availability.isAvailable()) {

178

// Wait for availability

179

availability.getAvailabilityFuture().thenRun(() -> {

180

// Scheduler is available again

181

scheduler.upload(nextTask);

182

});

183

}

184

```

185

186

### BatchingStateChangeUploadScheduler

187

188

Implementation that batches multiple upload requests for efficiency.

189

190

```java { .api }

191

/**

192

* Upload scheduler that batches requests to reduce the number of upload operations.

193

* Collects upload tasks for a configurable delay period before executing them together.

194

*/

195

public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {

196

197

/**

198

* Creates batching scheduler

199

* @param uploader Underlying uploader for executing batched requests

200

* @param persistDelay Delay before executing batched uploads

201

* @param persistSizeThreshold Size threshold to trigger immediate upload

202

* @param inFlightDataLimit Maximum in-flight data for backpressure

203

* @param executor Executor for running upload operations

204

* @param metricGroup Metrics for tracking upload performance

205

*/

206

public BatchingStateChangeUploadScheduler(

207

StateChangeUploader uploader,

208

Duration persistDelay,

209

long persistSizeThreshold,

210

long inFlightDataLimit,

211

Executor executor,

212

ChangelogStorageMetricGroup metricGroup

213

);

214

}

215

```

216

217

The batching scheduler:

218

- Collects upload tasks for the configured `persistDelay` period

219

- Triggers immediate upload when accumulated size exceeds `persistSizeThreshold`

220

- Provides backpressure when in-flight data exceeds `inFlightDataLimit`

221

- Merges compatible tasks to reduce filesystem operations

222

223

### StateChangeFsUploader

224

225

Filesystem-specific implementation for uploading to distributed file systems.

226

227

```java { .api }

228

/**

229

* Filesystem-based uploader for state changes.

230

* Handles serialization, compression, and persistence to distributed file systems.

231

*/

232

public class StateChangeFsUploader extends AbstractStateChangeFsUploader {

233

234

/**

235

* Creates filesystem uploader

236

* @param jobID Job identifier for organizing files

237

* @param basePath Base path for changelog files

238

* @param fileSystem FileSystem instance for the base path

239

* @param compression Whether to enable compression

240

* @param bufferSize Buffer size for write operations

241

* @param metricGroup Metrics for tracking upload performance

242

* @param changelogRegistry Registry for managing uploaded state

243

*/

244

public StateChangeFsUploader(

245

JobID jobID,

246

Path basePath,

247

org.apache.flink.core.fs.FileSystem fileSystem,

248

boolean compression,

249

int bufferSize,

250

ChangelogStorageMetricGroup metricGroup,

251

TaskChangelogRegistry changelogRegistry

252

);

253

}

254

```

255

256

### DuplicatingStateChangeFsUploader

257

258

Specialized uploader that creates both remote and local copies for recovery.

259

260

```java { .api }

261

/**

262

* Uploader that creates duplicates for local recovery.

263

* Writes to both distributed file system and local storage simultaneously.

264

*/

265

public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader {

266

267

/**

268

* Creates duplicating uploader

269

* @param remoteUploader Primary uploader for distributed file system

270

* @param localUploader Secondary uploader for local storage

271

*/

272

public DuplicatingStateChangeFsUploader(

273

StateChangeFsUploader remoteUploader,

274

StateChangeFsUploader localUploader

275

);

276

}

277

```

278

279

### Retry and Error Handling

280

281

The upload system integrates with retry policies and error handling:

282

283

```java { .api }

284

/**

285

* Executor that applies retry policies to upload operations

286

*/

287

public class RetryingExecutor {

288

289

/**

290

* Executes operation with retry policy

291

* @param operation Operation to execute

292

* @param retryPolicy Retry policy for handling failures

293

* @return Result of successful execution

294

* @throws Exception if all retry attempts fail

295

*/

296

public <T> T execute(

297

Callable<T> operation,

298

RetryPolicy retryPolicy

299

) throws Exception;

300

}

301

```

302

303

**Error Handling Examples:**

304

305

```java

306

// Configure retry policy

307

RetryPolicy retryPolicy = RetryPolicy.fixed(

308

3, // max attempts

309

Duration.ofSeconds(5).toMillis(), // timeout

310

Duration.ofMillis(500).toMillis() // delay after failure

311

);

312

313

// Upload with retry handling

314

try {

315

UploadTasksResult result = uploader.upload(tasks);

316

317

// Process successful uploads

318

result.getSuccessful().forEach((task, uploadResults) -> {

319

task.complete(uploadResults);

320

});

321

322

// Handle failed uploads

323

result.getFailed().forEach((task, exception) -> {

324

task.fail(exception);

325

});

326

327

} catch (IOException e) {

328

log.error("Upload operation failed after retries", e);

329

// Trigger checkpoint failure and recovery

330

}

331

```

332

333

### Throttling and Flow Control

334

335

Upload throttling prevents overwhelming the distributed file system:

336

337

```java { .api }

338

/**

339

* Throttle for controlling upload rate and preventing system overload

340

*/

341

public class UploadThrottle {

342

343

/**

344

* Requests permission to upload data

345

* @param size Size of data to upload

346

* @return CompletableFuture that completes when upload is permitted

347

*/

348

public CompletableFuture<Void> requestUpload(long size);

349

350

/**

351

* Notifies throttle of completed upload

352

* @param size Size of completed upload

353

*/

354

public void uploadCompleted(long size);

355

}

356

```

357

358

The throttling system:

359

- Limits concurrent in-flight data based on `IN_FLIGHT_DATA_LIMIT`

360

- Provides backpressure to prevent memory exhaustion

361

- Coordinates across multiple writers and operators

362

- Integrates with Flink's availability provider system

363

364

### Performance Optimization

365

366

The upload system includes several optimizations:

367

368

- **Batching**: Reduces filesystem operation overhead

369

- **Compression**: Reduces network and storage usage

370

- **Parallel uploads**: Multiple threads for concurrent operations

371

- **Buffer management**: Configurable buffer sizes for different workloads

372

- **Connection pooling**: Reuses filesystem connections when possible