Core interfaces and abstractions for building Apache Pulsar IO connectors
—
Source interfaces define the contract for reading data from external systems and publishing to Pulsar topics.
The basic pull-based source interface for reading data from external sources.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source<T> extends AutoCloseable {
/**
* Open connector with configuration.
*
* @param config initialization config
* @param sourceContext environment where the source connector is running
* @throws Exception IO type exceptions when opening a connector
*/
void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
/**
* Reads the next message from source.
* If source does not have any new messages, this call should block.
* @return next message from source. The return result should never be null
* @throws Exception
*/
Record<T> read() throws Exception;
/**
* Close the connector and clean up resources.
* @throws Exception
*/
void close() throws Exception;
}public class FileSource implements Source<String> {
private BufferedReader reader;
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
String filePath = (String) config.get("file.path");
this.reader = new BufferedReader(new FileReader(filePath));
}
@Override
public Record<String> read() throws Exception {
String line = reader.readLine();
if (line != null) {
return new SimpleRecord<>(null, line);
}
// Block waiting for more data or return when file ends
Thread.sleep(1000);
return read(); // Retry
}
@Override
public void close() throws Exception {
if (reader != null) {
reader.close();
}
}
}Interface for batch-based sources that process data in batches with distinct lifecycle phases.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface BatchSource<T> extends AutoCloseable {
/**
* Open and initialize the source with configuration.
*
* @param config initialization config
* @param context environment where the source connector is running
* @throws Exception IO type exceptions when opening a connector
*/
void open(Map<String, Object> config, SourceContext context) throws Exception;
/**
* Discovery phase for finding available tasks/partitions to process.
*
* @param taskEater consumer that accepts discovered task identifiers
* @throws Exception
*/
void discover(Consumer<byte[]> taskEater) throws Exception;
/**
* Prepare to process a specific task identified during discovery.
*
* @param task task identifier from discovery phase
* @throws Exception
*/
void prepare(byte[] task) throws Exception;
/**
* Read next record from current task.
*
* @return next record or null when current task is complete
* @throws Exception
*/
Record<T> readNext() throws Exception;
/**
* Close the connector and clean up resources.
* @throws Exception
*/
void close() throws Exception;
}public class DatabaseBatchSource implements BatchSource<Map<String, Object>> {
private Connection connection;
private PreparedStatement currentQuery;
private ResultSet currentResults;
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext context) throws Exception {
this.context = context;
String jdbcUrl = (String) config.get("jdbc.url");
this.connection = DriverManager.getConnection(jdbcUrl);
}
@Override
public void discover(Consumer<byte[]> taskEater) throws Exception {
// Discover available tables or partitions
ResultSet tables = connection.getMetaData().getTables(null, null, "%", new String[]{"TABLE"});
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
taskEater.accept(tableName.getBytes());
}
}
@Override
public void prepare(byte[] task) throws Exception {
String tableName = new String(task);
currentQuery = connection.prepareStatement("SELECT * FROM " + tableName);
currentResults = currentQuery.executeQuery();
}
@Override
public Record<Map<String, Object>> readNext() throws Exception {
if (currentResults.next()) {
Map<String, Object> row = new HashMap<>();
ResultSetMetaData metadata = currentResults.getMetaData();
for (int i = 1; i <= metadata.getColumnCount(); i++) {
row.put(metadata.getColumnName(i), currentResults.getObject(i));
}
return new SimpleRecord<>(null, row);
}
return null; // Task complete
}
@Override
public void close() throws Exception {
if (currentResults != null) currentResults.close();
if (currentQuery != null) currentQuery.close();
if (connection != null) connection.close();
}
}Interface for triggering discovery in batch sources, allowing external systems to control when batch processing should begin.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface BatchSourceTriggerer {
/**
* Initialize the triggerer with configuration.
*
* @param config initialization config
* @param sourceContext environment where the source connector is running
* @throws Exception
*/
void init(Map<String, Object> config, SourceContext sourceContext) throws Exception;
/**
* Start triggering discovery with callback function.
*
* @param trigger callback function to invoke when discovery should be triggered
*/
void start(Consumer<String> trigger);
/**
* Stop triggering discovery.
*/
void stop();
}public class ScheduledBatchTriggerer implements BatchSourceTriggerer {
private ScheduledExecutorService scheduler;
private SourceContext context;
@Override
public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
this.scheduler = Executors.newScheduledThreadPool(1);
}
@Override
public void start(Consumer<String> trigger) {
// Trigger discovery every hour
scheduler.scheduleAtFixedRate(
() -> trigger.accept("scheduled-trigger"),
0, 1, TimeUnit.HOURS
);
}
@Override
public void stop() {
if (scheduler != null) {
scheduler.shutdown();
}
}
}// Required imports
import java.util.Map;
import java.util.function.Consumer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core