CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Pending
Overview
Eval results
Files

push-sources.mddocs/

Push Source Classes

Push-based source classes provide queue-based functionality for asynchronous data ingestion using consumer callback patterns.

AbstractPushSource<T>

Base abstract class providing queue-based push source functionality with internal buffering.

package org.apache.pulsar.io.core;

public abstract class AbstractPushSource<T> {
    /**
     * Default queue length for internal buffering.
     */
    static final int DEFAULT_QUEUE_LENGTH = 1000;

    /**
     * Constructor initializing internal queue with default capacity.
     */
    public AbstractPushSource();

    /**
     * Read next record from internal queue.
     * This method is used internally by push source implementations.
     *
     * @return next record from queue or null if queue is empty
     * @throws Exception
     */
    protected Record<T> readNext() throws Exception;

    /**
     * Add record to internal queue.
     * This method should be called by external systems to push data.
     *
     * @param record record to add to queue
     */
    public void consume(Record<T> record);

    /**
     * Get queue capacity.
     *
     * @return queue capacity (default 1000)
     */
    public int getQueueLength();

    /**
     * Notify of asynchronous errors.
     * This allows external systems to report errors that occurred during async operations.
     *
     * @param ex exception that occurred
     */
    public void notifyError(Exception ex);
}

PushSource<T>

Push-based source that uses a consumer callback pattern, extending AbstractPushSource and implementing the Source interface.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {
    /**
     * Reads the next message using push mechanism.
     * Overrides Source.read() to use internal queue-based mechanism.
     *
     * @return next message from source
     * @throws Exception
     */
    Record<T> read() throws Exception;
}

Usage Example

public class WebSocketPushSource extends PushSource<String> {
    private WebSocketClient client;
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        String wsUrl = (String) config.get("websocket.url");
        
        this.client = new WebSocketClient();
        this.client.onMessage(message -> {
            // Push received message to internal queue
            this.consume(new SimpleRecord<>(null, message));
        });
        
        this.client.onError(error -> {
            // Notify of async errors
            this.notifyError(error);
        });
        
        client.connect(wsUrl);
    }

    @Override
    public void close() throws Exception {
        if (client != null) {
            client.disconnect();
        }
    }
}

BatchPushSource<T>

Batch push source combining batch processing with push pattern, extending AbstractPushSource and implementing BatchSource.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class BatchPushSource<T> extends AbstractPushSource<T> implements BatchSource<T> {
    /**
     * Read next record using push mechanism.
     * Overrides BatchSource.readNext() to use internal queue-based mechanism.
     *
     * @return next record or null when current task is complete
     * @throws Exception
     */
    Record<T> readNext() throws Exception;
}

Usage Example

public class KafkaBatchPushSource extends BatchPushSource<byte[]> {
    private KafkaConsumer<String, byte[]> consumer;
    private SourceContext context;
    private String currentTopic;

    @Override
    public void open(Map<String, Object> config, SourceContext context) throws Exception {
        this.context = context;
        Properties props = new Properties();
        props.put("bootstrap.servers", config.get("kafka.brokers"));
        props.put("group.id", config.get("kafka.group.id"));
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        
        this.consumer = new KafkaConsumer<>(props);
    }

    @Override
    public void discover(Consumer<byte[]> taskEater) throws Exception {
        // Discover available Kafka topics
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
        for (String topic : topics.keySet()) {
            taskEater.accept(topic.getBytes());
        }
    }

    @Override
    public void prepare(byte[] task) throws Exception {
        this.currentTopic = new String(task);
        consumer.subscribe(Collections.singletonList(currentTopic));
        
        // Start background polling that pushes records to queue
        startBackgroundPolling();
    }

    private void startBackgroundPolling() {
        new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, byte[]> record : records) {
                        // Push each record to internal queue
                        this.consume(new SimpleRecord<>(record.key(), record.value()));
                    }
                }
            } catch (Exception e) {
                this.notifyError(e);
            }
        }).start();
    }

    @Override
    public void close() throws Exception {
        if (consumer != null) {
            consumer.close();
        }
    }
}

Event-Driven Push Source Example

public class EventDrivenPushSource extends PushSource<Map<String, Object>> {
    private EventBus eventBus;
    private SourceContext context;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        this.eventBus = new EventBus();
        
        // Register event handlers that push data to queue
        eventBus.register(new Object() {
            @Subscribe
            public void handleDataEvent(DataEvent event) {
                Map<String, Object> data = event.getData();
                consume(new SimpleRecord<>(event.getId(), data));
            }
            
            @Subscribe
            public void handleErrorEvent(ErrorEvent event) {
                notifyError(event.getException());
            }
        });
        
        // Start event processing
        eventBus.start();
    }

    @Override
    public void close() throws Exception {
        if (eventBus != null) {
            eventBus.stop();
        }
    }
}

Types

// Required imports
import java.util.Map;
import java.util.function.Consumer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core

docs

connector-annotations.md

context-interfaces.md

index.md

push-sources.md

sink-interfaces.md

source-interfaces.md

utility-classes.md

tile.json