CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Pending
Overview
Eval results
Files

context-interfaces.mddocs/

Context Interfaces

Context interfaces provide connector runtime environment and access to Pulsar platform capabilities.

SourceContext

Context interface providing source runtime environment and capabilities for publishing data to Pulsar topics.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface SourceContext extends BaseContext {
    /**
     * Get the name of the source.
     *
     * @return source name
     */
    String getSourceName();

    /**
     * Get the output topic name where the source publishes messages.
     *
     * @return output topic name
     */
    String getOutputTopic();

    /**
     * Get the source configuration.
     *
     * @return source configuration object
     */
    SourceConfig getSourceConfig();

    /**
     * Create a new output message builder for publishing to a specific topic.
     *
     * @param topicName name of the topic to publish to
     * @param schema schema for message serialization
     * @return typed message builder for constructing messages
     * @throws PulsarClientException if unable to create message builder
     */
    <T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;

    /**
     * Create a new consumer builder for reading from topics.
     * This is useful for sources that need to consume from other Pulsar topics.
     *
     * @param schema schema for message deserialization
     * @return consumer builder for creating consumers
     * @throws PulsarClientException if unable to create consumer builder
     */
    <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;

    // BaseContext inherited methods
    String getTenant();
    String getNamespace();
    int getInstanceId();
    int getNumInstances();
    Logger getLogger();
    String getSecret(String secretName);
    default <X extends StateStore> X getStateStore(String name);
    default <X extends StateStore> X getStateStore(String tenant, String ns, String name);
    void putState(String key, ByteBuffer value);
    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
    ByteBuffer getState(String key);
    CompletableFuture<ByteBuffer> getStateAsync(String key);
    void deleteState(String key);
    CompletableFuture<Void> deleteStateAsync(String key);
    void incrCounter(String key, long amount);
    CompletableFuture<Void> incrCounterAsync(String key, long amount);
    long getCounter(String key);
    CompletableFuture<Long> getCounterAsync(String key);
    void recordMetric(String metricName, double value);
    default PulsarClient getPulsarClient();
    default ClientBuilder getPulsarClientBuilder();
    void fatal(Throwable t);
}

Usage Example

public class DatabaseSource implements Source<Map<String, Object>> {
    private SourceContext context;
    private Connection connection;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.context = sourceContext;
        
        // Access source configuration
        String sourceName = context.getSourceName();
        String outputTopic = context.getOutputTopic();
        
        // Get source-specific config
        SourceConfig sourceConfig = context.getSourceConfig();
        
        // Initialize database connection
        String jdbcUrl = (String) config.get("jdbc.url");
        this.connection = DriverManager.getConnection(jdbcUrl);
    }

    @Override
    public Record<Map<String, Object>> read() throws Exception {
        // Read data from database
        Map<String, Object> data = readFromDatabase();
        
        // Create output message with specific schema
        TypedMessageBuilder<Map<String, Object>> messageBuilder = 
            context.newOutputMessage(context.getOutputTopic(), Schema.JSON(Map.class));
            
        messageBuilder.value(data);
        messageBuilder.property("source", context.getSourceName());
        
        // Send message and return record
        MessageId messageId = messageBuilder.send();
        return new SimpleRecord<>(messageId.toString(), data);
    }
}

SinkContext

Context interface providing sink runtime environment and capabilities for consuming data from Pulsar topics.

package org.apache.pulsar.io.core;

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface SinkContext extends BaseContext {
    /**
     * Get the name of the sink.
     *
     * @return sink name
     */
    String getSinkName();

    /**
     * Get the input topics that the sink consumes from.
     *
     * @return collection of input topic names
     */
    Collection<String> getInputTopics();

    /**
     * Get the sink configuration.
     *
     * @return sink configuration object
     */
    SinkConfig getSinkConfig();

    /**
     * Get the subscription type used by the sink.
     * Default implementation throws UnsupportedOperationException.
     *
     * @return subscription type
     * @throws UnsupportedOperationException if not supported
     */
    default SubscriptionType getSubscriptionType() {
        throw new UnsupportedOperationException("getSubscriptionType not implemented");
    }

    /**
     * Reset subscription position to a specific message ID.
     *
     * @param topic topic name
     * @param partition partition number
     * @param messageId message ID to seek to
     * @throws PulsarClientException if seek operation fails
     */
    default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
        throw new UnsupportedOperationException("seek not implemented");
    }

    /**
     * Pause message consumption from a specific topic partition.
     *
     * @param topic topic name
     * @param partition partition number
     * @throws PulsarClientException if pause operation fails
     */
    default void pause(String topic, int partition) throws PulsarClientException {
        throw new UnsupportedOperationException("pause not implemented");
    }

    /**
     * Resume message consumption from a specific topic partition.
     *
     * @param topic topic name
     * @param partition partition number
     * @throws PulsarClientException if resume operation fails
     */
    default void resume(String topic, int partition) throws PulsarClientException {
        throw new UnsupportedOperationException("resume not implemented");
    }

    // BaseContext inherited methods
    String getTenant();
    String getNamespace();
    int getInstanceId();
    int getNumInstances();
    Logger getLogger();
    String getSecret(String secretName);
    default <X extends StateStore> X getStateStore(String name);
    default <X extends StateStore> X getStateStore(String tenant, String ns, String name);
    void putState(String key, ByteBuffer value);
    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
    ByteBuffer getState(String key);
    CompletableFuture<ByteBuffer> getStateAsync(String key);
    void deleteState(String key);
    CompletableFuture<Void> deleteStateAsync(String key);
    void incrCounter(String key, long amount);
    CompletableFuture<Void> incrCounterAsync(String key, long amount);
    long getCounter(String key);
    CompletableFuture<Long> getCounterAsync(String key);
    void recordMetric(String metricName, double value);
    default PulsarClient getPulsarClient();
    default ClientBuilder getPulsarClientBuilder();
    void fatal(Throwable t);
}

Usage Example

public class ElasticsearchSink implements Sink<Map<String, Object>> {
    private SinkContext context;
    private ElasticsearchClient client;

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.context = sinkContext;
        
        // Access sink configuration
        String sinkName = context.getSinkName();
        Collection<String> inputTopics = context.getInputTopics();
        SinkConfig sinkConfig = context.getSinkConfig();
        
        // Log subscription type if available
        try {
            SubscriptionType subType = context.getSubscriptionType();
            System.out.println("Using subscription type: " + subType);
        } catch (UnsupportedOperationException e) {
            System.out.println("Subscription type not available");
        }
        
        // Initialize Elasticsearch client
        String esUrl = (String) config.get("elasticsearch.url");
        this.client = new ElasticsearchClient(esUrl);
    }

    @Override
    public void write(Record<Map<String, Object>> record) throws Exception {
        Map<String, Object> document = record.getValue();
        String indexName = determineIndex(record);
        
        // Index document in Elasticsearch
        client.index(indexName, document);
        
        // Optionally seek or pause/resume based on processing results
        if (shouldPauseProcessing(document)) {
            String topic = record.getTopicName().orElse("unknown");
            context.pause(topic, 0); // Pause partition 0
        }
    }

    private void handleProcessingError(Record<Map<String, Object>> record, Exception error) {
        // Example: seek back to retry failed message
        try {
            String topic = record.getTopicName().orElse("unknown");
            MessageId messageId = MessageId.fromByteArray(record.getKey().toString().getBytes());
            context.seek(topic, 0, messageId);
        } catch (Exception e) {
            System.err.println("Failed to seek: " + e.getMessage());
        }
    }
}

Flow Control Example

public class RateLimitedSink implements Sink<String> {
    private SinkContext context;
    private RateLimiter rateLimiter;
    private Map<String, Boolean> topicPausedState = new ConcurrentHashMap<>();

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.context = sinkContext;
        double maxRate = (Double) config.get("max.rate.per.second");
        this.rateLimiter = RateLimiter.create(maxRate);
    }

    @Override
    public void write(Record<String> record) throws Exception {
        // Acquire rate limit permit
        if (!rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
            // Rate limit exceeded, pause all input topics
            pauseAllTopics();
            
            // Wait for permit
            rateLimiter.acquire();
            
            // Resume topics after rate limit allows
            resumeAllTopics();
        }
        
        // Process the record
        processRecord(record);
    }

    private void pauseAllTopics() {
        for (String topic : context.getInputTopics()) {
            try {
                context.pause(topic, 0);
                topicPausedState.put(topic, true);
            } catch (Exception e) {
                System.err.println("Failed to pause topic " + topic + ": " + e.getMessage());
            }
        }
    }

    private void resumeAllTopics() {
        for (String topic : topicPausedState.keySet()) {
            try {
                context.resume(topic, 0);
                topicPausedState.remove(topic);
            } catch (Exception e) {
                System.err.println("Failed to resume topic " + topic + ": " + e.getMessage());
            }
        }
    }
}

Types

// Required imports
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.BaseContext;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.io.core.SinkConfig;
import org.apache.pulsar.io.core.SourceConfig;
import org.slf4j.Logger;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-core

docs

connector-annotations.md

context-interfaces.md

index.md

push-sources.md

sink-interfaces.md

source-interfaces.md

utility-classes.md

tile.json