CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-wso2-siddhi--siddhi-core

Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.

Overview
Eval results
Files

queries-and-callbacks.mddocs/

Queries and Callbacks

Query processing and callback handling provide mechanisms for executing streaming SQL queries and receiving processed results. This includes stream callbacks for output events, query callbacks for query-specific results, and store queries for on-demand data retrieval.

Stream Callbacks

StreamCallback

Base class for receiving events from streams. Must be extended by users to handle output events from stream processing operations.

public abstract class StreamCallback {
    // Stream Configuration
    public String getStreamId();
    public void setStreamId(String streamId);
    public AbstractDefinition getStreamDefinition();
    public void setStreamDefinition(AbstractDefinition streamDefinition);
    public void setContext(SiddhiAppContext siddhiAppContext);
    
    // Event Processing (Abstract - Must Implement)
    public abstract void receive(Event[] events);
    
    // Lifecycle Management
    public void startProcessing();
    public void stopProcessing();
    
    // Utility Methods
    public Map<String, Object> toMap(Event event);
    public Map<String, Object>[] toMap(Event[] events);
}

Usage Example

// Create custom stream callback
StreamCallback stockCallback = new StreamCallback() {
    @Override
    public void receive(Event[] events) {
        for (Event event : events) {
            String symbol = (String) event.getData(0);
            Double price = (Double) event.getData(1);
            Long volume = (Long) event.getData(2);
            
            System.out.println("Received: " + symbol + " @ " + price + " vol: " + volume);
            
            // Convert to map for easier access
            Map<String, Object> eventMap = toMap(event);
            processStockData(eventMap);
        }
    }
    
    private void processStockData(Map<String, Object> data) {
        // Custom processing logic
        if ((Double) data.get("price") > 150.0) {
            alertHighPrice((String) data.get("symbol"), (Double) data.get("price"));
        }
    }
};

// Register callback with runtime
siddhiAppRuntime.addCallback("HighPriceStocks", stockCallback);

Query Callbacks

QueryCallback

Base class for receiving results from Siddhi queries. Provides access to both incoming and removed events from query processing.

public abstract class QueryCallback {
    // Query Configuration
    public void setQuery(Query query);
    public void setContext(SiddhiAppContext siddhiAppContext);
    
    // Event Processing (Abstract - Must Implement)
    public abstract void receive(long timestamp, Event[] inEvents, Event[] removeEvents);
    
    // Lifecycle Management
    public void startProcessing();
    public void stopProcessing();
}

Usage Example

// Create query-specific callback
QueryCallback avgPriceCallback = new QueryCallback() {
    @Override
    public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
        // Handle incoming events
        if (inEvents != null) {
            for (Event event : inEvents) {
                String symbol = (String) event.getData(0);
                Double avgPrice = (Double) event.getData(1);
                Long count = (Long) event.getData(2);
                
                System.out.println("Average price for " + symbol + ": " + avgPrice + 
                                 " (based on " + count + " events)");
                updateDashboard(symbol, avgPrice, count);
            }
        }
        
        // Handle removed events (for window-based queries)
        if (removeEvents != null) {
            for (Event event : removeEvents) {
                String symbol = (String) event.getData(0);
                System.out.println("Removing old average for: " + symbol);
                cleanupOldData(symbol);
            }
        }
    }
};

// Register callback with specific query
siddhiAppRuntime.addCallback("avgPriceQuery", avgPriceCallback);

Store Queries

Store queries provide on-demand querying capabilities for tables, windows, and aggregations within Siddhi applications.

public class SiddhiAppRuntime {
    // Execute store queries
    public Event[] query(String storeQuery);
    public Event[] query(StoreQuery storeQuery);
    
    // Query metadata
    public Attribute[] getStoreQueryOutputAttributes(String storeQuery);
}

Usage Examples

// Simple store query
String query = "from StockTable select symbol, price";
Event[] results = siddhiAppRuntime.query(query);

for (Event event : results) {
    System.out.println("Symbol: " + event.getData(0) + ", Price: " + event.getData(1));
}

// Parameterized store query
String paramQuery = "from StockTable on symbol == 'IBM' select *";
Event[] ibmResults = siddhiAppRuntime.query(paramQuery);

// Complex aggregation query
String aggQuery = "from StockAggregation " +
                 "within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +
                 "per 'day' " +
                 "select symbol, avg(price) as avgPrice, sum(volume) as totalVolume";
Event[] aggResults = siddhiAppRuntime.query(aggQuery);

// Get query output attributes
Attribute[] attributes = siddhiAppRuntime.getStoreQueryOutputAttributes(query);
for (Attribute attr : attributes) {
    System.out.println("Attribute: " + attr.getName() + " Type: " + attr.getType());
}

// Window-based query
String windowQuery = "from StockWindow select symbol, price order by price desc limit 10";
Event[] topStocks = siddhiAppRuntime.query(windowQuery);

Processing Chain

Processor

Parent interface for all event processors in Siddhi execution chain, enabling custom processing logic.

public interface Processor {
    // Event Processing
    void process(ComplexEventChunk complexEventChunk);
    
    // Processor Chain Management
    Processor getNextProcessor();
    void setNextProcessor(Processor processor);
    void setToLast(Processor processor);
    
    // Processor Lifecycle
    Processor cloneProcessor(String key);
    void clean();
}

Advanced Callback Patterns

Conditional Processing

StreamCallback conditionalCallback = new StreamCallback() {
    @Override
    public void receive(Event[] events) {
        for (Event event : events) {
            // Apply business rules based on event content
            String eventType = (String) event.getData(0);
            
            switch (eventType) {
                case "ALERT":
                    handleAlert(event);
                    break;
                case "WARNING":
                    handleWarning(event);
                    break;
                case "INFO":
                    logInformation(event);
                    break;
                default:
                    handleUnknownEvent(event);
            }
        }
    }
    
    private void handleAlert(Event event) {
        // Send notifications, update dashboards
        Map<String, Object> alertData = toMap(event);
        notificationService.sendAlert(alertData);
    }
};

Batch Processing

StreamCallback batchProcessor = new StreamCallback() {
    private final List<Event> batch = new ArrayList<>();
    private final int batchSize = 100;
    
    @Override
    public void receive(Event[] events) {
        synchronized (batch) {
            batch.addAll(Arrays.asList(events));
            
            if (batch.size() >= batchSize) {
                processBatch(new ArrayList<>(batch));
                batch.clear();
            }
        }
    }
    
    private void processBatch(List<Event> events) {
        // Efficient batch processing
        bulkInsertToDatabase(events);
        updateStatistics(events.size());
    }
};

Multi-Stream Coordination

// Callback for coordinating multiple streams
public class MultiStreamCallback extends StreamCallback {
    private final Map<String, List<Event>> streamBuffers = new ConcurrentHashMap<>();
    
    @Override
    public void receive(Event[] events) {
        String streamId = getStreamId();
        streamBuffers.computeIfAbsent(streamId, k -> new ArrayList<>())
                    .addAll(Arrays.asList(events));
        
        // Check if we have data from all required streams
        if (hasDataFromAllStreams()) {
            correlateAndProcess();
        }
    }
    
    private void correlateAndProcess() {
        // Correlate events from multiple streams
        // Apply complex business logic
        // Clear buffers after processing
    }
}

Query Execution Patterns

Synchronous Query Execution

// Immediate query execution with results
public List<StockInfo> getCurrentHighPriceStocks() {
    String query = "from StockTable on price > 100 select symbol, price, volume";
    Event[] results = siddhiAppRuntime.query(query);
    
    return Arrays.stream(results)
                 .map(event -> new StockInfo(
                     (String) event.getData(0),
                     (Double) event.getData(1),
                     (Long) event.getData(2)
                 ))
                 .collect(Collectors.toList());
}

Asynchronous Query Processing

// Asynchronous query execution with CompletableFuture
public CompletableFuture<Event[]> queryAsync(String storeQuery) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            return siddhiAppRuntime.query(storeQuery);
        } catch (Exception e) {
            throw new RuntimeException("Query execution failed", e);
        }
    });
}

// Usage
queryAsync("from StockAggregation select *")
    .thenAccept(results -> processResults(results))
    .exceptionally(throwable -> {
        logger.error("Query failed", throwable);
        return null;
    });

Types

public interface Query {
    // Represents a parsed Siddhi query
}

public interface StoreQuery {
    // Represents a parsed store query
}

public interface AbstractDefinition {
    // Base interface for stream/table definitions
}

public interface Attribute {
    String getName();
    Attribute.Type getType();
    
    enum Type {
        STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
    }
}

public interface SiddhiAppContext {
    // Application context providing access to runtime resources
}

Install with Tessl CLI

npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core

docs

aggregations.md

core-management.md

event-handling.md

exceptions.md

extensions.md

index.md

persistence.md

queries-and-callbacks.md

statistics.md

tile.json