or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcore-management.mdevent-handling.mdexceptions.mdextensions.mdindex.mdpersistence.mdqueries-and-callbacks.mdstatistics.md

queries-and-callbacks.mddocs/

0

# Queries and Callbacks

1

2

Query processing and callback handling provide mechanisms for executing streaming SQL queries and receiving processed results. This includes stream callbacks for output events, query callbacks for query-specific results, and store queries for on-demand data retrieval.

3

4

## Stream Callbacks

5

6

### StreamCallback

7

8

Base class for receiving events from streams. Must be extended by users to handle output events from stream processing operations.

9

10

```java { .api }

11

public abstract class StreamCallback {

12

// Stream Configuration

13

public String getStreamId();

14

public void setStreamId(String streamId);

15

public AbstractDefinition getStreamDefinition();

16

public void setStreamDefinition(AbstractDefinition streamDefinition);

17

public void setContext(SiddhiAppContext siddhiAppContext);

18

19

// Event Processing (Abstract - Must Implement)

20

public abstract void receive(Event[] events);

21

22

// Lifecycle Management

23

public void startProcessing();

24

public void stopProcessing();

25

26

// Utility Methods

27

public Map<String, Object> toMap(Event event);

28

public Map<String, Object>[] toMap(Event[] events);

29

}

30

```

31

32

### Usage Example

33

34

```java

35

// Create custom stream callback

36

StreamCallback stockCallback = new StreamCallback() {

37

@Override

38

public void receive(Event[] events) {

39

for (Event event : events) {

40

String symbol = (String) event.getData(0);

41

Double price = (Double) event.getData(1);

42

Long volume = (Long) event.getData(2);

43

44

System.out.println("Received: " + symbol + " @ " + price + " vol: " + volume);

45

46

// Convert to map for easier access

47

Map<String, Object> eventMap = toMap(event);

48

processStockData(eventMap);

49

}

50

}

51

52

private void processStockData(Map<String, Object> data) {

53

// Custom processing logic

54

if ((Double) data.get("price") > 150.0) {

55

alertHighPrice((String) data.get("symbol"), (Double) data.get("price"));

56

}

57

}

58

};

59

60

// Register callback with runtime

61

siddhiAppRuntime.addCallback("HighPriceStocks", stockCallback);

62

```

63

64

## Query Callbacks

65

66

### QueryCallback

67

68

Base class for receiving results from Siddhi queries. Provides access to both incoming and removed events from query processing.

69

70

```java { .api }

71

public abstract class QueryCallback {

72

// Query Configuration

73

public void setQuery(Query query);

74

public void setContext(SiddhiAppContext siddhiAppContext);

75

76

// Event Processing (Abstract - Must Implement)

77

public abstract void receive(long timestamp, Event[] inEvents, Event[] removeEvents);

78

79

// Lifecycle Management

80

public void startProcessing();

81

public void stopProcessing();

82

}

83

```

84

85

### Usage Example

86

87

```java

88

// Create query-specific callback

89

QueryCallback avgPriceCallback = new QueryCallback() {

90

@Override

91

public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {

92

// Handle incoming events

93

if (inEvents != null) {

94

for (Event event : inEvents) {

95

String symbol = (String) event.getData(0);

96

Double avgPrice = (Double) event.getData(1);

97

Long count = (Long) event.getData(2);

98

99

System.out.println("Average price for " + symbol + ": " + avgPrice +

100

" (based on " + count + " events)");

101

updateDashboard(symbol, avgPrice, count);

102

}

103

}

104

105

// Handle removed events (for window-based queries)

106

if (removeEvents != null) {

107

for (Event event : removeEvents) {

108

String symbol = (String) event.getData(0);

109

System.out.println("Removing old average for: " + symbol);

110

cleanupOldData(symbol);

111

}

112

}

113

}

114

};

115

116

// Register callback with specific query

117

siddhiAppRuntime.addCallback("avgPriceQuery", avgPriceCallback);

118

```

119

120

## Store Queries

121

122

Store queries provide on-demand querying capabilities for tables, windows, and aggregations within Siddhi applications.

123

124

```java { .api }

125

public class SiddhiAppRuntime {

126

// Execute store queries

127

public Event[] query(String storeQuery);

128

public Event[] query(StoreQuery storeQuery);

129

130

// Query metadata

131

public Attribute[] getStoreQueryOutputAttributes(String storeQuery);

132

}

133

```

134

135

### Usage Examples

136

137

```java

138

// Simple store query

139

String query = "from StockTable select symbol, price";

140

Event[] results = siddhiAppRuntime.query(query);

141

142

for (Event event : results) {

143

System.out.println("Symbol: " + event.getData(0) + ", Price: " + event.getData(1));

144

}

145

146

// Parameterized store query

147

String paramQuery = "from StockTable on symbol == 'IBM' select *";

148

Event[] ibmResults = siddhiAppRuntime.query(paramQuery);

149

150

// Complex aggregation query

151

String aggQuery = "from StockAggregation " +

152

"within '2023-01-01 00:00:00', '2023-12-31 23:59:59' " +

153

"per 'day' " +

154

"select symbol, avg(price) as avgPrice, sum(volume) as totalVolume";

155

Event[] aggResults = siddhiAppRuntime.query(aggQuery);

156

157

// Get query output attributes

158

Attribute[] attributes = siddhiAppRuntime.getStoreQueryOutputAttributes(query);

159

for (Attribute attr : attributes) {

160

System.out.println("Attribute: " + attr.getName() + " Type: " + attr.getType());

161

}

162

163

// Window-based query

164

String windowQuery = "from StockWindow select symbol, price order by price desc limit 10";

165

Event[] topStocks = siddhiAppRuntime.query(windowQuery);

166

```

167

168

## Processing Chain

169

170

### Processor

171

172

Parent interface for all event processors in Siddhi execution chain, enabling custom processing logic.

173

174

```java { .api }

175

public interface Processor {

176

// Event Processing

177

void process(ComplexEventChunk complexEventChunk);

178

179

// Processor Chain Management

180

Processor getNextProcessor();

181

void setNextProcessor(Processor processor);

182

void setToLast(Processor processor);

183

184

// Processor Lifecycle

185

Processor cloneProcessor(String key);

186

void clean();

187

}

188

```

189

190

## Advanced Callback Patterns

191

192

### Conditional Processing

193

194

```java

195

StreamCallback conditionalCallback = new StreamCallback() {

196

@Override

197

public void receive(Event[] events) {

198

for (Event event : events) {

199

// Apply business rules based on event content

200

String eventType = (String) event.getData(0);

201

202

switch (eventType) {

203

case "ALERT":

204

handleAlert(event);

205

break;

206

case "WARNING":

207

handleWarning(event);

208

break;

209

case "INFO":

210

logInformation(event);

211

break;

212

default:

213

handleUnknownEvent(event);

214

}

215

}

216

}

217

218

private void handleAlert(Event event) {

219

// Send notifications, update dashboards

220

Map<String, Object> alertData = toMap(event);

221

notificationService.sendAlert(alertData);

222

}

223

};

224

```

225

226

### Batch Processing

227

228

```java

229

StreamCallback batchProcessor = new StreamCallback() {

230

private final List<Event> batch = new ArrayList<>();

231

private final int batchSize = 100;

232

233

@Override

234

public void receive(Event[] events) {

235

synchronized (batch) {

236

batch.addAll(Arrays.asList(events));

237

238

if (batch.size() >= batchSize) {

239

processBatch(new ArrayList<>(batch));

240

batch.clear();

241

}

242

}

243

}

244

245

private void processBatch(List<Event> events) {

246

// Efficient batch processing

247

bulkInsertToDatabase(events);

248

updateStatistics(events.size());

249

}

250

};

251

```

252

253

### Multi-Stream Coordination

254

255

```java

256

// Callback for coordinating multiple streams

257

public class MultiStreamCallback extends StreamCallback {

258

private final Map<String, List<Event>> streamBuffers = new ConcurrentHashMap<>();

259

260

@Override

261

public void receive(Event[] events) {

262

String streamId = getStreamId();

263

streamBuffers.computeIfAbsent(streamId, k -> new ArrayList<>())

264

.addAll(Arrays.asList(events));

265

266

// Check if we have data from all required streams

267

if (hasDataFromAllStreams()) {

268

correlateAndProcess();

269

}

270

}

271

272

private void correlateAndProcess() {

273

// Correlate events from multiple streams

274

// Apply complex business logic

275

// Clear buffers after processing

276

}

277

}

278

```

279

280

## Query Execution Patterns

281

282

### Synchronous Query Execution

283

284

```java

285

// Immediate query execution with results

286

public List<StockInfo> getCurrentHighPriceStocks() {

287

String query = "from StockTable on price > 100 select symbol, price, volume";

288

Event[] results = siddhiAppRuntime.query(query);

289

290

return Arrays.stream(results)

291

.map(event -> new StockInfo(

292

(String) event.getData(0),

293

(Double) event.getData(1),

294

(Long) event.getData(2)

295

))

296

.collect(Collectors.toList());

297

}

298

```

299

300

### Asynchronous Query Processing

301

302

```java

303

// Asynchronous query execution with CompletableFuture

304

public CompletableFuture<Event[]> queryAsync(String storeQuery) {

305

return CompletableFuture.supplyAsync(() -> {

306

try {

307

return siddhiAppRuntime.query(storeQuery);

308

} catch (Exception e) {

309

throw new RuntimeException("Query execution failed", e);

310

}

311

});

312

}

313

314

// Usage

315

queryAsync("from StockAggregation select *")

316

.thenAccept(results -> processResults(results))

317

.exceptionally(throwable -> {

318

logger.error("Query failed", throwable);

319

return null;

320

});

321

```

322

323

## Types

324

325

```java { .api }

326

public interface Query {

327

// Represents a parsed Siddhi query

328

}

329

330

public interface StoreQuery {

331

// Represents a parsed store query

332

}

333

334

public interface AbstractDefinition {

335

// Base interface for stream/table definitions

336

}

337

338

public interface Attribute {

339

String getName();

340

Attribute.Type getType();

341

342

enum Type {

343

STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT

344

}

345

}

346

347

public interface SiddhiAppContext {

348

// Application context providing access to runtime resources

349

}

350

```