Core interfaces and abstractions for building Apache Pulsar IO connectors
npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-core@4.0.0Apache Pulsar IO Core provides the foundational interfaces and abstractions for building Pulsar IO connectors that enable data integration between Pulsar and external systems. It includes core interfaces for data ingestion (Sources) and data egress (Sinks), batch processing capabilities, and metadata annotations for connector discovery and configuration.
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-io-core</artifactId><version>4.0.6</version></dependency>import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.apache.pulsar.io.core.annotations.IOType;import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.functions.api.Record;
import java.util.Map;
public class MySource implements Source<String> {
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
// Initialize your source connector with configuration
}
@Override
public Record<String> read() throws Exception {
// Read and return the next message from your external system
// This method should block if no data is available
return null; // Return actual Record<String> object
}
@Override
public void close() throws Exception {
// Clean up resources
}
}import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.functions.api.Record;
import java.util.Map;
public class MySink implements Sink<String> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
// Initialize your sink connector with configuration
}
@Override
public void write(Record<String> record) throws Exception {
// Write the record to your external system
}
@Override
public void close() throws Exception {
// Clean up resources
}
}Pulsar IO Core follows a clean separation of concerns:
Source, Sink, BatchSource) define the contract for data movementSourceContext, SinkContext) provides access to Pulsar capabilitiesSource) and push-based (PushSource) patternsAutoCloseablePrimary interfaces for reading data from external systems and publishing to Pulsar topics.
// Basic pull-based source interface
public interface Source<T> extends AutoCloseable {
void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
Record<T> read() throws Exception;
}
// Batch processing source interface
public interface BatchSource<T> extends AutoCloseable {
void open(Map<String, Object> config, SourceContext context) throws Exception;
void discover(Consumer<byte[]> taskEater) throws Exception;
void prepare(byte[] task) throws Exception;
Record<T> readNext() throws Exception;
}Primary interfaces for writing data from Pulsar to external systems.
// Basic sink interface
public interface Sink<T> extends AutoCloseable {
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
void write(Record<T> record) throws Exception;
}Abstract classes providing queue-based push source functionality for asynchronous data ingestion.
// Push-based source using consumer callback pattern
public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {
// Inherits push mechanism functionality
}
// Base class for push sources with internal queue
public abstract class AbstractPushSource<T> {
static final int DEFAULT_QUEUE_LENGTH = 1000;
public AbstractPushSource();
public void consume(Record<T> record);
public void notifyError(Exception ex);
public int getQueueLength();
protected Record<T> readNext() throws Exception;
}Context interfaces providing connector runtime environment and Pulsar platform capabilities.
// Source runtime context
public interface SourceContext extends BaseContext {
String getSourceName();
String getOutputTopic();
SourceConfig getSourceConfig();
<T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;
<T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;
// BaseContext methods for tenant/namespace info, logging, state, counters, metrics
String getTenant();
String getNamespace();
Logger getLogger();
void putState(String key, ByteBuffer value);
ByteBuffer getState(String key);
void incrCounter(String key, long amount);
long getCounter(String key);
void recordMetric(String metricName, double value);
}
// Sink runtime context
public interface SinkContext extends BaseContext {
String getSinkName();
Collection<String> getInputTopics();
SinkConfig getSinkConfig();
default SubscriptionType getSubscriptionType();
default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException;
default void pause(String topic, int partition) throws PulsarClientException;
default void resume(String topic, int partition) throws PulsarClientException;
// BaseContext methods for tenant/namespace info, logging, state, counters, metrics
String getTenant();
String getNamespace();
Logger getLogger();
void putState(String key, ByteBuffer value);
ByteBuffer getState(String key);
void incrCounter(String key, long amount);
long getCounter(String key);
void recordMetric(String metricName, double value);
}Annotation-based metadata system for connector discovery, configuration, and documentation.
// Connector metadata annotation
@Target(TYPE)
@Retention(RUNTIME)
public @interface Connector {
String name();
IOType type();
String help();
Class configClass();
}
// Configuration field documentation
@Target(FIELD)
@Retention(RUNTIME)
public @interface FieldDoc {
boolean required() default false;
String defaultValue();
boolean sensitive() default false;
String help();
}Helper classes for common data structures and operations.
// Generic key-value pair container
public class KeyValue<K, V> {
public KeyValue(K key, V value);
K getKey();
V getValue();
void setKey(K key);
void setValue(V value);
}