CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-files

Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.

Pending
Overview
Eval results
Files

split-assignment.mddocs/

Split Assignment

Split assignment manages the distribution of file splits to reader nodes with locality awareness and load balancing for optimal distributed processing performance.

Capabilities

FileSplitAssigner Interface

Core interface for managing the assignment of file splits to reader nodes.

/**
 * Interface for assigning file splits to reader nodes with locality and load balancing
 */
public interface FileSplitAssigner {
    /**
     * Gets the next split for assignment to a specific hostname
     * @param hostname Hostname of the requesting reader node (null if no preference)
     * @return Optional containing the next split, or empty if no splits available
     */
    Optional<FileSourceSplit> getNext(String hostname);
    
    /**
     * Adds new splits to the assignment queue
     * @param splits Collection of splits to add for assignment
     */
    void addSplits(Collection<FileSourceSplit> splits);
    
    /**
     * Returns all remaining unassigned splits
     * @return Collection of splits not yet assigned
     */
    Collection<FileSourceSplit> remainingSplits();
}

FileSplitAssigner.Provider Interface

Factory interface for creating FileSplitAssigner instances with serialization support.

/**
 * Factory interface for creating FileSplitAssigner instances
 */
public interface Provider extends Serializable {
    /**
     * Creates a new FileSplitAssigner with initial splits
     * @param splits Initial collection of splits to manage
     * @return FileSplitAssigner implementation
     */
    FileSplitAssigner create(Collection<FileSourceSplit> splits);
}

LocalityAwareSplitAssigner

Default split assigner that considers data locality for performance optimization.

/**
 * Split assigner that considers data locality for optimal performance
 * Prefers assigning splits to nodes where the data is locally available
 */
public class LocalityAwareSplitAssigner implements FileSplitAssigner {
    /**
     * Creates locality-aware assigner with initial splits
     * @param splits Initial collection of splits to manage
     */
    public LocalityAwareSplitAssigner(Collection<FileSourceSplit> splits);
    
    /**
     * Gets next split with locality preference for the given hostname
     * @param hostname Hostname of requesting reader (used for locality matching)
     * @return Optional containing locally preferred split, or any available split
     */
    @Override
    public Optional<FileSourceSplit> getNext(String hostname);
    
    @Override
    public void addSplits(Collection<FileSourceSplit> splits);
    
    @Override
    public Collection<FileSourceSplit> remainingSplits();
    
    /**
     * Provider instance for use with FileSource builder
     */
    public static final Provider PROVIDER = LocalityAwareSplitAssigner::new;
}

SimpleSplitAssigner

Basic round-robin split assigner without locality awareness.

/**
 * Simple round-robin split assigner without locality consideration
 * Suitable for scenarios where data locality is not important
 */
public class SimpleSplitAssigner implements FileSplitAssigner {
    /**
     * Creates simple assigner with initial splits
     * @param splits Initial collection of splits to manage
     */
    public SimpleSplitAssigner(Collection<FileSourceSplit> splits);
    
    /**
     * Gets next split in round-robin fashion, ignoring hostname
     * @param hostname Hostname (ignored by this implementation)
     * @return Optional containing next available split
     */
    @Override
    public Optional<FileSourceSplit> getNext(String hostname);
    
    @Override
    public void addSplits(Collection<FileSourceSplit> splits);
    
    @Override
    public Collection<FileSourceSplit> remainingSplits();
    
    /**
     * Provider instance for use with FileSource builder
     */
    public static final Provider PROVIDER = SimpleSplitAssigner::new;
}

Usage Examples:

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.assigners.*;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;

// Using locality-aware assignment (recommended for distributed file systems)
FileSource<String> localitySource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/hdfs/data"))
    .setSplitAssigner(LocalityAwareSplitAssigner.PROVIDER)
    .build();

// Using simple round-robin assignment
FileSource<String> simpleSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/local/data"))
    .setSplitAssigner(SimpleSplitAssigner.PROVIDER)
    .build();

// Default behavior (locality-aware is used automatically)
FileSource<String> defaultSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
    .build(); // Uses LocalityAwareSplitAssigner by default

Custom Split Assignment Implementation

Example of implementing a custom split assigner with priority-based assignment.

/**
 * Example custom split assigner that prioritizes splits by size
 */
public class SizeBasedSplitAssigner implements FileSplitAssigner {
    private final Queue<FileSourceSplit> largeSplits;
    private final Queue<FileSourceSplit> smallSplits;
    private final long sizeThreshold;
    
    public SizeBasedSplitAssigner(Collection<FileSourceSplit> splits, long sizeThreshold) {
        this.sizeThreshold = sizeThreshold;
        this.largeSplits = new ArrayDeque<>();
        this.smallSplits = new ArrayDeque<>();
        
        // Separate splits by size
        for (FileSourceSplit split : splits) {
            if (split.length() > sizeThreshold) {
                largeSplits.offer(split);
            } else {
                smallSplits.offer(split);
            }
        }
    }
    
    @Override
    public Optional<FileSourceSplit> getNext(String hostname) {
        // Prioritize large splits first for better load balancing
        FileSourceSplit split = largeSplits.poll();
        if (split == null) {
            split = smallSplits.poll();
        }
        return Optional.ofNullable(split);
    }
    
    @Override
    public void addSplits(Collection<FileSourceSplit> splits) {
        for (FileSourceSplit split : splits) {
            if (split.length() > sizeThreshold) {
                largeSplits.offer(split);
            } else {
                smallSplits.offer(split);
            }
        }
    }
    
    @Override
    public Collection<FileSourceSplit> remainingSplits() {
        List<FileSourceSplit> remaining = new ArrayList<>();
        remaining.addAll(largeSplits);
        remaining.addAll(smallSplits);
        return remaining;
    }
    
    public static class Provider implements FileSplitAssigner.Provider {
        private final long sizeThreshold;
        
        public Provider(long sizeThreshold) {
            this.sizeThreshold = sizeThreshold;
        }
        
        @Override
        public FileSplitAssigner create(Collection<FileSourceSplit> splits) {
            return new SizeBasedSplitAssigner(splits, sizeThreshold);
        }
    }
}

Advanced Split Assignment Patterns

Examples of advanced split assignment strategies for specific use cases.

/**
 * Weighted split assigner that considers node capacity
 */
public class WeightedSplitAssigner implements FileSplitAssigner {
    private final Map<String, Integer> nodeWeights;
    private final Map<String, Integer> assignedCounts;
    private final Queue<FileSourceSplit> availableSplits;
    
    public WeightedSplitAssigner(Collection<FileSourceSplit> splits, 
                                Map<String, Integer> nodeWeights) {
        this.nodeWeights = new HashMap<>(nodeWeights);
        this.assignedCounts = new HashMap<>();
        this.availableSplits = new ArrayDeque<>(splits);
        
        // Initialize assigned counts
        for (String hostname : nodeWeights.keySet()) {
            assignedCounts.put(hostname, 0);
        }
    }
    
    @Override
    public Optional<FileSourceSplit> getNext(String hostname) {
        if (availableSplits.isEmpty()) {
            return Optional.empty();
        }
        
        // Check if this node can accept more splits based on weight
        int nodeWeight = nodeWeights.getOrDefault(hostname, 1);
        int assignedCount = assignedCounts.getOrDefault(hostname, 0);
        
        if (assignedCount < nodeWeight) {
            FileSourceSplit split = availableSplits.poll();
            if (split != null) {
                assignedCounts.put(hostname, assignedCount + 1);
            }
            return Optional.ofNullable(split);
        }
        
        return Optional.empty();
    }
    
    @Override
    public void addSplits(Collection<FileSourceSplit> splits) {
        availableSplits.addAll(splits);
    }
    
    @Override
    public Collection<FileSourceSplit> remainingSplits() {
        return new ArrayList<>(availableSplits);
    }
}

Advanced Usage Examples:

// Size-based assignment for mixed file sizes
long sizeThreshold = 64 * 1024 * 1024; // 64MB
FileSource<String> sizeBasedSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/mixed-sizes"))
    .setSplitAssigner(new SizeBasedSplitAssigner.Provider(sizeThreshold))
    .build();

// Weighted assignment based on node capacity
Map<String, Integer> nodeWeights = Map.of(
    "worker-1", 4,  // High capacity node
    "worker-2", 2,  // Medium capacity node  
    "worker-3", 1   // Low capacity node
);

FileSource<String> weightedSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
    .setSplitAssigner(splits -> new WeightedSplitAssigner(splits, nodeWeights))
    .build();

// Combining with custom enumeration for complete control
FileSource<String> fullyCustomSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data"))
    .setFileEnumerator(() -> new CustomEnumerator())
    .setSplitAssigner(new CustomSplitAssigner.Provider())
    .build();

Integration with Flink's Source Framework

Split assigners integrate with Flink's unified source framework for state management.

/**
 * Split assigner state for checkpointing and recovery
 */
public class SplitAssignerState {
    private final Collection<FileSourceSplit> remainingSplits;
    private final Map<String, Object> assignerSpecificState;
    
    public SplitAssignerState(
        Collection<FileSourceSplit> remainingSplits,
        Map<String, Object> assignerSpecificState) {
        this.remainingSplits = remainingSplits;
        this.assignerSpecificState = assignerSpecificState;
    }
    
    public Collection<FileSourceSplit> getRemainingSplits() {
        return remainingSplits;
    }
    
    public Map<String, Object> getAssignerSpecificState() {
        return assignerSpecificState;
    }
}

Error Handling

Split assigners handle various error conditions during split assignment:

  • IllegalArgumentException: Invalid split or hostname parameters
  • ConcurrentModificationException: Concurrent access to split collections
  • OutOfMemoryError: Too many splits or large split metadata
try {
    FileSplitAssigner assigner = new LocalityAwareSplitAssigner(splits);
    Optional<FileSourceSplit> split = assigner.getNext("worker-node-1");
    
    if (split.isPresent()) {
        // Process the assigned split
    }
} catch (IllegalArgumentException e) {
    // Handle invalid parameters
} catch (Exception e) {
    // Handle other assignment errors
}

Performance Considerations

  • Use LocalityAwareSplitAssigner for distributed file systems (HDFS, S3, etc.)
  • Use SimpleSplitAssigner for local file systems or when locality doesn't matter
  • Consider node capacity and processing power when implementing custom assigners
  • Monitor split assignment patterns to ensure balanced load distribution
  • Avoid creating too many small splits which can lead to overhead
  • Consider file access patterns and reader parallelism when designing assignment strategies
  • Implement efficient data structures for large numbers of splits
  • Balance between locality optimization and load balancing requirements

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-files

docs

bulk-formats.md

file-compaction.md

file-enumeration.md

file-sinks.md

file-sources.md

index.md

split-assignment.md

stream-formats.md

tile.json