or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcore-rpc.mdindex.mdplugins.mdprotocols.mdstats.mdtransports.md

stats.mddocs/

0

# Statistics and Monitoring

1

2

Apache Avro IPC provides built-in performance monitoring capabilities through histogram-based statistics collection, latency tracking, payload analysis, and web-based visualization.

3

4

## Capabilities

5

6

### Statistics Collection Plugin

7

8

The `StatsPlugin` automatically collects comprehensive RPC performance metrics including call counts, latency distributions, and payload size analysis.

9

10

```java { .api }

11

public class StatsPlugin extends RPCPlugin {

12

// Constructors

13

public StatsPlugin();

14

public StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter, Segmenter<?, Integer> integerSegmenter);

15

16

// Server startup tracking

17

public Date startupTime;

18

19

// Default segmenters for bucketing metrics

20

public static final Segmenter<String, Float> LATENCY_SEGMENTER;

21

public static final Segmenter<String, Integer> PAYLOAD_SEGMENTER;

22

23

// Utility methods

24

public static float nanosToMillis(long elapsedNanos);

25

26

// Inherited plugin methods for metric collection

27

public void clientStartConnect(RPCContext context);

28

public void clientFinishConnect(RPCContext context);

29

public void clientSendRequest(RPCContext context);

30

public void clientReceiveResponse(RPCContext context);

31

public void serverConnecting(RPCContext context);

32

public void serverReceiveRequest(RPCContext context);

33

public void serverSendResponse(RPCContext context);

34

}

35

```

36

37

#### Usage Examples

38

39

```java

40

// Basic statistics collection

41

StatsPlugin statsPlugin = new StatsPlugin();

42

43

// Add to requestor and responder

44

requestor.addRPCPlugin(statsPlugin);

45

responder.addRPCPlugin(statsPlugin);

46

47

// Statistics are automatically collected for all RPC calls

48

MyService client = SpecificRequestor.getClient(MyService.class, transceiver);

49

String result = client.processData("test data"); // Metrics collected automatically

50

51

// Access startup time

52

System.out.println("Server started at: " + statsPlugin.startupTime);

53

54

// Custom segmenters for specialized bucketing

55

Segmenter<String, Float> customLatencySegmenter = new Segmenter<String, Float>() {

56

@Override

57

public int size() { return 5; }

58

59

@Override

60

public int segment(Float value) {

61

if (value < 10) return 0; // < 10ms

62

if (value < 50) return 1; // 10-50ms

63

if (value < 200) return 2; // 50-200ms

64

if (value < 1000) return 3; // 200ms-1s

65

return 4; // > 1s

66

}

67

68

@Override

69

public Iterator<String> getBuckets() {

70

return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s").iterator();

71

}

72

73

@Override

74

public List<String> getBoundaryLabels() {

75

return Arrays.asList("10", "50", "200", "1000");

76

}

77

78

@Override

79

public List<String> getBucketLabels() {

80

return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s");

81

}

82

};

83

84

StatsPlugin customStatsPlugin = new StatsPlugin(StatsPlugin.SYSTEM_TICKS,

85

customLatencySegmenter, StatsPlugin.PAYLOAD_SEGMENTER);

86

```

87

88

### Web-Based Statistics Viewer

89

90

The `StatsServlet` provides a web interface for viewing collected statistics with histograms, summaries, and real-time metrics.

91

92

```java { .api }

93

public class StatsServlet extends HttpServlet {

94

// Constructor

95

public StatsServlet(StatsPlugin statsPlugin);

96

97

// Web interface methods

98

protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException;

99

public void writeStats(Writer w) throws IOException;

100

101

// Utility methods

102

protected static List<String> escapeStringArray(List<String> input);

103

104

// Inner classes for rendering

105

public static class RenderableMessage {

106

// Public fields and methods for Velocity template access

107

}

108

}

109

```

110

111

#### Usage Examples

112

113

```java

114

// Web-based statistics viewer

115

StatsPlugin statsPlugin = new StatsPlugin();

116

StatsServlet statsServlet = new StatsServlet(statsPlugin);

117

118

// Deploy to servlet container (example with Jetty)

119

ServletContextHandler context = new ServletContextHandler();

120

context.addServlet(new ServletHolder(statsServlet), "/stats");

121

122

// Access statistics via HTTP

123

// GET http://localhost:8080/stats

124

// Returns HTML page with histograms and metrics

125

126

// Programmatic statistics access

127

StringWriter writer = new StringWriter();

128

statsServlet.writeStats(writer);

129

String statsHtml = writer.toString();

130

System.out.println(statsHtml);

131

```

132

133

### Histogram Data Structures

134

135

Generic histogram implementation for collecting and analyzing metric distributions.

136

137

#### Base Histogram Class

138

139

```java { .api }

140

public class Histogram<B,T> {

141

public static final int MAX_HISTORY_SIZE = 20;

142

143

// Constructor

144

public Histogram(Segmenter<B,T> segmenter);

145

146

// Data collection

147

public void add(T value);

148

149

// Data access

150

public int[] getHistogram();

151

public Segmenter<B,T> getSegmenter();

152

public List<T> getRecentAdditions();

153

public int getCount();

154

public Iterable<Entry<B>> entries();

155

156

// Inner interfaces and classes

157

public interface Segmenter<B,T> {

158

int size();

159

int segment(T value);

160

Iterator<B> getBuckets();

161

List<String> getBoundaryLabels();

162

List<String> getBucketLabels();

163

}

164

165

public static class SegmenterException extends RuntimeException {

166

public SegmenterException(String message);

167

public SegmenterException(String message, Throwable cause);

168

}

169

170

public static class TreeMapSegmenter<T extends Comparable<T>> implements Segmenter<String,T> {

171

public TreeMapSegmenter(T[] boundaries, String[] labels);

172

// Implementation of Segmenter interface

173

}

174

175

public static class Entry<B> {

176

public B bucket;

177

public int count;

178

// Constructor and methods

179

}

180

}

181

```

182

183

#### Float Histogram with Statistics

184

185

```java { .api }

186

public class FloatHistogram<B> extends Histogram<B, Float> {

187

// Constructor

188

public FloatHistogram(Segmenter<B,Float> segmenter);

189

190

// Statistical calculations

191

public float getMean();

192

public float getUnbiasedStdDev();

193

194

// Inherited methods from Histogram

195

public void add(Float value);

196

public int getCount();

197

public int[] getHistogram();

198

}

199

```

200

201

#### Integer Histogram with Statistics

202

203

```java { .api }

204

public class IntegerHistogram<B> extends Histogram<B, Integer> {

205

// Constructor

206

public IntegerHistogram(Segmenter<B,Integer> segmenter);

207

208

// Statistical calculations

209

public float getMean();

210

public float getUnbiasedStdDev();

211

212

// Inherited methods from Histogram

213

public void add(Integer value);

214

public int getCount();

215

public int[] getHistogram();

216

}

217

```

218

219

#### Usage Examples

220

221

```java

222

// Latency histogram

223

Segmenter<String, Float> latencySegmenter = new Histogram.TreeMapSegmenter<>(

224

new Float[]{10.0f, 50.0f, 200.0f, 1000.0f},

225

new String[]{"<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s"}

226

);

227

228

FloatHistogram<String> latencyHistogram = new FloatHistogram<>(latencySegmenter);

229

230

// Collect latency data

231

latencyHistogram.add(15.5f); // 10-50ms bucket

232

latencyHistogram.add(75.2f); // 50-200ms bucket

233

latencyHistogram.add(5.1f); // <10ms bucket

234

235

// Analyze statistics

236

System.out.println("Mean latency: " + latencyHistogram.getMean() + "ms");

237

System.out.println("Std deviation: " + latencyHistogram.getUnbiasedStdDev() + "ms");

238

System.out.println("Total samples: " + latencyHistogram.getCount());

239

240

// Get histogram distribution

241

int[] bucketCounts = latencyHistogram.getHistogram();

242

for (int i = 0; i < bucketCounts.length; i++) {

243

System.out.println("Bucket " + i + ": " + bucketCounts[i] + " samples");

244

}

245

246

// Payload size histogram

247

Segmenter<String, Integer> payloadSegmenter = new Histogram.TreeMapSegmenter<>(

248

new Integer[]{1024, 10240, 102400, 1048576},

249

new String[]{"<1KB", "1-10KB", "10-100KB", "100KB-1MB", ">1MB"}

250

);

251

252

IntegerHistogram<String> payloadHistogram = new IntegerHistogram<>(payloadSegmenter);

253

payloadHistogram.add(2048); // 1-10KB bucket

254

payloadHistogram.add(512); // <1KB bucket

255

payloadHistogram.add(204800); // 100KB-1MB bucket

256

```

257

258

### Time Measurement Utilities

259

260

Precise time measurement for performance tracking and latency analysis.

261

262

#### Stopwatch Class

263

264

```java { .api }

265

public class Stopwatch {

266

// Time source interface

267

public interface Ticks {

268

long ticks();

269

}

270

271

// System time implementation

272

public static final Ticks SYSTEM_TICKS;

273

274

// Constructor

275

public Stopwatch(Ticks ticks);

276

277

// Timing methods

278

public void start();

279

public void stop();

280

public long elapsedNanos();

281

}

282

```

283

284

#### Usage Examples

285

286

```java

287

// Basic stopwatch usage

288

Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);

289

290

stopwatch.start();

291

// ... perform operation to measure

292

performExpensiveOperation();

293

stopwatch.stop();

294

295

long elapsedNanos = stopwatch.elapsedNanos();

296

float elapsedMillis = StatsPlugin.nanosToMillis(elapsedNanos);

297

System.out.println("Operation took: " + elapsedMillis + "ms");

298

299

// Custom time source for testing

300

Stopwatch testStopwatch = new Stopwatch(new Stopwatch.Ticks() {

301

private long currentTime = 0;

302

303

@Override

304

public long ticks() {

305

return currentTime += 1000000; // Add 1ms per tick

306

}

307

});

308

309

testStopwatch.start();

310

// Simulated passage of time

311

testStopwatch.stop();

312

System.out.println("Test elapsed: " + testStopwatch.elapsedNanos() + "ns");

313

```

314

315

## Advanced Monitoring Examples

316

317

### Custom Statistics Collection

318

319

```java

320

public class CustomStatsPlugin extends RPCPlugin {

321

private final Map<String, FloatHistogram<String>> methodLatencies = new ConcurrentHashMap<>();

322

private final Map<String, IntegerHistogram<String>> methodPayloads = new ConcurrentHashMap<>();

323

private final Map<String, AtomicLong> methodCounts = new ConcurrentHashMap<>();

324

private final ThreadLocal<Stopwatch> requestStopwatch = new ThreadLocal<>();

325

326

@Override

327

public void serverReceiveRequest(RPCContext context) {

328

// Start timing

329

Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);

330

stopwatch.start();

331

requestStopwatch.set(stopwatch);

332

333

// Count method invocations

334

String methodName = context.getMessage().getName();

335

methodCounts.computeIfAbsent(methodName, k -> new AtomicLong()).incrementAndGet();

336

}

337

338

@Override

339

public void serverSendResponse(RPCContext context) {

340

// Stop timing and collect latency

341

Stopwatch stopwatch = requestStopwatch.get();

342

if (stopwatch != null) {

343

stopwatch.stop();

344

float latencyMs = StatsPlugin.nanosToMillis(stopwatch.elapsedNanos());

345

346

String methodName = context.getMessage().getName();

347

FloatHistogram<String> latencyHist = methodLatencies.computeIfAbsent(methodName,

348

k -> new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER));

349

latencyHist.add(latencyMs);

350

351

requestStopwatch.remove();

352

}

353

354

// Collect payload size

355

List<ByteBuffer> responsePayload = context.getResponsePayload();

356

if (responsePayload != null) {

357

int totalSize = responsePayload.stream()

358

.mapToInt(ByteBuffer::remaining)

359

.sum();

360

361

String methodName = context.getMessage().getName();

362

IntegerHistogram<String> payloadHist = methodPayloads.computeIfAbsent(methodName,

363

k -> new IntegerHistogram<>(StatsPlugin.PAYLOAD_SEGMENTER));

364

payloadHist.add(totalSize);

365

}

366

}

367

368

// Public methods to access collected statistics

369

public Map<String, FloatHistogram<String>> getMethodLatencies() {

370

return Collections.unmodifiableMap(methodLatencies);

371

}

372

373

public Map<String, AtomicLong> getMethodCounts() {

374

return Collections.unmodifiableMap(methodCounts);

375

}

376

377

public void printStatistics() {

378

System.out.println("=== Custom RPC Statistics ===");

379

380

for (Map.Entry<String, AtomicLong> entry : methodCounts.entrySet()) {

381

String method = entry.getKey();

382

long count = entry.getValue().get();

383

384

FloatHistogram<String> latencyHist = methodLatencies.get(method);

385

float avgLatency = latencyHist != null ? latencyHist.getMean() : 0;

386

387

System.out.printf("Method: %s, Calls: %d, Avg Latency: %.2fms%n",

388

method, count, avgLatency);

389

}

390

}

391

}

392

```

393

394

### JMX Integration

395

396

```java

397

public class JMXStatsPlugin extends RPCPlugin implements JMXStatsPluginMBean {

398

private final AtomicLong totalRequests = new AtomicLong();

399

private final AtomicLong errorCount = new AtomicLong();

400

private final FloatHistogram<String> latencyHistogram;

401

private final ThreadLocal<Stopwatch> requestTimer = new ThreadLocal<>();

402

403

public JMXStatsPlugin() {

404

this.latencyHistogram = new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER);

405

406

// Register with JMX

407

try {

408

MBeanServer server = ManagementFactory.getPlatformMBeanServer();

409

ObjectName name = new ObjectName("org.apache.avro.ipc:type=Stats");

410

server.registerMBean(this, name);

411

} catch (Exception e) {

412

System.err.println("Failed to register JMX bean: " + e.getMessage());

413

}

414

}

415

416

@Override

417

public void serverReceiveRequest(RPCContext context) {

418

totalRequests.incrementAndGet();

419

Stopwatch timer = new Stopwatch(Stopwatch.SYSTEM_TICKS);

420

timer.start();

421

requestTimer.set(timer);

422

}

423

424

@Override

425

public void serverSendResponse(RPCContext context) {

426

Stopwatch timer = requestTimer.get();

427

if (timer != null) {

428

timer.stop();

429

float latencyMs = StatsPlugin.nanosToMillis(timer.elapsedNanos());

430

latencyHistogram.add(latencyMs);

431

requestTimer.remove();

432

}

433

434

if (context.isError()) {

435

errorCount.incrementAndGet();

436

}

437

}

438

439

// JMX interface methods

440

@Override

441

public long getTotalRequests() {

442

return totalRequests.get();

443

}

444

445

@Override

446

public long getErrorCount() {

447

return errorCount.get();

448

}

449

450

@Override

451

public double getAverageLatency() {

452

return latencyHistogram.getMean();

453

}

454

455

@Override

456

public double getErrorRate() {

457

long total = totalRequests.get();

458

return total > 0 ? (double) errorCount.get() / total : 0.0;

459

}

460

}

461

462

// JMX interface

463

public interface JMXStatsPluginMBean {

464

long getTotalRequests();

465

long getErrorCount();

466

double getAverageLatency();

467

double getErrorRate();

468

}

469

```

470

471

### Health Check Integration

472

473

```java

474

public class HealthCheckPlugin extends RPCPlugin {

475

private final AtomicReference<HealthStatus> healthStatus = new AtomicReference<>(HealthStatus.HEALTHY);

476

private final CircularBuffer<Long> recentLatencies = new CircularBuffer<>(100);

477

private final AtomicLong consecutiveErrors = new AtomicLong();

478

479

private static final float LATENCY_THRESHOLD_MS = 1000.0f;

480

private static final long ERROR_THRESHOLD = 5;

481

482

@Override

483

public void serverReceiveRequest(RPCContext context) {

484

// Reset consecutive errors on successful request receipt

485

if (healthStatus.get() == HealthStatus.DEGRADED) {

486

consecutiveErrors.set(0);

487

}

488

}

489

490

@Override

491

public void serverSendResponse(RPCContext context) {

492

if (context.isError()) {

493

long errors = consecutiveErrors.incrementAndGet();

494

if (errors >= ERROR_THRESHOLD) {

495

healthStatus.set(HealthStatus.UNHEALTHY);

496

}

497

} else {

498

consecutiveErrors.set(0);

499

500

// Check latency for health degradation

501

// (This would need actual latency measurement)

502

checkLatencyHealth();

503

}

504

}

505

506

private void checkLatencyHealth() {

507

if (recentLatencies.size() >= 10) {

508

double avgLatency = recentLatencies.stream()

509

.mapToLong(Long::longValue)

510

.average()

511

.orElse(0.0);

512

513

if (avgLatency > LATENCY_THRESHOLD_MS) {

514

healthStatus.set(HealthStatus.DEGRADED);

515

} else if (healthStatus.get() == HealthStatus.DEGRADED) {

516

healthStatus.set(HealthStatus.HEALTHY);

517

}

518

}

519

}

520

521

public HealthStatus getHealthStatus() {

522

return healthStatus.get();

523

}

524

525

public enum HealthStatus {

526

HEALTHY, DEGRADED, UNHEALTHY

527

}

528

}

529

```

530

531

## Performance Impact and Best Practices

532

533

### Statistics Collection Overhead

534

535

- `StatsPlugin` adds minimal overhead (< 1% typically)

536

- Histogram operations are O(1) for bucket assignment

537

- Memory usage scales with number of buckets and history size

538

- Web servlet generates HTML on-demand (no constant overhead)

539

540

### Optimization Guidelines

541

542

```java

543

// Good: Efficient statistics collection

544

StatsPlugin statsPlugin = new StatsPlugin();

545

// Uses default segmenters with reasonable bucket counts

546

547

// Good: Custom segmenter with appropriate bucket count

548

Segmenter<String, Float> efficientSegmenter = new Histogram.TreeMapSegmenter<>(

549

new Float[]{10.0f, 100.0f, 1000.0f}, // Only 4 buckets

550

new String[]{"<10ms", "10-100ms", "100ms-1s", ">1s"}

551

);

552

553

// Bad: Too many buckets

554

Segmenter<String, Float> inefficientSegmenter = new Histogram.TreeMapSegmenter<>(

555

new Float[]{1.0f, 2.0f, 3.0f, /* ... 100 boundaries ... */}, // 100+ buckets

556

new String[]{/* ... 100+ labels ... */}

557

);

558

559

// Good: Bounded history size (default MAX_HISTORY_SIZE = 20)

560

// Bad: Unbounded data collection that grows indefinitely

561

```

562

563

### Memory Management

564

565

```java

566

// Monitor histogram memory usage

567

public void printHistogramStats(Histogram<?, ?> histogram) {

568

System.out.println("Histogram buckets: " + histogram.getSegmenter().size());

569

System.out.println("Recent additions: " + histogram.getRecentAdditions().size());

570

System.out.println("Total count: " + histogram.getCount());

571

}

572

573

// Periodic cleanup for long-running applications

574

public class RotatingStatsPlugin extends StatsPlugin {

575

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

576

577

public RotatingStatsPlugin() {

578

super();

579

// Reset statistics every hour

580

scheduler.scheduleAtFixedRate(this::resetStatistics, 1, 1, TimeUnit.HOURS);

581

}

582

583

private void resetStatistics() {

584

// Reset internal histograms (implementation-specific)

585

System.out.println("Statistics reset at: " + new Date());

586

}

587

}

588

```