Core interfaces and abstractions for building Apache Pulsar IO connectors
—
Push-based source classes provide queue-based functionality for asynchronous data ingestion using consumer callback patterns.
Base abstract class providing queue-based push source functionality with internal buffering.
package org.apache.pulsar.io.core;
public abstract class AbstractPushSource<T> {
/**
* Default queue length for internal buffering.
*/
static final int DEFAULT_QUEUE_LENGTH = 1000;
/**
* Constructor initializing internal queue with default capacity.
*/
public AbstractPushSource();
/**
* Read next record from internal queue.
* This method is used internally by push source implementations.
*
* @return next record from queue or null if queue is empty
* @throws Exception
*/
protected Record<T> readNext() throws Exception;
/**
* Add record to internal queue.
* This method should be called by external systems to push data.
*
* @param record record to add to queue
*/
public void consume(Record<T> record);
/**
* Get queue capacity.
*
* @return queue capacity (default 1000)
*/
public int getQueueLength();
/**
* Notify of asynchronous errors.
* This allows external systems to report errors that occurred during async operations.
*
* @param ex exception that occurred
*/
public void notifyError(Exception ex);
}Push-based source that uses a consumer callback pattern, extending AbstractPushSource and implementing the Source interface.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {
/**
* Reads the next message using push mechanism.
* Overrides Source.read() to use internal queue-based mechanism.
*
* @return next message from source
* @throws Exception
*/
Record<T> read() throws Exception;
}public class WebSocketPushSource extends PushSource<String> {
private WebSocketClient client;
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
String wsUrl = (String) config.get("websocket.url");
this.client = new WebSocketClient();
this.client.onMessage(message -> {
// Push received message to internal queue
this.consume(new SimpleRecord<>(null, message));
});
this.client.onError(error -> {
// Notify of async errors
this.notifyError(error);
});
client.connect(wsUrl);
}
@Override
public void close() throws Exception {
if (client != null) {
client.disconnect();
}
}
}Batch push source combining batch processing with push pattern, extending AbstractPushSource and implementing BatchSource.
package org.apache.pulsar.io.core;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class BatchPushSource<T> extends AbstractPushSource<T> implements BatchSource<T> {
/**
* Read next record using push mechanism.
* Overrides BatchSource.readNext() to use internal queue-based mechanism.
*
* @return next record or null when current task is complete
* @throws Exception
*/
Record<T> readNext() throws Exception;
}public class KafkaBatchPushSource extends BatchPushSource<byte[]> {
private KafkaConsumer<String, byte[]> consumer;
private SourceContext context;
private String currentTopic;
@Override
public void open(Map<String, Object> config, SourceContext context) throws Exception {
this.context = context;
Properties props = new Properties();
props.put("bootstrap.servers", config.get("kafka.brokers"));
props.put("group.id", config.get("kafka.group.id"));
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void discover(Consumer<byte[]> taskEater) throws Exception {
// Discover available Kafka topics
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
for (String topic : topics.keySet()) {
taskEater.accept(topic.getBytes());
}
}
@Override
public void prepare(byte[] task) throws Exception {
this.currentTopic = new String(task);
consumer.subscribe(Collections.singletonList(currentTopic));
// Start background polling that pushes records to queue
startBackgroundPolling();
}
private void startBackgroundPolling() {
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, byte[]> record : records) {
// Push each record to internal queue
this.consume(new SimpleRecord<>(record.key(), record.value()));
}
}
} catch (Exception e) {
this.notifyError(e);
}
}).start();
}
@Override
public void close() throws Exception {
if (consumer != null) {
consumer.close();
}
}
}public class EventDrivenPushSource extends PushSource<Map<String, Object>> {
private EventBus eventBus;
private SourceContext context;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.context = sourceContext;
this.eventBus = new EventBus();
// Register event handlers that push data to queue
eventBus.register(new Object() {
@Subscribe
public void handleDataEvent(DataEvent event) {
Map<String, Object> data = event.getData();
consume(new SimpleRecord<>(event.getId(), data));
}
@Subscribe
public void handleErrorEvent(ErrorEvent event) {
notifyError(event.getException());
}
});
// Start event processing
eventBus.start();
}
@Override
public void close() throws Exception {
if (eventBus != null) {
eventBus.stop();
}
}
}// Required imports
import java.util.Map;
import java.util.function.Consumer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core