or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md

upload-scheduling.mddocs/

0

# Upload Scheduling and Management

1

2

Upload scheduler interfaces and implementations for batching and coordinating state change uploads. The upload system provides pluggable strategies for persistence with retry mechanisms and availability tracking.

3

4

## Capabilities

5

6

### StateChangeUploadScheduler Interface

7

8

Core interface for scheduling upload tasks with support for batching and backpressure handling.

9

10

```java { .api }

11

/**

12

* Interface for scheduling upload tasks for state changes

13

*/

14

@Internal

15

public interface StateChangeUploadScheduler extends AutoCloseable {

16

17

/**

18

* Schedules an upload task for execution

19

* @param uploadTask Task containing state changes to upload

20

* @throws IOException If the upload cannot be scheduled

21

*/

22

void upload(UploadTask uploadTask) throws IOException;

23

24

/**

25

* Creates a direct scheduler that executes uploads immediately

26

* @param uploader The uploader to use for executing tasks

27

* @return StateChangeUploadScheduler that uploads directly

28

*/

29

static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);

30

31

/**

32

* Creates a scheduler from configuration with batching and threading

33

* @param jobID Job identifier

34

* @param config Configuration containing scheduler settings

35

* @param metricGroup Metrics for monitoring upload behavior

36

* @param changelogRegistry Registry for tracking changelog segments

37

* @param localRecoveryConfig Local recovery configuration

38

* @return Configured StateChangeUploadScheduler instance

39

* @throws IOException If scheduler creation fails

40

*/

41

static StateChangeUploadScheduler fromConfig(

42

JobID jobID,

43

ReadableConfig config,

44

ChangelogStorageMetricGroup metricGroup,

45

TaskChangelogRegistry changelogRegistry,

46

LocalRecoveryConfig localRecoveryConfig

47

) throws IOException;

48

49

/**

50

* Returns availability provider for backpressure coordination

51

* @return AvailabilityProvider indicating when scheduler can accept more uploads

52

*/

53

default AvailabilityProvider getAvailabilityProvider() {

54

return AvailabilityProvider.AVAILABLE;

55

}

56

}

57

```

58

59

**Basic Scheduler Usage Example:**

60

61

```java

62

import org.apache.flink.changelog.fs.StateChangeUploadScheduler;

63

import org.apache.flink.changelog.fs.StateChangeFsUploader;

64

65

// Create direct scheduler for immediate uploads

66

StateChangeFsUploader uploader = new StateChangeFsUploader(/* ... */);

67

StateChangeUploadScheduler directScheduler =

68

StateChangeUploadScheduler.directScheduler(uploader);

69

70

// Create upload task

71

StateChangeSet changeSet = new StateChangeSet(logId, sequenceNumber, changes);

72

UploadTask task = new UploadTask();

73

task.changeset = changeSet;

74

task.onCompleted = result -> System.out.println("Upload completed: " + result);

75

task.onFailed = throwable -> System.err.println("Upload failed: " + throwable);

76

77

// Schedule upload

78

directScheduler.upload(task);

79

80

// Clean up

81

directScheduler.close();

82

```

83

84

### UploadTask Definition

85

86

Task structure containing collections of state changes and completion callbacks for upload operations.

87

88

```java { .api }

89

/**

90

* Upload Task for StateChangeUploadScheduler

91

*/

92

@ThreadSafe

93

final class UploadTask {

94

/** Collection of state change sets to upload */

95

final Collection<StateChangeSet> changeSets;

96

97

/** Callback invoked when upload completes successfully */

98

final Consumer<List<UploadResult>> successCallback;

99

100

/** Callback invoked when upload fails */

101

final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;

102

103

/**

104

* Creates upload task with change sets and callbacks

105

* @param changeSets Collection of state change sets to upload

106

* @param successCallback Callback for successful upload with results

107

* @param failureCallback Callback for failed upload with sequence numbers and error

108

*/

109

public UploadTask(

110

Collection<StateChangeSet> changeSets,

111

Consumer<List<UploadResult>> successCallback,

112

BiConsumer<List<SequenceNumber>, Throwable> failureCallback

113

);

114

115

/**

116

* Completes the task with successful results

117

* @param results List of upload results

118

*/

119

public void complete(List<UploadResult> results);

120

121

/**

122

* Fails the task with an error

123

* @param error Throwable representing the failure

124

*/

125

public void fail(Throwable error);

126

127

/**

128

* Gets total size of all change sets in this task

129

* @return Total size in bytes

130

*/

131

public long getSize();

132

133

/**

134

* Gets the collection of change sets

135

* @return Collection of StateChangeSet objects

136

*/

137

public Collection<StateChangeSet> getChangeSets();

138

}

139

```

140

141

**UploadTask Usage Examples:**

142

143

```java

144

// Create upload task with multiple change sets

145

Collection<StateChangeSet> changeSets = Arrays.asList(

146

new StateChangeSet(logId1, sequenceNumber1, stateChanges1),

147

new StateChangeSet(logId2, sequenceNumber2, stateChanges2)

148

);

149

150

// Success callback - receives list of results

151

Consumer<List<UploadResult>> successCallback = uploadResults -> {

152

for (UploadResult result : uploadResults) {

153

System.out.println("Uploaded to: " + result.streamStateHandle);

154

System.out.println("Offset: " + result.offset);

155

System.out.println("Size: " + result.size);

156

}

157

// Update tracking or notify other components

158

updateCheckpointTracking(uploadResults);

159

};

160

161

// Failure callback - receives sequence numbers and error

162

BiConsumer<List<SequenceNumber>, Throwable> failureCallback = (sequenceNumbers, throwable) -> {

163

System.err.println("Upload failed for sequences: " + sequenceNumbers);

164

System.err.println("Error: " + throwable.getMessage());

165

166

// Handle failure: retry, fail checkpoint, etc.

167

handleUploadFailure(sequenceNumbers, throwable);

168

};

169

170

// Create and schedule the task

171

UploadTask task = new UploadTask(changeSets, successCallback, failureCallback);

172

scheduler.upload(task);

173

```

174

175

### Configuration-Based Scheduler Creation

176

177

Factory method for creating schedulers with batching, threading, and retry configuration.

178

179

```java { .api }

180

/**

181

* Creates scheduler from configuration with batching and advanced features

182

* @param jobID Job identifier for naming and metrics

183

* @param config Configuration containing scheduler settings

184

* @param metricGroup Metrics group for monitoring

185

* @param changelogRegistry Registry for tracking uploaded segments

186

* @param localRecoveryConfig Local recovery configuration

187

* @return Configured scheduler with batching and threading

188

* @throws IOException If scheduler creation fails

189

*/

190

static StateChangeUploadScheduler fromConfig(

191

JobID jobID,

192

ReadableConfig config,

193

ChangelogStorageMetricGroup metricGroup,

194

TaskChangelogRegistry changelogRegistry,

195

LocalRecoveryConfig localRecoveryConfig

196

) throws IOException;

197

```

198

199

**Configuration-Based Scheduler Example:**

200

201

```java

202

import org.apache.flink.changelog.fs.FsStateChangelogOptions;

203

204

// Configure upload behavior

205

Configuration config = new Configuration();

206

config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);

207

config.set(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE, MemorySize.parse("2MB"));

208

config.set(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT, MemorySize.parse("200MB"));

209

config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);

210

config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));

211

212

// Create scheduler with configuration

213

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(

214

new JobID(),

215

config,

216

metricGroup,

217

changelogRegistry,

218

localRecoveryConfig

219

);

220

221

// Use scheduler with batching and threading

222

for (StateChangeSet changeSet : changeSets) {

223

UploadTask task = createUploadTask(changeSet);

224

scheduler.upload(task);

225

}

226

```

227

228

### StateChangeUploader Interface

229

230

Core uploader interface that handles the actual upload execution for collections of tasks.

231

232

```java { .api }

233

/**

234

* Interface for uploading state change tasks

235

*/

236

@Internal

237

public interface StateChangeUploader extends AutoCloseable {

238

239

/**

240

* Uploads a collection of tasks and returns results

241

* @param tasks Collection of upload tasks to execute

242

* @return UploadTasksResult containing individual results

243

* @throws IOException If upload operation fails

244

*/

245

UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;

246

}

247

```

248

249

### UploadTasksResult Structure

250

251

Result structure containing outcomes of batch upload operations with task-to-offset mappings.

252

253

```java { .api }

254

/**

255

* Result of executing one or more upload tasks

256

*/

257

final class UploadTasksResult {

258

/** Mapping of tasks to their state change set offsets in the uploaded stream */

259

private final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets;

260

261

/** Handle to the uploaded remote stream */

262

private final StreamStateHandle handle;

263

264

/** Handle to the local backup stream (if local recovery enabled) */

265

private final StreamStateHandle localHandle;

266

267

/**

268

* Creates result with task offsets and remote handle

269

* @param tasksOffsets Mapping of tasks to their offsets in the stream

270

* @param handle Remote stream handle

271

*/

272

public UploadTasksResult(

273

Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,

274

StreamStateHandle handle

275

);

276

277

/**

278

* Creates result with task offsets, remote and local handles

279

* @param tasksOffsets Mapping of tasks to their offsets in the stream

280

* @param handle Remote stream handle

281

* @param localHandle Local stream handle (nullable)

282

*/

283

public UploadTasksResult(

284

Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,

285

StreamStateHandle handle,

286

@Nullable StreamStateHandle localHandle

287

);

288

289

/**

290

* Completes all tasks in this result by calling their completion callbacks

291

*/

292

public void complete();

293

294

/**

295

* Gets the total state size of the uploaded stream

296

* @return Size in bytes

297

*/

298

public long getStateSize();

299

300

/**

301

* Discards the uploaded state handle

302

* @throws Exception If discard fails

303

*/

304

public void discard() throws Exception;

305

}

306

```

307

308

**Uploader Implementation Example:**

309

310

```java

311

// Custom uploader implementation

312

public class MyStateChangeUploader implements StateChangeUploader {

313

314

@Override

315

public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {

316

List<UploadResult> successful = new ArrayList<>();

317

Map<UploadTask, Throwable> failed = new HashMap<>();

318

319

for (UploadTask task : tasks) {

320

try {

321

// Perform upload operation

322

UploadResult result = performUpload(task.changeset);

323

successful.add(result);

324

task.onCompleted.accept(result);

325

} catch (Exception e) {

326

failed.put(task, e);

327

task.onFailed.accept(e);

328

}

329

}

330

331

return new UploadTasksResult(successful, failed);

332

}

333

334

private UploadResult performUpload(StateChangeSet changeSet) throws IOException {

335

// Implementation-specific upload logic

336

StreamStateHandle handle = writeToFileSystem(changeSet);

337

return UploadResult.of(handle, null, changeSet, 0, 0);

338

}

339

340

@Override

341

public void close() throws Exception {

342

// Clean up resources

343

}

344

}

345

```

346

347

### Batching Upload Scheduler

348

349

Internal implementation that batches upload tasks for efficiency.

350

351

```java { .api }

352

/**

353

* Upload scheduler that batches tasks for efficient uploading

354

*/

355

@ThreadSafe

356

class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {

357

358

/**

359

* Creates batching scheduler with configuration

360

* @param uploader Underlying uploader for executing batches

361

* @param maxBatchSize Maximum number of tasks per batch

362

* @param batchTimeout Timeout for incomplete batches

363

* @param executor Executor for upload operations

364

*/

365

public BatchingStateChangeUploadScheduler(

366

StateChangeUploader uploader,

367

int maxBatchSize,

368

Duration batchTimeout,

369

Executor executor

370

);

371

}

372

```

373

374

### Availability and Backpressure

375

376

Upload schedulers support backpressure through availability providers to coordinate with upstream components.

377

378

```java { .api }

379

/**

380

* Returns availability provider for backpressure coordination

381

* @return AvailabilityProvider indicating scheduler capacity

382

*/

383

default AvailabilityProvider getAvailabilityProvider();

384

```

385

386

**Backpressure Handling Example:**

387

388

```java

389

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(/* ... */);

390

391

// Check availability before scheduling uploads

392

AvailabilityProvider availability = scheduler.getAvailabilityProvider();

393

394

if (availability.isAvailable()) {

395

// Scheduler can accept more uploads

396

scheduler.upload(uploadTask);

397

} else {

398

// Wait for availability

399

availability.getAvailabilityFuture().thenRun(() -> {

400

try {

401

scheduler.upload(uploadTask);

402

} catch (IOException e) {

403

System.err.println("Upload failed: " + e.getMessage());

404

}

405

});

406

}

407

```

408

409

### Retry Policy Integration

410

411

Upload schedulers integrate with retry policies for handling transient failures.

412

413

**Retry Configuration Example:**

414

415

```java

416

Configuration config = new Configuration();

417

config.set(FsStateChangelogOptions.RETRY_POLICY, "fixed");

418

config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);

419

config.set(FsStateChangelogOptions.RETRY_DELAY_AFTER_FAILURE, Duration.ofSeconds(2));

420

config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));

421

422

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(

423

jobID, config, metricGroup, changelogRegistry, localRecoveryConfig

424

);

425

426

// Scheduler will automatically retry failed uploads according to policy

427

```

428

429

### Error Handling and Monitoring

430

431

Upload schedulers provide comprehensive error handling and metrics integration.

432

433

**Error Handling Example:**

434

435

```java

436

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(/* ... */);

437

438

UploadTask task = new UploadTask();

439

task.changeset = changeSet;

440

441

// Handle different failure scenarios

442

task.onFailed = throwable -> {

443

if (throwable instanceof IOException) {

444

System.err.println("I/O error during upload: " + throwable.getMessage());

445

// May retry or fail checkpoint

446

} else if (throwable instanceof TimeoutException) {

447

System.err.println("Upload timed out: " + throwable.getMessage());

448

// May increase timeout or fail

449

} else {

450

System.err.println("Unexpected upload failure: " + throwable.getMessage());

451

// Log and fail checkpoint

452

}

453

};

454

455

try {

456

scheduler.upload(task);

457

} catch (IOException e) {

458

System.err.println("Failed to schedule upload: " + e.getMessage());

459

// Scheduler may be overloaded or closed

460

}

461

```