0
# Queries and Callbacks
1
2
Query processing and callback handling provide mechanisms for executing streaming SQL queries and receiving processed results. This includes stream callbacks for output events, query callbacks for query-specific results, and store queries for on-demand data retrieval.
3
4
## Stream Callbacks
5
6
### StreamCallback
7
8
Base class for receiving events from streams. Must be extended by users to handle output events from stream processing operations.
9
10
```java { .api }
11
public abstract class StreamCallback {
12
// Stream Configuration
13
public String getStreamId();
14
public void setStreamId(String streamId);
15
public AbstractDefinition getStreamDefinition();
16
public void setStreamDefinition(AbstractDefinition streamDefinition);
17
public void setContext(SiddhiAppContext siddhiAppContext);
18
19
// Event Processing (Abstract - Must Implement)
20
public abstract void receive(Event[] events);
21
22
// Lifecycle Management
23
public void startProcessing();
24
public void stopProcessing();
25
26
// Utility Methods
27
public Map<String, Object> toMap(Event event);
28
public Map<String, Object>[] toMap(Event[] events);
29
}
30
```
31
32
### Usage Example
33
34
```java
35
// Create custom stream callback
36
StreamCallback stockCallback = new StreamCallback() {
37
@Override
38
public void receive(Event[] events) {
39
for (Event event : events) {
40
String symbol = (String) event.getData(0);
41
Double price = (Double) event.getData(1);
42
Long volume = (Long) event.getData(2);
43
44
System.out.println("Received: " + symbol + " @ " + price + " vol: " + volume);
45
46
// Convert to map for easier access
47
Map<String, Object> eventMap = toMap(event);
48
processStockData(eventMap);
49
}
50
}
51
52
private void processStockData(Map<String, Object> data) {
53
// Custom processing logic
54
if ((Double) data.get("price") > 150.0) {
55
alertHighPrice((String) data.get("symbol"), (Double) data.get("price"));
56
}
57
}
58
};
59
60
// Register callback with runtime
61
siddhiAppRuntime.addCallback("HighPriceStocks", stockCallback);
62
```
63
64
## Query Callbacks
65
66
### QueryCallback
67
68
Base class for receiving results from Siddhi queries. Provides access to both incoming and removed events from query processing.
69
70
```java { .api }
71
public abstract class QueryCallback {
72
// Query Configuration
73
public void setQuery(Query query);
74
public void setContext(SiddhiAppContext siddhiAppContext);
75
76
// Event Processing (Abstract - Must Implement)
77
public abstract void receive(long timestamp, Event[] inEvents, Event[] removeEvents);
78
79
// Lifecycle Management
80
public void startProcessing();
81
public void stopProcessing();
82
}
83
```
84
85
### Usage Example
86
87
```java
88
// Create query-specific callback
89
QueryCallback avgPriceCallback = new QueryCallback() {
90
@Override
91
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
92
// Handle incoming events
93
if (inEvents != null) {
94
for (Event event : inEvents) {
95
String symbol = (String) event.getData(0);
96
Double avgPrice = (Double) event.getData(1);
97
Long count = (Long) event.getData(2);
98
99
System.out.println("Average price for " + symbol + ": " + avgPrice +
100
" (based on " + count + " events)");
101
updateDashboard(symbol, avgPrice, count);
102
}
103
}
104
105
// Handle removed events (for window-based queries)
106
if (removeEvents != null) {
107
for (Event event : removeEvents) {
108
String symbol = (String) event.getData(0);
109
System.out.println("Removing old average for: " + symbol);
110
cleanupOldData(symbol);
111
}
112
}
113
}
114
};
115
116
// Register callback with specific query
117
siddhiAppRuntime.addCallback("avgPriceQuery", avgPriceCallback);
118
```
119
120
## Store Queries
121
122
Store queries provide on-demand querying capabilities for tables, windows, and aggregations within Siddhi applications.
123
124
```java { .api }
125
public class SiddhiAppRuntime {
126
// Execute store queries
127
public Event[] query(String storeQuery);
128
public Event[] query(StoreQuery storeQuery);
129
130
// Query metadata
131
public Attribute[] getStoreQueryOutputAttributes(String storeQuery);
132
}
133
```
134
135
### Usage Examples
136
137
```java
138
// Simple store query
139
String query = "from StockTable select symbol, price";
140
Event[] results = siddhiAppRuntime.query(query);
141
142
for (Event event : results) {
143
System.out.println("Symbol: " + event.getData(0) + ", Price: " + event.getData(1));
144
}
145
146
// Parameterized store query
147
String paramQuery = "from StockTable on symbol == 'IBM' select *";
148
Event[] ibmResults = siddhiAppRuntime.query(paramQuery);
149
150
// Complex aggregation query
151
String aggQuery = "from StockAggregation " +
152
"within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +
153
"per 'day' " +
154
"select symbol, avg(price) as avgPrice, sum(volume) as totalVolume";
155
Event[] aggResults = siddhiAppRuntime.query(aggQuery);
156
157
// Get query output attributes
158
Attribute[] attributes = siddhiAppRuntime.getStoreQueryOutputAttributes(query);
159
for (Attribute attr : attributes) {
160
System.out.println("Attribute: " + attr.getName() + " Type: " + attr.getType());
161
}
162
163
// Window-based query
164
String windowQuery = "from StockWindow select symbol, price order by price desc limit 10";
165
Event[] topStocks = siddhiAppRuntime.query(windowQuery);
166
```
167
168
## Processing Chain
169
170
### Processor
171
172
Parent interface for all event processors in Siddhi execution chain, enabling custom processing logic.
173
174
```java { .api }
175
public interface Processor {
176
// Event Processing
177
void process(ComplexEventChunk complexEventChunk);
178
179
// Processor Chain Management
180
Processor getNextProcessor();
181
void setNextProcessor(Processor processor);
182
void setToLast(Processor processor);
183
184
// Processor Lifecycle
185
Processor cloneProcessor(String key);
186
void clean();
187
}
188
```
189
190
## Advanced Callback Patterns
191
192
### Conditional Processing
193
194
```java
195
StreamCallback conditionalCallback = new StreamCallback() {
196
@Override
197
public void receive(Event[] events) {
198
for (Event event : events) {
199
// Apply business rules based on event content
200
String eventType = (String) event.getData(0);
201
202
switch (eventType) {
203
case "ALERT":
204
handleAlert(event);
205
break;
206
case "WARNING":
207
handleWarning(event);
208
break;
209
case "INFO":
210
logInformation(event);
211
break;
212
default:
213
handleUnknownEvent(event);
214
}
215
}
216
}
217
218
private void handleAlert(Event event) {
219
// Send notifications, update dashboards
220
Map<String, Object> alertData = toMap(event);
221
notificationService.sendAlert(alertData);
222
}
223
};
224
```
225
226
### Batch Processing
227
228
```java
229
StreamCallback batchProcessor = new StreamCallback() {
230
private final List<Event> batch = new ArrayList<>();
231
private final int batchSize = 100;
232
233
@Override
234
public void receive(Event[] events) {
235
synchronized (batch) {
236
batch.addAll(Arrays.asList(events));
237
238
if (batch.size() >= batchSize) {
239
processBatch(new ArrayList<>(batch));
240
batch.clear();
241
}
242
}
243
}
244
245
private void processBatch(List<Event> events) {
246
// Efficient batch processing
247
bulkInsertToDatabase(events);
248
updateStatistics(events.size());
249
}
250
};
251
```
252
253
### Multi-Stream Coordination
254
255
```java
256
// Callback for coordinating multiple streams
257
public class MultiStreamCallback extends StreamCallback {
258
private final Map<String, List<Event>> streamBuffers = new ConcurrentHashMap<>();
259
260
@Override
261
public void receive(Event[] events) {
262
String streamId = getStreamId();
263
streamBuffers.computeIfAbsent(streamId, k -> new ArrayList<>())
264
.addAll(Arrays.asList(events));
265
266
// Check if we have data from all required streams
267
if (hasDataFromAllStreams()) {
268
correlateAndProcess();
269
}
270
}
271
272
private void correlateAndProcess() {
273
// Correlate events from multiple streams
274
// Apply complex business logic
275
// Clear buffers after processing
276
}
277
}
278
```
279
280
## Query Execution Patterns
281
282
### Synchronous Query Execution
283
284
```java
285
// Immediate query execution with results
286
public List<StockInfo> getCurrentHighPriceStocks() {
287
String query = "from StockTable on price > 100 select symbol, price, volume";
288
Event[] results = siddhiAppRuntime.query(query);
289
290
return Arrays.stream(results)
291
.map(event -> new StockInfo(
292
(String) event.getData(0),
293
(Double) event.getData(1),
294
(Long) event.getData(2)
295
))
296
.collect(Collectors.toList());
297
}
298
```
299
300
### Asynchronous Query Processing
301
302
```java
303
// Asynchronous query execution with CompletableFuture
304
public CompletableFuture<Event[]> queryAsync(String storeQuery) {
305
return CompletableFuture.supplyAsync(() -> {
306
try {
307
return siddhiAppRuntime.query(storeQuery);
308
} catch (Exception e) {
309
throw new RuntimeException("Query execution failed", e);
310
}
311
});
312
}
313
314
// Usage
315
queryAsync("from StockAggregation select *")
316
.thenAccept(results -> processResults(results))
317
.exceptionally(throwable -> {
318
logger.error("Query failed", throwable);
319
return null;
320
});
321
```
322
323
## Types
324
325
```java { .api }
326
public interface Query {
327
// Represents a parsed Siddhi query
328
}
329
330
public interface StoreQuery {
331
// Represents a parsed store query
332
}
333
334
public interface AbstractDefinition {
335
// Base interface for stream/table definitions
336
}
337
338
public interface Attribute {
339
String getName();
340
Attribute.Type getType();
341
342
enum Type {
343
STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
344
}
345
}
346
347
public interface SiddhiAppContext {
348
// Application context providing access to runtime resources
349
}
350
```