or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-sink.mdhybrid-source.mdindex.mdrate-limiting.mdsource-reader.mdtable-api.md

rate-limiting.mddocs/

0

# Rate Limiting & Scaling

1

2

The Rate Limiting & Scaling framework provides sophisticated strategies for controlling throughput, handling backpressure, and dynamically adjusting to changing conditions. It includes pluggable rate limiting strategies and scaling algorithms optimized for different workload patterns.

3

4

## Core Components

5

6

### RateLimitingStrategy

7

8

The main interface for controlling request rates and backpressure.

9

10

```java { .api }

11

@PublicEvolving

12

public interface RateLimitingStrategy {

13

void registerInFlightRequest(RequestInfo requestInfo)

14

void registerCompletedRequest(ResultInfo resultInfo)

15

boolean shouldBlock(RequestInfo requestInfo)

16

int getMaxBatchSize()

17

}

18

```

19

20

### ScalingStrategy

21

22

Interface for controlling scale up/down behavior.

23

24

```java { .api }

25

@PublicEvolving

26

public interface ScalingStrategy<T> {

27

T scaleUp(T currentValue)

28

T scaleDown(T currentValue)

29

}

30

```

31

32

### CongestionControlRateLimitingStrategy

33

34

Advanced rate limiting strategy that scales based on success/failure patterns.

35

36

```java { .api }

37

@PublicEvolving

38

public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {

39

40

// Builder methods

41

public static CongestionControlRateLimitingStrategyBuilder builder()

42

43

// Builder interface

44

public interface CongestionControlRateLimitingStrategyBuilder {

45

Builder setMaxInFlightRequests(int maxInFlightRequests)

46

Builder setInitialMaxInFlightMessages(int initialMaxInFlightMessages)

47

Builder setScalingStrategy(ScalingStrategy<Integer> scalingStrategy)

48

CongestionControlRateLimitingStrategy build()

49

}

50

}

51

```

52

53

### AIMDScalingStrategy

54

55

Additive Increase/Multiplicative Decrease scaling strategy.

56

57

```java { .api }

58

@PublicEvolving

59

public class AIMDScalingStrategy implements ScalingStrategy<Integer> {

60

61

// Constructor

62

public AIMDScalingStrategy(int increaseRate, double decreaseFactor, int rateThreshold)

63

64

// Scaling methods

65

public Integer scaleUp(Integer currentRate)

66

public Integer scaleDown(Integer currentRate)

67

68

// Builder

69

public static AIMDScalingStrategyBuilder builder(int rateThreshold)

70

71

public interface AIMDScalingStrategyBuilder {

72

AIMDScalingStrategyBuilder setIncreaseRate(int increaseRate)

73

AIMDScalingStrategyBuilder setDecreaseFactor(double decreaseFactor)

74

AIMDScalingStrategy build()

75

}

76

}

77

```

78

79

## Implementation Examples

80

81

### Basic Rate Limiting Configuration

82

83

```java

84

// Create AIMD scaling strategy

85

AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)

86

.setIncreaseRate(10) // Increase by 10 per success

87

.setDecreaseFactor(0.5) // Halve on failure

88

.build();

89

90

// Create congestion control rate limiting

91

CongestionControlRateLimitingStrategy rateLimiting =

92

CongestionControlRateLimitingStrategy.builder()

93

.setMaxInFlightRequests(50)

94

.setInitialMaxInFlightMessages(100)

95

.setScalingStrategy(scalingStrategy)

96

.build();

97

98

// Apply to AsyncSinkWriter configuration

99

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()

100

.setMaxBatchSize(200)

101

.setMaxBatchSizeInBytes(2 * 1024 * 1024) // 2MB

102

.setMaxInFlightRequests(50)

103

.setMaxBufferedRequests(2000)

104

.setMaxTimeInBufferMS(3000)

105

.setMaxRecordSizeInBytes(512 * 1024)

106

.setRateLimitingStrategy(rateLimiting)

107

.build();

108

```

109

110

### Custom Rate Limiting Strategy

111

112

```java

113

public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {

114

private final int maxTokens;

115

private final int refillRate; // Tokens per second

116

private final AtomicInteger tokens;

117

private final ScheduledExecutorService scheduler;

118

private volatile long lastRefillTime;

119

120

public TokenBucketRateLimitingStrategy(int maxTokens, int refillRate) {

121

this.maxTokens = maxTokens;

122

this.refillRate = refillRate;

123

this.tokens = new AtomicInteger(maxTokens);

124

this.lastRefillTime = System.currentTimeMillis();

125

126

// Schedule token refill

127

this.scheduler = Executors.newSingleThreadScheduledExecutor();

128

this.scheduler.scheduleAtFixedRate(this::refillTokens, 0, 1, TimeUnit.SECONDS);

129

}

130

131

@Override

132

public void registerInFlightRequest(RequestInfo requestInfo) {

133

// Consume tokens for the request

134

int batchSize = requestInfo.getBatchSize();

135

tokens.updateAndGet(current -> Math.max(0, current - batchSize));

136

}

137

138

@Override

139

public void registerCompletedRequest(ResultInfo resultInfo) {

140

// No action needed for token bucket on completion

141

}

142

143

@Override

144

public boolean shouldBlock(RequestInfo requestInfo) {

145

int requiredTokens = requestInfo.getBatchSize();

146

return tokens.get() < requiredTokens;

147

}

148

149

@Override

150

public int getMaxBatchSize() {

151

// Batch size limited by available tokens

152

return Math.min(maxTokens, tokens.get());

153

}

154

155

private void refillTokens() {

156

long now = System.currentTimeMillis();

157

long timeDelta = now - lastRefillTime;

158

159

if (timeDelta >= 1000) { // Refill every second

160

int tokensToAdd = (int) (refillRate * (timeDelta / 1000.0));

161

tokens.updateAndGet(current -> Math.min(maxTokens, current + tokensToAdd));

162

lastRefillTime = now;

163

}

164

}

165

}

166

```

167

168

### Adaptive Rate Limiting Strategy

169

170

```java

171

public class AdaptiveRateLimitingStrategy implements RateLimitingStrategy {

172

private final ScalingStrategy<Integer> scalingStrategy;

173

private final int maxInFlightRequests;

174

private final AtomicInteger currentMaxInFlightMessages;

175

private final AtomicInteger inFlightRequests;

176

177

// Performance metrics

178

private final MovingAverage successRate;

179

private final MovingAverage latency;

180

private volatile long lastSuccessTime = System.currentTimeMillis();

181

182

public AdaptiveRateLimitingStrategy(

183

int maxInFlightRequests,

184

int initialMaxInFlightMessages,

185

ScalingStrategy<Integer> scalingStrategy) {

186

187

this.maxInFlightRequests = maxInFlightRequests;

188

this.currentMaxInFlightMessages = new AtomicInteger(initialMaxInFlightMessages);

189

this.inFlightRequests = new AtomicInteger(0);

190

this.scalingStrategy = scalingStrategy;

191

192

// Initialize metrics with 10-second windows

193

this.successRate = new MovingAverage(10);

194

this.latency = new MovingAverage(10);

195

}

196

197

@Override

198

public void registerInFlightRequest(RequestInfo requestInfo) {

199

inFlightRequests.incrementAndGet();

200

201

// Track request start time for latency calculation

202

if (requestInfo instanceof TimestampedRequestInfo) {

203

TimestampedRequestInfo timestamped = (TimestampedRequestInfo) requestInfo;

204

timestamped.setStartTime(System.currentTimeMillis());

205

}

206

}

207

208

@Override

209

public void registerCompletedRequest(ResultInfo resultInfo) {

210

inFlightRequests.decrementAndGet();

211

212

int failedMessages = resultInfo.getFailedMessages();

213

int totalMessages = resultInfo.getBatchSize();

214

215

// Update success rate

216

double batchSuccessRate = (double) (totalMessages - failedMessages) / totalMessages;

217

successRate.addValue(batchSuccessRate);

218

219

// Update latency if available

220

if (resultInfo instanceof TimestampedResultInfo) {

221

TimestampedResultInfo timestamped = (TimestampedResultInfo) resultInfo;

222

long requestLatency = System.currentTimeMillis() - timestamped.getStartTime();

223

latency.addValue(requestLatency);

224

}

225

226

// Adaptive scaling based on performance

227

adaptiveScale(batchSuccessRate);

228

229

if (failedMessages == 0) {

230

lastSuccessTime = System.currentTimeMillis();

231

}

232

}

233

234

@Override

235

public boolean shouldBlock(RequestInfo requestInfo) {

236

// Block if too many in-flight requests

237

if (inFlightRequests.get() >= maxInFlightRequests) {

238

return true;

239

}

240

241

// Block if batch would exceed current limit

242

int currentLimit = currentMaxInFlightMessages.get();

243

return requestInfo.getBatchSize() > currentLimit;

244

}

245

246

@Override

247

public int getMaxBatchSize() {

248

return currentMaxInFlightMessages.get();

249

}

250

251

private void adaptiveScale(double batchSuccessRate) {

252

double avgSuccessRate = successRate.getAverage();

253

double avgLatency = latency.getAverage();

254

255

// Scale up conditions: high success rate and low latency

256

if (avgSuccessRate > 0.95 && avgLatency < 1000) {

257

int newLimit = scalingStrategy.scaleUp(currentMaxInFlightMessages.get());

258

currentMaxInFlightMessages.set(newLimit);

259

260

// Scale down conditions: low success rate or high latency

261

} else if (avgSuccessRate < 0.8 || avgLatency > 5000) {

262

int newLimit = scalingStrategy.scaleDown(currentMaxInFlightMessages.get());

263

currentMaxInFlightMessages.set(Math.max(1, newLimit));

264

}

265

266

// Emergency scale down if no success for too long

267

long timeSinceLastSuccess = System.currentTimeMillis() - lastSuccessTime;

268

if (timeSinceLastSuccess > 30000) { // 30 seconds

269

int emergencyLimit = currentMaxInFlightMessages.get() / 4;

270

currentMaxInFlightMessages.set(Math.max(1, emergencyLimit));

271

}

272

}

273

}

274

275

// Helper classes for timestamped metrics

276

public class TimestampedRequestInfo implements RequestInfo {

277

private final int batchSize;

278

private volatile long startTime;

279

280

public TimestampedRequestInfo(int batchSize) {

281

this.batchSize = batchSize;

282

}

283

284

@Override

285

public int getBatchSize() {

286

return batchSize;

287

}

288

289

public void setStartTime(long startTime) {

290

this.startTime = startTime;

291

}

292

293

public long getStartTime() {

294

return startTime;

295

}

296

}

297

298

public class TimestampedResultInfo implements ResultInfo {

299

private final int failedMessages;

300

private final int batchSize;

301

private final long startTime;

302

303

public TimestampedResultInfo(int failedMessages, int batchSize, long startTime) {

304

this.failedMessages = failedMessages;

305

this.batchSize = batchSize;

306

this.startTime = startTime;

307

}

308

309

@Override

310

public int getFailedMessages() {

311

return failedMessages;

312

}

313

314

@Override

315

public int getBatchSize() {

316

return batchSize;

317

}

318

319

public long getStartTime() {

320

return startTime;

321

}

322

}

323

324

// Moving average helper

325

public class MovingAverage {

326

private final int windowSize;

327

private final double[] values;

328

private int index = 0;

329

private int count = 0;

330

331

public MovingAverage(int windowSize) {

332

this.windowSize = windowSize;

333

this.values = new double[windowSize];

334

}

335

336

public synchronized void addValue(double value) {

337

values[index] = value;

338

index = (index + 1) % windowSize;

339

count = Math.min(count + 1, windowSize);

340

}

341

342

public synchronized double getAverage() {

343

if (count == 0) {

344

return 0.0;

345

}

346

347

double sum = 0.0;

348

for (int i = 0; i < count; i++) {

349

sum += values[i];

350

}

351

return sum / count;

352

}

353

}

354

```

355

356

### Circuit Breaker Rate Limiting

357

358

```java

359

public class CircuitBreakerRateLimitingStrategy implements RateLimitingStrategy {

360

361

public enum State {

362

CLOSED, // Normal operation

363

OPEN, // Circuit is open, blocking all requests

364

HALF_OPEN // Testing if circuit can be closed

365

}

366

367

private final int failureThreshold;

368

private final long timeoutMillis;

369

private final int halfOpenMaxRequests;

370

371

private volatile State state = State.CLOSED;

372

private final AtomicInteger failureCount = new AtomicInteger(0);

373

private final AtomicInteger halfOpenSuccessCount = new AtomicInteger(0);

374

private volatile long lastFailureTime = 0;

375

376

public CircuitBreakerRateLimitingStrategy(

377

int failureThreshold,

378

long timeoutMillis,

379

int halfOpenMaxRequests) {

380

381

this.failureThreshold = failureThreshold;

382

this.timeoutMillis = timeoutMillis;

383

this.halfOpenMaxRequests = halfOpenMaxRequests;

384

}

385

386

@Override

387

public void registerInFlightRequest(RequestInfo requestInfo) {

388

// No action needed on request start

389

}

390

391

@Override

392

public void registerCompletedRequest(ResultInfo resultInfo) {

393

int failedMessages = resultInfo.getFailedMessages();

394

int totalMessages = resultInfo.getBatchSize();

395

396

switch (state) {

397

case CLOSED:

398

if (failedMessages > 0) {

399

int failures = failureCount.addAndGet(failedMessages);

400

if (failures >= failureThreshold) {

401

// Open the circuit

402

state = State.OPEN;

403

lastFailureTime = System.currentTimeMillis();

404

LOG.warn("Circuit breaker opened after {} failures", failures);

405

}

406

} else {

407

// Reset failure count on success

408

failureCount.set(0);

409

}

410

break;

411

412

case HALF_OPEN:

413

if (failedMessages > 0) {

414

// Failure in half-open state - go back to open

415

state = State.OPEN;

416

lastFailureTime = System.currentTimeMillis();

417

halfOpenSuccessCount.set(0);

418

LOG.warn("Circuit breaker re-opened due to failure in half-open state");

419

} else {

420

// Success in half-open state

421

int successes = halfOpenSuccessCount.incrementAndGet();

422

if (successes >= halfOpenMaxRequests) {

423

// Close the circuit

424

state = State.CLOSED;

425

failureCount.set(0);

426

halfOpenSuccessCount.set(0);

427

LOG.info("Circuit breaker closed after {} successful requests", successes);

428

}

429

}

430

break;

431

432

case OPEN:

433

// Check if timeout has passed

434

if (System.currentTimeMillis() - lastFailureTime >= timeoutMillis) {

435

state = State.HALF_OPEN;

436

halfOpenSuccessCount.set(0);

437

LOG.info("Circuit breaker moved to half-open state");

438

}

439

break;

440

}

441

}

442

443

@Override

444

public boolean shouldBlock(RequestInfo requestInfo) {

445

switch (state) {

446

case CLOSED:

447

return false;

448

449

case OPEN:

450

// Check if timeout has passed

451

if (System.currentTimeMillis() - lastFailureTime >= timeoutMillis) {

452

state = State.HALF_OPEN;

453

halfOpenSuccessCount.set(0);

454

return false; // Allow first half-open request

455

}

456

return true; // Block all requests

457

458

case HALF_OPEN:

459

// Allow limited requests in half-open state

460

return halfOpenSuccessCount.get() >= halfOpenMaxRequests;

461

462

default:

463

return false;

464

}

465

}

466

467

@Override

468

public int getMaxBatchSize() {

469

switch (state) {

470

case CLOSED:

471

return Integer.MAX_VALUE; // No limit

472

473

case OPEN:

474

return 0; // Block all

475

476

case HALF_OPEN:

477

return 1; // Small batches for testing

478

479

default:

480

return Integer.MAX_VALUE;

481

}

482

}

483

484

public State getCurrentState() {

485

return state;

486

}

487

}

488

```

489

490

### Advanced Scaling Strategies

491

492

```java

493

// TCP-like congestion control scaling

494

public class TCPLikeScalingStrategy implements ScalingStrategy<Integer> {

495

private final int slowStartThreshold;

496

private volatile boolean inSlowStart = true;

497

private volatile int congestionWindow = 1;

498

499

public TCPLikeScalingStrategy(int slowStartThreshold) {

500

this.slowStartThreshold = slowStartThreshold;

501

}

502

503

@Override

504

public Integer scaleUp(Integer currentValue) {

505

if (inSlowStart) {

506

// Exponential growth in slow start

507

congestionWindow *= 2;

508

if (congestionWindow >= slowStartThreshold) {

509

inSlowStart = false;

510

}

511

} else {

512

// Linear growth in congestion avoidance

513

congestionWindow += 1;

514

}

515

516

return Math.min(congestionWindow, currentValue * 2); // Cap at double

517

}

518

519

@Override

520

public Integer scaleDown(Integer currentValue) {

521

// Multiplicative decrease

522

congestionWindow = Math.max(1, congestionWindow / 2);

523

inSlowStart = false; // Exit slow start on congestion

524

return Math.max(1, currentValue / 2);

525

}

526

}

527

528

// Exponential backoff with jitter

529

public class ExponentialBackoffScalingStrategy implements ScalingStrategy<Integer> {

530

private final Random random = new Random();

531

private volatile int backoffMultiplier = 1;

532

533

@Override

534

public Integer scaleUp(Integer currentValue) {

535

// Reset backoff on success

536

backoffMultiplier = 1;

537

return Math.min(currentValue * 2, currentValue + 10); // Conservative increase

538

}

539

540

@Override

541

public Integer scaleDown(Integer currentValue) {

542

// Exponential backoff with jitter

543

backoffMultiplier = Math.min(backoffMultiplier * 2, 64); // Max 64x backoff

544

double jitterFactor = 0.5 + (random.nextDouble() * 0.5); // 0.5 to 1.0

545

int reduction = (int) (currentValue * 0.5 * backoffMultiplier * jitterFactor);

546

547

return Math.max(1, currentValue - reduction);

548

}

549

}

550

551

// Percentile-based scaling

552

public class PercentileBasedScalingStrategy implements ScalingStrategy<Integer> {

553

private final PercentileTracker latencyTracker;

554

private final long targetLatencyP95;

555

private final long targetLatencyP99;

556

557

public PercentileBasedScalingStrategy(long targetLatencyP95, long targetLatencyP99) {

558

this.targetLatencyP95 = targetLatencyP95;

559

this.targetLatencyP99 = targetLatencyP99;

560

this.latencyTracker = new PercentileTracker();

561

}

562

563

public void recordLatency(long latency) {

564

latencyTracker.record(latency);

565

}

566

567

@Override

568

public Integer scaleUp(Integer currentValue) {

569

long p95 = latencyTracker.getPercentile(95);

570

long p99 = latencyTracker.getPercentile(99);

571

572

// Scale up only if latency is well below targets

573

if (p95 < targetLatencyP95 * 0.7 && p99 < targetLatencyP99 * 0.7) {

574

return Math.min(currentValue + (currentValue / 10), currentValue * 2);

575

}

576

577

return currentValue;

578

}

579

580

@Override

581

public Integer scaleDown(Integer currentValue) {

582

long p95 = latencyTracker.getPercentile(95);

583

long p99 = latencyTracker.getPercentile(99);

584

585

// Aggressive scale down if latency is too high

586

if (p99 > targetLatencyP99) {

587

return Math.max(1, currentValue / 4);

588

} else if (p95 > targetLatencyP95) {

589

return Math.max(1, currentValue / 2);

590

}

591

592

return currentValue;

593

}

594

}

595

```

596

597

## Configuration Patterns

598

599

### Multi-Tier Rate Limiting

600

601

```java

602

public class MultiTierRateLimitingStrategy implements RateLimitingStrategy {

603

private final List<RateLimitingStrategy> strategies;

604

605

public MultiTierRateLimitingStrategy(RateLimitingStrategy... strategies) {

606

this.strategies = Arrays.asList(strategies);

607

}

608

609

@Override

610

public void registerInFlightRequest(RequestInfo requestInfo) {

611

strategies.forEach(strategy -> strategy.registerInFlightRequest(requestInfo));

612

}

613

614

@Override

615

public void registerCompletedRequest(ResultInfo resultInfo) {

616

strategies.forEach(strategy -> strategy.registerCompletedRequest(resultInfo));

617

}

618

619

@Override

620

public boolean shouldBlock(RequestInfo requestInfo) {

621

// Block if ANY strategy says to block

622

return strategies.stream().anyMatch(strategy -> strategy.shouldBlock(requestInfo));

623

}

624

625

@Override

626

public int getMaxBatchSize() {

627

// Use the minimum batch size from all strategies

628

return strategies.stream()

629

.mapToInt(RateLimitingStrategy::getMaxBatchSize)

630

.min()

631

.orElse(Integer.MAX_VALUE);

632

}

633

}

634

635

// Usage

636

RateLimitingStrategy multiTier = new MultiTierRateLimitingStrategy(

637

new TokenBucketRateLimitingStrategy(1000, 100), // Token bucket limit

638

new CircuitBreakerRateLimitingStrategy(10, 30000, 5), // Circuit breaker

639

CongestionControlRateLimitingStrategy.builder() // Congestion control

640

.setMaxInFlightRequests(50)

641

.setInitialMaxInFlightMessages(100)

642

.setScalingStrategy(AIMDScalingStrategy.builder(500).build())

643

.build()

644

);

645

```

646

647

### Environment-Aware Configuration

648

649

```java

650

public class EnvironmentAwareRateLimitingFactory {

651

652

public static RateLimitingStrategy createForEnvironment(String environment) {

653

switch (environment.toLowerCase()) {

654

case "production":

655

return createProductionStrategy();

656

case "staging":

657

return createStagingStrategy();

658

case "development":

659

return createDevelopmentStrategy();

660

default:

661

return createDefaultStrategy();

662

}

663

}

664

665

private static RateLimitingStrategy createProductionStrategy() {

666

// Conservative settings for production

667

AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(2000)

668

.setIncreaseRate(5) // Slow increase

669

.setDecreaseFactor(0.7) // Moderate decrease

670

.build();

671

672

return new MultiTierRateLimitingStrategy(

673

CongestionControlRateLimitingStrategy.builder()

674

.setMaxInFlightRequests(100)

675

.setInitialMaxInFlightMessages(50)

676

.setScalingStrategy(scalingStrategy)

677

.build(),

678

new CircuitBreakerRateLimitingStrategy(20, 60000, 10)

679

);

680

}

681

682

private static RateLimitingStrategy createStagingStrategy() {

683

// Moderate settings for staging

684

AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)

685

.setIncreaseRate(10)

686

.setDecreaseFactor(0.6)

687

.build();

688

689

return CongestionControlRateLimitingStrategy.builder()

690

.setMaxInFlightRequests(50)

691

.setInitialMaxInFlightMessages(25)

692

.setScalingStrategy(scalingStrategy)

693

.build();

694

}

695

696

private static RateLimitingStrategy createDevelopmentStrategy() {

697

// Permissive settings for development

698

return new NoOpRateLimitingStrategy();

699

}

700

}

701

702

public class NoOpRateLimitingStrategy implements RateLimitingStrategy {

703

@Override

704

public void registerInFlightRequest(RequestInfo requestInfo) {}

705

706

@Override

707

public void registerCompletedRequest(ResultInfo resultInfo) {}

708

709

@Override

710

public boolean shouldBlock(RequestInfo requestInfo) {

711

return false;

712

}

713

714

@Override

715

public int getMaxBatchSize() {

716

return Integer.MAX_VALUE;

717

}

718

}

719

```

720

721

## Best Practices

722

723

### Performance Monitoring and Metrics

724

725

```java

726

public class MetricsEnabledRateLimitingStrategy implements RateLimitingStrategy {

727

private final RateLimitingStrategy delegate;

728

private final MetricGroup metricGroup;

729

730

// Metrics

731

private final Counter requestsBlocked;

732

private final Counter requestsAllowed;

733

private final Gauge<Integer> currentBatchSize;

734

private final Histogram requestLatency;

735

736

public MetricsEnabledRateLimitingStrategy(

737

RateLimitingStrategy delegate,

738

MetricGroup metricGroup) {

739

this.delegate = delegate;

740

this.metricGroup = metricGroup;

741

742

this.requestsBlocked = metricGroup.counter("requests_blocked");

743

this.requestsAllowed = metricGroup.counter("requests_allowed");

744

this.currentBatchSize = metricGroup.gauge("current_batch_size", delegate::getMaxBatchSize);

745

this.requestLatency = metricGroup.histogram("request_latency");

746

}

747

748

@Override

749

public boolean shouldBlock(RequestInfo requestInfo) {

750

boolean shouldBlock = delegate.shouldBlock(requestInfo);

751

752

if (shouldBlock) {

753

requestsBlocked.inc();

754

} else {

755

requestsAllowed.inc();

756

}

757

758

return shouldBlock;

759

}

760

761

// ... delegate other methods

762

}

763

```

764

765

### Testing and Validation

766

767

```java

768

public class RateLimitingStrategyTester {

769

770

public static TestResults testStrategy(

771

RateLimitingStrategy strategy,

772

TestScenario scenario) {

773

774

TestResults results = new TestResults();

775

776

for (TestCase testCase : scenario.getTestCases()) {

777

long startTime = System.currentTimeMillis();

778

779

// Simulate requests

780

for (RequestPattern request : testCase.getRequests()) {

781

boolean blocked = strategy.shouldBlock(request.getRequestInfo());

782

783

if (!blocked) {

784

strategy.registerInFlightRequest(request.getRequestInfo());

785

786

// Simulate request completion

787

CompletableFuture.delayedExecutor(request.getLatency(), TimeUnit.MILLISECONDS)

788

.execute(() -> {

789

strategy.registerCompletedRequest(request.getResultInfo());

790

});

791

}

792

793

results.recordRequest(blocked, request);

794

}

795

796

long endTime = System.currentTimeMillis();

797

results.recordTestCase(testCase, endTime - startTime);

798

}

799

800

return results;

801

}

802

}

803

```

804

805

The Rate Limiting & Scaling framework provides sophisticated tools for building resilient, high-performance systems that can adapt to changing conditions while maintaining optimal throughput and protecting downstream systems from overload.