CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-wso2-siddhi--siddhi-core

Siddhi Core is a high-performing Complex Event Processing engine providing stream processing and complex event processing capabilities through Streaming SQL.

Overview
Eval results
Files

extensions.mddocs/

Extensions

Extension points provide a pluggable architecture for creating custom sources, sinks, functions, and processors to extend Siddhi capabilities. The extension system enables developers to add domain-specific functionality while maintaining integration with the core Siddhi processing engine.

Core Extension Interfaces

Source

Abstract class for creating custom input sources that can feed data into Siddhi streams from external systems.

public abstract class Source {
    // Abstract methods that must be implemented
    public abstract void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
                             String[] requestedTransportPropertyNames, ConfigReader configReader,
                             SiddhiAppContext siddhiAppContext);
    public abstract Class[] getOutputEventClasses();
    public abstract void connect(ConnectionCallback connectionCallback);
    public abstract void disconnect();
    public abstract void destroy();
    public abstract void pause();
    public abstract void resume();
    
    // Concrete methods available
    public void connectWithRetry();
    public SourceMapper getMapper();
    public void shutdown();
    public String getType();
    public StreamDefinition getStreamDefinition();
}

Sink

Abstract class for creating custom output sinks that can send processed data to external systems.

public abstract class Sink {
    // Abstract methods that must be implemented
    public abstract Class[] getSupportedInputEventClasses();
    public abstract String[] getSupportedDynamicOptions();
    public abstract void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
                             ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext);
    public abstract void publish(Object payload, DynamicOptions transportOptions);
    public abstract void connect();
    public abstract void disconnect();
    public abstract void destroy();
    
    // Concrete methods available
    public void publish(Object payload);
    public void connectWithRetry();
    public void shutdown();
    public String getType();
    public SinkMapper getMapper();
    public SinkHandler getHandler();
    public StreamDefinition getStreamDefinition();
    public boolean isConnected();
}

FunctionExecutor

Abstract class for custom functions that can be used in Siddhi queries for data transformation and computation.

public abstract class FunctionExecutor implements ExpressionExecutor {
    // Abstract methods that must be implemented
    public abstract void init(ExpressionExecutor[] attributeExpressionExecutors,
                             ConfigReader configReader, SiddhiAppContext siddhiAppContext);
    public abstract Object execute(Object[] data);
    public abstract Object execute(Object data);
    
    // Concrete methods available from ExpressionExecutor
    public void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,
                            SiddhiAppContext siddhiAppContext, String queryName,
                            ConfigReader configReader);
    public Object execute(ComplexEvent event);
    public ExpressionExecutor cloneExecutor(String key);
    public String getElementId();
    public void clean();
}

StreamProcessor

Interface for custom stream processing that can transform or filter events in the processing pipeline.

public abstract class StreamProcessor {
    // Initialization
    public abstract void init(MetaStreamEvent metaStreamEvent,
                             AbstractDefinition inputDefinition,
                             ExpressionExecutor[] attributeExpressionExecutors,
                             ConfigReader configReader,
                             StreamEventClonerHolder streamEventClonerHolder,
                             boolean outputExpectsExpiredEvents,
                             boolean findToBeExecuted,
                             SiddhiAppContext siddhiAppContext);
    
    // Processing
    public abstract void process(ComplexEventChunk<StreamEvent> streamEventChunk,
                                Processor nextProcessor,
                                StreamEventCloner streamEventCloner,
                                ComplexEventPopulater complexEventPopulater);
    
    // Lifecycle
    public abstract void start();
    public abstract void stop();
    
    // Configuration
    public abstract List<Attribute> getReturnAttributes();
}

Extension Registration

SiddhiManager Extension Registration

public class SiddhiManager {
    // Extension Management
    public void setExtension(String name, Class clazz);
    public Map<String, Class> getExtensions();
    public void removeExtension(String name);
}

Usage Examples

// Register custom extensions
SiddhiManager siddhiManager = new SiddhiManager();

// Register custom function
siddhiManager.setExtension("math:factorial", FactorialFunctionExecutor.class);

// Register custom source
siddhiManager.setExtension("kafka", KafkaSource.class);

// Register custom sink
siddhiManager.setExtension("elasticsearch", ElasticsearchSink.class);

// Register custom stream processor
siddhiManager.setExtension("ml:predict", MLPredictionProcessor.class);

// Use extensions in Siddhi app
String siddhiApp = 
    "@source(type='kafka', topic='stock-data', bootstrap.servers='localhost:9092', " +
    "        @map(type='json')) " +
    "define stream StockStream (symbol string, price double, volume long); " +
    
    "@sink(type='elasticsearch', hostname='localhost', port='9200', " +
    "      index.name='stock-analysis', @map(type='json')) " +
    "define stream ProcessedStream (symbol string, processedPrice double, prediction string); " +
    
    "from StockStream " +
    "select symbol, " +
    "       math:factorial(volume % 10) as processedPrice, " +
    "       ml:predict(price, volume) as prediction " +
    "insert into ProcessedStream;";

Built-in Extensions

InMemorySource

Built-in in-memory event source for testing and development.

public class InMemorySource implements Source {
    // Built-in source for in-memory event generation
    // Useful for testing and development scenarios
}

InMemorySink

Built-in in-memory event sink for collecting results during testing.

public class InMemorySink implements Sink {
    // Built-in sink for in-memory event collection
    // Useful for testing and result collection
}

LogSink

Built-in logging sink for debugging and monitoring.

public class LogSink implements Sink {
    // Built-in sink for logging events
    // Useful for debugging and monitoring
}

Extension Examples

Custom Function Example

// Custom mathematical function
public class FactorialFunctionExecutor extends FunctionExecutor {
    
    @Override
    public void init(AttributeExpressionExecutor[] attributeExpressionExecutors,
                     ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        if (attributeExpressionExecutors.length != 1) {
            throw new SiddhiAppValidationException("Factorial function requires exactly one parameter");
        }
        
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT &&
            attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Factorial function requires integer input");
        }
    }
    
    @Override
    public Object execute(Object[] data) {
        if (data[0] == null) {
            return null;
        }
        
        int n = ((Number) data[0]).intValue();
        if (n < 0) {
            throw new SiddhiAppRuntimeException("Factorial not defined for negative numbers");
        }
        
        long result = 1;
        for (int i = 2; i <= n; i++) {
            result *= i;
        }
        return result;
    }
    
    @Override
    public Attribute.Type getReturnType() {
        return Attribute.Type.LONG;
    }
}

Custom Source Example

// Custom HTTP source
public class HttpSource implements Source {
    private SourceEventListener sourceEventListener;
    private String url;
    private int pollInterval;
    private HttpClient httpClient;
    private ScheduledExecutorService scheduler;
    
    @Override
    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder,
                     String[] requestedTransportPropertyNames, ConfigReader configReader,
                     SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.url = optionHolder.validateAndGetStaticValue("url");
        this.pollInterval = Integer.parseInt(optionHolder.validateAndGetStaticValue("poll.interval", "5000"));
        this.httpClient = HttpClient.newHttpClient();
        this.scheduler = Executors.newScheduledThreadPool(1);
    }
    
    @Override
    public void connect(ConnectionCallback connectionCallback, State state) {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                HttpRequest request = HttpRequest.newBuilder()
                        .uri(URI.create(url))
                        .build();
                
                HttpResponse<String> response = httpClient.send(request, 
                        HttpResponse.BodyHandlers.ofString());
                
                if (response.statusCode() == 200) {
                    // Parse response and send to Siddhi
                    Object[] eventData = parseResponse(response.body());
                    sourceEventListener.onEvent(eventData, null);
                }
            } catch (Exception e) {
                connectionCallback.onError(e);
            }
        }, 0, pollInterval, TimeUnit.MILLISECONDS);
        
        connectionCallback.onConnect();
    }
    
    @Override
    public void disconnect() {
        if (scheduler != null) {
            scheduler.shutdown();
        }
    }
    
    @Override
    public void destroy() {
        disconnect();
    }
    
    @Override
    public void pause() {
        // Implementation for pausing
    }
    
    @Override
    public void resume() {
        // Implementation for resuming
    }
    
    @Override
    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Map.class, Object[].class};
    }
    
    @Override
    public String[] getSupportedDynamicOptions() {
        return new String[]{"url"};
    }
    
    private Object[] parseResponse(String responseBody) {
        // Parse HTTP response into event data
        // Implementation depends on response format
        return new Object[]{responseBody, System.currentTimeMillis()};
    }
}

Custom Sink Example

// Custom database sink
public class DatabaseSink implements Sink {
    private DataSource dataSource;
    private String tableName;
    private String[] columnNames;
    
    @Override
    public void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
                     ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String dataSourceName = optionHolder.validateAndGetStaticValue("datasource");
        this.dataSource = siddhiAppContext.getSiddhiDataSource(dataSourceName);
        this.tableName = optionHolder.validateAndGetStaticValue("table.name");
        
        // Extract column names from stream definition
        List<Attribute> attributes = outputStreamDefinition.getAttributeList();
        this.columnNames = attributes.stream()
                .map(Attribute::getName)
                .toArray(String[]::new);
    }
    
    @Override
    public void connect() {
        // Verify database connection
        try (Connection conn = dataSource.getConnection()) {
            // Test connection
        } catch (SQLException e) {
            throw new ConnectionUnavailableException("Database connection failed", e);
        }
    }
    
    @Override
    public void publish(Object payload, DynamicOptions dynamicOptions, State state) {
        Object[] eventData = (Object[]) payload;
        
        StringBuilder sql = new StringBuilder("INSERT INTO ");
        sql.append(tableName).append(" (");
        sql.append(String.join(", ", columnNames));
        sql.append(") VALUES (");
        sql.append(String.join(", ", Collections.nCopies(columnNames.length, "?")));
        sql.append(")");
        
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(sql.toString())) {
            
            for (int i = 0; i < eventData.length; i++) {
                stmt.setObject(i + 1, eventData[i]);
            }
            
            stmt.executeUpdate();
        } catch (SQLException e) {
            throw new ConnectionUnavailableException("Database insert failed", e);
        }
    }
    
    @Override
    public void disconnect() {
        // Cleanup resources
    }
    
    @Override
    public void destroy() {
        disconnect();
    }
    
    @Override
    public Class[] getSupportedInputEventClasses() {
        return new Class[]{Object[].class, Map.class};
    }
    
    @Override
    public String[] getSupportedDynamicOptions() {
        return new String[]{"table.name"};
    }
}

Extension Mappers

SourceMapper

Interface for custom source data mapping to convert external data formats to Siddhi events.

public interface SourceMapper {
    void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
              Map<String, TemplateBuilder> payloadTemplateBuilderMap,
              ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
    
    void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,
                   Object eventObject, SourceEventListener sourceEventListener);
    
    Class[] getSupportedInputEventClasses();
}

SinkMapper

Interface for custom sink data mapping to convert Siddhi events to external data formats.

public interface SinkMapper {
    void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
              Map<String, TemplateBuilder> payloadTemplateBuilderMap,
              ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
    
    void mapAndSend(Event[] events, OptionHolder optionHolder,
                   Map<String, TemplateBuilder> payloadTemplateBuilderMap,
                   SinkListener sinkListener);
    
    Class[] getSupportedInputEventClasses();
}

Extension Holders

Extension holders manage different types of extensions and provide common functionality.

public abstract class AbstractExtensionHolder {
    // Common functionality for extension management
    protected Map<String, Class<?>> extensions;
    
    public void addExtension(String name, Class<?> extensionClass);
    public Class<?> getExtension(String name);
    public void removeExtension(String name);
}

public class FunctionExecutorExtensionHolder extends AbstractExtensionHolder {
    // Manages function executor extensions
}

public class SourceExtensionHolder extends AbstractExtensionHolder {
    // Manages source extensions
}

public class SinkExtensionHolder extends AbstractExtensionHolder {
    // Manages sink extensions
}

Types

public interface SourceEventListener {
    void onEvent(Object eventObject, Object[] transportProperties);
    void onEvent(Object eventObject, Object[] transportProperties, String[] transportSyncProperties);
}

public interface ConnectionCallback {
    void onConnect();
    void onError(Exception e);
}

public interface SinkListener {
    void publish(Object payload);
}

public interface OptionHolder {
    String validateAndGetStaticValue(String key);
    String validateAndGetStaticValue(String key, String defaultValue);
    String getOrCreateOption(String key, String defaultValue);
}

public interface ConfigReader {
    String readConfig(String key, String defaultValue);
    Map<String, String> getAllConfigs();
}

public interface DynamicOptions {
    String get(String key);
}

public interface State {
    boolean canDestroy();
    Map<String, Object> getState();
    void restoreState(Map<String, Object> state);
}

public interface AttributeMapping {
    String getName();
    String getMapping();
}

public interface TemplateBuilder {
    String build(Event event);
}

public interface ExpressionExecutor {
    void initExecutor(ExpressionExecutor[] attributeExpressionExecutors,
                     SiddhiAppContext siddhiAppContext, String queryName,
                     ConfigReader configReader);
    Object execute(ComplexEvent event);
    ExpressionExecutor cloneExecutor(String key);
    String getElementId();
    void clean();
    Attribute.Type getReturnType();
}

public interface Snapshotable {
    Map<String, Object> currentState();
    void restoreState(Map<String, Object> state);
}

public interface SinkHandler {
    // Handler for sink operations
}

public interface SinkMapper {
    void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
              Map<String, TemplateBuilder> payloadTemplateBuilderMap,
              ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
    void mapAndSend(Event[] events, OptionHolder optionHolder,
                   Map<String, TemplateBuilder> payloadTemplateBuilderMap,
                   SinkListener sinkListener);
    Class[] getSupportedInputEventClasses();
}

public interface SourceMapper {
    void init(StreamDefinition streamDefinition, OptionHolder optionHolder,
              Map<String, TemplateBuilder> payloadTemplateBuilderMap,
              ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext);
    void mapAndSend(Object[] transportProperties, List<AttributeMapping> transportMapping,
                   Object eventObject, SourceEventListener sourceEventListener);  
    Class[] getSupportedInputEventClasses();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-wso2-siddhi--siddhi-core

docs

aggregations.md

core-management.md

event-handling.md

exceptions.md

extensions.md

index.md

persistence.md

queries-and-callbacks.md

statistics.md

tile.json