0
# Upload System
1
2
Upload scheduling system with batching, throttling, and retry capabilities for efficient distributed file system operations. The upload system coordinates between multiple changelog writers and provides backpressure control.
3
4
## Capabilities
5
6
### StateChangeUploadScheduler Interface
7
8
Core interface for scheduling and managing upload operations with backpressure support.
9
10
```java { .api }
11
/**
12
* Interface for scheduling state change uploads with backpressure control.
13
* Implementations handle batching, scheduling, and coordination between multiple writers.
14
*/
15
public interface StateChangeUploadScheduler extends AutoCloseable {
16
17
/**
18
* Schedules an upload task for execution
19
* @param task Upload task containing change sets and completion callbacks
20
*/
21
void upload(UploadTask task);
22
23
/**
24
* Returns availability provider for backpressure control
25
* @return AvailabilityProvider indicating when scheduler can accept more tasks
26
*/
27
AvailabilityProvider getAvailabilityProvider();
28
29
/**
30
* Closes the scheduler and releases all resources
31
* @throws Exception if cleanup fails
32
*/
33
void close() throws Exception;
34
35
/**
36
* Creates a scheduler from configuration
37
* @param jobID Job identifier
38
* @param config Flink configuration
39
* @param metricGroup Metric group for collecting upload metrics
40
* @param changelogRegistry Registry for managing state handle lifecycle
41
* @param localRecoveryConfig Configuration for local recovery
42
* @return Configured StateChangeUploadScheduler instance
43
* @throws IOException if scheduler creation fails
44
*/
45
static StateChangeUploadScheduler fromConfig(
46
JobID jobID,
47
Configuration config,
48
ChangelogStorageMetricGroup metricGroup,
49
TaskChangelogRegistry changelogRegistry,
50
LocalRecoveryConfig localRecoveryConfig
51
) throws IOException;
52
53
/**
54
* Creates a direct scheduler that uploads immediately
55
* @param uploader State change uploader implementation
56
* @return Direct scheduler with no batching
57
*/
58
static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader);
59
}
60
```
61
62
### StateChangeUploader Interface
63
64
Interface for the actual upload operations to distributed file systems.
65
66
```java { .api }
67
/**
68
* Interface for uploading state changes to distributed file systems.
69
* Implementations handle the actual persistence operations.
70
*/
71
public interface StateChangeUploader extends AutoCloseable {
72
73
/**
74
* Executes upload tasks and returns results
75
* @param tasks Collection of upload tasks to execute
76
* @return UploadTasksResult containing successful and failed uploads
77
* @throws IOException if upload execution fails
78
*/
79
UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
80
81
/**
82
* Closes the uploader and releases resources
83
* @throws Exception if cleanup fails
84
*/
85
void close() throws Exception;
86
}
87
```
88
89
### Upload Task Structure
90
91
Upload tasks encapsulate the work to be performed and completion callbacks.
92
93
```java { .api }
94
/**
95
* Represents an upload task containing change sets and completion callbacks
96
*/
97
public class UploadTask {
98
99
/**
100
* Creates an upload task
101
* @param changeSets Collection of state change sets to upload
102
* @param successCallback Callback for successful uploads
103
* @param failureCallback Callback for failed uploads
104
*/
105
public UploadTask(
106
Collection<StateChangeSet> changeSets,
107
Consumer<List<UploadResult>> successCallback,
108
BiConsumer<List<SequenceNumber>, Throwable> failureCallback
109
);
110
111
/**
112
* Completes the task with upload results
113
* @param results List of upload results
114
*/
115
public void complete(List<UploadResult> results);
116
117
/**
118
* Fails the task with an exception
119
* @param exception Failure cause
120
*/
121
public void fail(Throwable exception);
122
}
123
124
/**
125
* Result of executing upload tasks
126
*/
127
public class UploadTasksResult {
128
129
/**
130
* Creates upload result
131
* @param successful Map of successfully uploaded tasks to their results
132
* @param failed Map of failed tasks to their exceptions
133
*/
134
public UploadTasksResult(
135
Map<UploadTask, List<UploadResult>> successful,
136
Map<UploadTask, Throwable> failed
137
);
138
139
public Map<UploadTask, List<UploadResult>> getSuccessful();
140
public Map<UploadTask, Throwable> getFailed();
141
}
142
```
143
144
**Usage Examples:**
145
146
```java
147
import org.apache.flink.changelog.fs.*;
148
149
// Create scheduler from configuration
150
StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(
151
jobId, config, metricGroup, changelogRegistry, localRecoveryConfig
152
);
153
154
// Create upload task
155
Collection<StateChangeSet> changeSets = Arrays.asList(changeSet1, changeSet2);
156
UploadTask task = new UploadTask(
157
changeSets,
158
results -> {
159
// Handle successful upload
160
log.info("Uploaded {} change sets", results.size());
161
for (UploadResult result : results) {
162
log.debug("Uploaded sequence {}, size {}",
163
result.getSequenceNumber(), result.getSize());
164
}
165
},
166
(failedSequenceNumbers, throwable) -> {
167
// Handle upload failure
168
log.error("Upload failed for sequences: {}", failedSequenceNumbers, throwable);
169
}
170
);
171
172
// Schedule upload
173
scheduler.upload(task);
174
175
// Check backpressure
176
AvailabilityProvider availability = scheduler.getAvailabilityProvider();
177
if (!availability.isAvailable()) {
178
// Wait for availability
179
availability.getAvailabilityFuture().thenRun(() -> {
180
// Scheduler is available again
181
scheduler.upload(nextTask);
182
});
183
}
184
```
185
186
### BatchingStateChangeUploadScheduler
187
188
Implementation that batches multiple upload requests for efficiency.
189
190
```java { .api }
191
/**
192
* Upload scheduler that batches requests to reduce the number of upload operations.
193
* Collects upload tasks for a configurable delay period before executing them together.
194
*/
195
public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
196
197
/**
198
* Creates batching scheduler
199
* @param uploader Underlying uploader for executing batched requests
200
* @param persistDelay Delay before executing batched uploads
201
* @param persistSizeThreshold Size threshold to trigger immediate upload
202
* @param inFlightDataLimit Maximum in-flight data for backpressure
203
* @param executor Executor for running upload operations
204
* @param metricGroup Metrics for tracking upload performance
205
*/
206
public BatchingStateChangeUploadScheduler(
207
StateChangeUploader uploader,
208
Duration persistDelay,
209
long persistSizeThreshold,
210
long inFlightDataLimit,
211
Executor executor,
212
ChangelogStorageMetricGroup metricGroup
213
);
214
}
215
```
216
217
The batching scheduler:
218
- Collects upload tasks for the configured `persistDelay` period
219
- Triggers immediate upload when accumulated size exceeds `persistSizeThreshold`
220
- Provides backpressure when in-flight data exceeds `inFlightDataLimit`
221
- Merges compatible tasks to reduce filesystem operations
222
223
### StateChangeFsUploader
224
225
Filesystem-specific implementation for uploading to distributed file systems.
226
227
```java { .api }
228
/**
229
* Filesystem-based uploader for state changes.
230
* Handles serialization, compression, and persistence to distributed file systems.
231
*/
232
public class StateChangeFsUploader extends AbstractStateChangeFsUploader {
233
234
/**
235
* Creates filesystem uploader
236
* @param jobID Job identifier for organizing files
237
* @param basePath Base path for changelog files
238
* @param fileSystem FileSystem instance for the base path
239
* @param compression Whether to enable compression
240
* @param bufferSize Buffer size for write operations
241
* @param metricGroup Metrics for tracking upload performance
242
* @param changelogRegistry Registry for managing uploaded state
243
*/
244
public StateChangeFsUploader(
245
JobID jobID,
246
Path basePath,
247
org.apache.flink.core.fs.FileSystem fileSystem,
248
boolean compression,
249
int bufferSize,
250
ChangelogStorageMetricGroup metricGroup,
251
TaskChangelogRegistry changelogRegistry
252
);
253
}
254
```
255
256
### DuplicatingStateChangeFsUploader
257
258
Specialized uploader that creates both remote and local copies for recovery.
259
260
```java { .api }
261
/**
262
* Uploader that creates duplicates for local recovery.
263
* Writes to both distributed file system and local storage simultaneously.
264
*/
265
public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader {
266
267
/**
268
* Creates duplicating uploader
269
* @param remoteUploader Primary uploader for distributed file system
270
* @param localUploader Secondary uploader for local storage
271
*/
272
public DuplicatingStateChangeFsUploader(
273
StateChangeFsUploader remoteUploader,
274
StateChangeFsUploader localUploader
275
);
276
}
277
```
278
279
### Retry and Error Handling
280
281
The upload system integrates with retry policies and error handling:
282
283
```java { .api }
284
/**
285
* Executor that applies retry policies to upload operations
286
*/
287
public class RetryingExecutor {
288
289
/**
290
* Executes operation with retry policy
291
* @param operation Operation to execute
292
* @param retryPolicy Retry policy for handling failures
293
* @return Result of successful execution
294
* @throws Exception if all retry attempts fail
295
*/
296
public <T> T execute(
297
Callable<T> operation,
298
RetryPolicy retryPolicy
299
) throws Exception;
300
}
301
```
302
303
**Error Handling Examples:**
304
305
```java
306
// Configure retry policy
307
RetryPolicy retryPolicy = RetryPolicy.fixed(
308
3, // max attempts
309
Duration.ofSeconds(5).toMillis(), // timeout
310
Duration.ofMillis(500).toMillis() // delay after failure
311
);
312
313
// Upload with retry handling
314
try {
315
UploadTasksResult result = uploader.upload(tasks);
316
317
// Process successful uploads
318
result.getSuccessful().forEach((task, uploadResults) -> {
319
task.complete(uploadResults);
320
});
321
322
// Handle failed uploads
323
result.getFailed().forEach((task, exception) -> {
324
task.fail(exception);
325
});
326
327
} catch (IOException e) {
328
log.error("Upload operation failed after retries", e);
329
// Trigger checkpoint failure and recovery
330
}
331
```
332
333
### Throttling and Flow Control
334
335
Upload throttling prevents overwhelming the distributed file system:
336
337
```java { .api }
338
/**
339
* Throttle for controlling upload rate and preventing system overload
340
*/
341
public class UploadThrottle {
342
343
/**
344
* Requests permission to upload data
345
* @param size Size of data to upload
346
* @return CompletableFuture that completes when upload is permitted
347
*/
348
public CompletableFuture<Void> requestUpload(long size);
349
350
/**
351
* Notifies throttle of completed upload
352
* @param size Size of completed upload
353
*/
354
public void uploadCompleted(long size);
355
}
356
```
357
358
The throttling system:
359
- Limits concurrent in-flight data based on `IN_FLIGHT_DATA_LIMIT`
360
- Provides backpressure to prevent memory exhaustion
361
- Coordinates across multiple writers and operators
362
- Integrates with Flink's availability provider system
363
364
### Performance Optimization
365
366
The upload system includes several optimizations:
367
368
- **Batching**: Reduces filesystem operation overhead
369
- **Compression**: Reduces network and storage usage
370
- **Parallel uploads**: Multiple threads for concurrent operations
371
- **Buffer management**: Configurable buffer sizes for different workloads
372
- **Connection pooling**: Reuses filesystem connections when possible