or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcore-management.mdevent-handling.mdexceptions.mdextensions.mdindex.mdpersistence.mdqueries-and-callbacks.mdstatistics.md

extensions.mddocs/

0

# Extensions

1

2

Extension points provide a pluggable architecture for creating custom sources, sinks, functions, and processors to extend Siddhi capabilities. The extension system enables developers to add domain-specific functionality while maintaining integration with the core Siddhi processing engine.

3

4

## Core Extension Interfaces

5

6

### Source

7

8

Abstract class for creating custom input sources that can feed data into Siddhi streams from external systems.

9

10

```java { .api }

11

public abstract class Source {

12

// Abstract methods that must be implemented

13

public abstract void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,

14

String[] requestedTransportPropertyNames, ConfigReader configReader,

15

SiddhiAppContext siddhiAppContext);

16

public abstract Class[] getOutputEventClasses();

17

public abstract void connect(ConnectionCallback connectionCallback);

18

public abstract void disconnect();

19

public abstract void destroy();

20

public abstract void pause();

21

public abstract void resume();

22

23

// Concrete methods available

24

public void connectWithRetry();

25

public SourceMapper getMapper();

26

public void shutdown();

27

public String getType();

28

public StreamDefinition getStreamDefinition();

29

}

30

```

31

32

### Sink

33

34

Abstract class for creating custom output sinks that can send processed data to external systems.

35

36

```java { .api }

37

public abstract class Sink {

38

// Abstract methods that must be implemented

39

public abstract Class[] getSupportedInputEventClasses();

40

public abstract String[] getSupportedDynamicOptions();

41

public abstract void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,

42

ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext);

43

public abstract void publish(Object payload, DynamicOptions transportOptions);

44

public abstract void connect();

45

public abstract void disconnect();

46

public abstract void destroy();

47

48

// Concrete methods available

49

public void publish(Object payload);

50

public void connectWithRetry();

51

public void shutdown();

52

public String getType();

53

public SinkMapper getMapper();

54

public SinkHandler getHandler();

55

public StreamDefinition getStreamDefinition();

56

public boolean isConnected();

57

}

58

```

59

60

### FunctionExecutor

61

62

Abstract class for custom functions that can be used in Siddhi queries for data transformation and computation.

63

64

```java { .api }

65

public abstract class FunctionExecutor implements ExpressionExecutor {

66

// Abstract methods that must be implemented

67

public abstract void init(ExpressionExecutor[] attributeExpressionExecutors,

68

ConfigReader configReader, SiddhiAppContext siddhiAppContext);

69

public abstract Object execute(Object[] data);

70

public abstract Object execute(Object data);

71

72

// Concrete methods available from ExpressionExecutor

73

public void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,

74

SiddhiAppContext siddhiAppContext, String queryName,

75

ConfigReader configReader);

76

public Object execute(ComplexEvent event);

77

public ExpressionExecutor cloneExecutor(String key);

78

public String getElementId();

79

public void clean();

80

}

81

```

82

83

### StreamProcessor

84

85

Interface for custom stream processing that can transform or filter events in the processing pipeline.

86

87

```java { .api }

88

public abstract class StreamProcessor {

89

// Initialization

90

public abstract void init(MetaStreamEvent metaStreamEvent,

91

AbstractDefinition inputDefinition,

92

ExpressionExecutor[] attributeExpressionExecutors,

93

ConfigReader configReader,

94

StreamEventClonerHolder streamEventClonerHolder,

95

boolean outputExpectsExpiredEvents,

96

boolean findToBeExecuted,

97

SiddhiAppContext siddhiAppContext);

98

99

// Processing

100

public abstract void process(ComplexEventChunk<StreamEvent> streamEventChunk,

101

Processor nextProcessor,

102

StreamEventCloner streamEventCloner,

103

ComplexEventPopulater complexEventPopulater);

104

105

// Lifecycle

106

public abstract void start();

107

public abstract void stop();

108

109

// Configuration

110

public abstract List<Attribute> getReturnAttributes();

111

}

112

```

113

114

## Extension Registration

115

116

### SiddhiManager Extension Registration

117

118

```java { .api }

119

public class SiddhiManager {

120

// Extension Management

121

public void setExtension(String name, Class clazz);

122

public Map<String, Class> getExtensions();

123

public void removeExtension(String name);

124

}

125

```

126

127

### Usage Examples

128

129

```java

130

// Register custom extensions

131

SiddhiManager siddhiManager = new SiddhiManager();

132

133

// Register custom function

134

siddhiManager.setExtension("math:factorial", FactorialFunctionExecutor.class);

135

136

// Register custom source

137

siddhiManager.setExtension("kafka", KafkaSource.class);

138

139

// Register custom sink

140

siddhiManager.setExtension("elasticsearch", ElasticsearchSink.class);

141

142

// Register custom stream processor

143

siddhiManager.setExtension("ml:predict", MLPredictionProcessor.class);

144

145

// Use extensions in Siddhi app

146

String siddhiApp =

147

"@source(type='kafka', topic='stock-data', bootstrap.servers='localhost:9092', " +

148

" @map(type='json')) " +

149

"define stream StockStream (symbol string, price double, volume long); " +

150

151

"@sink(type='elasticsearch', hostname='localhost', port='9200', " +

152

" index.name='stock-analysis', @map(type='json')) " +

153

"define stream ProcessedStream (symbol string, processedPrice double, prediction string); " +

154

155

"from StockStream " +

156

"select symbol, " +

157

" math:factorial(volume % 10) as processedPrice, " +

158

" ml:predict(price, volume) as prediction " +

159

"insert into ProcessedStream;";

160

```

161

162

## Built-in Extensions

163

164

### InMemorySource

165

166

Built-in in-memory event source for testing and development.

167

168

```java { .api }

169

public class InMemorySource implements Source {

170

// Built-in source for in-memory event generation

171

// Useful for testing and development scenarios

172

}

173

```

174

175

### InMemorySink

176

177

Built-in in-memory event sink for collecting results during testing.

178

179

```java { .api }

180

public class InMemorySink implements Sink {

181

// Built-in sink for in-memory event collection

182

// Useful for testing and result collection

183

}

184

```

185

186

### LogSink

187

188

Built-in logging sink for debugging and monitoring.

189

190

```java { .api }

191

public class LogSink implements Sink {

192

// Built-in sink for logging events

193

// Useful for debugging and monitoring

194

}

195

```

196

197

## Extension Examples

198

199

### Custom Function Example

200

201

```java

202

// Custom mathematical function

203

public class FactorialFunctionExecutor extends FunctionExecutor {

204

205

@Override

206

public void init(AttributeExpressionExecutor[] attributeExpressionExecutors,

207

ConfigReader configReader, SiddhiAppContext siddhiAppContext) {

208

if (attributeExpressionExecutors.length != 1) {

209

throw new SiddhiAppValidationException("Factorial function requires exactly one parameter");

210

}

211

212

if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT &&

213

attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {

214

throw new SiddhiAppValidationException("Factorial function requires integer input");

215

}

216

}

217

218

@Override

219

public Object execute(Object[] data) {

220

if (data[0] == null) {

221

return null;

222

}

223

224

int n = ((Number) data[0]).intValue();

225

if (n < 0) {

226

throw new SiddhiAppRuntimeException("Factorial not defined for negative numbers");

227

}

228

229

long result = 1;

230

for (int i = 2; i <= n; i++) {

231

result *= i;

232

}

233

return result;

234

}

235

236

@Override

237

public Attribute.Type getReturnType() {

238

return Attribute.Type.LONG;

239

}

240

}

241

```

242

243

### Custom Source Example

244

245

```java

246

// Custom HTTP source

247

public class HttpSource implements Source {

248

private SourceEventListener sourceEventListener;

249

private String url;

250

private int pollInterval;

251

private HttpClient httpClient;

252

private ScheduledExecutorService scheduler;

253

254

@Override

255

public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,

256

String[] requestedTransportPropertyNames, ConfigReader configReader,

257

SiddhiAppContext siddhiAppContext) {

258

this.sourceEventListener = sourceEventListener;

259

this.url = optionHolder.validateAndGetStaticValue("url");

260

this.pollInterval = Integer.parseInt(optionHolder.validateAndGetStaticValue("poll.interval", "5000"));

261

this.httpClient = HttpClient.newHttpClient();

262

this.scheduler = Executors.newScheduledThreadPool(1);

263

}

264

265

@Override

266

public void connect(ConnectionCallback connectionCallback, State state) {

267

scheduler.scheduleAtFixedRate(() -> {

268

try {

269

HttpRequest request = HttpRequest.newBuilder()

270

.uri(URI.create(url))

271

.build();

272

273

HttpResponse<String> response = httpClient.send(request,

274

HttpResponse.BodyHandlers.ofString());

275

276

if (response.statusCode() == 200) {

277

// Parse response and send to Siddhi

278

Object[] eventData = parseResponse(response.body());

279

sourceEventListener.onEvent(eventData, null);

280

}

281

} catch (Exception e) {

282

connectionCallback.onError(e);

283

}

284

}, 0, pollInterval, TimeUnit.MILLISECONDS);

285

286

connectionCallback.onConnect();

287

}

288

289

@Override

290

public void disconnect() {

291

if (scheduler != null) {

292

scheduler.shutdown();

293

}

294

}

295

296

@Override

297

public void destroy() {

298

disconnect();

299

}

300

301

@Override

302

public void pause() {

303

// Implementation for pausing

304

}

305

306

@Override

307

public void resume() {

308

// Implementation for resuming

309

}

310

311

@Override

312

public Class[] getSupportedInputEventClasses() {

313

return new Class[]{Map.class, Object[].class};

314

}

315

316

@Override

317

public String[] getSupportedDynamicOptions() {

318

return new String[]{"url"};

319

}

320

321

private Object[] parseResponse(String responseBody) {

322

// Parse HTTP response into event data

323

// Implementation depends on response format

324

return new Object[]{responseBody, System.currentTimeMillis()};

325

}

326

}

327

```

328

329

### Custom Sink Example

330

331

```java

332

// Custom database sink

333

public class DatabaseSink implements Sink {

334

private DataSource dataSource;

335

private String tableName;

336

private String[] columnNames;

337

338

@Override

339

public void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,

340

ConfigReader configReader, SiddhiAppContext siddhiAppContext) {

341

String dataSourceName = optionHolder.validateAndGetStaticValue("datasource");

342

this.dataSource = siddhiAppContext.getSiddhiDataSource(dataSourceName);

343

this.tableName = optionHolder.validateAndGetStaticValue("table.name");

344

345

// Extract column names from stream definition

346

List<Attribute> attributes = outputStreamDefinition.getAttributeList();

347

this.columnNames = attributes.stream()

348

.map(Attribute::getName)

349

.toArray(String[]::new);

350

}

351

352

@Override

353

public void connect() {

354

// Verify database connection

355

try (Connection conn = dataSource.getConnection()) {

356

// Test connection

357

} catch (SQLException e) {

358

throw new ConnectionUnavailableException("Database connection failed", e);

359

}

360

}

361

362

@Override

363

public void publish(Object payload, DynamicOptions dynamicOptions, State state) {

364

Object[] eventData = (Object[]) payload;

365

366

StringBuilder sql = new StringBuilder("INSERT INTO ");

367

sql.append(tableName).append(" (");

368

sql.append(String.join(", ", columnNames));

369

sql.append(") VALUES (");

370

sql.append(String.join(", ", Collections.nCopies(columnNames.length, "?")));

371

sql.append(")");

372

373

try (Connection conn = dataSource.getConnection();

374

PreparedStatement stmt = conn.prepareStatement(sql.toString())) {

375

376

for (int i = 0; i < eventData.length; i++) {

377

stmt.setObject(i + 1, eventData[i]);

378

}

379

380

stmt.executeUpdate();

381

} catch (SQLException e) {

382

throw new ConnectionUnavailableException("Database insert failed", e);

383

}

384

}

385

386

@Override

387

public void disconnect() {

388

// Cleanup resources

389

}

390

391

@Override

392

public void destroy() {

393

disconnect();

394

}

395

396

@Override

397

public Class[] getSupportedInputEventClasses() {

398

return new Class[]{Object[].class, Map.class};

399

}

400

401

@Override

402

public String[] getSupportedDynamicOptions() {

403

return new String[]{"table.name"};

404

}

405

}

406

```

407

408

## Extension Mappers

409

410

### SourceMapper

411

412

Interface for custom source data mapping to convert external data formats to Siddhi events.

413

414

```java { .api }

415

public interface SourceMapper {

416

void init(StreamDefinition streamDefinition, OptionHolder optionHolder,

417

Map<String, TemplateBuilder> payloadTemplateBuilderMap,

418

ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);

419

420

void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,

421

Object eventObject, SourceEventListener sourceEventListener);

422

423

Class[] getSupportedInputEventClasses();

424

}

425

```

426

427

### SinkMapper

428

429

Interface for custom sink data mapping to convert Siddhi events to external data formats.

430

431

```java { .api }

432

public interface SinkMapper {

433

void init(StreamDefinition streamDefinition, OptionHolder optionHolder,

434

Map<String, TemplateBuilder> payloadTemplateBuilderMap,

435

ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);

436

437

void mapAndSend(Event[] events, OptionHolder optionHolder,

438

Map<String, TemplateBuilder> payloadTemplateBuilderMap,

439

SinkListener sinkListener);

440

441

Class[] getSupportedInputEventClasses();

442

}

443

```

444

445

## Extension Holders

446

447

Extension holders manage different types of extensions and provide common functionality.

448

449

```java { .api }

450

public abstract class AbstractExtensionHolder {

451

// Common functionality for extension management

452

protected Map<String, Class<?>> extensions;

453

454

public void addExtension(String name, Class<?> extensionClass);

455

public Class<?> getExtension(String name);

456

public void removeExtension(String name);

457

}

458

459

public class FunctionExecutorExtensionHolder extends AbstractExtensionHolder {

460

// Manages function executor extensions

461

}

462

463

public class SourceExtensionHolder extends AbstractExtensionHolder {

464

// Manages source extensions

465

}

466

467

public class SinkExtensionHolder extends AbstractExtensionHolder {

468

// Manages sink extensions

469

}

470

```

471

472

## Types

473

474

```java { .api }

475

public interface SourceEventListener {

476

void onEvent(Object eventObject, Object[] transportProperties);

477

void onEvent(Object eventObject, Object[] transportProperties, String[] transportSyncProperties);

478

}

479

480

public interface ConnectionCallback {

481

void onConnect();

482

void onError(Exception e);

483

}

484

485

public interface SinkListener {

486

void publish(Object payload);

487

}

488

489

public interface OptionHolder {

490

String validateAndGetStaticValue(String key);

491

String validateAndGetStaticValue(String key, String defaultValue);

492

String getOrCreateOption(String key, String defaultValue);

493

}

494

495

public interface ConfigReader {

496

String readConfig(String key, String defaultValue);

497

Map<String, String> getAllConfigs();

498

}

499

500

public interface DynamicOptions {

501

String get(String key);

502

}

503

504

public interface State {

505

boolean canDestroy();

506

Map<String, Object> getState();

507

void restoreState(Map<String, Object> state);

508

}

509

510

public interface AttributeMapping {

511

String getName();

512

String getMapping();

513

}

514

515

public interface TemplateBuilder {

516

String build(Event event);

517

}

518

519

public interface ExpressionExecutor {

520

void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,

521

SiddhiAppContext siddhiAppContext, String queryName,

522

ConfigReader configReader);

523

Object execute(ComplexEvent event);

524

ExpressionExecutor cloneExecutor(String key);

525

String getElementId();

526

void clean();

527

Attribute.Type getReturnType();

528

}

529

530

public interface Snapshotable {

531

Map<String, Object> currentState();

532

void restoreState(Map<String, Object> state);

533

}

534

535

public interface SinkHandler {

536

// Handler for sink operations

537

}

538

539

public interface SinkMapper {

540

void init(StreamDefinition streamDefinition, OptionHolder optionHolder,

541

Map<String, TemplateBuilder> payloadTemplateBuilderMap,

542

ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);

543

void mapAndSend(Event[] events, OptionHolder optionHolder,

544

Map<String, TemplateBuilder> payloadTemplateBuilderMap,

545

SinkListener sinkListener);

546

Class[] getSupportedInputEventClasses();

547

}

548

549

public interface SourceMapper {

550

void init(StreamDefinition streamDefinition, OptionHolder optionHolder,

551

Map<String, TemplateBuilder> payloadTemplateBuilderMap,

552

ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);

553

void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,

554

Object eventObject, SourceEventListener sourceEventListener);

555

Class[] getSupportedInputEventClasses();

556

}

557

```