Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
Split assignment manages the distribution of file splits to reader nodes with locality awareness and load balancing for optimal distributed processing performance.
Core interface for managing the assignment of file splits to reader nodes.
/**
* Interface for assigning file splits to reader nodes with locality and load balancing
*/
public interface FileSplitAssigner {
/**
* Gets the next split for assignment to a specific hostname
* @param hostname Hostname of the requesting reader node (null if no preference)
* @return Optional containing the next split, or empty if no splits available
*/
Optional<FileSourceSplit> getNext(String hostname);
/**
* Adds new splits to the assignment queue
* @param splits Collection of splits to add for assignment
*/
void addSplits(Collection<FileSourceSplit> splits);
/**
* Returns all remaining unassigned splits
* @return Collection of splits not yet assigned
*/
Collection<FileSourceSplit> remainingSplits();
}Factory interface for creating FileSplitAssigner instances with serialization support.
/**
* Factory interface for creating FileSplitAssigner instances
*/
public interface Provider extends Serializable {
/**
* Creates a new FileSplitAssigner with initial splits
* @param splits Initial collection of splits to manage
* @return FileSplitAssigner implementation
*/
FileSplitAssigner create(Collection<FileSourceSplit> splits);
}Default split assigner that considers data locality for performance optimization.
/**
* Split assigner that considers data locality for optimal performance
* Prefers assigning splits to nodes where the data is locally available
*/
public class LocalityAwareSplitAssigner implements FileSplitAssigner {
/**
* Creates locality-aware assigner with initial splits
* @param splits Initial collection of splits to manage
*/
public LocalityAwareSplitAssigner(Collection<FileSourceSplit> splits);
/**
* Gets next split with locality preference for the given hostname
* @param hostname Hostname of requesting reader (used for locality matching)
* @return Optional containing locally preferred split, or any available split
*/
@Override
public Optional<FileSourceSplit> getNext(String hostname);
@Override
public void addSplits(Collection<FileSourceSplit> splits);
@Override
public Collection<FileSourceSplit> remainingSplits();
/**
* Provider instance for use with FileSource builder
*/
public static final Provider PROVIDER = LocalityAwareSplitAssigner::new;
}Basic round-robin split assigner without locality awareness.
/**
* Simple round-robin split assigner without locality consideration
* Suitable for scenarios where data locality is not important
*/
public class SimpleSplitAssigner implements FileSplitAssigner {
/**
* Creates simple assigner with initial splits
* @param splits Initial collection of splits to manage
*/
public SimpleSplitAssigner(Collection<FileSourceSplit> splits);
/**
* Gets next split in round-robin fashion, ignoring hostname
* @param hostname Hostname (ignored by this implementation)
* @return Optional containing next available split
*/
@Override
public Optional<FileSourceSplit> getNext(String hostname);
@Override
public void addSplits(Collection<FileSourceSplit> splits);
@Override
public Collection<FileSourceSplit> remainingSplits();
/**
* Provider instance for use with FileSource builder
*/
public static final Provider PROVIDER = SimpleSplitAssigner::new;
}Usage Examples:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.assigners.*;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
// Using locality-aware assignment (recommended for distributed file systems)
FileSource<String> localitySource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/hdfs/data"))
.setSplitAssigner(LocalityAwareSplitAssigner.PROVIDER)
.build();
// Using simple round-robin assignment
FileSource<String> simpleSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/local/data"))
.setSplitAssigner(SimpleSplitAssigner.PROVIDER)
.build();
// Default behavior (locality-aware is used automatically)
FileSource<String> defaultSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.build(); // Uses LocalityAwareSplitAssigner by defaultExample of implementing a custom split assigner with priority-based assignment.
/**
* Example custom split assigner that prioritizes splits by size
*/
public class SizeBasedSplitAssigner implements FileSplitAssigner {
private final Queue<FileSourceSplit> largeSplits;
private final Queue<FileSourceSplit> smallSplits;
private final long sizeThreshold;
public SizeBasedSplitAssigner(Collection<FileSourceSplit> splits, long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
this.largeSplits = new ArrayDeque<>();
this.smallSplits = new ArrayDeque<>();
// Separate splits by size
for (FileSourceSplit split : splits) {
if (split.length() > sizeThreshold) {
largeSplits.offer(split);
} else {
smallSplits.offer(split);
}
}
}
@Override
public Optional<FileSourceSplit> getNext(String hostname) {
// Prioritize large splits first for better load balancing
FileSourceSplit split = largeSplits.poll();
if (split == null) {
split = smallSplits.poll();
}
return Optional.ofNullable(split);
}
@Override
public void addSplits(Collection<FileSourceSplit> splits) {
for (FileSourceSplit split : splits) {
if (split.length() > sizeThreshold) {
largeSplits.offer(split);
} else {
smallSplits.offer(split);
}
}
}
@Override
public Collection<FileSourceSplit> remainingSplits() {
List<FileSourceSplit> remaining = new ArrayList<>();
remaining.addAll(largeSplits);
remaining.addAll(smallSplits);
return remaining;
}
public static class Provider implements FileSplitAssigner.Provider {
private final long sizeThreshold;
public Provider(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
}
@Override
public FileSplitAssigner create(Collection<FileSourceSplit> splits) {
return new SizeBasedSplitAssigner(splits, sizeThreshold);
}
}
}Examples of advanced split assignment strategies for specific use cases.
/**
* Weighted split assigner that considers node capacity
*/
public class WeightedSplitAssigner implements FileSplitAssigner {
private final Map<String, Integer> nodeWeights;
private final Map<String, Integer> assignedCounts;
private final Queue<FileSourceSplit> availableSplits;
public WeightedSplitAssigner(Collection<FileSourceSplit> splits,
Map<String, Integer> nodeWeights) {
this.nodeWeights = new HashMap<>(nodeWeights);
this.assignedCounts = new HashMap<>();
this.availableSplits = new ArrayDeque<>(splits);
// Initialize assigned counts
for (String hostname : nodeWeights.keySet()) {
assignedCounts.put(hostname, 0);
}
}
@Override
public Optional<FileSourceSplit> getNext(String hostname) {
if (availableSplits.isEmpty()) {
return Optional.empty();
}
// Check if this node can accept more splits based on weight
int nodeWeight = nodeWeights.getOrDefault(hostname, 1);
int assignedCount = assignedCounts.getOrDefault(hostname, 0);
if (assignedCount < nodeWeight) {
FileSourceSplit split = availableSplits.poll();
if (split != null) {
assignedCounts.put(hostname, assignedCount + 1);
}
return Optional.ofNullable(split);
}
return Optional.empty();
}
@Override
public void addSplits(Collection<FileSourceSplit> splits) {
availableSplits.addAll(splits);
}
@Override
public Collection<FileSourceSplit> remainingSplits() {
return new ArrayList<>(availableSplits);
}
}Advanced Usage Examples:
// Size-based assignment for mixed file sizes
long sizeThreshold = 64 * 1024 * 1024; // 64MB
FileSource<String> sizeBasedSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/mixed-sizes"))
.setSplitAssigner(new SizeBasedSplitAssigner.Provider(sizeThreshold))
.build();
// Weighted assignment based on node capacity
Map<String, Integer> nodeWeights = Map.of(
"worker-1", 4, // High capacity node
"worker-2", 2, // Medium capacity node
"worker-3", 1 // Low capacity node
);
FileSource<String> weightedSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.setSplitAssigner(splits -> new WeightedSplitAssigner(splits, nodeWeights))
.build();
// Combining with custom enumeration for complete control
FileSource<String> fullyCustomSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
.setFileEnumerator(() -> new CustomEnumerator())
.setSplitAssigner(new CustomSplitAssigner.Provider())
.build();Split assigners integrate with Flink's unified source framework for state management.
/**
* Split assigner state for checkpointing and recovery
*/
public class SplitAssignerState {
private final Collection<FileSourceSplit> remainingSplits;
private final Map<String, Object> assignerSpecificState;
public SplitAssignerState(
Collection<FileSourceSplit> remainingSplits,
Map<String, Object> assignerSpecificState) {
this.remainingSplits = remainingSplits;
this.assignerSpecificState = assignerSpecificState;
}
public Collection<FileSourceSplit> getRemainingSplits() {
return remainingSplits;
}
public Map<String, Object> getAssignerSpecificState() {
return assignerSpecificState;
}
}Split assigners handle various error conditions during split assignment:
try {
FileSplitAssigner assigner = new LocalityAwareSplitAssigner(splits);
Optional<FileSourceSplit> split = assigner.getNext("worker-node-1");
if (split.isPresent()) {
// Process the assigned split
}
} catch (IllegalArgumentException e) {
// Handle invalid parameters
} catch (Exception e) {
// Handle other assignment errors
}LocalityAwareSplitAssigner for distributed file systems (HDFS, S3, etc.)SimpleSplitAssigner for local file systems or when locality doesn't matterInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files