0
# Upload Scheduling and Management
1
2
Upload scheduler interfaces and implementations for batching and coordinating state change uploads. The upload system provides pluggable strategies for persistence with retry mechanisms and availability tracking.
3
4
## Capabilities
5
6
### StateChangeUploadScheduler Interface
7
8
Core interface for scheduling upload tasks with support for batching and backpressure handling.
9
10
```java { .api }
11
/**
12
* Interface for scheduling upload tasks for state changes
13
*/
14
@Internal
15
public interface StateChangeUploadScheduler extends AutoCloseable {
16
17
/**
18
* Schedules an upload task for execution
19
* @param uploadTask Task containing state changes to upload
20
* @throws IOException If the upload cannot be scheduled
21
*/
22
void upload(UploadTask uploadTask) throws IOException;
23
24
/**
25
* Creates a direct scheduler that executes uploads immediately
26
* @param uploader The uploader to use for executing tasks
27
* @return StateChangeUploadScheduler that uploads directly
28
*/
29
static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);
30
31
/**
32
* Creates a scheduler from configuration with batching and threading
33
* @param jobID Job identifier
34
* @param config Configuration containing scheduler settings
35
* @param metricGroup Metrics for monitoring upload behavior
36
* @param changelogRegistry Registry for tracking changelog segments
37
* @param localRecoveryConfig Local recovery configuration
38
* @return Configured StateChangeUploadScheduler instance
39
* @throws IOException If scheduler creation fails
40
*/
41
static StateChangeUploadScheduler fromConfig(
42
JobID jobID,
43
ReadableConfig config,
44
ChangelogStorageMetricGroup metricGroup,
45
TaskChangelogRegistry changelogRegistry,
46
LocalRecoveryConfig localRecoveryConfig
47
) throws IOException;
48
49
/**
50
* Returns availability provider for backpressure coordination
51
* @return AvailabilityProvider indicating when scheduler can accept more uploads
52
*/
53
default AvailabilityProvider getAvailabilityProvider() {
54
return AvailabilityProvider.AVAILABLE;
55
}
56
}
57
```
58
59
**Basic Scheduler Usage Example:**
60
61
```java
62
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
63
import org.apache.flink.changelog.fs.StateChangeFsUploader;
64
65
// Create direct scheduler for immediate uploads
66
StateChangeFsUploader uploader = new StateChangeFsUploader(/* ... */);
67
StateChangeUploadScheduler directScheduler =
68
StateChangeUploadScheduler.directScheduler(uploader);
69
70
// Create upload task
71
StateChangeSet changeSet = new StateChangeSet(logId, sequenceNumber, changes);
72
UploadTask task = new UploadTask();
73
task.changeset = changeSet;
74
task.onCompleted = result -> System.out.println("Upload completed: " + result);
75
task.onFailed = throwable -> System.err.println("Upload failed: " + throwable);
76
77
// Schedule upload
78
directScheduler.upload(task);
79
80
// Clean up
81
directScheduler.close();
82
```
83
84
### UploadTask Definition
85
86
Task structure containing collections of state changes and completion callbacks for upload operations.
87
88
```java { .api }
89
/**
90
* Upload Task for StateChangeUploadScheduler
91
*/
92
@ThreadSafe
93
final class UploadTask {
94
/** Collection of state change sets to upload */
95
final Collection<StateChangeSet> changeSets;
96
97
/** Callback invoked when upload completes successfully */
98
final Consumer<List<UploadResult>> successCallback;
99
100
/** Callback invoked when upload fails */
101
final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
102
103
/**
104
* Creates upload task with change sets and callbacks
105
* @param changeSets Collection of state change sets to upload
106
* @param successCallback Callback for successful upload with results
107
* @param failureCallback Callback for failed upload with sequence numbers and error
108
*/
109
public UploadTask(
110
Collection<StateChangeSet> changeSets,
111
Consumer<List<UploadResult>> successCallback,
112
BiConsumer<List<SequenceNumber>, Throwable> failureCallback
113
);
114
115
/**
116
* Completes the task with successful results
117
* @param results List of upload results
118
*/
119
public void complete(List<UploadResult> results);
120
121
/**
122
* Fails the task with an error
123
* @param error Throwable representing the failure
124
*/
125
public void fail(Throwable error);
126
127
/**
128
* Gets total size of all change sets in this task
129
* @return Total size in bytes
130
*/
131
public long getSize();
132
133
/**
134
* Gets the collection of change sets
135
* @return Collection of StateChangeSet objects
136
*/
137
public Collection<StateChangeSet> getChangeSets();
138
}
139
```
140
141
**UploadTask Usage Examples:**
142
143
```java
144
// Create upload task with multiple change sets
145
Collection<StateChangeSet> changeSets = Arrays.asList(
146
new StateChangeSet(logId1, sequenceNumber1, stateChanges1),
147
new StateChangeSet(logId2, sequenceNumber2, stateChanges2)
148
);
149
150
// Success callback - receives list of results
151
Consumer<List<UploadResult>> successCallback = uploadResults -> {
152
for (UploadResult result : uploadResults) {
153
System.out.println("Uploaded to: " + result.streamStateHandle);
154
System.out.println("Offset: " + result.offset);
155
System.out.println("Size: " + result.size);
156
}
157
// Update tracking or notify other components
158
updateCheckpointTracking(uploadResults);
159
};
160
161
// Failure callback - receives sequence numbers and error
162
BiConsumer<List<SequenceNumber>, Throwable> failureCallback = (sequenceNumbers, throwable) -> {
163
System.err.println("Upload failed for sequences: " + sequenceNumbers);
164
System.err.println("Error: " + throwable.getMessage());
165
166
// Handle failure: retry, fail checkpoint, etc.
167
handleUploadFailure(sequenceNumbers, throwable);
168
};
169
170
// Create and schedule the task
171
UploadTask task = new UploadTask(changeSets, successCallback, failureCallback);
172
scheduler.upload(task);
173
```
174
175
### Configuration-Based Scheduler Creation
176
177
Factory method for creating schedulers with batching, threading, and retry configuration.
178
179
```java { .api }
180
/**
181
* Creates scheduler from configuration with batching and advanced features
182
* @param jobID Job identifier for naming and metrics
183
* @param config Configuration containing scheduler settings
184
* @param metricGroup Metrics group for monitoring
185
* @param changelogRegistry Registry for tracking uploaded segments
186
* @param localRecoveryConfig Local recovery configuration
187
* @return Configured scheduler with batching and threading
188
* @throws IOException If scheduler creation fails
189
*/
190
static StateChangeUploadScheduler fromConfig(
191
JobID jobID,
192
ReadableConfig config,
193
ChangelogStorageMetricGroup metricGroup,
194
TaskChangelogRegistry changelogRegistry,
195
LocalRecoveryConfig localRecoveryConfig
196
) throws IOException;
197
```
198
199
**Configuration-Based Scheduler Example:**
200
201
```java
202
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
203
204
// Configure upload behavior
205
Configuration config = new Configuration();
206
config.set(FsStateChangelogOptions.NUM_UPLOAD_THREADS, 10);
207
config.set(FsStateChangelogOptions.UPLOAD_BUFFER_SIZE, MemorySize.parse("2MB"));
208
config.set(FsStateChangelogOptions.IN_FLIGHT_DATA_LIMIT, MemorySize.parse("200MB"));
209
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
210
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));
211
212
// Create scheduler with configuration
213
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
214
new JobID(),
215
config,
216
metricGroup,
217
changelogRegistry,
218
localRecoveryConfig
219
);
220
221
// Use scheduler with batching and threading
222
for (StateChangeSet changeSet : changeSets) {
223
UploadTask task = createUploadTask(changeSet);
224
scheduler.upload(task);
225
}
226
```
227
228
### StateChangeUploader Interface
229
230
Core uploader interface that handles the actual upload execution for collections of tasks.
231
232
```java { .api }
233
/**
234
* Interface for uploading state change tasks
235
*/
236
@Internal
237
public interface StateChangeUploader extends AutoCloseable {
238
239
/**
240
* Uploads a collection of tasks and returns results
241
* @param tasks Collection of upload tasks to execute
242
* @return UploadTasksResult containing individual results
243
* @throws IOException If upload operation fails
244
*/
245
UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
246
}
247
```
248
249
### UploadTasksResult Structure
250
251
Result structure containing outcomes of batch upload operations with task-to-offset mappings.
252
253
```java { .api }
254
/**
255
* Result of executing one or more upload tasks
256
*/
257
final class UploadTasksResult {
258
/** Mapping of tasks to their state change set offsets in the uploaded stream */
259
private final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets;
260
261
/** Handle to the uploaded remote stream */
262
private final StreamStateHandle handle;
263
264
/** Handle to the local backup stream (if local recovery enabled) */
265
private final StreamStateHandle localHandle;
266
267
/**
268
* Creates result with task offsets and remote handle
269
* @param tasksOffsets Mapping of tasks to their offsets in the stream
270
* @param handle Remote stream handle
271
*/
272
public UploadTasksResult(
273
Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,
274
StreamStateHandle handle
275
);
276
277
/**
278
* Creates result with task offsets, remote and local handles
279
* @param tasksOffsets Mapping of tasks to their offsets in the stream
280
* @param handle Remote stream handle
281
* @param localHandle Local stream handle (nullable)
282
*/
283
public UploadTasksResult(
284
Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,
285
StreamStateHandle handle,
286
@Nullable StreamStateHandle localHandle
287
);
288
289
/**
290
* Completes all tasks in this result by calling their completion callbacks
291
*/
292
public void complete();
293
294
/**
295
* Gets the total state size of the uploaded stream
296
* @return Size in bytes
297
*/
298
public long getStateSize();
299
300
/**
301
* Discards the uploaded state handle
302
* @throws Exception If discard fails
303
*/
304
public void discard() throws Exception;
305
}
306
```
307
308
**Uploader Implementation Example:**
309
310
```java
311
// Custom uploader implementation
312
public class MyStateChangeUploader implements StateChangeUploader {
313
314
@Override
315
public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
316
List<UploadResult> successful = new ArrayList<>();
317
Map<UploadTask, Throwable> failed = new HashMap<>();
318
319
for (UploadTask task : tasks) {
320
try {
321
// Perform upload operation
322
UploadResult result = performUpload(task.changeset);
323
successful.add(result);
324
task.onCompleted.accept(result);
325
} catch (Exception e) {
326
failed.put(task, e);
327
task.onFailed.accept(e);
328
}
329
}
330
331
return new UploadTasksResult(successful, failed);
332
}
333
334
private UploadResult performUpload(StateChangeSet changeSet) throws IOException {
335
// Implementation-specific upload logic
336
StreamStateHandle handle = writeToFileSystem(changeSet);
337
return UploadResult.of(handle, null, changeSet, 0, 0);
338
}
339
340
@Override
341
public void close() throws Exception {
342
// Clean up resources
343
}
344
}
345
```
346
347
### Batching Upload Scheduler
348
349
Internal implementation that batches upload tasks for efficiency.
350
351
```java { .api }
352
/**
353
* Upload scheduler that batches tasks for efficient uploading
354
*/
355
@ThreadSafe
356
class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
357
358
/**
359
* Creates batching scheduler with configuration
360
* @param uploader Underlying uploader for executing batches
361
* @param maxBatchSize Maximum number of tasks per batch
362
* @param batchTimeout Timeout for incomplete batches
363
* @param executor Executor for upload operations
364
*/
365
public BatchingStateChangeUploadScheduler(
366
StateChangeUploader uploader,
367
int maxBatchSize,
368
Duration batchTimeout,
369
Executor executor
370
);
371
}
372
```
373
374
### Availability and Backpressure
375
376
Upload schedulers support backpressure through availability providers to coordinate with upstream components.
377
378
```java { .api }
379
/**
380
* Returns availability provider for backpressure coordination
381
* @return AvailabilityProvider indicating scheduler capacity
382
*/
383
default AvailabilityProvider getAvailabilityProvider();
384
```
385
386
**Backpressure Handling Example:**
387
388
```java
389
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(/* ... */);
390
391
// Check availability before scheduling uploads
392
AvailabilityProvider availability = scheduler.getAvailabilityProvider();
393
394
if (availability.isAvailable()) {
395
// Scheduler can accept more uploads
396
scheduler.upload(uploadTask);
397
} else {
398
// Wait for availability
399
availability.getAvailabilityFuture().thenRun(() -> {
400
try {
401
scheduler.upload(uploadTask);
402
} catch (IOException e) {
403
System.err.println("Upload failed: " + e.getMessage());
404
}
405
});
406
}
407
```
408
409
### Retry Policy Integration
410
411
Upload schedulers integrate with retry policies for handling transient failures.
412
413
**Retry Configuration Example:**
414
415
```java
416
Configuration config = new Configuration();
417
config.set(FsStateChangelogOptions.RETRY_POLICY, "fixed");
418
config.set(FsStateChangelogOptions.RETRY_MAX_ATTEMPTS, 5);
419
config.set(FsStateChangelogOptions.RETRY_DELAY_AFTER_FAILURE, Duration.ofSeconds(2));
420
config.set(FsStateChangelogOptions.UPLOAD_TIMEOUT, Duration.ofSeconds(30));
421
422
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
423
jobID, config, metricGroup, changelogRegistry, localRecoveryConfig
424
);
425
426
// Scheduler will automatically retry failed uploads according to policy
427
```
428
429
### Error Handling and Monitoring
430
431
Upload schedulers provide comprehensive error handling and metrics integration.
432
433
**Error Handling Example:**
434
435
```java
436
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(/* ... */);
437
438
UploadTask task = new UploadTask();
439
task.changeset = changeSet;
440
441
// Handle different failure scenarios
442
task.onFailed = throwable -> {
443
if (throwable instanceof IOException) {
444
System.err.println("I/O error during upload: " + throwable.getMessage());
445
// May retry or fail checkpoint
446
} else if (throwable instanceof TimeoutException) {
447
System.err.println("Upload timed out: " + throwable.getMessage());
448
// May increase timeout or fail
449
} else {
450
System.err.println("Unexpected upload failure: " + throwable.getMessage());
451
// Log and fail checkpoint
452
}
453
};
454
455
try {
456
scheduler.upload(task);
457
} catch (IOException e) {
458
System.err.println("Failed to schedule upload: " + e.getMessage());
459
// Scheduler may be overloaded or closed
460
}
461
```