Core interfaces and abstractions for building Apache Pulsar IO connectors
—
Annotation-based metadata system for connector discovery, configuration, and documentation.
Annotation for documenting connector metadata, providing essential information for Pulsar to discover and manage connectors.
package org.apache.pulsar.io.core.annotations;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@InterfaceAudience.Public
@InterfaceStability.Stable
public @interface Connector {
/**
* Name of the connector.
*
* @return connector name
*/
String name();
/**
* Type of the connector (SOURCE or SINK).
*
* @return connector type
*/
IOType type();
/**
* Description of what the connector does.
*
* @return connector help text
*/
String help();
/**
* Configuration class that defines the connector's configuration schema.
*
* @return configuration class
*/
Class configClass();
}@Connector(
name = "file-source",
type = IOType.SOURCE,
help = "Reads data from files and publishes to Pulsar topics",
configClass = FileSourceConfig.class
)
public class FileSource implements Source<String> {
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
// Implementation
}
@Override
public Record<String> read() throws Exception {
// Implementation
return null;
}
@Override
public void close() throws Exception {
// Implementation
}
}
// Configuration class referenced by @Connector
public class FileSourceConfig {
@FieldDoc(
required = true,
help = "Path to the input file"
)
private String filePath;
@FieldDoc(
required = false,
defaultValue = "1000",
help = "Polling interval in milliseconds"
)
private int pollingIntervalMs = 1000;
// Getters and setters...
}Annotation for documenting configuration fields, providing metadata about field requirements, defaults, and descriptions.
package org.apache.pulsar.io.core.annotations;
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@InterfaceAudience.Public
@InterfaceStability.Stable
public @interface FieldDoc {
/**
* Whether this field is required.
*
* @return true if field is required, false otherwise (default: false)
*/
boolean required() default false;
/**
* Default value description for the field.
*
* @return default value description
*/
String defaultValue();
/**
* Whether this field contains sensitive data.
* Sensitive fields may be handled differently for security purposes.
*
* @return true if field is sensitive, false otherwise (default: false)
*/
boolean sensitive() default false;
/**
* Help text describing what this field does.
*
* @return field description
*/
String help();
}public class DatabaseSinkConfig {
@FieldDoc(
required = true,
help = "JDBC URL for database connection"
)
private String jdbcUrl;
@FieldDoc(
required = true,
sensitive = true,
help = "Database username"
)
private String username;
@FieldDoc(
required = true,
sensitive = true,
help = "Database password"
)
private String password;
@FieldDoc(
required = false,
defaultValue = "data_table",
help = "Name of the table to insert data into"
)
private String tableName = "data_table";
@FieldDoc(
required = false,
defaultValue = "100",
help = "Batch size for bulk inserts"
)
private int batchSize = 100;
@FieldDoc(
required = false,
defaultValue = "30000",
help = "Connection timeout in milliseconds"
)
private int connectionTimeoutMs = 30000;
// Getters and setters...
public String getJdbcUrl() { return jdbcUrl; }
public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public String getTableName() { return tableName; }
public void setTableName(String tableName) { this.tableName = tableName; }
public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
public int getConnectionTimeoutMs() { return connectionTimeoutMs; }
public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; }
}Enumeration of connector types used by the @Connector annotation.
package org.apache.pulsar.io.core.annotations;
@InterfaceAudience.Public
@InterfaceStability.Stable
public enum IOType {
/**
* Source connector type - reads data from external systems into Pulsar.
*/
SOURCE,
/**
* Sink connector type - writes data from Pulsar to external systems.
*/
SINK
}@Connector(
name = "elasticsearch-sink",
type = IOType.SINK,
help = "Writes data from Pulsar topics to Elasticsearch indices",
configClass = ElasticsearchSinkConfig.class
)
public class ElasticsearchSink implements Sink<Map<String, Object>> {
private ElasticsearchClient client;
private ElasticsearchSinkConfig config;
@Override
public void open(Map<String, Object> configMap, SinkContext sinkContext) throws Exception {
// Convert Map to strongly typed config object
this.config = ConfigurationUtils.create(configMap, ElasticsearchSinkConfig.class,
sinkContext.getSinkConfig().getConfigs());
// Initialize Elasticsearch client
this.client = ElasticsearchClient.builder()
.hosts(config.getElasticsearchUrl())
.username(config.getUsername())
.password(config.getPassword())
.connectTimeout(config.getConnectionTimeoutMs())
.build();
}
@Override
public void write(Record<Map<String, Object>> record) throws Exception {
Map<String, Object> document = record.getValue();
String indexName = config.getIndexName();
client.index(indexName, document);
}
@Override
public void close() throws Exception {
if (client != null) {
client.close();
}
}
}
public class ElasticsearchSinkConfig {
@FieldDoc(
required = true,
help = "Elasticsearch server URL (e.g., http://localhost:9200)"
)
private String elasticsearchUrl;
@FieldDoc(
required = false,
defaultValue = "pulsar-data",
help = "Name of the Elasticsearch index to write to"
)
private String indexName = "pulsar-data";
@FieldDoc(
required = false,
sensitive = true,
help = "Username for Elasticsearch authentication"
)
private String username;
@FieldDoc(
required = false,
sensitive = true,
help = "Password for Elasticsearch authentication"
)
private String password;
@FieldDoc(
required = false,
defaultValue = "30000",
help = "Connection timeout in milliseconds"
)
private int connectionTimeoutMs = 30000;
@FieldDoc(
required = false,
defaultValue = "100",
help = "Batch size for bulk operations"
)
private int batchSize = 100;
// Getters and setters...
}// Utility class for discovering connectors using annotations
public class ConnectorDiscovery {
public static ConnectorMetadata getConnectorMetadata(Class<?> connectorClass) {
Connector connectorAnnotation = connectorClass.getAnnotation(Connector.class);
if (connectorAnnotation == null) {
throw new IllegalArgumentException("Class is not annotated with @Connector");
}
ConnectorMetadata metadata = new ConnectorMetadata();
metadata.setName(connectorAnnotation.name());
metadata.setType(connectorAnnotation.type());
metadata.setHelp(connectorAnnotation.help());
metadata.setConfigClass(connectorAnnotation.configClass());
// Discover configuration fields
Field[] fields = connectorAnnotation.configClass().getDeclaredFields();
for (Field field : fields) {
FieldDoc fieldDoc = field.getAnnotation(FieldDoc.class);
if (fieldDoc != null) {
ConfigFieldMetadata fieldMetadata = new ConfigFieldMetadata();
fieldMetadata.setName(field.getName());
fieldMetadata.setType(field.getType());
fieldMetadata.setRequired(fieldDoc.required());
fieldMetadata.setDefaultValue(fieldDoc.defaultValue());
fieldMetadata.setSensitive(fieldDoc.sensitive());
fieldMetadata.setHelp(fieldDoc.help());
metadata.addConfigField(fieldMetadata);
}
}
return metadata;
}
}// Required imports
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core