or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

split-assignment.mddocs/

0

# Split Assignment

1

2

Split assignment manages the distribution of file splits to reader nodes with locality awareness and load balancing for optimal distributed processing performance.

3

4

## Capabilities

5

6

### FileSplitAssigner Interface

7

8

Core interface for managing the assignment of file splits to reader nodes.

9

10

```java { .api }

11

/**

12

* Interface for assigning file splits to reader nodes with locality and load balancing

13

*/

14

public interface FileSplitAssigner {

15

/**

16

* Gets the next split for assignment to a specific hostname

17

* @param hostname Hostname of the requesting reader node (null if no preference)

18

* @return Optional containing the next split, or empty if no splits available

19

*/

20

Optional<FileSourceSplit> getNext(String hostname);

21

22

/**

23

* Adds new splits to the assignment queue

24

* @param splits Collection of splits to add for assignment

25

*/

26

void addSplits(Collection<FileSourceSplit> splits);

27

28

/**

29

* Returns all remaining unassigned splits

30

* @return Collection of splits not yet assigned

31

*/

32

Collection<FileSourceSplit> remainingSplits();

33

}

34

```

35

36

### FileSplitAssigner.Provider Interface

37

38

Factory interface for creating FileSplitAssigner instances with serialization support.

39

40

```java { .api }

41

/**

42

* Factory interface for creating FileSplitAssigner instances

43

*/

44

public interface Provider extends Serializable {

45

/**

46

* Creates a new FileSplitAssigner with initial splits

47

* @param splits Initial collection of splits to manage

48

* @return FileSplitAssigner implementation

49

*/

50

FileSplitAssigner create(Collection<FileSourceSplit> splits);

51

}

52

```

53

54

### LocalityAwareSplitAssigner

55

56

Default split assigner that considers data locality for performance optimization.

57

58

```java { .api }

59

/**

60

* Split assigner that considers data locality for optimal performance

61

* Prefers assigning splits to nodes where the data is locally available

62

*/

63

public class LocalityAwareSplitAssigner implements FileSplitAssigner {

64

/**

65

* Creates locality-aware assigner with initial splits

66

* @param splits Initial collection of splits to manage

67

*/

68

public LocalityAwareSplitAssigner(Collection<FileSourceSplit> splits);

69

70

/**

71

* Gets next split with locality preference for the given hostname

72

* @param hostname Hostname of requesting reader (used for locality matching)

73

* @return Optional containing locally preferred split, or any available split

74

*/

75

@Override

76

public Optional<FileSourceSplit> getNext(String hostname);

77

78

@Override

79

public void addSplits(Collection<FileSourceSplit> splits);

80

81

@Override

82

public Collection<FileSourceSplit> remainingSplits();

83

84

/**

85

* Provider instance for use with FileSource builder

86

*/

87

public static final Provider PROVIDER = LocalityAwareSplitAssigner::new;

88

}

89

```

90

91

### SimpleSplitAssigner

92

93

Basic round-robin split assigner without locality awareness.

94

95

```java { .api }

96

/**

97

* Simple round-robin split assigner without locality consideration

98

* Suitable for scenarios where data locality is not important

99

*/

100

public class SimpleSplitAssigner implements FileSplitAssigner {

101

/**

102

* Creates simple assigner with initial splits

103

* @param splits Initial collection of splits to manage

104

*/

105

public SimpleSplitAssigner(Collection<FileSourceSplit> splits);

106

107

/**

108

* Gets next split in round-robin fashion, ignoring hostname

109

* @param hostname Hostname (ignored by this implementation)

110

* @return Optional containing next available split

111

*/

112

@Override

113

public Optional<FileSourceSplit> getNext(String hostname);

114

115

@Override

116

public void addSplits(Collection<FileSourceSplit> splits);

117

118

@Override

119

public Collection<FileSourceSplit> remainingSplits();

120

121

/**

122

* Provider instance for use with FileSource builder

123

*/

124

public static final Provider PROVIDER = SimpleSplitAssigner::new;

125

}

126

```

127

128

**Usage Examples:**

129

130

```java

131

import org.apache.flink.connector.file.src.FileSource;

132

import org.apache.flink.connector.file.src.assigners.*;

133

import org.apache.flink.connector.file.src.reader.TextLineInputFormat;

134

import org.apache.flink.core.fs.Path;

135

136

// Using locality-aware assignment (recommended for distributed file systems)

137

FileSource<String> localitySource = FileSource

138

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/hdfs/data"))

139

.setSplitAssigner(LocalityAwareSplitAssigner.PROVIDER)

140

.build();

141

142

// Using simple round-robin assignment

143

FileSource<String> simpleSource = FileSource

144

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/local/data"))

145

.setSplitAssigner(SimpleSplitAssigner.PROVIDER)

146

.build();

147

148

// Default behavior (locality-aware is used automatically)

149

FileSource<String> defaultSource = FileSource

150

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

151

.build(); // Uses LocalityAwareSplitAssigner by default

152

```

153

154

### Custom Split Assignment Implementation

155

156

Example of implementing a custom split assigner with priority-based assignment.

157

158

```java { .api }

159

/**

160

* Example custom split assigner that prioritizes splits by size

161

*/

162

public class SizeBasedSplitAssigner implements FileSplitAssigner {

163

private final Queue<FileSourceSplit> largeSplits;

164

private final Queue<FileSourceSplit> smallSplits;

165

private final long sizeThreshold;

166

167

public SizeBasedSplitAssigner(Collection<FileSourceSplit> splits, long sizeThreshold) {

168

this.sizeThreshold = sizeThreshold;

169

this.largeSplits = new ArrayDeque<>();

170

this.smallSplits = new ArrayDeque<>();

171

172

// Separate splits by size

173

for (FileSourceSplit split : splits) {

174

if (split.length() > sizeThreshold) {

175

largeSplits.offer(split);

176

} else {

177

smallSplits.offer(split);

178

}

179

}

180

}

181

182

@Override

183

public Optional<FileSourceSplit> getNext(String hostname) {

184

// Prioritize large splits first for better load balancing

185

FileSourceSplit split = largeSplits.poll();

186

if (split == null) {

187

split = smallSplits.poll();

188

}

189

return Optional.ofNullable(split);

190

}

191

192

@Override

193

public void addSplits(Collection<FileSourceSplit> splits) {

194

for (FileSourceSplit split : splits) {

195

if (split.length() > sizeThreshold) {

196

largeSplits.offer(split);

197

} else {

198

smallSplits.offer(split);

199

}

200

}

201

}

202

203

@Override

204

public Collection<FileSourceSplit> remainingSplits() {

205

List<FileSourceSplit> remaining = new ArrayList<>();

206

remaining.addAll(largeSplits);

207

remaining.addAll(smallSplits);

208

return remaining;

209

}

210

211

public static class Provider implements FileSplitAssigner.Provider {

212

private final long sizeThreshold;

213

214

public Provider(long sizeThreshold) {

215

this.sizeThreshold = sizeThreshold;

216

}

217

218

@Override

219

public FileSplitAssigner create(Collection<FileSourceSplit> splits) {

220

return new SizeBasedSplitAssigner(splits, sizeThreshold);

221

}

222

}

223

}

224

```

225

226

### Advanced Split Assignment Patterns

227

228

Examples of advanced split assignment strategies for specific use cases.

229

230

```java { .api }

231

/**

232

* Weighted split assigner that considers node capacity

233

*/

234

public class WeightedSplitAssigner implements FileSplitAssigner {

235

private final Map<String, Integer> nodeWeights;

236

private final Map<String, Integer> assignedCounts;

237

private final Queue<FileSourceSplit> availableSplits;

238

239

public WeightedSplitAssigner(Collection<FileSourceSplit> splits,

240

Map<String, Integer> nodeWeights) {

241

this.nodeWeights = new HashMap<>(nodeWeights);

242

this.assignedCounts = new HashMap<>();

243

this.availableSplits = new ArrayDeque<>(splits);

244

245

// Initialize assigned counts

246

for (String hostname : nodeWeights.keySet()) {

247

assignedCounts.put(hostname, 0);

248

}

249

}

250

251

@Override

252

public Optional<FileSourceSplit> getNext(String hostname) {

253

if (availableSplits.isEmpty()) {

254

return Optional.empty();

255

}

256

257

// Check if this node can accept more splits based on weight

258

int nodeWeight = nodeWeights.getOrDefault(hostname, 1);

259

int assignedCount = assignedCounts.getOrDefault(hostname, 0);

260

261

if (assignedCount < nodeWeight) {

262

FileSourceSplit split = availableSplits.poll();

263

if (split != null) {

264

assignedCounts.put(hostname, assignedCount + 1);

265

}

266

return Optional.ofNullable(split);

267

}

268

269

return Optional.empty();

270

}

271

272

@Override

273

public void addSplits(Collection<FileSourceSplit> splits) {

274

availableSplits.addAll(splits);

275

}

276

277

@Override

278

public Collection<FileSourceSplit> remainingSplits() {

279

return new ArrayList<>(availableSplits);

280

}

281

}

282

```

283

284

**Advanced Usage Examples:**

285

286

```java

287

// Size-based assignment for mixed file sizes

288

long sizeThreshold = 64 * 1024 * 1024; // 64MB

289

FileSource<String> sizeBasedSource = FileSource

290

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/mixed-sizes"))

291

.setSplitAssigner(new SizeBasedSplitAssigner.Provider(sizeThreshold))

292

.build();

293

294

// Weighted assignment based on node capacity

295

Map<String, Integer> nodeWeights = Map.of(

296

"worker-1", 4, // High capacity node

297

"worker-2", 2, // Medium capacity node

298

"worker-3", 1 // Low capacity node

299

);

300

301

FileSource<String> weightedSource = FileSource

302

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

303

.setSplitAssigner(splits -> new WeightedSplitAssigner(splits, nodeWeights))

304

.build();

305

306

// Combining with custom enumeration for complete control

307

FileSource<String> fullyCustomSource = FileSource

308

.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))

309

.setFileEnumerator(() -> new CustomEnumerator())

310

.setSplitAssigner(new CustomSplitAssigner.Provider())

311

.build();

312

```

313

314

### Integration with Flink's Source Framework

315

316

Split assigners integrate with Flink's unified source framework for state management.

317

318

```java { .api }

319

/**

320

* Split assigner state for checkpointing and recovery

321

*/

322

public class SplitAssignerState {

323

private final Collection<FileSourceSplit> remainingSplits;

324

private final Map<String, Object> assignerSpecificState;

325

326

public SplitAssignerState(

327

Collection<FileSourceSplit> remainingSplits,

328

Map<String, Object> assignerSpecificState) {

329

this.remainingSplits = remainingSplits;

330

this.assignerSpecificState = assignerSpecificState;

331

}

332

333

public Collection<FileSourceSplit> getRemainingSplits() {

334

return remainingSplits;

335

}

336

337

public Map<String, Object> getAssignerSpecificState() {

338

return assignerSpecificState;

339

}

340

}

341

```

342

343

## Error Handling

344

345

Split assigners handle various error conditions during split assignment:

346

347

- **IllegalArgumentException**: Invalid split or hostname parameters

348

- **ConcurrentModificationException**: Concurrent access to split collections

349

- **OutOfMemoryError**: Too many splits or large split metadata

350

351

```java

352

try {

353

FileSplitAssigner assigner = new LocalityAwareSplitAssigner(splits);

354

Optional<FileSourceSplit> split = assigner.getNext("worker-node-1");

355

356

if (split.isPresent()) {

357

// Process the assigned split

358

}

359

} catch (IllegalArgumentException e) {

360

// Handle invalid parameters

361

} catch (Exception e) {

362

// Handle other assignment errors

363

}

364

```

365

366

## Performance Considerations

367

368

- Use `LocalityAwareSplitAssigner` for distributed file systems (HDFS, S3, etc.)

369

- Use `SimpleSplitAssigner` for local file systems or when locality doesn't matter

370

- Consider node capacity and processing power when implementing custom assigners

371

- Monitor split assignment patterns to ensure balanced load distribution

372

- Avoid creating too many small splits which can lead to overhead

373

- Consider file access patterns and reader parallelism when designing assignment strategies

374

- Implement efficient data structures for large numbers of splits

375

- Balance between locality optimization and load balancing requirements