CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-ipc

Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations

Pending
Overview
Eval results
Files

stats.mddocs/

Statistics and Monitoring

Apache Avro IPC provides built-in performance monitoring capabilities through histogram-based statistics collection, latency tracking, payload analysis, and web-based visualization.

Capabilities

Statistics Collection Plugin

The StatsPlugin automatically collects comprehensive RPC performance metrics including call counts, latency distributions, and payload size analysis.

public class StatsPlugin extends RPCPlugin {
    // Constructors
    public StatsPlugin();
    public StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter, Segmenter<?, Integer> integerSegmenter);
    
    // Server startup tracking
    public Date startupTime;
    
    // Default segmenters for bucketing metrics
    public static final Segmenter<String, Float> LATENCY_SEGMENTER;
    public static final Segmenter<String, Integer> PAYLOAD_SEGMENTER;
    
    // Utility methods
    public static float nanosToMillis(long elapsedNanos);
    
    // Inherited plugin methods for metric collection
    public void clientStartConnect(RPCContext context);
    public void clientFinishConnect(RPCContext context);
    public void clientSendRequest(RPCContext context);
    public void clientReceiveResponse(RPCContext context);
    public void serverConnecting(RPCContext context);
    public void serverReceiveRequest(RPCContext context);
    public void serverSendResponse(RPCContext context);
}

Usage Examples

// Basic statistics collection
StatsPlugin statsPlugin = new StatsPlugin();

// Add to requestor and responder
requestor.addRPCPlugin(statsPlugin);
responder.addRPCPlugin(statsPlugin);

// Statistics are automatically collected for all RPC calls
MyService client = SpecificRequestor.getClient(MyService.class, transceiver);
String result = client.processData("test data"); // Metrics collected automatically

// Access startup time
System.out.println("Server started at: " + statsPlugin.startupTime);

// Custom segmenters for specialized bucketing
Segmenter<String, Float> customLatencySegmenter = new Segmenter<String, Float>() {
    @Override
    public int size() { return 5; }
    
    @Override
    public int segment(Float value) {
        if (value < 10) return 0;      // < 10ms
        if (value < 50) return 1;      // 10-50ms
        if (value < 200) return 2;     // 50-200ms
        if (value < 1000) return 3;    // 200ms-1s
        return 4;                      // > 1s
    }
    
    @Override
    public Iterator<String> getBuckets() {
        return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s").iterator();
    }
    
    @Override
    public List<String> getBoundaryLabels() {
        return Arrays.asList("10", "50", "200", "1000");
    }
    
    @Override
    public List<String> getBucketLabels() {
        return Arrays.asList("<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s");
    }
};

StatsPlugin customStatsPlugin = new StatsPlugin(StatsPlugin.SYSTEM_TICKS, 
    customLatencySegmenter, StatsPlugin.PAYLOAD_SEGMENTER);

Web-Based Statistics Viewer

The StatsServlet provides a web interface for viewing collected statistics with histograms, summaries, and real-time metrics.

public class StatsServlet extends HttpServlet {
    // Constructor
    public StatsServlet(StatsPlugin statsPlugin);
    
    // Web interface methods
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException;
    public void writeStats(Writer w) throws IOException;
    
    // Utility methods
    protected static List<String> escapeStringArray(List<String> input);
    
    // Inner classes for rendering
    public static class RenderableMessage {
        // Public fields and methods for Velocity template access
    }
}

Usage Examples

// Web-based statistics viewer
StatsPlugin statsPlugin = new StatsPlugin();
StatsServlet statsServlet = new StatsServlet(statsPlugin);

// Deploy to servlet container (example with Jetty)
ServletContextHandler context = new ServletContextHandler();
context.addServlet(new ServletHolder(statsServlet), "/stats");

// Access statistics via HTTP
// GET http://localhost:8080/stats
// Returns HTML page with histograms and metrics

// Programmatic statistics access
StringWriter writer = new StringWriter();
statsServlet.writeStats(writer);
String statsHtml = writer.toString();
System.out.println(statsHtml);

Histogram Data Structures

Generic histogram implementation for collecting and analyzing metric distributions.

Base Histogram Class

public class Histogram<B,T> {
    public static final int MAX_HISTORY_SIZE = 20;
    
    // Constructor
    public Histogram(Segmenter<B,T> segmenter);
    
    // Data collection
    public void add(T value);
    
    // Data access
    public int[] getHistogram();
    public Segmenter<B,T> getSegmenter();
    public List<T> getRecentAdditions();
    public int getCount();
    public Iterable<Entry<B>> entries();
    
    // Inner interfaces and classes
    public interface Segmenter<B,T> {
        int size();
        int segment(T value);
        Iterator<B> getBuckets();
        List<String> getBoundaryLabels();
        List<String> getBucketLabels();
    }
    
    public static class SegmenterException extends RuntimeException {
        public SegmenterException(String message);
        public SegmenterException(String message, Throwable cause);
    }
    
    public static class TreeMapSegmenter<T extends Comparable<T>> implements Segmenter<String,T> {
        public TreeMapSegmenter(T[] boundaries, String[] labels);
        // Implementation of Segmenter interface
    }
    
    public static class Entry<B> {
        public B bucket;
        public int count;
        // Constructor and methods
    }
}

Float Histogram with Statistics

public class FloatHistogram<B> extends Histogram<B, Float> {
    // Constructor
    public FloatHistogram(Segmenter<B,Float> segmenter);
    
    // Statistical calculations
    public float getMean();
    public float getUnbiasedStdDev();
    
    // Inherited methods from Histogram
    public void add(Float value);
    public int getCount();
    public int[] getHistogram();
}

Integer Histogram with Statistics

public class IntegerHistogram<B> extends Histogram<B, Integer> {
    // Constructor
    public IntegerHistogram(Segmenter<B,Integer> segmenter);
    
    // Statistical calculations
    public float getMean();
    public float getUnbiasedStdDev();
    
    // Inherited methods from Histogram
    public void add(Integer value);
    public int getCount();
    public int[] getHistogram();
}

Usage Examples

// Latency histogram
Segmenter<String, Float> latencySegmenter = new Histogram.TreeMapSegmenter<>(
    new Float[]{10.0f, 50.0f, 200.0f, 1000.0f},
    new String[]{"<10ms", "10-50ms", "50-200ms", "200ms-1s", ">1s"}
);

FloatHistogram<String> latencyHistogram = new FloatHistogram<>(latencySegmenter);

// Collect latency data
latencyHistogram.add(15.5f);  // 10-50ms bucket
latencyHistogram.add(75.2f);  // 50-200ms bucket
latencyHistogram.add(5.1f);   // <10ms bucket

// Analyze statistics
System.out.println("Mean latency: " + latencyHistogram.getMean() + "ms");
System.out.println("Std deviation: " + latencyHistogram.getUnbiasedStdDev() + "ms");
System.out.println("Total samples: " + latencyHistogram.getCount());

// Get histogram distribution
int[] bucketCounts = latencyHistogram.getHistogram();
for (int i = 0; i < bucketCounts.length; i++) {
    System.out.println("Bucket " + i + ": " + bucketCounts[i] + " samples");
}

// Payload size histogram
Segmenter<String, Integer> payloadSegmenter = new Histogram.TreeMapSegmenter<>(
    new Integer[]{1024, 10240, 102400, 1048576},
    new String[]{"<1KB", "1-10KB", "10-100KB", "100KB-1MB", ">1MB"}
);

IntegerHistogram<String> payloadHistogram = new IntegerHistogram<>(payloadSegmenter);
payloadHistogram.add(2048);    // 1-10KB bucket  
payloadHistogram.add(512);     // <1KB bucket
payloadHistogram.add(204800);  // 100KB-1MB bucket

Time Measurement Utilities

Precise time measurement for performance tracking and latency analysis.

Stopwatch Class

public class Stopwatch {
    // Time source interface
    public interface Ticks {
        long ticks();
    }
    
    // System time implementation
    public static final Ticks SYSTEM_TICKS;
    
    // Constructor
    public Stopwatch(Ticks ticks);
    
    // Timing methods
    public void start();
    public void stop();
    public long elapsedNanos();
}

Usage Examples

// Basic stopwatch usage
Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);

stopwatch.start();
// ... perform operation to measure
performExpensiveOperation();
stopwatch.stop();

long elapsedNanos = stopwatch.elapsedNanos();
float elapsedMillis = StatsPlugin.nanosToMillis(elapsedNanos);
System.out.println("Operation took: " + elapsedMillis + "ms");

// Custom time source for testing
Stopwatch testStopwatch = new Stopwatch(new Stopwatch.Ticks() {
    private long currentTime = 0;
    
    @Override
    public long ticks() {
        return currentTime += 1000000; // Add 1ms per tick
    }
});

testStopwatch.start();
// Simulated passage of time
testStopwatch.stop();
System.out.println("Test elapsed: " + testStopwatch.elapsedNanos() + "ns");

Advanced Monitoring Examples

Custom Statistics Collection

public class CustomStatsPlugin extends RPCPlugin {
    private final Map<String, FloatHistogram<String>> methodLatencies = new ConcurrentHashMap<>();
    private final Map<String, IntegerHistogram<String>> methodPayloads = new ConcurrentHashMap<>();
    private final Map<String, AtomicLong> methodCounts = new ConcurrentHashMap<>();
    private final ThreadLocal<Stopwatch> requestStopwatch = new ThreadLocal<>();
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Start timing
        Stopwatch stopwatch = new Stopwatch(Stopwatch.SYSTEM_TICKS);
        stopwatch.start();
        requestStopwatch.set(stopwatch);
        
        // Count method invocations
        String methodName = context.getMessage().getName();
        methodCounts.computeIfAbsent(methodName, k -> new AtomicLong()).incrementAndGet();
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        // Stop timing and collect latency
        Stopwatch stopwatch = requestStopwatch.get();
        if (stopwatch != null) {
            stopwatch.stop();
            float latencyMs = StatsPlugin.nanosToMillis(stopwatch.elapsedNanos());
            
            String methodName = context.getMessage().getName();
            FloatHistogram<String> latencyHist = methodLatencies.computeIfAbsent(methodName,
                k -> new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER));
            latencyHist.add(latencyMs);
            
            requestStopwatch.remove();
        }
        
        // Collect payload size
        List<ByteBuffer> responsePayload = context.getResponsePayload();
        if (responsePayload != null) {
            int totalSize = responsePayload.stream()
                .mapToInt(ByteBuffer::remaining)
                .sum();
            
            String methodName = context.getMessage().getName();
            IntegerHistogram<String> payloadHist = methodPayloads.computeIfAbsent(methodName,
                k -> new IntegerHistogram<>(StatsPlugin.PAYLOAD_SEGMENTER));
            payloadHist.add(totalSize);
        }
    }
    
    // Public methods to access collected statistics
    public Map<String, FloatHistogram<String>> getMethodLatencies() {
        return Collections.unmodifiableMap(methodLatencies);
    }
    
    public Map<String, AtomicLong> getMethodCounts() {
        return Collections.unmodifiableMap(methodCounts);
    }
    
    public void printStatistics() {
        System.out.println("=== Custom RPC Statistics ===");
        
        for (Map.Entry<String, AtomicLong> entry : methodCounts.entrySet()) {
            String method = entry.getKey();
            long count = entry.getValue().get();
            
            FloatHistogram<String> latencyHist = methodLatencies.get(method);
            float avgLatency = latencyHist != null ? latencyHist.getMean() : 0;
            
            System.out.printf("Method: %s, Calls: %d, Avg Latency: %.2fms%n", 
                method, count, avgLatency);
        }
    }
}

JMX Integration

public class JMXStatsPlugin extends RPCPlugin implements JMXStatsPluginMBean {
    private final AtomicLong totalRequests = new AtomicLong();
    private final AtomicLong errorCount = new AtomicLong();
    private final FloatHistogram<String> latencyHistogram;
    private final ThreadLocal<Stopwatch> requestTimer = new ThreadLocal<>();
    
    public JMXStatsPlugin() {
        this.latencyHistogram = new FloatHistogram<>(StatsPlugin.LATENCY_SEGMENTER);
        
        // Register with JMX
        try {
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            ObjectName name = new ObjectName("org.apache.avro.ipc:type=Stats");
            server.registerMBean(this, name);
        } catch (Exception e) {
            System.err.println("Failed to register JMX bean: " + e.getMessage());
        }
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        totalRequests.incrementAndGet();
        Stopwatch timer = new Stopwatch(Stopwatch.SYSTEM_TICKS);
        timer.start();
        requestTimer.set(timer);
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        Stopwatch timer = requestTimer.get();
        if (timer != null) {
            timer.stop();
            float latencyMs = StatsPlugin.nanosToMillis(timer.elapsedNanos());
            latencyHistogram.add(latencyMs);
            requestTimer.remove();
        }
        
        if (context.isError()) {
            errorCount.incrementAndGet();
        }
    }
    
    // JMX interface methods
    @Override
    public long getTotalRequests() {
        return totalRequests.get();
    }
    
    @Override
    public long getErrorCount() {
        return errorCount.get();
    }
    
    @Override
    public double getAverageLatency() {
        return latencyHistogram.getMean();
    }
    
    @Override
    public double getErrorRate() {
        long total = totalRequests.get();
        return total > 0 ? (double) errorCount.get() / total : 0.0;
    }
}

// JMX interface
public interface JMXStatsPluginMBean {
    long getTotalRequests();
    long getErrorCount();
    double getAverageLatency();
    double getErrorRate();
}

Health Check Integration

public class HealthCheckPlugin extends RPCPlugin {
    private final AtomicReference<HealthStatus> healthStatus = new AtomicReference<>(HealthStatus.HEALTHY);
    private final CircularBuffer<Long> recentLatencies = new CircularBuffer<>(100);
    private final AtomicLong consecutiveErrors = new AtomicLong();
    
    private static final float LATENCY_THRESHOLD_MS = 1000.0f;
    private static final long ERROR_THRESHOLD = 5;
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Reset consecutive errors on successful request receipt
        if (healthStatus.get() == HealthStatus.DEGRADED) {
            consecutiveErrors.set(0);
        }
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        if (context.isError()) {
            long errors = consecutiveErrors.incrementAndGet();
            if (errors >= ERROR_THRESHOLD) {
                healthStatus.set(HealthStatus.UNHEALTHY);
            }
        } else {
            consecutiveErrors.set(0);
            
            // Check latency for health degradation
            // (This would need actual latency measurement)
            checkLatencyHealth();
        }
    }
    
    private void checkLatencyHealth() {
        if (recentLatencies.size() >= 10) {
            double avgLatency = recentLatencies.stream()
                .mapToLong(Long::longValue)
                .average()
                .orElse(0.0);
            
            if (avgLatency > LATENCY_THRESHOLD_MS) {
                healthStatus.set(HealthStatus.DEGRADED);
            } else if (healthStatus.get() == HealthStatus.DEGRADED) {
                healthStatus.set(HealthStatus.HEALTHY);
            }
        }
    }
    
    public HealthStatus getHealthStatus() {
        return healthStatus.get();
    }
    
    public enum HealthStatus {
        HEALTHY, DEGRADED, UNHEALTHY
    }
}

Performance Impact and Best Practices

Statistics Collection Overhead

  • StatsPlugin adds minimal overhead (< 1% typically)
  • Histogram operations are O(1) for bucket assignment
  • Memory usage scales with number of buckets and history size
  • Web servlet generates HTML on-demand (no constant overhead)

Optimization Guidelines

// Good: Efficient statistics collection
StatsPlugin statsPlugin = new StatsPlugin();
// Uses default segmenters with reasonable bucket counts

// Good: Custom segmenter with appropriate bucket count
Segmenter<String, Float> efficientSegmenter = new Histogram.TreeMapSegmenter<>(
    new Float[]{10.0f, 100.0f, 1000.0f},  // Only 4 buckets
    new String[]{"<10ms", "10-100ms", "100ms-1s", ">1s"}
);

// Bad: Too many buckets
Segmenter<String, Float> inefficientSegmenter = new Histogram.TreeMapSegmenter<>(
    new Float[]{1.0f, 2.0f, 3.0f, /* ... 100 boundaries ... */},  // 100+ buckets
    new String[]{/* ... 100+ labels ... */}
);

// Good: Bounded history size (default MAX_HISTORY_SIZE = 20)
// Bad: Unbounded data collection that grows indefinitely

Memory Management

// Monitor histogram memory usage
public void printHistogramStats(Histogram<?, ?> histogram) {
    System.out.println("Histogram buckets: " + histogram.getSegmenter().size());
    System.out.println("Recent additions: " + histogram.getRecentAdditions().size());
    System.out.println("Total count: " + histogram.getCount());
}

// Periodic cleanup for long-running applications
public class RotatingStatsPlugin extends StatsPlugin {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    public RotatingStatsPlugin() {
        super();
        // Reset statistics every hour
        scheduler.scheduleAtFixedRate(this::resetStatistics, 1, 1, TimeUnit.HOURS);
    }
    
    private void resetStatistics() {
        // Reset internal histograms (implementation-specific)
        System.out.println("Statistics reset at: " + new Date());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-ipc

docs

async.md

core-rpc.md

index.md

plugins.md

protocols.md

stats.md

transports.md

tile.json