CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Pending
Overview
Eval results
Files

sink-interfaces.mddocs/

Sink Interfaces

Sink interfaces define the contract for writing data from Pulsar to external systems.

Sink<T>

The basic sink interface for writing data from Pulsar to external systems.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink<T> extends AutoCloseable {
    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sinkContext environment where the sink connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;

    /**
     * Write a message to sink.
     *
     * @param record message to write to sink
     * @throws Exception
     */
    void write(Record<T> record) throws Exception;

    /**
     * Close the connector and clean up resources.
     * @throws Exception
     */
    void close() throws Exception;
}

Usage Example

public class FileSink implements Sink<String> {
    private PrintWriter writer;
    private SinkContext context;

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.context = sinkContext;
        String filePath = (String) config.get("file.path");
        this.writer = new PrintWriter(new FileWriter(filePath, true)); // Append mode
    }

    @Override
    public void write(Record<String> record) throws Exception {
        String value = record.getValue();
        writer.println(value);
        writer.flush(); // Ensure data is written immediately
    }

    @Override
    public void close() throws Exception {
        if (writer != null) {
            writer.close();
        }
    }
}

Database Sink Example

public class DatabaseSink implements Sink<Map<String, Object>> {
    private Connection connection;
    private PreparedStatement insertStatement;
    private SinkContext context;

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.context = sinkContext;
        String jdbcUrl = (String) config.get("jdbc.url");
        String tableName = (String) config.get("table.name");
        
        this.connection = DriverManager.getConnection(jdbcUrl);
        this.insertStatement = connection.prepareStatement(
            "INSERT INTO " + tableName + " (id, name, value) VALUES (?, ?, ?)"
        );
    }

    @Override
    public void write(Record<Map<String, Object>> record) throws Exception {
        Map<String, Object> data = record.getValue();
        
        insertStatement.setObject(1, data.get("id"));
        insertStatement.setObject(2, data.get("name"));
        insertStatement.setObject(3, data.get("value"));
        
        insertStatement.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        if (insertStatement != null) {
            insertStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

HTTP Sink Example

public class HttpSink implements Sink<String> {
    private HttpClient httpClient;
    private String endpoint;
    private SinkContext context;

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.context = sinkContext;
        this.endpoint = (String) config.get("http.endpoint");
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(30))
            .build();
    }

    @Override
    public void write(Record<String> record) throws Exception {
        String payload = record.getValue();
        
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(endpoint))
            .header("Content-Type", "application/json")
            .POST(HttpRequest.BodyPublishers.ofString(payload))
            .build();

        HttpResponse<String> response = httpClient.send(request, 
            HttpResponse.BodyHandlers.ofString());
            
        if (response.statusCode() >= 400) {
            throw new Exception("HTTP request failed with status: " + response.statusCode());
        }
    }

    @Override
    public void close() throws Exception {
        // HttpClient doesn't need explicit closing in Java 11+
    }
}

Types

// Required imports
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core

docs

connector-annotations.md

context-interfaces.md

index.md

push-sources.md

sink-interfaces.md

source-interfaces.md

utility-classes.md

tile.json