CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Pending
Overview
Eval results
Files

connector-annotations.mddocs/

Connector Annotations

Annotation-based metadata system for connector discovery, configuration, and documentation.

@Connector

Annotation for documenting connector metadata, providing essential information for Pulsar to discover and manage connectors.

package org.apache.pulsar.io.core.annotations;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@InterfaceAudience.Public
@InterfaceStability.Stable
public @interface Connector {
    /**
     * Name of the connector.
     *
     * @return connector name
     */
    String name();

    /**
     * Type of the connector (SOURCE or SINK).
     *
     * @return connector type
     */
    IOType type();

    /**
     * Description of what the connector does.
     *
     * @return connector help text
     */
    String help();

    /**
     * Configuration class that defines the connector's configuration schema.
     *
     * @return configuration class
     */
    Class configClass();
}

Usage Example

@Connector(
    name = "file-source",
    type = IOType.SOURCE,
    help = "Reads data from files and publishes to Pulsar topics",
    configClass = FileSourceConfig.class
)
public class FileSource implements Source<String> {
    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        // Implementation
    }

    @Override
    public Record<String> read() throws Exception {
        // Implementation
        return null;
    }

    @Override
    public void close() throws Exception {
        // Implementation
    }
}

// Configuration class referenced by @Connector
public class FileSourceConfig {
    @FieldDoc(
        required = true,
        help = "Path to the input file"
    )
    private String filePath;

    @FieldDoc(
        required = false,
        defaultValue = "1000",
        help = "Polling interval in milliseconds"
    )
    private int pollingIntervalMs = 1000;

    // Getters and setters...
}

@FieldDoc

Annotation for documenting configuration fields, providing metadata about field requirements, defaults, and descriptions.

package org.apache.pulsar.io.core.annotations;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@InterfaceAudience.Public
@InterfaceStability.Stable
public @interface FieldDoc {
    /**
     * Whether this field is required.
     *
     * @return true if field is required, false otherwise (default: false)
     */
    boolean required() default false;

    /**
     * Default value description for the field.
     *
     * @return default value description
     */
    String defaultValue();

    /**
     * Whether this field contains sensitive data.
     * Sensitive fields may be handled differently for security purposes.
     *
     * @return true if field is sensitive, false otherwise (default: false)
     */
    boolean sensitive() default false;

    /**
     * Help text describing what this field does.
     *
     * @return field description
     */
    String help();
}

Usage Example

public class DatabaseSinkConfig {
    @FieldDoc(
        required = true,
        help = "JDBC URL for database connection"
    )
    private String jdbcUrl;

    @FieldDoc(
        required = true,
        sensitive = true,
        help = "Database username"
    )
    private String username;

    @FieldDoc(
        required = true,
        sensitive = true,
        help = "Database password"
    )
    private String password;

    @FieldDoc(
        required = false,
        defaultValue = "data_table",
        help = "Name of the table to insert data into"
    )
    private String tableName = "data_table";

    @FieldDoc(
        required = false,
        defaultValue = "100",
        help = "Batch size for bulk inserts"
    )
    private int batchSize = 100;

    @FieldDoc(
        required = false,
        defaultValue = "30000",
        help = "Connection timeout in milliseconds"
    )
    private int connectionTimeoutMs = 30000;

    // Getters and setters...
    public String getJdbcUrl() { return jdbcUrl; }
    public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; }
    
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    
    public String getPassword() { return password; }
    public void setPassword(String password) { this.password = password; }
    
    public String getTableName() { return tableName; }
    public void setTableName(String tableName) { this.tableName = tableName; }
    
    public int getBatchSize() { return batchSize; }
    public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
    
    public int getConnectionTimeoutMs() { return connectionTimeoutMs; }
    public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; }
}

IOType

Enumeration of connector types used by the @Connector annotation.

package org.apache.pulsar.io.core.annotations;

@InterfaceAudience.Public
@InterfaceStability.Stable
public enum IOType {
    /**
     * Source connector type - reads data from external systems into Pulsar.
     */
    SOURCE,

    /**
     * Sink connector type - writes data from Pulsar to external systems.
     */
    SINK
}

Complete Sink Example with Annotations

@Connector(
    name = "elasticsearch-sink",
    type = IOType.SINK,
    help = "Writes data from Pulsar topics to Elasticsearch indices",
    configClass = ElasticsearchSinkConfig.class
)
public class ElasticsearchSink implements Sink<Map<String, Object>> {
    private ElasticsearchClient client;
    private ElasticsearchSinkConfig config;

    @Override
    public void open(Map<String, Object> configMap, SinkContext sinkContext) throws Exception {
        // Convert Map to strongly typed config object
        this.config = ConfigurationUtils.create(configMap, ElasticsearchSinkConfig.class, 
            sinkContext.getSinkConfig().getConfigs());
        
        // Initialize Elasticsearch client
        this.client = ElasticsearchClient.builder()
            .hosts(config.getElasticsearchUrl())
            .username(config.getUsername())
            .password(config.getPassword())
            .connectTimeout(config.getConnectionTimeoutMs())
            .build();
    }

    @Override
    public void write(Record<Map<String, Object>> record) throws Exception {
        Map<String, Object> document = record.getValue();
        String indexName = config.getIndexName();
        
        client.index(indexName, document);
    }

    @Override
    public void close() throws Exception {
        if (client != null) {
            client.close();
        }
    }
}

public class ElasticsearchSinkConfig {
    @FieldDoc(
        required = true,
        help = "Elasticsearch server URL (e.g., http://localhost:9200)"
    )
    private String elasticsearchUrl;

    @FieldDoc(
        required = false,
        defaultValue = "pulsar-data",
        help = "Name of the Elasticsearch index to write to"
    )
    private String indexName = "pulsar-data";

    @FieldDoc(
        required = false,
        sensitive = true,
        help = "Username for Elasticsearch authentication"
    )
    private String username;

    @FieldDoc(
        required = false,
        sensitive = true,
        help = "Password for Elasticsearch authentication"
    )
    private String password;

    @FieldDoc(
        required = false,
        defaultValue = "30000",
        help = "Connection timeout in milliseconds"
    )
    private int connectionTimeoutMs = 30000;

    @FieldDoc(
        required = false,
        defaultValue = "100",
        help = "Batch size for bulk operations"
    )
    private int batchSize = 100;

    // Getters and setters...
}

Connector Discovery Example

// Utility class for discovering connectors using annotations
public class ConnectorDiscovery {
    public static ConnectorMetadata getConnectorMetadata(Class<?> connectorClass) {
        Connector connectorAnnotation = connectorClass.getAnnotation(Connector.class);
        if (connectorAnnotation == null) {
            throw new IllegalArgumentException("Class is not annotated with @Connector");
        }

        ConnectorMetadata metadata = new ConnectorMetadata();
        metadata.setName(connectorAnnotation.name());
        metadata.setType(connectorAnnotation.type());
        metadata.setHelp(connectorAnnotation.help());
        metadata.setConfigClass(connectorAnnotation.configClass());
        
        // Discover configuration fields
        Field[] fields = connectorAnnotation.configClass().getDeclaredFields();
        for (Field field : fields) {
            FieldDoc fieldDoc = field.getAnnotation(FieldDoc.class);
            if (fieldDoc != null) {
                ConfigFieldMetadata fieldMetadata = new ConfigFieldMetadata();
                fieldMetadata.setName(field.getName());
                fieldMetadata.setType(field.getType());
                fieldMetadata.setRequired(fieldDoc.required());
                fieldMetadata.setDefaultValue(fieldDoc.defaultValue());
                fieldMetadata.setSensitive(fieldDoc.sensitive());
                fieldMetadata.setHelp(fieldDoc.help());
                
                metadata.addConfigField(fieldMetadata);
            }
        }
        
        return metadata;
    }
}

Types

// Required imports
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core

docs

connector-annotations.md

context-interfaces.md

index.md

push-sources.md

sink-interfaces.md

source-interfaces.md

utility-classes.md

tile.json