Core interfaces and abstractions for building Apache Pulsar IO connectors
—
Context interfaces provide connector runtime environment and access to Pulsar platform capabilities.
Context interface providing source runtime environment and capabilities for publishing data to Pulsar topics.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface SourceContext extends BaseContext {
/**
* Get the name of the source.
*
* @return source name
*/
String getSourceName();
/**
* Get the output topic name where the source publishes messages.
*
* @return output topic name
*/
String getOutputTopic();
/**
* Get the source configuration.
*
* @return source configuration object
*/
SourceConfig getSourceConfig();
/**
* Create a new output message builder for publishing to a specific topic.
*
* @param topicName name of the topic to publish to
* @param schema schema for message serialization
* @return typed message builder for constructing messages
* @throws PulsarClientException if unable to create message builder
*/
<T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;
/**
* Create a new consumer builder for reading from topics.
* This is useful for sources that need to consume from other Pulsar topics.
*
* @param schema schema for message deserialization
* @return consumer builder for creating consumers
* @throws PulsarClientException if unable to create consumer builder
*/
<T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;
// BaseContext inherited methods
String getTenant();
String getNamespace();
int getInstanceId();
int getNumInstances();
Logger getLogger();
String getSecret(String secretName);
default <X extends StateStore> X getStateStore(String name);
default <X extends StateStore> X getStateStore(String tenant, String ns, String name);
void putState(String key, ByteBuffer value);
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
ByteBuffer getState(String key);
CompletableFuture<ByteBuffer> getStateAsync(String key);
void deleteState(String key);
CompletableFuture<Void> deleteStateAsync(String key);
void incrCounter(String key, long amount);
CompletableFuture<Void> incrCounterAsync(String key, long amount);
long getCounter(String key);
CompletableFuture<Long> getCounterAsync(String key);
void recordMetric(String metricName, double value);
default PulsarClient getPulsarClient();
default ClientBuilder getPulsarClientBuilder();
void fatal(Throwable t);
}public class DatabaseSource implements Source<Map<String, Object>> {
private SourceContext context;
private Connection connection;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
// Access source configuration
String sourceName = context.getSourceName();
String outputTopic = context.getOutputTopic();
// Get source-specific config
SourceConfig sourceConfig = context.getSourceConfig();
// Initialize database connection
String jdbcUrl = (String) config.get("jdbc.url");
this.connection = DriverManager.getConnection(jdbcUrl);
}
@Override
public Record<Map<String, Object>> read() throws Exception {
// Read data from database
Map<String, Object> data = readFromDatabase();
// Create output message with specific schema
TypedMessageBuilder<Map<String, Object>> messageBuilder =
context.newOutputMessage(context.getOutputTopic(), Schema.JSON(Map.class));
messageBuilder.value(data);
messageBuilder.property("source", context.getSourceName());
// Send message and return record
MessageId messageId = messageBuilder.send();
return new SimpleRecord<>(messageId.toString(), data);
}
}Context interface providing sink runtime environment and capabilities for consuming data from Pulsar topics.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface SinkContext extends BaseContext {
/**
* Get the name of the sink.
*
* @return sink name
*/
String getSinkName();
/**
* Get the input topics that the sink consumes from.
*
* @return collection of input topic names
*/
Collection<String> getInputTopics();
/**
* Get the sink configuration.
*
* @return sink configuration object
*/
SinkConfig getSinkConfig();
/**
* Get the subscription type used by the sink.
* Default implementation throws UnsupportedOperationException.
*
* @return subscription type
* @throws UnsupportedOperationException if not supported
*/
default SubscriptionType getSubscriptionType() {
throw new UnsupportedOperationException("getSubscriptionType not implemented");
}
/**
* Reset subscription position to a specific message ID.
*
* @param topic topic name
* @param partition partition number
* @param messageId message ID to seek to
* @throws PulsarClientException if seek operation fails
*/
default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
throw new UnsupportedOperationException("seek not implemented");
}
/**
* Pause message consumption from a specific topic partition.
*
* @param topic topic name
* @param partition partition number
* @throws PulsarClientException if pause operation fails
*/
default void pause(String topic, int partition) throws PulsarClientException {
throw new UnsupportedOperationException("pause not implemented");
}
/**
* Resume message consumption from a specific topic partition.
*
* @param topic topic name
* @param partition partition number
* @throws PulsarClientException if resume operation fails
*/
default void resume(String topic, int partition) throws PulsarClientException {
throw new UnsupportedOperationException("resume not implemented");
}
// BaseContext inherited methods
String getTenant();
String getNamespace();
int getInstanceId();
int getNumInstances();
Logger getLogger();
String getSecret(String secretName);
default <X extends StateStore> X getStateStore(String name);
default <X extends StateStore> X getStateStore(String tenant, String ns, String name);
void putState(String key, ByteBuffer value);
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
ByteBuffer getState(String key);
CompletableFuture<ByteBuffer> getStateAsync(String key);
void deleteState(String key);
CompletableFuture<Void> deleteStateAsync(String key);
void incrCounter(String key, long amount);
CompletableFuture<Void> incrCounterAsync(String key, long amount);
long getCounter(String key);
CompletableFuture<Long> getCounterAsync(String key);
void recordMetric(String metricName, double value);
default PulsarClient getPulsarClient();
default ClientBuilder getPulsarClientBuilder();
void fatal(Throwable t);
}public class ElasticsearchSink implements Sink<Map<String, Object>> {
private SinkContext context;
private ElasticsearchClient client;
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.context = sinkContext;
// Access sink configuration
String sinkName = context.getSinkName();
Collection<String> inputTopics = context.getInputTopics();
SinkConfig sinkConfig = context.getSinkConfig();
// Log subscription type if available
try {
SubscriptionType subType = context.getSubscriptionType();
System.out.println("Using subscription type: " + subType);
} catch (UnsupportedOperationException e) {
System.out.println("Subscription type not available");
}
// Initialize Elasticsearch client
String esUrl = (String) config.get("elasticsearch.url");
this.client = new ElasticsearchClient(esUrl);
}
@Override
public void write(Record<Map<String, Object>> record) throws Exception {
Map<String, Object> document = record.getValue();
String indexName = determineIndex(record);
// Index document in Elasticsearch
client.index(indexName, document);
// Optionally seek or pause/resume based on processing results
if (shouldPauseProcessing(document)) {
String topic = record.getTopicName().orElse("unknown");
context.pause(topic, 0); // Pause partition 0
}
}
private void handleProcessingError(Record<Map<String, Object>> record, Exception error) {
// Example: seek back to retry failed message
try {
String topic = record.getTopicName().orElse("unknown");
MessageId messageId = MessageId.fromByteArray(record.getKey().toString().getBytes());
context.seek(topic, 0, messageId);
} catch (Exception e) {
System.err.println("Failed to seek: " + e.getMessage());
}
}
}public class RateLimitedSink implements Sink<String> {
private SinkContext context;
private RateLimiter rateLimiter;
private Map<String, Boolean> topicPausedState = new ConcurrentHashMap<>();
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.context = sinkContext;
double maxRate = (Double) config.get("max.rate.per.second");
this.rateLimiter = RateLimiter.create(maxRate);
}
@Override
public void write(Record<String> record) throws Exception {
// Acquire rate limit permit
if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
// Rate limit exceeded, pause all input topics
pauseAllTopics();
// Wait for permit
rateLimiter.acquire();
// Resume topics after rate limit allows
resumeAllTopics();
}
// Process the record
processRecord(record);
}
private void pauseAllTopics() {
for (String topic : context.getInputTopics()) {
try {
context.pause(topic, 0);
topicPausedState.put(topic, true);
} catch (Exception e) {
System.err.println("Failed to pause topic " + topic + ": " + e.getMessage());
}
}
}
private void resumeAllTopics() {
for (String topic : topicPausedState.keySet()) {
try {
context.resume(topic, 0);
topicPausedState.remove(topic);
} catch (Exception e) {
System.err.println("Failed to resume topic " + topic + ": " + e.getMessage());
}
}
}
}// Required imports
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.BaseContext;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.io.core.SinkConfig;
import org.apache.pulsar.io.core.SourceConfig;
import org.slf4j.Logger;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core