Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.
Query processing and callback handling provide mechanisms for executing streaming SQL queries and receiving processed results. This includes stream callbacks for output events, query callbacks for query-specific results, and store queries for on-demand data retrieval.
Base class for receiving events from streams. Must be extended by users to handle output events from stream processing operations.
public abstract class StreamCallback {
// Stream Configuration
public String getStreamId();
public void setStreamId(String streamId);
public AbstractDefinition getStreamDefinition();
public void setStreamDefinition(AbstractDefinition streamDefinition);
public void setContext(SiddhiAppContext siddhiAppContext);
// Event Processing (Abstract - Must Implement)
public abstract void receive(Event[] events);
// Lifecycle Management
public void startProcessing();
public void stopProcessing();
// Utility Methods
public Map<String, Object> toMap(Event event);
public Map<String, Object>[] toMap(Event[] events);
}// Create custom stream callback
StreamCallback stockCallback = new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
String symbol = (String) event.getData(0);
Double price = (Double) event.getData(1);
Long volume = (Long) event.getData(2);
System.out.println("Received: " + symbol + " @ " + price + " vol: " + volume);
// Convert to map for easier access
Map<String, Object> eventMap = toMap(event);
processStockData(eventMap);
}
}
private void processStockData(Map<String, Object> data) {
// Custom processing logic
if ((Double) data.get("price") > 150.0) {
alertHighPrice((String) data.get("symbol"), (Double) data.get("price"));
}
}
};
// Register callback with runtime
siddhiAppRuntime.addCallback("HighPriceStocks", stockCallback);Base class for receiving results from Siddhi queries. Provides access to both incoming and removed events from query processing.
public abstract class QueryCallback {
// Query Configuration
public void setQuery(Query query);
public void setContext(SiddhiAppContext siddhiAppContext);
// Event Processing (Abstract - Must Implement)
public abstract void receive(long timestamp, Event[] inEvents, Event[] removeEvents);
// Lifecycle Management
public void startProcessing();
public void stopProcessing();
}// Create query-specific callback
QueryCallback avgPriceCallback = new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
// Handle incoming events
if (inEvents != null) {
for (Event event : inEvents) {
String symbol = (String) event.getData(0);
Double avgPrice = (Double) event.getData(1);
Long count = (Long) event.getData(2);
System.out.println("Average price for " + symbol + ": " + avgPrice +
" (based on " + count + " events)");
updateDashboard(symbol, avgPrice, count);
}
}
// Handle removed events (for window-based queries)
if (removeEvents != null) {
for (Event event : removeEvents) {
String symbol = (String) event.getData(0);
System.out.println("Removing old average for: " + symbol);
cleanupOldData(symbol);
}
}
}
};
// Register callback with specific query
siddhiAppRuntime.addCallback("avgPriceQuery", avgPriceCallback);Store queries provide on-demand querying capabilities for tables, windows, and aggregations within Siddhi applications.
public class SiddhiAppRuntime {
// Execute store queries
public Event[] query(String storeQuery);
public Event[] query(StoreQuery storeQuery);
// Query metadata
public Attribute[] getStoreQueryOutputAttributes(String storeQuery);
}// Simple store query
String query = "from StockTable select symbol, price";
Event[] results = siddhiAppRuntime.query(query);
for (Event event : results) {
System.out.println("Symbol: " + event.getData(0) + ", Price: " + event.getData(1));
}
// Parameterized store query
String paramQuery = "from StockTable on symbol == 'IBM' select *";
Event[] ibmResults = siddhiAppRuntime.query(paramQuery);
// Complex aggregation query
String aggQuery = "from StockAggregation " +
"within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +
"per 'day' " +
"select symbol, avg(price) as avgPrice, sum(volume) as totalVolume";
Event[] aggResults = siddhiAppRuntime.query(aggQuery);
// Get query output attributes
Attribute[] attributes = siddhiAppRuntime.getStoreQueryOutputAttributes(query);
for (Attribute attr : attributes) {
System.out.println("Attribute: " + attr.getName() + " Type: " + attr.getType());
}
// Window-based query
String windowQuery = "from StockWindow select symbol, price order by price desc limit 10";
Event[] topStocks = siddhiAppRuntime.query(windowQuery);Parent interface for all event processors in Siddhi execution chain, enabling custom processing logic.
public interface Processor {
// Event Processing
void process(ComplexEventChunk complexEventChunk);
// Processor Chain Management
Processor getNextProcessor();
void setNextProcessor(Processor processor);
void setToLast(Processor processor);
// Processor Lifecycle
Processor cloneProcessor(String key);
void clean();
}StreamCallback conditionalCallback = new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
// Apply business rules based on event content
String eventType = (String) event.getData(0);
switch (eventType) {
case "ALERT":
handleAlert(event);
break;
case "WARNING":
handleWarning(event);
break;
case "INFO":
logInformation(event);
break;
default:
handleUnknownEvent(event);
}
}
}
private void handleAlert(Event event) {
// Send notifications, update dashboards
Map<String, Object> alertData = toMap(event);
notificationService.sendAlert(alertData);
}
};StreamCallback batchProcessor = new StreamCallback() {
private final List<Event> batch = new ArrayList<>();
private final int batchSize = 100;
@Override
public void receive(Event[] events) {
synchronized (batch) {
batch.addAll(Arrays.asList(events));
if (batch.size() >= batchSize) {
processBatch(new ArrayList<>(batch));
batch.clear();
}
}
}
private void processBatch(List<Event> events) {
// Efficient batch processing
bulkInsertToDatabase(events);
updateStatistics(events.size());
}
};// Callback for coordinating multiple streams
public class MultiStreamCallback extends StreamCallback {
private final Map<String, List<Event>> streamBuffers = new ConcurrentHashMap<>();
@Override
public void receive(Event[] events) {
String streamId = getStreamId();
streamBuffers.computeIfAbsent(streamId, k -> new ArrayList<>())
.addAll(Arrays.asList(events));
// Check if we have data from all required streams
if (hasDataFromAllStreams()) {
correlateAndProcess();
}
}
private void correlateAndProcess() {
// Correlate events from multiple streams
// Apply complex business logic
// Clear buffers after processing
}
}// Immediate query execution with results
public List<StockInfo> getCurrentHighPriceStocks() {
String query = "from StockTable on price > 100 select symbol, price, volume";
Event[] results = siddhiAppRuntime.query(query);
return Arrays.stream(results)
.map(event -> new StockInfo(
(String) event.getData(0),
(Double) event.getData(1),
(Long) event.getData(2)
))
.collect(Collectors.toList());
}// Asynchronous query execution with CompletableFuture
public CompletableFuture<Event[]> queryAsync(String storeQuery) {
return CompletableFuture.supplyAsync(() -> {
try {
return siddhiAppRuntime.query(storeQuery);
} catch (Exception e) {
throw new RuntimeException("Query execution failed", e);
}
});
}
// Usage
queryAsync("from StockAggregation select *")
.thenAccept(results -> processResults(results))
.exceptionally(throwable -> {
logger.error("Query failed", throwable);
return null;
});public interface Query {
// Represents a parsed Siddhi query
}
public interface StoreQuery {
// Represents a parsed store query
}
public interface AbstractDefinition {
// Base interface for stream/table definitions
}
public interface Attribute {
String getName();
Attribute.Type getType();
enum Type {
STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
}
}
public interface SiddhiAppContext {
// Application context providing access to runtime resources
}Install with Tessl CLI
npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core