CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Pending
Overview
Eval results
Files

source-interfaces.mddocs/

Source Interfaces

Source interfaces define the contract for reading data from external systems and publishing to Pulsar topics.

Source<T>

The basic pull-based source interface for reading data from external sources.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source<T> extends AutoCloseable {
    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sourceContext environment where the source connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;

    /**
     * Reads the next message from source.
     * If source does not have any new messages, this call should block.
     * @return next message from source. The return result should never be null
     * @throws Exception
     */
    Record<T> read() throws Exception;

    /**
     * Close the connector and clean up resources.
     * @throws Exception
     */
    void close() throws Exception;
}

Usage Example

public class FileSource implements Source<String> {
    private BufferedReader reader;
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        String filePath = (String) config.get("file.path");
        this.reader = new BufferedReader(new FileReader(filePath));
    }

    @Override
    public Record<String> read() throws Exception {
        String line = reader.readLine();
        if (line != null) {
            return new SimpleRecord<>(null, line);
        }
        // Block waiting for more data or return when file ends
        Thread.sleep(1000);
        return read(); // Retry
    }

    @Override
    public void close() throws Exception {
        if (reader != null) {
            reader.close();
        }
    }
}

BatchSource<T>

Interface for batch-based sources that process data in batches with distinct lifecycle phases.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface BatchSource<T> extends AutoCloseable {
    /**
     * Open and initialize the source with configuration.
     *
     * @param config initialization config
     * @param context environment where the source connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(Map<String, Object> config, SourceContext context) throws Exception;

    /**
     * Discovery phase for finding available tasks/partitions to process.
     *
     * @param taskEater consumer that accepts discovered task identifiers
     * @throws Exception
     */
    void discover(Consumer<byte[]> taskEater) throws Exception;

    /**
     * Prepare to process a specific task identified during discovery.
     *
     * @param task task identifier from discovery phase
     * @throws Exception
     */
    void prepare(byte[] task) throws Exception;

    /**
     * Read next record from current task.
     *
     * @return next record or null when current task is complete
     * @throws Exception
     */
    Record<T> readNext() throws Exception;

    /**
     * Close the connector and clean up resources.
     * @throws Exception
     */
    void close() throws Exception;
}

Usage Example

public class DatabaseBatchSource implements BatchSource<Map<String, Object>> {
    private Connection connection;
    private PreparedStatement currentQuery;
    private ResultSet currentResults;
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext context) throws Exception {
        this.context = context;
        String jdbcUrl = (String) config.get("jdbc.url");
        this.connection = DriverManager.getConnection(jdbcUrl);
    }

    @Override
    public void discover(Consumer<byte[]> taskEater) throws Exception {
        // Discover available tables or partitions
        ResultSet tables = connection.getMetaData().getTables(null, null, "%", new String[]{"TABLE"});
        while (tables.next()) {
            String tableName = tables.getString("TABLE_NAME");
            taskEater.accept(tableName.getBytes());
        }
    }

    @Override
    public void prepare(byte[] task) throws Exception {
        String tableName = new String(task);
        currentQuery = connection.prepareStatement("SELECT * FROM " + tableName);
        currentResults = currentQuery.executeQuery();
    }

    @Override
    public Record<Map<String, Object>> readNext() throws Exception {
        if (currentResults.next()) {
            Map<String, Object> row = new HashMap<>();
            ResultSetMetaData metadata = currentResults.getMetaData();
            for (int i = 1; i <= metadata.getColumnCount(); i++) {
                row.put(metadata.getColumnName(i), currentResults.getObject(i));
            }
            return new SimpleRecord<>(null, row);
        }
        return null; // Task complete
    }

    @Override
    public void close() throws Exception {
        if (currentResults != null) currentResults.close();
        if (currentQuery != null) currentQuery.close();
        if (connection != null) connection.close();
    }
}

BatchSourceTriggerer

Interface for triggering discovery in batch sources, allowing external systems to control when batch processing should begin.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface BatchSourceTriggerer {
    /**
     * Initialize the triggerer with configuration.
     *
     * @param config initialization config
     * @param sourceContext environment where the source connector is running
     * @throws Exception
     */
    void init(Map<String, Object> config, SourceContext sourceContext) throws Exception;

    /**
     * Start triggering discovery with callback function.
     *
     * @param trigger callback function to invoke when discovery should be triggered
     */
    void start(Consumer<String> trigger);

    /**
     * Stop triggering discovery.
     */
    void stop();
}

Usage Example

public class ScheduledBatchTriggerer implements BatchSourceTriggerer {
    private ScheduledExecutorService scheduler;
    private SourceContext context;

    @Override
    public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        this.scheduler = Executors.newScheduledThreadPool(1);
    }

    @Override
    public void start(Consumer<String> trigger) {
        // Trigger discovery every hour
        scheduler.scheduleAtFixedRate(
            () -> trigger.accept("scheduled-trigger"),
            0, 1, TimeUnit.HOURS
        );
    }

    @Override
    public void stop() {
        if (scheduler != null) {
            scheduler.shutdown();
        }
    }
}

Types

// Required imports
import java.util.Map;
import java.util.function.Consumer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core

docs

connector-annotations.md

context-interfaces.md

index.md

push-sources.md

sink-interfaces.md

source-interfaces.md

utility-classes.md

tile.json