0
# Statistics and Monitoring
1
2
Apache Avro IPC provides built-in performance monitoring capabilities through histogram-based statistics collection, latency tracking, payload analysis, and web-based visualization.
3
4
## Capabilities
5
6
### Statistics Collection Plugin
7
8
The `StatsPlugin` automatically collects comprehensive RPC performance metrics including call counts, latency distributions, and payload size analysis.
9
10
```java { .api }
11
public class StatsPlugin extends RPCPlugin {
12
// Constructors
13
public StatsPlugin();
14
public StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter, Segmenter<?, Integer> integerSegmenter);
15
16
// Server startup tracking
17
public Date startupTime;
18
19
// Default segmenters for bucketing metrics
20
public static final Segmenter<String, Float> LATENCY_SEGMENTER;
21
public static final Segmenter<String, Integer> PAYLOAD_SEGMENTER;
22
23
// Utility methods
24
public static float nanosToMillis(long elapsedNanos);
25
26
// Inherited plugin methods for metric collection
27
public void clientStartConnect(RPCContext context);
28
public void clientFinishConnect(RPCContext context);
29
public void clientSendRequest(RPCContext context);
30
public void clientReceiveResponse(RPCContext context);
31
public void serverConnecting(RPCContext context);
32
public void serverReceiveRequest(RPCContext context);
33
public void serverSendResponse(RPCContext context);
34
}
35
```
36
37
#### Usage Examples
38
39
```java
40
// Basic statistics collection
41
StatsPlugin statsPlugin = new StatsPlugin();
42
43
// Add to requestor and responder
44
requestor.addRPCPlugin(statsPlugin);
45
responder.addRPCPlugin(statsPlugin);
46
47
// Statistics are automatically collected for all RPC calls
48
MyService client = SpecificRequestor.getClient(MyService.class, transceiver);
49
String result = client.processData("test data"); // Metrics collected automatically
50
51
// Access startup time
52
System.out.println("Server started at: " + statsPlugin.startupTime);
53
54
// Custom segmenters for specialized bucketing
55
Segmenter<String, Float> customLatencySegmenter = new Segmenter<String, Float>() {
56
@Override
57
public int size() { return 5; }
58
59
@Override
60
public int segment(Float value) {
61
if (value < 10) return 0; // < 10ms
62
if (value < 50) return 1; // 10-50ms
63
if (value < 200) return 2; // 50-200ms
64
if (value < 1000) return 3; // 200ms-1s
65
return 4; // > 1s
66
}
67
68
@Override
69
public Iterator<String> getBuckets() {
70
return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s").iterator();
71
}
72
73
@Override
74
public List<String> getBoundaryLabels() {
75
return Arrays.asList("10", "50", "200", "1000");
76
}
77
78
@Override
79
public List<String> getBucketLabels() {
80
return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s");
81
}
82
};
83
84
StatsPlugin customStatsPlugin = new StatsPlugin(StatsPlugin.SYSTEM_TICKS,
85
customLatencySegmenter, StatsPlugin.PAYLOAD_SEGMENTER);
86
```
87
88
### Web-Based Statistics Viewer
89
90
The `StatsServlet` provides a web interface for viewing collected statistics with histograms, summaries, and real-time metrics.
91
92
```java { .api }
93
public class StatsServlet extends HttpServlet {
94
// Constructor
95
public StatsServlet(StatsPlugin statsPlugin);
96
97
// Web interface methods
98
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException;
99
public void writeStats(Writer w) throws IOException;
100
101
// Utility methods
102
protected static List<String> escapeStringArray(List<String> input);
103
104
// Inner classes for rendering
105
public static class RenderableMessage {
106
// Public fields and methods for Velocity template access
107
}
108
}
109
```
110
111
#### Usage Examples
112
113
```java
114
// Web-based statistics viewer
115
StatsPlugin statsPlugin = new StatsPlugin();
116
StatsServlet statsServlet = new StatsServlet(statsPlugin);
117
118
// Deploy to servlet container (example with Jetty)
119
ServletContextHandler context = new ServletContextHandler();
120
context.addServlet(new ServletHolder(statsServlet), "/stats");
121
122
// Access statistics via HTTP
123
// GET http://localhost:8080/stats
124
// Returns HTML page with histograms and metrics
125
126
// Programmatic statistics access
127
StringWriter writer = new StringWriter();
128
statsServlet.writeStats(writer);
129
String statsHtml = writer.toString();
130
System.out.println(statsHtml);
131
```
132
133
### Histogram Data Structures
134
135
Generic histogram implementation for collecting and analyzing metric distributions.
136
137
#### Base Histogram Class
138
139
```java { .api }
140
public class Histogram<B,T> {
141
public static final int MAX_HISTORY_SIZE = 20;
142
143
// Constructor
144
public Histogram(Segmenter<B,T> segmenter);
145
146
// Data collection
147
public void add(T value);
148
149
// Data access
150
public int[] getHistogram();
151
public Segmenter<B,T> getSegmenter();
152
public List<T> getRecentAdditions();
153
public int getCount();
154
public Iterable<Entry<B>> entries();
155
156
// Inner interfaces and classes
157
public interface Segmenter<B,T> {
158
int size();
159
int segment(T value);
160
Iterator<B> getBuckets();
161
List<String> getBoundaryLabels();
162
List<String> getBucketLabels();
163
}
164
165
public static class SegmenterException extends RuntimeException {
166
public SegmenterException(String message);
167
public SegmenterException(String message, Throwable cause);
168
}
169
170
public static class TreeMapSegmenter<T extends Comparable<T>> implements Segmenter<String,T> {
171
public TreeMapSegmenter(T[] boundaries, String[] labels);
172
// Implementation of Segmenter interface
173
}
174
175
public static class Entry<B> {
176
public B bucket;
177
public int count;
178
// Constructor and methods
179
}
180
}
181
```
182
183
#### Float Histogram with Statistics
184
185
```java { .api }
186
public class FloatHistogram<B> extends Histogram<B, Float> {
187
// Constructor
188
public FloatHistogram(Segmenter<B,Float> segmenter);
189
190
// Statistical calculations
191
public float getMean();
192
public float getUnbiasedStdDev();
193
194
// Inherited methods from Histogram
195
public void add(Float value);
196
public int getCount();
197
public int[] getHistogram();
198
}
199
```
200
201
#### Integer Histogram with Statistics
202
203
```java { .api }
204
public class IntegerHistogram<B> extends Histogram<B, Integer> {
205
// Constructor
206
public IntegerHistogram(Segmenter<B,Integer> segmenter);
207
208
// Statistical calculations
209
public float getMean();
210
public float getUnbiasedStdDev();
211
212
// Inherited methods from Histogram
213
public void add(Integer value);
214
public int getCount();
215
public int[] getHistogram();
216
}
217
```
218
219
#### Usage Examples
220
221
```java
222
// Latency histogram
223
Segmenter<String, Float> latencySegmenter = new Histogram.TreeMapSegmenter<>(
224
new Float[]{10.0f, 50.0f, 200.0f, 1000.0f},
225
new String[]{"<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s"}
226
);
227
228
FloatHistogram<String> latencyHistogram = new FloatHistogram<>(latencySegmenter);
229
230
// Collect latency data
231
latencyHistogram.add(15.5f); // 10-50ms bucket
232
latencyHistogram.add(75.2f); // 50-200ms bucket
233
latencyHistogram.add(5.1f); // <10ms bucket
234
235
// Analyze statistics
236
System.out.println("Mean latency: " + latencyHistogram.getMean() + "ms");
237
System.out.println("Std deviation: " + latencyHistogram.getUnbiasedStdDev() + "ms");
238
System.out.println("Total samples: " + latencyHistogram.getCount());
239
240
// Get histogram distribution
241
int[] bucketCounts = latencyHistogram.getHistogram();
242
for (int i = 0; i < bucketCounts.length; i++) {
243
System.out.println("Bucket " + i + ": " + bucketCounts[i] + " samples");
244
}
245
246
// Payload size histogram
247
Segmenter<String, Integer> payloadSegmenter = new Histogram.TreeMapSegmenter<>(
248
new Integer[]{1024, 10240, 102400, 1048576},
249
new String[]{"<1KB", "1-10KB", "10-100KB", "100KB-1MB", ">1MB"}
250
);
251
252
IntegerHistogram<String> payloadHistogram = new IntegerHistogram<>(payloadSegmenter);
253
payloadHistogram.add(2048); // 1-10KB bucket
254
payloadHistogram.add(512); // <1KB bucket
255
payloadHistogram.add(204800); // 100KB-1MB bucket
256
```
257
258
### Time Measurement Utilities
259
260
Precise time measurement for performance tracking and latency analysis.
261
262
#### Stopwatch Class
263
264
```java { .api }
265
public class Stopwatch {
266
// Time source interface
267
public interface Ticks {
268
long ticks();
269
}
270
271
// System time implementation
272
public static final Ticks SYSTEM_TICKS;
273
274
// Constructor
275
public Stopwatch(Ticks ticks);
276
277
// Timing methods
278
public void start();
279
public void stop();
280
public long elapsedNanos();
281
}
282
```
283
284
#### Usage Examples
285
286
```java
287
// Basic stopwatch usage
288
Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);
289
290
stopwatch.start();
291
// ... perform operation to measure
292
performExpensiveOperation();
293
stopwatch.stop();
294
295
long elapsedNanos = stopwatch.elapsedNanos();
296
float elapsedMillis = StatsPlugin.nanosToMillis(elapsedNanos);
297
System.out.println("Operation took: " + elapsedMillis + "ms");
298
299
// Custom time source for testing
300
Stopwatch testStopwatch = new Stopwatch(new Stopwatch.Ticks() {
301
private long currentTime = 0;
302
303
@Override
304
public long ticks() {
305
return currentTime += 1000000; // Add 1ms per tick
306
}
307
});
308
309
testStopwatch.start();
310
// Simulated passage of time
311
testStopwatch.stop();
312
System.out.println("Test elapsed: " + testStopwatch.elapsedNanos() + "ns");
313
```
314
315
## Advanced Monitoring Examples
316
317
### Custom Statistics Collection
318
319
```java
320
public class CustomStatsPlugin extends RPCPlugin {
321
private final Map<String, FloatHistogram<String>> methodLatencies = new ConcurrentHashMap<>();
322
private final Map<String, IntegerHistogram<String>> methodPayloads = new ConcurrentHashMap<>();
323
private final Map<String, AtomicLong> methodCounts = new ConcurrentHashMap<>();
324
private final ThreadLocal<Stopwatch> requestStopwatch = new ThreadLocal<>();
325
326
@Override
327
public void serverReceiveRequest(RPCContext context) {
328
// Start timing
329
Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);
330
stopwatch.start();
331
requestStopwatch.set(stopwatch);
332
333
// Count method invocations
334
String methodName = context.getMessage().getName();
335
methodCounts.computeIfAbsent(methodName, k -> new AtomicLong()).incrementAndGet();
336
}
337
338
@Override
339
public void serverSendResponse(RPCContext context) {
340
// Stop timing and collect latency
341
Stopwatch stopwatch = requestStopwatch.get();
342
if (stopwatch != null) {
343
stopwatch.stop();
344
float latencyMs = StatsPlugin.nanosToMillis(stopwatch.elapsedNanos());
345
346
String methodName = context.getMessage().getName();
347
FloatHistogram<String> latencyHist = methodLatencies.computeIfAbsent(methodName,
348
k -> new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER));
349
latencyHist.add(latencyMs);
350
351
requestStopwatch.remove();
352
}
353
354
// Collect payload size
355
List<ByteBuffer> responsePayload = context.getResponsePayload();
356
if (responsePayload != null) {
357
int totalSize = responsePayload.stream()
358
.mapToInt(ByteBuffer::remaining)
359
.sum();
360
361
String methodName = context.getMessage().getName();
362
IntegerHistogram<String> payloadHist = methodPayloads.computeIfAbsent(methodName,
363
k -> new IntegerHistogram<>(StatsPlugin.PAYLOAD_SEGMENTER));
364
payloadHist.add(totalSize);
365
}
366
}
367
368
// Public methods to access collected statistics
369
public Map<String, FloatHistogram<String>> getMethodLatencies() {
370
return Collections.unmodifiableMap(methodLatencies);
371
}
372
373
public Map<String, AtomicLong> getMethodCounts() {
374
return Collections.unmodifiableMap(methodCounts);
375
}
376
377
public void printStatistics() {
378
System.out.println("=== Custom RPC Statistics ===");
379
380
for (Map.Entry<String, AtomicLong> entry : methodCounts.entrySet()) {
381
String method = entry.getKey();
382
long count = entry.getValue().get();
383
384
FloatHistogram<String> latencyHist = methodLatencies.get(method);
385
float avgLatency = latencyHist != null ? latencyHist.getMean() : 0;
386
387
System.out.printf("Method: %s, Calls: %d, Avg Latency: %.2fms%n",
388
method, count, avgLatency);
389
}
390
}
391
}
392
```
393
394
### JMX Integration
395
396
```java
397
public class JMXStatsPlugin extends RPCPlugin implements JMXStatsPluginMBean {
398
private final AtomicLong totalRequests = new AtomicLong();
399
private final AtomicLong errorCount = new AtomicLong();
400
private final FloatHistogram<String> latencyHistogram;
401
private final ThreadLocal<Stopwatch> requestTimer = new ThreadLocal<>();
402
403
public JMXStatsPlugin() {
404
this.latencyHistogram = new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER);
405
406
// Register with JMX
407
try {
408
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
409
ObjectName name = new ObjectName("org.apache.avro.ipc:type=Stats");
410
server.registerMBean(this, name);
411
} catch (Exception e) {
412
System.err.println("Failed to register JMX bean: " + e.getMessage());
413
}
414
}
415
416
@Override
417
public void serverReceiveRequest(RPCContext context) {
418
totalRequests.incrementAndGet();
419
Stopwatch timer = new Stopwatch(Stopwatch.SYSTEM_TICKS);
420
timer.start();
421
requestTimer.set(timer);
422
}
423
424
@Override
425
public void serverSendResponse(RPCContext context) {
426
Stopwatch timer = requestTimer.get();
427
if (timer != null) {
428
timer.stop();
429
float latencyMs = StatsPlugin.nanosToMillis(timer.elapsedNanos());
430
latencyHistogram.add(latencyMs);
431
requestTimer.remove();
432
}
433
434
if (context.isError()) {
435
errorCount.incrementAndGet();
436
}
437
}
438
439
// JMX interface methods
440
@Override
441
public long getTotalRequests() {
442
return totalRequests.get();
443
}
444
445
@Override
446
public long getErrorCount() {
447
return errorCount.get();
448
}
449
450
@Override
451
public double getAverageLatency() {
452
return latencyHistogram.getMean();
453
}
454
455
@Override
456
public double getErrorRate() {
457
long total = totalRequests.get();
458
return total > 0 ? (double) errorCount.get() / total : 0.0;
459
}
460
}
461
462
// JMX interface
463
public interface JMXStatsPluginMBean {
464
long getTotalRequests();
465
long getErrorCount();
466
double getAverageLatency();
467
double getErrorRate();
468
}
469
```
470
471
### Health Check Integration
472
473
```java
474
public class HealthCheckPlugin extends RPCPlugin {
475
private final AtomicReference<HealthStatus> healthStatus = new AtomicReference<>(HealthStatus.HEALTHY);
476
private final CircularBuffer<Long> recentLatencies = new CircularBuffer<>(100);
477
private final AtomicLong consecutiveErrors = new AtomicLong();
478
479
private static final float LATENCY_THRESHOLD_MS = 1000.0f;
480
private static final long ERROR_THRESHOLD = 5;
481
482
@Override
483
public void serverReceiveRequest(RPCContext context) {
484
// Reset consecutive errors on successful request receipt
485
if (healthStatus.get() == HealthStatus.DEGRADED) {
486
consecutiveErrors.set(0);
487
}
488
}
489
490
@Override
491
public void serverSendResponse(RPCContext context) {
492
if (context.isError()) {
493
long errors = consecutiveErrors.incrementAndGet();
494
if (errors >= ERROR_THRESHOLD) {
495
healthStatus.set(HealthStatus.UNHEALTHY);
496
}
497
} else {
498
consecutiveErrors.set(0);
499
500
// Check latency for health degradation
501
// (This would need actual latency measurement)
502
checkLatencyHealth();
503
}
504
}
505
506
private void checkLatencyHealth() {
507
if (recentLatencies.size() >= 10) {
508
double avgLatency = recentLatencies.stream()
509
.mapToLong(Long::longValue)
510
.average()
511
.orElse(0.0);
512
513
if (avgLatency > LATENCY_THRESHOLD_MS) {
514
healthStatus.set(HealthStatus.DEGRADED);
515
} else if (healthStatus.get() == HealthStatus.DEGRADED) {
516
healthStatus.set(HealthStatus.HEALTHY);
517
}
518
}
519
}
520
521
public HealthStatus getHealthStatus() {
522
return healthStatus.get();
523
}
524
525
public enum HealthStatus {
526
HEALTHY, DEGRADED, UNHEALTHY
527
}
528
}
529
```
530
531
## Performance Impact and Best Practices
532
533
### Statistics Collection Overhead
534
535
- `StatsPlugin` adds minimal overhead (< 1% typically)
536
- Histogram operations are O(1) for bucket assignment
537
- Memory usage scales with number of buckets and history size
538
- Web servlet generates HTML on-demand (no constant overhead)
539
540
### Optimization Guidelines
541
542
```java
543
// Good: Efficient statistics collection
544
StatsPlugin statsPlugin = new StatsPlugin();
545
// Uses default segmenters with reasonable bucket counts
546
547
// Good: Custom segmenter with appropriate bucket count
548
Segmenter<String, Float> efficientSegmenter = new Histogram.TreeMapSegmenter<>(
549
new Float[]{10.0f, 100.0f, 1000.0f}, // Only 4 buckets
550
new String[]{"<10ms", "10-100ms", "100ms-1s", ">1s"}
551
);
552
553
// Bad: Too many buckets
554
Segmenter<String, Float> inefficientSegmenter = new Histogram.TreeMapSegmenter<>(
555
new Float[]{1.0f, 2.0f, 3.0f, /* ... 100 boundaries ... */}, // 100+ buckets
556
new String[]{/* ... 100+ labels ... */}
557
);
558
559
// Good: Bounded history size (default MAX_HISTORY_SIZE = 20)
560
// Bad: Unbounded data collection that grows indefinitely
561
```
562
563
### Memory Management
564
565
```java
566
// Monitor histogram memory usage
567
public void printHistogramStats(Histogram<?, ?> histogram) {
568
System.out.println("Histogram buckets: " + histogram.getSegmenter().size());
569
System.out.println("Recent additions: " + histogram.getRecentAdditions().size());
570
System.out.println("Total count: " + histogram.getCount());
571
}
572
573
// Periodic cleanup for long-running applications
574
public class RotatingStatsPlugin extends StatsPlugin {
575
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
576
577
public RotatingStatsPlugin() {
578
super();
579
// Reset statistics every hour
580
scheduler.scheduleAtFixedRate(this::resetStatistics, 1, 1, TimeUnit.HOURS);
581
}
582
583
private void resetStatistics() {
584
// Reset internal histograms (implementation-specific)
585
System.out.println("Statistics reset at: " + new Date());
586
}
587
}
588
```