0
# Split Assignment
1
2
Split assignment manages the distribution of file splits to reader nodes with locality awareness and load balancing for optimal distributed processing performance.
3
4
## Capabilities
5
6
### FileSplitAssigner Interface
7
8
Core interface for managing the assignment of file splits to reader nodes.
9
10
```java { .api }
11
/**
12
* Interface for assigning file splits to reader nodes with locality and load balancing
13
*/
14
public interface FileSplitAssigner {
15
/**
16
* Gets the next split for assignment to a specific hostname
17
* @param hostname Hostname of the requesting reader node (null if no preference)
18
* @return Optional containing the next split, or empty if no splits available
19
*/
20
Optional<FileSourceSplit> getNext(String hostname);
21
22
/**
23
* Adds new splits to the assignment queue
24
* @param splits Collection of splits to add for assignment
25
*/
26
void addSplits(Collection<FileSourceSplit> splits);
27
28
/**
29
* Returns all remaining unassigned splits
30
* @return Collection of splits not yet assigned
31
*/
32
Collection<FileSourceSplit> remainingSplits();
33
}
34
```
35
36
### FileSplitAssigner.Provider Interface
37
38
Factory interface for creating FileSplitAssigner instances with serialization support.
39
40
```java { .api }
41
/**
42
* Factory interface for creating FileSplitAssigner instances
43
*/
44
public interface Provider extends Serializable {
45
/**
46
* Creates a new FileSplitAssigner with initial splits
47
* @param splits Initial collection of splits to manage
48
* @return FileSplitAssigner implementation
49
*/
50
FileSplitAssigner create(Collection<FileSourceSplit> splits);
51
}
52
```
53
54
### LocalityAwareSplitAssigner
55
56
Default split assigner that considers data locality for performance optimization.
57
58
```java { .api }
59
/**
60
* Split assigner that considers data locality for optimal performance
61
* Prefers assigning splits to nodes where the data is locally available
62
*/
63
public class LocalityAwareSplitAssigner implements FileSplitAssigner {
64
/**
65
* Creates locality-aware assigner with initial splits
66
* @param splits Initial collection of splits to manage
67
*/
68
public LocalityAwareSplitAssigner(Collection<FileSourceSplit> splits);
69
70
/**
71
* Gets next split with locality preference for the given hostname
72
* @param hostname Hostname of requesting reader (used for locality matching)
73
* @return Optional containing locally preferred split, or any available split
74
*/
75
@Override
76
public Optional<FileSourceSplit> getNext(String hostname);
77
78
@Override
79
public void addSplits(Collection<FileSourceSplit> splits);
80
81
@Override
82
public Collection<FileSourceSplit> remainingSplits();
83
84
/**
85
* Provider instance for use with FileSource builder
86
*/
87
public static final Provider PROVIDER = LocalityAwareSplitAssigner::new;
88
}
89
```
90
91
### SimpleSplitAssigner
92
93
Basic round-robin split assigner without locality awareness.
94
95
```java { .api }
96
/**
97
* Simple round-robin split assigner without locality consideration
98
* Suitable for scenarios where data locality is not important
99
*/
100
public class SimpleSplitAssigner implements FileSplitAssigner {
101
/**
102
* Creates simple assigner with initial splits
103
* @param splits Initial collection of splits to manage
104
*/
105
public SimpleSplitAssigner(Collection<FileSourceSplit> splits);
106
107
/**
108
* Gets next split in round-robin fashion, ignoring hostname
109
* @param hostname Hostname (ignored by this implementation)
110
* @return Optional containing next available split
111
*/
112
@Override
113
public Optional<FileSourceSplit> getNext(String hostname);
114
115
@Override
116
public void addSplits(Collection<FileSourceSplit> splits);
117
118
@Override
119
public Collection<FileSourceSplit> remainingSplits();
120
121
/**
122
* Provider instance for use with FileSource builder
123
*/
124
public static final Provider PROVIDER = SimpleSplitAssigner::new;
125
}
126
```
127
128
**Usage Examples:**
129
130
```java
131
import org.apache.flink.connector.file.src.FileSource;
132
import org.apache.flink.connector.file.src.assigners.*;
133
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
134
import org.apache.flink.core.fs.Path;
135
136
// Using locality-aware assignment (recommended for distributed file systems)
137
FileSource<String> localitySource = FileSource
138
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/hdfs/data"))
139
.setSplitAssigner(LocalityAwareSplitAssigner.PROVIDER)
140
.build();
141
142
// Using simple round-robin assignment
143
FileSource<String> simpleSource = FileSource
144
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/local/data"))
145
.setSplitAssigner(SimpleSplitAssigner.PROVIDER)
146
.build();
147
148
// Default behavior (locality-aware is used automatically)
149
FileSource<String> defaultSource = FileSource
150
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
151
.build(); // Uses LocalityAwareSplitAssigner by default
152
```
153
154
### Custom Split Assignment Implementation
155
156
Example of implementing a custom split assigner with priority-based assignment.
157
158
```java { .api }
159
/**
160
* Example custom split assigner that prioritizes splits by size
161
*/
162
public class SizeBasedSplitAssigner implements FileSplitAssigner {
163
private final Queue<FileSourceSplit> largeSplits;
164
private final Queue<FileSourceSplit> smallSplits;
165
private final long sizeThreshold;
166
167
public SizeBasedSplitAssigner(Collection<FileSourceSplit> splits, long sizeThreshold) {
168
this.sizeThreshold = sizeThreshold;
169
this.largeSplits = new ArrayDeque<>();
170
this.smallSplits = new ArrayDeque<>();
171
172
// Separate splits by size
173
for (FileSourceSplit split : splits) {
174
if (split.length() > sizeThreshold) {
175
largeSplits.offer(split);
176
} else {
177
smallSplits.offer(split);
178
}
179
}
180
}
181
182
@Override
183
public Optional<FileSourceSplit> getNext(String hostname) {
184
// Prioritize large splits first for better load balancing
185
FileSourceSplit split = largeSplits.poll();
186
if (split == null) {
187
split = smallSplits.poll();
188
}
189
return Optional.ofNullable(split);
190
}
191
192
@Override
193
public void addSplits(Collection<FileSourceSplit> splits) {
194
for (FileSourceSplit split : splits) {
195
if (split.length() > sizeThreshold) {
196
largeSplits.offer(split);
197
} else {
198
smallSplits.offer(split);
199
}
200
}
201
}
202
203
@Override
204
public Collection<FileSourceSplit> remainingSplits() {
205
List<FileSourceSplit> remaining = new ArrayList<>();
206
remaining.addAll(largeSplits);
207
remaining.addAll(smallSplits);
208
return remaining;
209
}
210
211
public static class Provider implements FileSplitAssigner.Provider {
212
private final long sizeThreshold;
213
214
public Provider(long sizeThreshold) {
215
this.sizeThreshold = sizeThreshold;
216
}
217
218
@Override
219
public FileSplitAssigner create(Collection<FileSourceSplit> splits) {
220
return new SizeBasedSplitAssigner(splits, sizeThreshold);
221
}
222
}
223
}
224
```
225
226
### Advanced Split Assignment Patterns
227
228
Examples of advanced split assignment strategies for specific use cases.
229
230
```java { .api }
231
/**
232
* Weighted split assigner that considers node capacity
233
*/
234
public class WeightedSplitAssigner implements FileSplitAssigner {
235
private final Map<String, Integer> nodeWeights;
236
private final Map<String, Integer> assignedCounts;
237
private final Queue<FileSourceSplit> availableSplits;
238
239
public WeightedSplitAssigner(Collection<FileSourceSplit> splits,
240
Map<String, Integer> nodeWeights) {
241
this.nodeWeights = new HashMap<>(nodeWeights);
242
this.assignedCounts = new HashMap<>();
243
this.availableSplits = new ArrayDeque<>(splits);
244
245
// Initialize assigned counts
246
for (String hostname : nodeWeights.keySet()) {
247
assignedCounts.put(hostname, 0);
248
}
249
}
250
251
@Override
252
public Optional<FileSourceSplit> getNext(String hostname) {
253
if (availableSplits.isEmpty()) {
254
return Optional.empty();
255
}
256
257
// Check if this node can accept more splits based on weight
258
int nodeWeight = nodeWeights.getOrDefault(hostname, 1);
259
int assignedCount = assignedCounts.getOrDefault(hostname, 0);
260
261
if (assignedCount < nodeWeight) {
262
FileSourceSplit split = availableSplits.poll();
263
if (split != null) {
264
assignedCounts.put(hostname, assignedCount + 1);
265
}
266
return Optional.ofNullable(split);
267
}
268
269
return Optional.empty();
270
}
271
272
@Override
273
public void addSplits(Collection<FileSourceSplit> splits) {
274
availableSplits.addAll(splits);
275
}
276
277
@Override
278
public Collection<FileSourceSplit> remainingSplits() {
279
return new ArrayList<>(availableSplits);
280
}
281
}
282
```
283
284
**Advanced Usage Examples:**
285
286
```java
287
// Size-based assignment for mixed file sizes
288
long sizeThreshold = 64 * 1024 * 1024; // 64MB
289
FileSource<String> sizeBasedSource = FileSource
290
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/mixed-sizes"))
291
.setSplitAssigner(new SizeBasedSplitAssigner.Provider(sizeThreshold))
292
.build();
293
294
// Weighted assignment based on node capacity
295
Map<String, Integer> nodeWeights = Map.of(
296
"worker-1", 4, // High capacity node
297
"worker-2", 2, // Medium capacity node
298
"worker-3", 1 // Low capacity node
299
);
300
301
FileSource<String> weightedSource = FileSource
302
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
303
.setSplitAssigner(splits -> new WeightedSplitAssigner(splits, nodeWeights))
304
.build();
305
306
// Combining with custom enumeration for complete control
307
FileSource<String> fullyCustomSource = FileSource
308
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
309
.setFileEnumerator(() -> new CustomEnumerator())
310
.setSplitAssigner(new CustomSplitAssigner.Provider())
311
.build();
312
```
313
314
### Integration with Flink's Source Framework
315
316
Split assigners integrate with Flink's unified source framework for state management.
317
318
```java { .api }
319
/**
320
* Split assigner state for checkpointing and recovery
321
*/
322
public class SplitAssignerState {
323
private final Collection<FileSourceSplit> remainingSplits;
324
private final Map<String, Object> assignerSpecificState;
325
326
public SplitAssignerState(
327
Collection<FileSourceSplit> remainingSplits,
328
Map<String, Object> assignerSpecificState) {
329
this.remainingSplits = remainingSplits;
330
this.assignerSpecificState = assignerSpecificState;
331
}
332
333
public Collection<FileSourceSplit> getRemainingSplits() {
334
return remainingSplits;
335
}
336
337
public Map<String, Object> getAssignerSpecificState() {
338
return assignerSpecificState;
339
}
340
}
341
```
342
343
## Error Handling
344
345
Split assigners handle various error conditions during split assignment:
346
347
- **IllegalArgumentException**: Invalid split or hostname parameters
348
- **ConcurrentModificationException**: Concurrent access to split collections
349
- **OutOfMemoryError**: Too many splits or large split metadata
350
351
```java
352
try {
353
FileSplitAssigner assigner = new LocalityAwareSplitAssigner(splits);
354
Optional<FileSourceSplit> split = assigner.getNext("worker-node-1");
355
356
if (split.isPresent()) {
357
// Process the assigned split
358
}
359
} catch (IllegalArgumentException e) {
360
// Handle invalid parameters
361
} catch (Exception e) {
362
// Handle other assignment errors
363
}
364
```
365
366
## Performance Considerations
367
368
- Use `LocalityAwareSplitAssigner` for distributed file systems (HDFS, S3, etc.)
369
- Use `SimpleSplitAssigner` for local file systems or when locality doesn't matter
370
- Consider node capacity and processing power when implementing custom assigners
371
- Monitor split assignment patterns to ensure balanced load distribution
372
- Avoid creating too many small splits which can lead to overhead
373
- Consider file access patterns and reader parallelism when designing assignment strategies
374
- Implement efficient data structures for large numbers of splits
375
- Balance between locality optimization and load balancing requirements