Core interfaces and abstractions for building Apache Pulsar IO connectors
—
Sink interfaces define the contract for writing data from Pulsar to external systems.
The basic sink interface for writing data from Pulsar to external systems.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink<T> extends AutoCloseable {
/**
* Open connector with configuration.
*
* @param config initialization config
* @param sinkContext environment where the sink connector is running
* @throws Exception IO type exceptions when opening a connector
*/
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
/**
* Write a message to sink.
*
* @param record message to write to sink
* @throws Exception
*/
void write(Record<T> record) throws Exception;
/**
* Close the connector and clean up resources.
* @throws Exception
*/
void close() throws Exception;
}public class FileSink implements Sink<String> {
private PrintWriter writer;
private SinkContext context;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.context = sinkContext;
String filePath = (String) config.get("file.path");
this.writer = new PrintWriter(new FileWriter(filePath, true)); // Append mode
}
@Override
public void write(Record<String> record) throws Exception {
String value = record.getValue();
writer.println(value);
writer.flush(); // Ensure data is written immediately
}
@Override
public void close() throws Exception {
if (writer != null) {
writer.close();
}
}
}public class DatabaseSink implements Sink<Map<String, Object>> {
private Connection connection;
private PreparedStatement insertStatement;
private SinkContext context;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.context = sinkContext;
String jdbcUrl = (String) config.get("jdbc.url");
String tableName = (String) config.get("table.name");
this.connection = DriverManager.getConnection(jdbcUrl);
this.insertStatement = connection.prepareStatement(
"INSERT INTO " + tableName + " (id, name, value) VALUES (?, ?, ?)"
);
}
@Override
public void write(Record<Map<String, Object>> record) throws Exception {
Map<String, Object> data = record.getValue();
insertStatement.setObject(1, data.get("id"));
insertStatement.setObject(2, data.get("name"));
insertStatement.setObject(3, data.get("value"));
insertStatement.executeUpdate();
}
@Override
public void close() throws Exception {
if (insertStatement != null) {
insertStatement.close();
}
if (connection != null) {
connection.close();
}
}
}public class HttpSink implements Sink<String> {
private HttpClient httpClient;
private String endpoint;
private SinkContext context;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.context = sinkContext;
this.endpoint = (String) config.get("http.endpoint");
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build();
}
@Override
public void write(Record<String> record) throws Exception {
String payload = record.getValue();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(payload))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 400) {
throw new Exception("HTTP request failed with status: " + response.statusCode());
}
}
@Override
public void close() throws Exception {
// HttpClient doesn't need explicit closing in Java 11+
}
}// Required imports
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core