CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Pending
Overview
Eval results
Files

rate-limiting.mddocs/

Rate Limiting & Scaling

The Rate Limiting & Scaling framework provides sophisticated strategies for controlling throughput, handling backpressure, and dynamically adjusting to changing conditions. It includes pluggable rate limiting strategies and scaling algorithms optimized for different workload patterns.

Core Components

RateLimitingStrategy

The main interface for controlling request rates and backpressure.

@PublicEvolving
public interface RateLimitingStrategy {
    void registerInFlightRequest(RequestInfo requestInfo)
    void registerCompletedRequest(ResultInfo resultInfo)
    boolean shouldBlock(RequestInfo requestInfo)
    int getMaxBatchSize()
}

ScalingStrategy

Interface for controlling scale up/down behavior.

@PublicEvolving
public interface ScalingStrategy<T> {
    T scaleUp(T currentValue)
    T scaleDown(T currentValue)
}

CongestionControlRateLimitingStrategy

Advanced rate limiting strategy that scales based on success/failure patterns.

@PublicEvolving
public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
    
    // Builder methods
    public static CongestionControlRateLimitingStrategyBuilder builder()
    
    // Builder interface
    public interface CongestionControlRateLimitingStrategyBuilder {
        Builder setMaxInFlightRequests(int maxInFlightRequests)
        Builder setInitialMaxInFlightMessages(int initialMaxInFlightMessages)
        Builder setScalingStrategy(ScalingStrategy<Integer> scalingStrategy)
        CongestionControlRateLimitingStrategy build()
    }
}

AIMDScalingStrategy

Additive Increase/Multiplicative Decrease scaling strategy.

@PublicEvolving
public class AIMDScalingStrategy implements ScalingStrategy<Integer> {
    
    // Constructor
    public AIMDScalingStrategy(int increaseRate, double decreaseFactor, int rateThreshold)
    
    // Scaling methods
    public Integer scaleUp(Integer currentRate)
    public Integer scaleDown(Integer currentRate)
    
    // Builder
    public static AIMDScalingStrategyBuilder builder(int rateThreshold)
    
    public interface AIMDScalingStrategyBuilder {
        AIMDScalingStrategyBuilder setIncreaseRate(int increaseRate)
        AIMDScalingStrategyBuilder setDecreaseFactor(double decreaseFactor)
        AIMDScalingStrategy build()
    }
}

Implementation Examples

Basic Rate Limiting Configuration

// Create AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
    .setIncreaseRate(10)         // Increase by 10 per success
    .setDecreaseFactor(0.5)      // Halve on failure
    .build();

// Create congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting = 
    CongestionControlRateLimitingStrategy.builder()
        .setMaxInFlightRequests(50)
        .setInitialMaxInFlightMessages(100)
        .setScalingStrategy(scalingStrategy)
        .build();

// Apply to AsyncSinkWriter configuration
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
    .setMaxBatchSize(200)
    .setMaxBatchSizeInBytes(2 * 1024 * 1024)  // 2MB
    .setMaxInFlightRequests(50)
    .setMaxBufferedRequests(2000)
    .setMaxTimeInBufferMS(3000)
    .setMaxRecordSizeInBytes(512 * 1024)
    .setRateLimitingStrategy(rateLimiting)
    .build();

Custom Rate Limiting Strategy

public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {
    private final int maxTokens;
    private final int refillRate;  // Tokens per second
    private final AtomicInteger tokens;
    private final ScheduledExecutorService scheduler;
    private volatile long lastRefillTime;
    
    public TokenBucketRateLimitingStrategy(int maxTokens, int refillRate) {
        this.maxTokens = maxTokens;
        this.refillRate = refillRate;
        this.tokens = new AtomicInteger(maxTokens);
        this.lastRefillTime = System.currentTimeMillis();
        
        // Schedule token refill
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.scheduler.scheduleAtFixedRate(this::refillTokens, 0, 1, TimeUnit.SECONDS);
    }
    
    @Override
    public void registerInFlightRequest(RequestInfo requestInfo) {
        // Consume tokens for the request
        int batchSize = requestInfo.getBatchSize();
        tokens.updateAndGet(current -> Math.max(0, current - batchSize));
    }
    
    @Override
    public void registerCompletedRequest(ResultInfo resultInfo) {
        // No action needed for token bucket on completion
    }
    
    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        int requiredTokens = requestInfo.getBatchSize();
        return tokens.get() < requiredTokens;
    }
    
    @Override
    public int getMaxBatchSize() {
        // Batch size limited by available tokens
        return Math.min(maxTokens, tokens.get());
    }
    
    private void refillTokens() {
        long now = System.currentTimeMillis();
        long timeDelta = now - lastRefillTime;
        
        if (timeDelta >= 1000) { // Refill every second
            int tokensToAdd = (int) (refillRate * (timeDelta / 1000.0));
            tokens.updateAndGet(current -> Math.min(maxTokens, current + tokensToAdd));
            lastRefillTime = now;
        }
    }
}

Adaptive Rate Limiting Strategy

public class AdaptiveRateLimitingStrategy implements RateLimitingStrategy {
    private final ScalingStrategy<Integer> scalingStrategy;
    private final int maxInFlightRequests;
    private final AtomicInteger currentMaxInFlightMessages;
    private final AtomicInteger inFlightRequests;
    
    // Performance metrics
    private final MovingAverage successRate;
    private final MovingAverage latency;
    private volatile long lastSuccessTime = System.currentTimeMillis();
    
    public AdaptiveRateLimitingStrategy(
            int maxInFlightRequests,
            int initialMaxInFlightMessages,
            ScalingStrategy<Integer> scalingStrategy) {
        
        this.maxInFlightRequests = maxInFlightRequests;
        this.currentMaxInFlightMessages = new AtomicInteger(initialMaxInFlightMessages);
        this.inFlightRequests = new AtomicInteger(0);
        this.scalingStrategy = scalingStrategy;
        
        // Initialize metrics with 10-second windows
        this.successRate = new MovingAverage(10);
        this.latency = new MovingAverage(10);
    }
    
    @Override
    public void registerInFlightRequest(RequestInfo requestInfo) {
        inFlightRequests.incrementAndGet();
        
        // Track request start time for latency calculation
        if (requestInfo instanceof TimestampedRequestInfo) {
            TimestampedRequestInfo timestamped = (TimestampedRequestInfo) requestInfo;
            timestamped.setStartTime(System.currentTimeMillis());
        }
    }
    
    @Override
    public void registerCompletedRequest(ResultInfo resultInfo) {
        inFlightRequests.decrementAndGet();
        
        int failedMessages = resultInfo.getFailedMessages();
        int totalMessages = resultInfo.getBatchSize();
        
        // Update success rate
        double batchSuccessRate = (double) (totalMessages - failedMessages) / totalMessages;
        successRate.addValue(batchSuccessRate);
        
        // Update latency if available
        if (resultInfo instanceof TimestampedResultInfo) {
            TimestampedResultInfo timestamped = (TimestampedResultInfo) resultInfo;
            long requestLatency = System.currentTimeMillis() - timestamped.getStartTime();
            latency.addValue(requestLatency);
        }
        
        // Adaptive scaling based on performance
        adaptiveScale(batchSuccessRate);
        
        if (failedMessages == 0) {
            lastSuccessTime = System.currentTimeMillis();
        }
    }
    
    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        // Block if too many in-flight requests
        if (inFlightRequests.get() >= maxInFlightRequests) {
            return true;
        }
        
        // Block if batch would exceed current limit
        int currentLimit = currentMaxInFlightMessages.get();
        return requestInfo.getBatchSize() > currentLimit;
    }
    
    @Override
    public int getMaxBatchSize() {
        return currentMaxInFlightMessages.get();
    }
    
    private void adaptiveScale(double batchSuccessRate) {
        double avgSuccessRate = successRate.getAverage();
        double avgLatency = latency.getAverage();
        
        // Scale up conditions: high success rate and low latency
        if (avgSuccessRate > 0.95 && avgLatency < 1000) {
            int newLimit = scalingStrategy.scaleUp(currentMaxInFlightMessages.get());
            currentMaxInFlightMessages.set(newLimit);
            
        // Scale down conditions: low success rate or high latency
        } else if (avgSuccessRate < 0.8 || avgLatency > 5000) {
            int newLimit = scalingStrategy.scaleDown(currentMaxInFlightMessages.get());
            currentMaxInFlightMessages.set(Math.max(1, newLimit));
        }
        
        // Emergency scale down if no success for too long
        long timeSinceLastSuccess = System.currentTimeMillis() - lastSuccessTime;
        if (timeSinceLastSuccess > 30000) { // 30 seconds
            int emergencyLimit = currentMaxInFlightMessages.get() / 4;
            currentMaxInFlightMessages.set(Math.max(1, emergencyLimit));
        }
    }
}

// Helper classes for timestamped metrics
public class TimestampedRequestInfo implements RequestInfo {
    private final int batchSize;
    private volatile long startTime;
    
    public TimestampedRequestInfo(int batchSize) {
        this.batchSize = batchSize;
    }
    
    @Override
    public int getBatchSize() {
        return batchSize;
    }
    
    public void setStartTime(long startTime) {
        this.startTime = startTime;
    }
    
    public long getStartTime() {
        return startTime;
    }
}

public class TimestampedResultInfo implements ResultInfo {
    private final int failedMessages;
    private final int batchSize;
    private final long startTime;
    
    public TimestampedResultInfo(int failedMessages, int batchSize, long startTime) {
        this.failedMessages = failedMessages;
        this.batchSize = batchSize;
        this.startTime = startTime;
    }
    
    @Override
    public int getFailedMessages() {
        return failedMessages;
    }
    
    @Override
    public int getBatchSize() {
        return batchSize;
    }
    
    public long getStartTime() {
        return startTime;
    }
}

// Moving average helper
public class MovingAverage {
    private final int windowSize;
    private final double[] values;
    private int index = 0;
    private int count = 0;
    
    public MovingAverage(int windowSize) {
        this.windowSize = windowSize;
        this.values = new double[windowSize];
    }
    
    public synchronized void addValue(double value) {
        values[index] = value;
        index = (index + 1) % windowSize;
        count = Math.min(count + 1, windowSize);
    }
    
    public synchronized double getAverage() {
        if (count == 0) {
            return 0.0;
        }
        
        double sum = 0.0;
        for (int i = 0; i < count; i++) {
            sum += values[i];
        }
        return sum / count;
    }
}

Circuit Breaker Rate Limiting

public class CircuitBreakerRateLimitingStrategy implements RateLimitingStrategy {
    
    public enum State {
        CLOSED,      // Normal operation
        OPEN,        // Circuit is open, blocking all requests
        HALF_OPEN    // Testing if circuit can be closed
    }
    
    private final int failureThreshold;
    private final long timeoutMillis;
    private final int halfOpenMaxRequests;
    
    private volatile State state = State.CLOSED;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicInteger halfOpenSuccessCount = new AtomicInteger(0);
    private volatile long lastFailureTime = 0;
    
    public CircuitBreakerRateLimitingStrategy(
            int failureThreshold,
            long timeoutMillis,
            int halfOpenMaxRequests) {
        
        this.failureThreshold = failureThreshold;
        this.timeoutMillis = timeoutMillis;
        this.halfOpenMaxRequests = halfOpenMaxRequests;
    }
    
    @Override
    public void registerInFlightRequest(RequestInfo requestInfo) {
        // No action needed on request start
    }
    
    @Override
    public void registerCompletedRequest(ResultInfo resultInfo) {
        int failedMessages = resultInfo.getFailedMessages();
        int totalMessages = resultInfo.getBatchSize();
        
        switch (state) {
            case CLOSED:
                if (failedMessages > 0) {
                    int failures = failureCount.addAndGet(failedMessages);
                    if (failures >= failureThreshold) {
                        // Open the circuit
                        state = State.OPEN;
                        lastFailureTime = System.currentTimeMillis();
                        LOG.warn("Circuit breaker opened after {} failures", failures);
                    }
                } else {
                    // Reset failure count on success
                    failureCount.set(0);
                }
                break;
                
            case HALF_OPEN:
                if (failedMessages > 0) {
                    // Failure in half-open state - go back to open
                    state = State.OPEN;
                    lastFailureTime = System.currentTimeMillis();
                    halfOpenSuccessCount.set(0);
                    LOG.warn("Circuit breaker re-opened due to failure in half-open state");
                } else {
                    // Success in half-open state
                    int successes = halfOpenSuccessCount.incrementAndGet();
                    if (successes >= halfOpenMaxRequests) {
                        // Close the circuit
                        state = State.CLOSED;
                        failureCount.set(0);
                        halfOpenSuccessCount.set(0);
                        LOG.info("Circuit breaker closed after {} successful requests", successes);
                    }
                }
                break;
                
            case OPEN:
                // Check if timeout has passed
                if (System.currentTimeMillis() - lastFailureTime >= timeoutMillis) {
                    state = State.HALF_OPEN;
                    halfOpenSuccessCount.set(0);
                    LOG.info("Circuit breaker moved to half-open state");
                }
                break;
        }
    }
    
    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        switch (state) {
            case CLOSED:
                return false;
                
            case OPEN:
                // Check if timeout has passed
                if (System.currentTimeMillis() - lastFailureTime >= timeoutMillis) {
                    state = State.HALF_OPEN;
                    halfOpenSuccessCount.set(0);
                    return false; // Allow first half-open request
                }
                return true; // Block all requests
                
            case HALF_OPEN:
                // Allow limited requests in half-open state
                return halfOpenSuccessCount.get() >= halfOpenMaxRequests;
                
            default:
                return false;
        }
    }
    
    @Override
    public int getMaxBatchSize() {
        switch (state) {
            case CLOSED:
                return Integer.MAX_VALUE; // No limit
                
            case OPEN:
                return 0; // Block all
                
            case HALF_OPEN:
                return 1; // Small batches for testing
                
            default:
                return Integer.MAX_VALUE;
        }
    }
    
    public State getCurrentState() {
        return state;
    }
}

Advanced Scaling Strategies

// TCP-like congestion control scaling
public class TCPLikeScalingStrategy implements ScalingStrategy<Integer> {
    private final int slowStartThreshold;
    private volatile boolean inSlowStart = true;
    private volatile int congestionWindow = 1;
    
    public TCPLikeScalingStrategy(int slowStartThreshold) {
        this.slowStartThreshold = slowStartThreshold;
    }
    
    @Override
    public Integer scaleUp(Integer currentValue) {
        if (inSlowStart) {
            // Exponential growth in slow start
            congestionWindow *= 2;
            if (congestionWindow >= slowStartThreshold) {
                inSlowStart = false;
            }
        } else {
            // Linear growth in congestion avoidance
            congestionWindow += 1;
        }
        
        return Math.min(congestionWindow, currentValue * 2); // Cap at double
    }
    
    @Override
    public Integer scaleDown(Integer currentValue) {
        // Multiplicative decrease
        congestionWindow = Math.max(1, congestionWindow / 2);
        inSlowStart = false; // Exit slow start on congestion
        return Math.max(1, currentValue / 2);
    }
}

// Exponential backoff with jitter
public class ExponentialBackoffScalingStrategy implements ScalingStrategy<Integer> {
    private final Random random = new Random();
    private volatile int backoffMultiplier = 1;
    
    @Override
    public Integer scaleUp(Integer currentValue) {
        // Reset backoff on success
        backoffMultiplier = 1;
        return Math.min(currentValue * 2, currentValue + 10); // Conservative increase
    }
    
    @Override
    public Integer scaleDown(Integer currentValue) {
        // Exponential backoff with jitter
        backoffMultiplier = Math.min(backoffMultiplier * 2, 64); // Max 64x backoff
        double jitterFactor = 0.5 + (random.nextDouble() * 0.5); // 0.5 to 1.0
        int reduction = (int) (currentValue * 0.5 * backoffMultiplier * jitterFactor);
        
        return Math.max(1, currentValue - reduction);
    }
}

// Percentile-based scaling
public class PercentileBasedScalingStrategy implements ScalingStrategy<Integer> {
    private final PercentileTracker latencyTracker;
    private final long targetLatencyP95;
    private final long targetLatencyP99;
    
    public PercentileBasedScalingStrategy(long targetLatencyP95, long targetLatencyP99) {
        this.targetLatencyP95 = targetLatencyP95;
        this.targetLatencyP99 = targetLatencyP99;
        this.latencyTracker = new PercentileTracker();
    }
    
    public void recordLatency(long latency) {
        latencyTracker.record(latency);
    }
    
    @Override
    public Integer scaleUp(Integer currentValue) {
        long p95 = latencyTracker.getPercentile(95);
        long p99 = latencyTracker.getPercentile(99);
        
        // Scale up only if latency is well below targets
        if (p95 < targetLatencyP95 * 0.7 && p99 < targetLatencyP99 * 0.7) {
            return Math.min(currentValue + (currentValue / 10), currentValue * 2);
        }
        
        return currentValue;
    }
    
    @Override
    public Integer scaleDown(Integer currentValue) {
        long p95 = latencyTracker.getPercentile(95);
        long p99 = latencyTracker.getPercentile(99);
        
        // Aggressive scale down if latency is too high
        if (p99 > targetLatencyP99) {
            return Math.max(1, currentValue / 4);
        } else if (p95 > targetLatencyP95) {
            return Math.max(1, currentValue / 2);
        }
        
        return currentValue;
    }
}

Configuration Patterns

Multi-Tier Rate Limiting

public class MultiTierRateLimitingStrategy implements RateLimitingStrategy {
    private final List<RateLimitingStrategy> strategies;
    
    public MultiTierRateLimitingStrategy(RateLimitingStrategy... strategies) {
        this.strategies = Arrays.asList(strategies);
    }
    
    @Override
    public void registerInFlightRequest(RequestInfo requestInfo) {
        strategies.forEach(strategy -> strategy.registerInFlightRequest(requestInfo));
    }
    
    @Override
    public void registerCompletedRequest(ResultInfo resultInfo) {
        strategies.forEach(strategy -> strategy.registerCompletedRequest(resultInfo));
    }
    
    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        // Block if ANY strategy says to block
        return strategies.stream().anyMatch(strategy -> strategy.shouldBlock(requestInfo));
    }
    
    @Override
    public int getMaxBatchSize() {
        // Use the minimum batch size from all strategies
        return strategies.stream()
            .mapToInt(RateLimitingStrategy::getMaxBatchSize)
            .min()
            .orElse(Integer.MAX_VALUE);
    }
}

// Usage
RateLimitingStrategy multiTier = new MultiTierRateLimitingStrategy(
    new TokenBucketRateLimitingStrategy(1000, 100),     // Token bucket limit
    new CircuitBreakerRateLimitingStrategy(10, 30000, 5), // Circuit breaker
    CongestionControlRateLimitingStrategy.builder()      // Congestion control
        .setMaxInFlightRequests(50)
        .setInitialMaxInFlightMessages(100)
        .setScalingStrategy(AIMDScalingStrategy.builder(500).build())
        .build()
);

Environment-Aware Configuration

public class EnvironmentAwareRateLimitingFactory {
    
    public static RateLimitingStrategy createForEnvironment(String environment) {
        switch (environment.toLowerCase()) {
            case "production":
                return createProductionStrategy();
            case "staging":
                return createStagingStrategy();
            case "development":
                return createDevelopmentStrategy();
            default:
                return createDefaultStrategy();
        }
    }
    
    private static RateLimitingStrategy createProductionStrategy() {
        // Conservative settings for production
        AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(2000)
            .setIncreaseRate(5)          // Slow increase
            .setDecreaseFactor(0.7)      // Moderate decrease
            .build();
        
        return new MultiTierRateLimitingStrategy(
            CongestionControlRateLimitingStrategy.builder()
                .setMaxInFlightRequests(100)
                .setInitialMaxInFlightMessages(50)
                .setScalingStrategy(scalingStrategy)
                .build(),
            new CircuitBreakerRateLimitingStrategy(20, 60000, 10)
        );
    }
    
    private static RateLimitingStrategy createStagingStrategy() {
        // Moderate settings for staging
        AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
            .setIncreaseRate(10)
            .setDecreaseFactor(0.6)
            .build();
        
        return CongestionControlRateLimitingStrategy.builder()
            .setMaxInFlightRequests(50)
            .setInitialMaxInFlightMessages(25)
            .setScalingStrategy(scalingStrategy)
            .build();
    }
    
    private static RateLimitingStrategy createDevelopmentStrategy() {
        // Permissive settings for development
        return new NoOpRateLimitingStrategy();
    }
}

public class NoOpRateLimitingStrategy implements RateLimitingStrategy {
    @Override
    public void registerInFlightRequest(RequestInfo requestInfo) {}
    
    @Override
    public void registerCompletedRequest(ResultInfo resultInfo) {}
    
    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        return false;
    }
    
    @Override
    public int getMaxBatchSize() {
        return Integer.MAX_VALUE;
    }
}

Best Practices

Performance Monitoring and Metrics

public class MetricsEnabledRateLimitingStrategy implements RateLimitingStrategy {
    private final RateLimitingStrategy delegate;
    private final MetricGroup metricGroup;
    
    // Metrics
    private final Counter requestsBlocked;
    private final Counter requestsAllowed;
    private final Gauge<Integer> currentBatchSize;
    private final Histogram requestLatency;
    
    public MetricsEnabledRateLimitingStrategy(
            RateLimitingStrategy delegate, 
            MetricGroup metricGroup) {
        this.delegate = delegate;
        this.metricGroup = metricGroup;
        
        this.requestsBlocked = metricGroup.counter("requests_blocked");
        this.requestsAllowed = metricGroup.counter("requests_allowed");
        this.currentBatchSize = metricGroup.gauge("current_batch_size", delegate::getMaxBatchSize);
        this.requestLatency = metricGroup.histogram("request_latency");
    }
    
    @Override
    public boolean shouldBlock(RequestInfo requestInfo) {
        boolean shouldBlock = delegate.shouldBlock(requestInfo);
        
        if (shouldBlock) {
            requestsBlocked.inc();
        } else {
            requestsAllowed.inc();
        }
        
        return shouldBlock;
    }
    
    // ... delegate other methods
}

Testing and Validation

public class RateLimitingStrategyTester {
    
    public static TestResults testStrategy(
            RateLimitingStrategy strategy, 
            TestScenario scenario) {
        
        TestResults results = new TestResults();
        
        for (TestCase testCase : scenario.getTestCases()) {
            long startTime = System.currentTimeMillis();
            
            // Simulate requests
            for (RequestPattern request : testCase.getRequests()) {
                boolean blocked = strategy.shouldBlock(request.getRequestInfo());
                
                if (!blocked) {
                    strategy.registerInFlightRequest(request.getRequestInfo());
                    
                    // Simulate request completion
                    CompletableFuture.delayedExecutor(request.getLatency(), TimeUnit.MILLISECONDS)
                        .execute(() -> {
                            strategy.registerCompletedRequest(request.getResultInfo());
                        });
                }
                
                results.recordRequest(blocked, request);
            }
            
            long endTime = System.currentTimeMillis();
            results.recordTestCase(testCase, endTime - startTime);
        }
        
        return results;
    }
}

The Rate Limiting & Scaling framework provides sophisticated tools for building resilient, high-performance systems that can adapt to changing conditions while maintaining optimal throughput and protecting downstream systems from overload.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-base

docs

async-sink.md

hybrid-source.md

index.md

rate-limiting.md

source-reader.md

table-api.md

tile.json