Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations
—
Apache Avro IPC provides built-in performance monitoring capabilities through histogram-based statistics collection, latency tracking, payload analysis, and web-based visualization.
The StatsPlugin automatically collects comprehensive RPC performance metrics including call counts, latency distributions, and payload size analysis.
public class StatsPlugin extends RPCPlugin {
// Constructors
public StatsPlugin();
public StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter, Segmenter<?, Integer> integerSegmenter);
// Server startup tracking
public Date startupTime;
// Default segmenters for bucketing metrics
public static final Segmenter<String, Float> LATENCY_SEGMENTER;
public static final Segmenter<String, Integer> PAYLOAD_SEGMENTER;
// Utility methods
public static float nanosToMillis(long elapsedNanos);
// Inherited plugin methods for metric collection
public void clientStartConnect(RPCContext context);
public void clientFinishConnect(RPCContext context);
public void clientSendRequest(RPCContext context);
public void clientReceiveResponse(RPCContext context);
public void serverConnecting(RPCContext context);
public void serverReceiveRequest(RPCContext context);
public void serverSendResponse(RPCContext context);
}// Basic statistics collection
StatsPlugin statsPlugin = new StatsPlugin();
// Add to requestor and responder
requestor.addRPCPlugin(statsPlugin);
responder.addRPCPlugin(statsPlugin);
// Statistics are automatically collected for all RPC calls
MyService client = SpecificRequestor.getClient(MyService.class, transceiver);
String result = client.processData("test data"); // Metrics collected automatically
// Access startup time
System.out.println("Server started at: " + statsPlugin.startupTime);
// Custom segmenters for specialized bucketing
Segmenter<String, Float> customLatencySegmenter = new Segmenter<String, Float>() {
@Override
public int size() { return 5; }
@Override
public int segment(Float value) {
if (value < 10) return 0; // < 10ms
if (value < 50) return 1; // 10-50ms
if (value < 200) return 2; // 50-200ms
if (value < 1000) return 3; // 200ms-1s
return 4; // > 1s
}
@Override
public Iterator<String> getBuckets() {
return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s").iterator();
}
@Override
public List<String> getBoundaryLabels() {
return Arrays.asList("10", "50", "200", "1000");
}
@Override
public List<String> getBucketLabels() {
return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s");
}
};
StatsPlugin customStatsPlugin = new StatsPlugin(StatsPlugin.SYSTEM_TICKS,
customLatencySegmenter, StatsPlugin.PAYLOAD_SEGMENTER);The StatsServlet provides a web interface for viewing collected statistics with histograms, summaries, and real-time metrics.
public class StatsServlet extends HttpServlet {
// Constructor
public StatsServlet(StatsPlugin statsPlugin);
// Web interface methods
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException;
public void writeStats(Writer w) throws IOException;
// Utility methods
protected static List<String> escapeStringArray(List<String> input);
// Inner classes for rendering
public static class RenderableMessage {
// Public fields and methods for Velocity template access
}
}// Web-based statistics viewer
StatsPlugin statsPlugin = new StatsPlugin();
StatsServlet statsServlet = new StatsServlet(statsPlugin);
// Deploy to servlet container (example with Jetty)
ServletContextHandler context = new ServletContextHandler();
context.addServlet(new ServletHolder(statsServlet), "/stats");
// Access statistics via HTTP
// GET http://localhost:8080/stats
// Returns HTML page with histograms and metrics
// Programmatic statistics access
StringWriter writer = new StringWriter();
statsServlet.writeStats(writer);
String statsHtml = writer.toString();
System.out.println(statsHtml);Generic histogram implementation for collecting and analyzing metric distributions.
public class Histogram<B,T> {
public static final int MAX_HISTORY_SIZE = 20;
// Constructor
public Histogram(Segmenter<B,T> segmenter);
// Data collection
public void add(T value);
// Data access
public int[] getHistogram();
public Segmenter<B,T> getSegmenter();
public List<T> getRecentAdditions();
public int getCount();
public Iterable<Entry<B>> entries();
// Inner interfaces and classes
public interface Segmenter<B,T> {
int size();
int segment(T value);
Iterator<B> getBuckets();
List<String> getBoundaryLabels();
List<String> getBucketLabels();
}
public static class SegmenterException extends RuntimeException {
public SegmenterException(String message);
public SegmenterException(String message, Throwable cause);
}
public static class TreeMapSegmenter<T extends Comparable<T>> implements Segmenter<String,T> {
public TreeMapSegmenter(T[] boundaries, String[] labels);
// Implementation of Segmenter interface
}
public static class Entry<B> {
public B bucket;
public int count;
// Constructor and methods
}
}public class FloatHistogram<B> extends Histogram<B, Float> {
// Constructor
public FloatHistogram(Segmenter<B,Float> segmenter);
// Statistical calculations
public float getMean();
public float getUnbiasedStdDev();
// Inherited methods from Histogram
public void add(Float value);
public int getCount();
public int[] getHistogram();
}public class IntegerHistogram<B> extends Histogram<B, Integer> {
// Constructor
public IntegerHistogram(Segmenter<B,Integer> segmenter);
// Statistical calculations
public float getMean();
public float getUnbiasedStdDev();
// Inherited methods from Histogram
public void add(Integer value);
public int getCount();
public int[] getHistogram();
}// Latency histogram
Segmenter<String, Float> latencySegmenter = new Histogram.TreeMapSegmenter<>(
new Float[]{10.0f, 50.0f, 200.0f, 1000.0f},
new String[]{"<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s"}
);
FloatHistogram<String> latencyHistogram = new FloatHistogram<>(latencySegmenter);
// Collect latency data
latencyHistogram.add(15.5f); // 10-50ms bucket
latencyHistogram.add(75.2f); // 50-200ms bucket
latencyHistogram.add(5.1f); // <10ms bucket
// Analyze statistics
System.out.println("Mean latency: " + latencyHistogram.getMean() + "ms");
System.out.println("Std deviation: " + latencyHistogram.getUnbiasedStdDev() + "ms");
System.out.println("Total samples: " + latencyHistogram.getCount());
// Get histogram distribution
int[] bucketCounts = latencyHistogram.getHistogram();
for (int i = 0; i < bucketCounts.length; i++) {
System.out.println("Bucket " + i + ": " + bucketCounts[i] + " samples");
}
// Payload size histogram
Segmenter<String, Integer> payloadSegmenter = new Histogram.TreeMapSegmenter<>(
new Integer[]{1024, 10240, 102400, 1048576},
new String[]{"<1KB", "1-10KB", "10-100KB", "100KB-1MB", ">1MB"}
);
IntegerHistogram<String> payloadHistogram = new IntegerHistogram<>(payloadSegmenter);
payloadHistogram.add(2048); // 1-10KB bucket
payloadHistogram.add(512); // <1KB bucket
payloadHistogram.add(204800); // 100KB-1MB bucketPrecise time measurement for performance tracking and latency analysis.
public class Stopwatch {
// Time source interface
public interface Ticks {
long ticks();
}
// System time implementation
public static final Ticks SYSTEM_TICKS;
// Constructor
public Stopwatch(Ticks ticks);
// Timing methods
public void start();
public void stop();
public long elapsedNanos();
}// Basic stopwatch usage
Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);
stopwatch.start();
// ... perform operation to measure
performExpensiveOperation();
stopwatch.stop();
long elapsedNanos = stopwatch.elapsedNanos();
float elapsedMillis = StatsPlugin.nanosToMillis(elapsedNanos);
System.out.println("Operation took: " + elapsedMillis + "ms");
// Custom time source for testing
Stopwatch testStopwatch = new Stopwatch(new Stopwatch.Ticks() {
private long currentTime = 0;
@Override
public long ticks() {
return currentTime += 1000000; // Add 1ms per tick
}
});
testStopwatch.start();
// Simulated passage of time
testStopwatch.stop();
System.out.println("Test elapsed: " + testStopwatch.elapsedNanos() + "ns");public class CustomStatsPlugin extends RPCPlugin {
private final Map<String, FloatHistogram<String>> methodLatencies = new ConcurrentHashMap<>();
private final Map<String, IntegerHistogram<String>> methodPayloads = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> methodCounts = new ConcurrentHashMap<>();
private final ThreadLocal<Stopwatch> requestStopwatch = new ThreadLocal<>();
@Override
public void serverReceiveRequest(RPCContext context) {
// Start timing
Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);
stopwatch.start();
requestStopwatch.set(stopwatch);
// Count method invocations
String methodName = context.getMessage().getName();
methodCounts.computeIfAbsent(methodName, k -> new AtomicLong()).incrementAndGet();
}
@Override
public void serverSendResponse(RPCContext context) {
// Stop timing and collect latency
Stopwatch stopwatch = requestStopwatch.get();
if (stopwatch != null) {
stopwatch.stop();
float latencyMs = StatsPlugin.nanosToMillis(stopwatch.elapsedNanos());
String methodName = context.getMessage().getName();
FloatHistogram<String> latencyHist = methodLatencies.computeIfAbsent(methodName,
k -> new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER));
latencyHist.add(latencyMs);
requestStopwatch.remove();
}
// Collect payload size
List<ByteBuffer> responsePayload = context.getResponsePayload();
if (responsePayload != null) {
int totalSize = responsePayload.stream()
.mapToInt(ByteBuffer::remaining)
.sum();
String methodName = context.getMessage().getName();
IntegerHistogram<String> payloadHist = methodPayloads.computeIfAbsent(methodName,
k -> new IntegerHistogram<>(StatsPlugin.PAYLOAD_SEGMENTER));
payloadHist.add(totalSize);
}
}
// Public methods to access collected statistics
public Map<String, FloatHistogram<String>> getMethodLatencies() {
return Collections.unmodifiableMap(methodLatencies);
}
public Map<String, AtomicLong> getMethodCounts() {
return Collections.unmodifiableMap(methodCounts);
}
public void printStatistics() {
System.out.println("=== Custom RPC Statistics ===");
for (Map.Entry<String, AtomicLong> entry : methodCounts.entrySet()) {
String method = entry.getKey();
long count = entry.getValue().get();
FloatHistogram<String> latencyHist = methodLatencies.get(method);
float avgLatency = latencyHist != null ? latencyHist.getMean() : 0;
System.out.printf("Method: %s, Calls: %d, Avg Latency: %.2fms%n",
method, count, avgLatency);
}
}
}public class JMXStatsPlugin extends RPCPlugin implements JMXStatsPluginMBean {
private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong errorCount = new AtomicLong();
private final FloatHistogram<String> latencyHistogram;
private final ThreadLocal<Stopwatch> requestTimer = new ThreadLocal<>();
public JMXStatsPlugin() {
this.latencyHistogram = new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER);
// Register with JMX
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.apache.avro.ipc:type=Stats");
server.registerMBean(this, name);
} catch (Exception e) {
System.err.println("Failed to register JMX bean: " + e.getMessage());
}
}
@Override
public void serverReceiveRequest(RPCContext context) {
totalRequests.incrementAndGet();
Stopwatch timer = new Stopwatch(Stopwatch.SYSTEM_TICKS);
timer.start();
requestTimer.set(timer);
}
@Override
public void serverSendResponse(RPCContext context) {
Stopwatch timer = requestTimer.get();
if (timer != null) {
timer.stop();
float latencyMs = StatsPlugin.nanosToMillis(timer.elapsedNanos());
latencyHistogram.add(latencyMs);
requestTimer.remove();
}
if (context.isError()) {
errorCount.incrementAndGet();
}
}
// JMX interface methods
@Override
public long getTotalRequests() {
return totalRequests.get();
}
@Override
public long getErrorCount() {
return errorCount.get();
}
@Override
public double getAverageLatency() {
return latencyHistogram.getMean();
}
@Override
public double getErrorRate() {
long total = totalRequests.get();
return total > 0 ? (double) errorCount.get() / total : 0.0;
}
}
// JMX interface
public interface JMXStatsPluginMBean {
long getTotalRequests();
long getErrorCount();
double getAverageLatency();
double getErrorRate();
}public class HealthCheckPlugin extends RPCPlugin {
private final AtomicReference<HealthStatus> healthStatus = new AtomicReference<>(HealthStatus.HEALTHY);
private final CircularBuffer<Long> recentLatencies = new CircularBuffer<>(100);
private final AtomicLong consecutiveErrors = new AtomicLong();
private static final float LATENCY_THRESHOLD_MS = 1000.0f;
private static final long ERROR_THRESHOLD = 5;
@Override
public void serverReceiveRequest(RPCContext context) {
// Reset consecutive errors on successful request receipt
if (healthStatus.get() == HealthStatus.DEGRADED) {
consecutiveErrors.set(0);
}
}
@Override
public void serverSendResponse(RPCContext context) {
if (context.isError()) {
long errors = consecutiveErrors.incrementAndGet();
if (errors >= ERROR_THRESHOLD) {
healthStatus.set(HealthStatus.UNHEALTHY);
}
} else {
consecutiveErrors.set(0);
// Check latency for health degradation
// (This would need actual latency measurement)
checkLatencyHealth();
}
}
private void checkLatencyHealth() {
if (recentLatencies.size() >= 10) {
double avgLatency = recentLatencies.stream()
.mapToLong(Long::longValue)
.average()
.orElse(0.0);
if (avgLatency > LATENCY_THRESHOLD_MS) {
healthStatus.set(HealthStatus.DEGRADED);
} else if (healthStatus.get() == HealthStatus.DEGRADED) {
healthStatus.set(HealthStatus.HEALTHY);
}
}
}
public HealthStatus getHealthStatus() {
return healthStatus.get();
}
public enum HealthStatus {
HEALTHY, DEGRADED, UNHEALTHY
}
}StatsPlugin adds minimal overhead (< 1% typically)// Good: Efficient statistics collection
StatsPlugin statsPlugin = new StatsPlugin();
// Uses default segmenters with reasonable bucket counts
// Good: Custom segmenter with appropriate bucket count
Segmenter<String, Float> efficientSegmenter = new Histogram.TreeMapSegmenter<>(
new Float[]{10.0f, 100.0f, 1000.0f}, // Only 4 buckets
new String[]{"<10ms", "10-100ms", "100ms-1s", ">1s"}
);
// Bad: Too many buckets
Segmenter<String, Float> inefficientSegmenter = new Histogram.TreeMapSegmenter<>(
new Float[]{1.0f, 2.0f, 3.0f, /* ... 100 boundaries ... */}, // 100+ buckets
new String[]{/* ... 100+ labels ... */}
);
// Good: Bounded history size (default MAX_HISTORY_SIZE = 20)
// Bad: Unbounded data collection that grows indefinitely// Monitor histogram memory usage
public void printHistogramStats(Histogram<?, ?> histogram) {
System.out.println("Histogram buckets: " + histogram.getSegmenter().size());
System.out.println("Recent additions: " + histogram.getRecentAdditions().size());
System.out.println("Total count: " + histogram.getCount());
}
// Periodic cleanup for long-running applications
public class RotatingStatsPlugin extends StatsPlugin {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public RotatingStatsPlugin() {
super();
// Reset statistics every hour
scheduler.scheduleAtFixedRate(this::resetStatistics, 1, 1, TimeUnit.HOURS);
}
private void resetStatistics() {
// Reset internal histograms (implementation-specific)
System.out.println("Statistics reset at: " + new Date());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-ipc