Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities
—
The Rate Limiting & Scaling framework provides sophisticated strategies for controlling throughput, handling backpressure, and dynamically adjusting to changing conditions. It includes pluggable rate limiting strategies and scaling algorithms optimized for different workload patterns.
The main interface for controlling request rates and backpressure.
@PublicEvolving
public interface RateLimitingStrategy {
void registerInFlightRequest(RequestInfo requestInfo)
void registerCompletedRequest(ResultInfo resultInfo)
boolean shouldBlock(RequestInfo requestInfo)
int getMaxBatchSize()
}Interface for controlling scale up/down behavior.
@PublicEvolving
public interface ScalingStrategy<T> {
T scaleUp(T currentValue)
T scaleDown(T currentValue)
}Advanced rate limiting strategy that scales based on success/failure patterns.
@PublicEvolving
public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
// Builder methods
public static CongestionControlRateLimitingStrategyBuilder builder()
// Builder interface
public interface CongestionControlRateLimitingStrategyBuilder {
Builder setMaxInFlightRequests(int maxInFlightRequests)
Builder setInitialMaxInFlightMessages(int initialMaxInFlightMessages)
Builder setScalingStrategy(ScalingStrategy<Integer> scalingStrategy)
CongestionControlRateLimitingStrategy build()
}
}Additive Increase/Multiplicative Decrease scaling strategy.
@PublicEvolving
public class AIMDScalingStrategy implements ScalingStrategy<Integer> {
// Constructor
public AIMDScalingStrategy(int increaseRate, double decreaseFactor, int rateThreshold)
// Scaling methods
public Integer scaleUp(Integer currentRate)
public Integer scaleDown(Integer currentRate)
// Builder
public static AIMDScalingStrategyBuilder builder(int rateThreshold)
public interface AIMDScalingStrategyBuilder {
AIMDScalingStrategyBuilder setIncreaseRate(int increaseRate)
AIMDScalingStrategyBuilder setDecreaseFactor(double decreaseFactor)
AIMDScalingStrategy build()
}
}// Create AIMD scaling strategy
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
.setIncreaseRate(10) // Increase by 10 per success
.setDecreaseFactor(0.5) // Halve on failure
.build();
// Create congestion control rate limiting
CongestionControlRateLimitingStrategy rateLimiting =
CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(50)
.setInitialMaxInFlightMessages(100)
.setScalingStrategy(scalingStrategy)
.build();
// Apply to AsyncSinkWriter configuration
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(200)
.setMaxBatchSizeInBytes(2 * 1024 * 1024) // 2MB
.setMaxInFlightRequests(50)
.setMaxBufferedRequests(2000)
.setMaxTimeInBufferMS(3000)
.setMaxRecordSizeInBytes(512 * 1024)
.setRateLimitingStrategy(rateLimiting)
.build();public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {
private final int maxTokens;
private final int refillRate; // Tokens per second
private final AtomicInteger tokens;
private final ScheduledExecutorService scheduler;
private volatile long lastRefillTime;
public TokenBucketRateLimitingStrategy(int maxTokens, int refillRate) {
this.maxTokens = maxTokens;
this.refillRate = refillRate;
this.tokens = new AtomicInteger(maxTokens);
this.lastRefillTime = System.currentTimeMillis();
// Schedule token refill
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.scheduler.scheduleAtFixedRate(this::refillTokens, 0, 1, TimeUnit.SECONDS);
}
@Override
public void registerInFlightRequest(RequestInfo requestInfo) {
// Consume tokens for the request
int batchSize = requestInfo.getBatchSize();
tokens.updateAndGet(current -> Math.max(0, current - batchSize));
}
@Override
public void registerCompletedRequest(ResultInfo resultInfo) {
// No action needed for token bucket on completion
}
@Override
public boolean shouldBlock(RequestInfo requestInfo) {
int requiredTokens = requestInfo.getBatchSize();
return tokens.get() < requiredTokens;
}
@Override
public int getMaxBatchSize() {
// Batch size limited by available tokens
return Math.min(maxTokens, tokens.get());
}
private void refillTokens() {
long now = System.currentTimeMillis();
long timeDelta = now - lastRefillTime;
if (timeDelta >= 1000) { // Refill every second
int tokensToAdd = (int) (refillRate * (timeDelta / 1000.0));
tokens.updateAndGet(current -> Math.min(maxTokens, current + tokensToAdd));
lastRefillTime = now;
}
}
}public class AdaptiveRateLimitingStrategy implements RateLimitingStrategy {
private final ScalingStrategy<Integer> scalingStrategy;
private final int maxInFlightRequests;
private final AtomicInteger currentMaxInFlightMessages;
private final AtomicInteger inFlightRequests;
// Performance metrics
private final MovingAverage successRate;
private final MovingAverage latency;
private volatile long lastSuccessTime = System.currentTimeMillis();
public AdaptiveRateLimitingStrategy(
int maxInFlightRequests,
int initialMaxInFlightMessages,
ScalingStrategy<Integer> scalingStrategy) {
this.maxInFlightRequests = maxInFlightRequests;
this.currentMaxInFlightMessages = new AtomicInteger(initialMaxInFlightMessages);
this.inFlightRequests = new AtomicInteger(0);
this.scalingStrategy = scalingStrategy;
// Initialize metrics with 10-second windows
this.successRate = new MovingAverage(10);
this.latency = new MovingAverage(10);
}
@Override
public void registerInFlightRequest(RequestInfo requestInfo) {
inFlightRequests.incrementAndGet();
// Track request start time for latency calculation
if (requestInfo instanceof TimestampedRequestInfo) {
TimestampedRequestInfo timestamped = (TimestampedRequestInfo) requestInfo;
timestamped.setStartTime(System.currentTimeMillis());
}
}
@Override
public void registerCompletedRequest(ResultInfo resultInfo) {
inFlightRequests.decrementAndGet();
int failedMessages = resultInfo.getFailedMessages();
int totalMessages = resultInfo.getBatchSize();
// Update success rate
double batchSuccessRate = (double) (totalMessages - failedMessages) / totalMessages;
successRate.addValue(batchSuccessRate);
// Update latency if available
if (resultInfo instanceof TimestampedResultInfo) {
TimestampedResultInfo timestamped = (TimestampedResultInfo) resultInfo;
long requestLatency = System.currentTimeMillis() - timestamped.getStartTime();
latency.addValue(requestLatency);
}
// Adaptive scaling based on performance
adaptiveScale(batchSuccessRate);
if (failedMessages == 0) {
lastSuccessTime = System.currentTimeMillis();
}
}
@Override
public boolean shouldBlock(RequestInfo requestInfo) {
// Block if too many in-flight requests
if (inFlightRequests.get() >= maxInFlightRequests) {
return true;
}
// Block if batch would exceed current limit
int currentLimit = currentMaxInFlightMessages.get();
return requestInfo.getBatchSize() > currentLimit;
}
@Override
public int getMaxBatchSize() {
return currentMaxInFlightMessages.get();
}
private void adaptiveScale(double batchSuccessRate) {
double avgSuccessRate = successRate.getAverage();
double avgLatency = latency.getAverage();
// Scale up conditions: high success rate and low latency
if (avgSuccessRate > 0.95 && avgLatency < 1000) {
int newLimit = scalingStrategy.scaleUp(currentMaxInFlightMessages.get());
currentMaxInFlightMessages.set(newLimit);
// Scale down conditions: low success rate or high latency
} else if (avgSuccessRate < 0.8 || avgLatency > 5000) {
int newLimit = scalingStrategy.scaleDown(currentMaxInFlightMessages.get());
currentMaxInFlightMessages.set(Math.max(1, newLimit));
}
// Emergency scale down if no success for too long
long timeSinceLastSuccess = System.currentTimeMillis() - lastSuccessTime;
if (timeSinceLastSuccess > 30000) { // 30 seconds
int emergencyLimit = currentMaxInFlightMessages.get() / 4;
currentMaxInFlightMessages.set(Math.max(1, emergencyLimit));
}
}
}
// Helper classes for timestamped metrics
public class TimestampedRequestInfo implements RequestInfo {
private final int batchSize;
private volatile long startTime;
public TimestampedRequestInfo(int batchSize) {
this.batchSize = batchSize;
}
@Override
public int getBatchSize() {
return batchSize;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getStartTime() {
return startTime;
}
}
public class TimestampedResultInfo implements ResultInfo {
private final int failedMessages;
private final int batchSize;
private final long startTime;
public TimestampedResultInfo(int failedMessages, int batchSize, long startTime) {
this.failedMessages = failedMessages;
this.batchSize = batchSize;
this.startTime = startTime;
}
@Override
public int getFailedMessages() {
return failedMessages;
}
@Override
public int getBatchSize() {
return batchSize;
}
public long getStartTime() {
return startTime;
}
}
// Moving average helper
public class MovingAverage {
private final int windowSize;
private final double[] values;
private int index = 0;
private int count = 0;
public MovingAverage(int windowSize) {
this.windowSize = windowSize;
this.values = new double[windowSize];
}
public synchronized void addValue(double value) {
values[index] = value;
index = (index + 1) % windowSize;
count = Math.min(count + 1, windowSize);
}
public synchronized double getAverage() {
if (count == 0) {
return 0.0;
}
double sum = 0.0;
for (int i = 0; i < count; i++) {
sum += values[i];
}
return sum / count;
}
}public class CircuitBreakerRateLimitingStrategy implements RateLimitingStrategy {
public enum State {
CLOSED, // Normal operation
OPEN, // Circuit is open, blocking all requests
HALF_OPEN // Testing if circuit can be closed
}
private final int failureThreshold;
private final long timeoutMillis;
private final int halfOpenMaxRequests;
private volatile State state = State.CLOSED;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicInteger halfOpenSuccessCount = new AtomicInteger(0);
private volatile long lastFailureTime = 0;
public CircuitBreakerRateLimitingStrategy(
int failureThreshold,
long timeoutMillis,
int halfOpenMaxRequests) {
this.failureThreshold = failureThreshold;
this.timeoutMillis = timeoutMillis;
this.halfOpenMaxRequests = halfOpenMaxRequests;
}
@Override
public void registerInFlightRequest(RequestInfo requestInfo) {
// No action needed on request start
}
@Override
public void registerCompletedRequest(ResultInfo resultInfo) {
int failedMessages = resultInfo.getFailedMessages();
int totalMessages = resultInfo.getBatchSize();
switch (state) {
case CLOSED:
if (failedMessages > 0) {
int failures = failureCount.addAndGet(failedMessages);
if (failures >= failureThreshold) {
// Open the circuit
state = State.OPEN;
lastFailureTime = System.currentTimeMillis();
LOG.warn("Circuit breaker opened after {} failures", failures);
}
} else {
// Reset failure count on success
failureCount.set(0);
}
break;
case HALF_OPEN:
if (failedMessages > 0) {
// Failure in half-open state - go back to open
state = State.OPEN;
lastFailureTime = System.currentTimeMillis();
halfOpenSuccessCount.set(0);
LOG.warn("Circuit breaker re-opened due to failure in half-open state");
} else {
// Success in half-open state
int successes = halfOpenSuccessCount.incrementAndGet();
if (successes >= halfOpenMaxRequests) {
// Close the circuit
state = State.CLOSED;
failureCount.set(0);
halfOpenSuccessCount.set(0);
LOG.info("Circuit breaker closed after {} successful requests", successes);
}
}
break;
case OPEN:
// Check if timeout has passed
if (System.currentTimeMillis() - lastFailureTime >= timeoutMillis) {
state = State.HALF_OPEN;
halfOpenSuccessCount.set(0);
LOG.info("Circuit breaker moved to half-open state");
}
break;
}
}
@Override
public boolean shouldBlock(RequestInfo requestInfo) {
switch (state) {
case CLOSED:
return false;
case OPEN:
// Check if timeout has passed
if (System.currentTimeMillis() - lastFailureTime >= timeoutMillis) {
state = State.HALF_OPEN;
halfOpenSuccessCount.set(0);
return false; // Allow first half-open request
}
return true; // Block all requests
case HALF_OPEN:
// Allow limited requests in half-open state
return halfOpenSuccessCount.get() >= halfOpenMaxRequests;
default:
return false;
}
}
@Override
public int getMaxBatchSize() {
switch (state) {
case CLOSED:
return Integer.MAX_VALUE; // No limit
case OPEN:
return 0; // Block all
case HALF_OPEN:
return 1; // Small batches for testing
default:
return Integer.MAX_VALUE;
}
}
public State getCurrentState() {
return state;
}
}// TCP-like congestion control scaling
public class TCPLikeScalingStrategy implements ScalingStrategy<Integer> {
private final int slowStartThreshold;
private volatile boolean inSlowStart = true;
private volatile int congestionWindow = 1;
public TCPLikeScalingStrategy(int slowStartThreshold) {
this.slowStartThreshold = slowStartThreshold;
}
@Override
public Integer scaleUp(Integer currentValue) {
if (inSlowStart) {
// Exponential growth in slow start
congestionWindow *= 2;
if (congestionWindow >= slowStartThreshold) {
inSlowStart = false;
}
} else {
// Linear growth in congestion avoidance
congestionWindow += 1;
}
return Math.min(congestionWindow, currentValue * 2); // Cap at double
}
@Override
public Integer scaleDown(Integer currentValue) {
// Multiplicative decrease
congestionWindow = Math.max(1, congestionWindow / 2);
inSlowStart = false; // Exit slow start on congestion
return Math.max(1, currentValue / 2);
}
}
// Exponential backoff with jitter
public class ExponentialBackoffScalingStrategy implements ScalingStrategy<Integer> {
private final Random random = new Random();
private volatile int backoffMultiplier = 1;
@Override
public Integer scaleUp(Integer currentValue) {
// Reset backoff on success
backoffMultiplier = 1;
return Math.min(currentValue * 2, currentValue + 10); // Conservative increase
}
@Override
public Integer scaleDown(Integer currentValue) {
// Exponential backoff with jitter
backoffMultiplier = Math.min(backoffMultiplier * 2, 64); // Max 64x backoff
double jitterFactor = 0.5 + (random.nextDouble() * 0.5); // 0.5 to 1.0
int reduction = (int) (currentValue * 0.5 * backoffMultiplier * jitterFactor);
return Math.max(1, currentValue - reduction);
}
}
// Percentile-based scaling
public class PercentileBasedScalingStrategy implements ScalingStrategy<Integer> {
private final PercentileTracker latencyTracker;
private final long targetLatencyP95;
private final long targetLatencyP99;
public PercentileBasedScalingStrategy(long targetLatencyP95, long targetLatencyP99) {
this.targetLatencyP95 = targetLatencyP95;
this.targetLatencyP99 = targetLatencyP99;
this.latencyTracker = new PercentileTracker();
}
public void recordLatency(long latency) {
latencyTracker.record(latency);
}
@Override
public Integer scaleUp(Integer currentValue) {
long p95 = latencyTracker.getPercentile(95);
long p99 = latencyTracker.getPercentile(99);
// Scale up only if latency is well below targets
if (p95 < targetLatencyP95 * 0.7 && p99 < targetLatencyP99 * 0.7) {
return Math.min(currentValue + (currentValue / 10), currentValue * 2);
}
return currentValue;
}
@Override
public Integer scaleDown(Integer currentValue) {
long p95 = latencyTracker.getPercentile(95);
long p99 = latencyTracker.getPercentile(99);
// Aggressive scale down if latency is too high
if (p99 > targetLatencyP99) {
return Math.max(1, currentValue / 4);
} else if (p95 > targetLatencyP95) {
return Math.max(1, currentValue / 2);
}
return currentValue;
}
}public class MultiTierRateLimitingStrategy implements RateLimitingStrategy {
private final List<RateLimitingStrategy> strategies;
public MultiTierRateLimitingStrategy(RateLimitingStrategy... strategies) {
this.strategies = Arrays.asList(strategies);
}
@Override
public void registerInFlightRequest(RequestInfo requestInfo) {
strategies.forEach(strategy -> strategy.registerInFlightRequest(requestInfo));
}
@Override
public void registerCompletedRequest(ResultInfo resultInfo) {
strategies.forEach(strategy -> strategy.registerCompletedRequest(resultInfo));
}
@Override
public boolean shouldBlock(RequestInfo requestInfo) {
// Block if ANY strategy says to block
return strategies.stream().anyMatch(strategy -> strategy.shouldBlock(requestInfo));
}
@Override
public int getMaxBatchSize() {
// Use the minimum batch size from all strategies
return strategies.stream()
.mapToInt(RateLimitingStrategy::getMaxBatchSize)
.min()
.orElse(Integer.MAX_VALUE);
}
}
// Usage
RateLimitingStrategy multiTier = new MultiTierRateLimitingStrategy(
new TokenBucketRateLimitingStrategy(1000, 100), // Token bucket limit
new CircuitBreakerRateLimitingStrategy(10, 30000, 5), // Circuit breaker
CongestionControlRateLimitingStrategy.builder() // Congestion control
.setMaxInFlightRequests(50)
.setInitialMaxInFlightMessages(100)
.setScalingStrategy(AIMDScalingStrategy.builder(500).build())
.build()
);public class EnvironmentAwareRateLimitingFactory {
public static RateLimitingStrategy createForEnvironment(String environment) {
switch (environment.toLowerCase()) {
case "production":
return createProductionStrategy();
case "staging":
return createStagingStrategy();
case "development":
return createDevelopmentStrategy();
default:
return createDefaultStrategy();
}
}
private static RateLimitingStrategy createProductionStrategy() {
// Conservative settings for production
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(2000)
.setIncreaseRate(5) // Slow increase
.setDecreaseFactor(0.7) // Moderate decrease
.build();
return new MultiTierRateLimitingStrategy(
CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(100)
.setInitialMaxInFlightMessages(50)
.setScalingStrategy(scalingStrategy)
.build(),
new CircuitBreakerRateLimitingStrategy(20, 60000, 10)
);
}
private static RateLimitingStrategy createStagingStrategy() {
// Moderate settings for staging
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
.setIncreaseRate(10)
.setDecreaseFactor(0.6)
.build();
return CongestionControlRateLimitingStrategy.builder()
.setMaxInFlightRequests(50)
.setInitialMaxInFlightMessages(25)
.setScalingStrategy(scalingStrategy)
.build();
}
private static RateLimitingStrategy createDevelopmentStrategy() {
// Permissive settings for development
return new NoOpRateLimitingStrategy();
}
}
public class NoOpRateLimitingStrategy implements RateLimitingStrategy {
@Override
public void registerInFlightRequest(RequestInfo requestInfo) {}
@Override
public void registerCompletedRequest(ResultInfo resultInfo) {}
@Override
public boolean shouldBlock(RequestInfo requestInfo) {
return false;
}
@Override
public int getMaxBatchSize() {
return Integer.MAX_VALUE;
}
}public class MetricsEnabledRateLimitingStrategy implements RateLimitingStrategy {
private final RateLimitingStrategy delegate;
private final MetricGroup metricGroup;
// Metrics
private final Counter requestsBlocked;
private final Counter requestsAllowed;
private final Gauge<Integer> currentBatchSize;
private final Histogram requestLatency;
public MetricsEnabledRateLimitingStrategy(
RateLimitingStrategy delegate,
MetricGroup metricGroup) {
this.delegate = delegate;
this.metricGroup = metricGroup;
this.requestsBlocked = metricGroup.counter("requests_blocked");
this.requestsAllowed = metricGroup.counter("requests_allowed");
this.currentBatchSize = metricGroup.gauge("current_batch_size", delegate::getMaxBatchSize);
this.requestLatency = metricGroup.histogram("request_latency");
}
@Override
public boolean shouldBlock(RequestInfo requestInfo) {
boolean shouldBlock = delegate.shouldBlock(requestInfo);
if (shouldBlock) {
requestsBlocked.inc();
} else {
requestsAllowed.inc();
}
return shouldBlock;
}
// ... delegate other methods
}public class RateLimitingStrategyTester {
public static TestResults testStrategy(
RateLimitingStrategy strategy,
TestScenario scenario) {
TestResults results = new TestResults();
for (TestCase testCase : scenario.getTestCases()) {
long startTime = System.currentTimeMillis();
// Simulate requests
for (RequestPattern request : testCase.getRequests()) {
boolean blocked = strategy.shouldBlock(request.getRequestInfo());
if (!blocked) {
strategy.registerInFlightRequest(request.getRequestInfo());
// Simulate request completion
CompletableFuture.delayedExecutor(request.getLatency(), TimeUnit.MILLISECONDS)
.execute(() -> {
strategy.registerCompletedRequest(request.getResultInfo());
});
}
results.recordRequest(blocked, request);
}
long endTime = System.currentTimeMillis();
results.recordTestCase(testCase, endTime - startTime);
}
return results;
}
}The Rate Limiting & Scaling framework provides sophisticated tools for building resilient, high-performance systems that can adapt to changing conditions while maintaining optimal throughput and protecting downstream systems from overload.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-base