or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md
tile.json

tessl/maven-org-apache-pulsar--pulsar-io-core

Core interfaces and abstractions for building Apache Pulsar IO connectors

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.pulsar/pulsar-io-core@4.0.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-core@4.0.0

index.mddocs/

Pulsar IO Core

Apache Pulsar IO Core provides the foundational interfaces and abstractions for building Pulsar IO connectors that enable data integration between Pulsar and external systems. It includes core interfaces for data ingestion (Sources) and data egress (Sinks), batch processing capabilities, and metadata annotations for connector discovery and configuration.

Package Information

  • Package Name: pulsar-io-core
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.pulsar
  • Artifact ID: pulsar-io-core
  • Installation: <dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-io-core</artifactId><version>4.0.6</version></dependency>

Core Imports

import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.apache.pulsar.io.core.annotations.IOType;

Basic Usage

Simple Source Connector

import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.functions.api.Record;
import java.util.Map;

public class MySource implements Source<String> {
    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        // Initialize your source connector with configuration
    }

    @Override
    public Record<String> read() throws Exception {
        // Read and return the next message from your external system
        // This method should block if no data is available
        return null; // Return actual Record<String> object
    }

    @Override
    public void close() throws Exception {
        // Clean up resources
    }
}

Simple Sink Connector

import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.functions.api.Record;
import java.util.Map;

public class MySink implements Sink<String> {
    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        // Initialize your sink connector with configuration
    }

    @Override
    public void write(Record<String> record) throws Exception {
        // Write the record to your external system
    }

    @Override
    public void close() throws Exception {
        // Clean up resources
    }
}

Architecture

Pulsar IO Core follows a clean separation of concerns:

  • Connector Interfaces: Core abstractions (Source, Sink, BatchSource) define the contract for data movement
  • Context Objects: Runtime environment (SourceContext, SinkContext) provides access to Pulsar capabilities
  • Push vs Pull Patterns: Support for both traditional pull-based (Source) and push-based (PushSource) patterns
  • Batch Processing: Specialized interfaces for efficient batch data processing
  • Lifecycle Management: Consistent initialization and cleanup through AutoCloseable
  • Metadata Annotations: Declarative connector configuration and documentation

Capabilities

Core Source Interfaces

Primary interfaces for reading data from external systems and publishing to Pulsar topics.

// Basic pull-based source interface
public interface Source<T> extends AutoCloseable {
    void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
    Record<T> read() throws Exception;
}

// Batch processing source interface  
public interface BatchSource<T> extends AutoCloseable {
    void open(Map<String, Object> config, SourceContext context) throws Exception;
    void discover(Consumer<byte[]> taskEater) throws Exception;
    void prepare(byte[] task) throws Exception;
    Record<T> readNext() throws Exception;
}

Source Interfaces

Core Sink Interfaces

Primary interfaces for writing data from Pulsar to external systems.

// Basic sink interface
public interface Sink<T> extends AutoCloseable {
    void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
    void write(Record<T> record) throws Exception;
}

Sink Interfaces

Push-Based Sources

Abstract classes providing queue-based push source functionality for asynchronous data ingestion.

// Push-based source using consumer callback pattern
public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {
    // Inherits push mechanism functionality
}

// Base class for push sources with internal queue
public abstract class AbstractPushSource<T> {
    static final int DEFAULT_QUEUE_LENGTH = 1000;
    public AbstractPushSource();
    public void consume(Record<T> record);
    public void notifyError(Exception ex);
    public int getQueueLength();
    protected Record<T> readNext() throws Exception;
}

Push Source Classes

Runtime Context

Context interfaces providing connector runtime environment and Pulsar platform capabilities.

// Source runtime context
public interface SourceContext extends BaseContext {
    String getSourceName();
    String getOutputTopic();
    SourceConfig getSourceConfig();
    <T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema) throws PulsarClientException;
    <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws PulsarClientException;
    
    // BaseContext methods for tenant/namespace info, logging, state, counters, metrics
    String getTenant();
    String getNamespace();
    Logger getLogger();
    void putState(String key, ByteBuffer value);
    ByteBuffer getState(String key);
    void incrCounter(String key, long amount);
    long getCounter(String key);
    void recordMetric(String metricName, double value);
}

// Sink runtime context
public interface SinkContext extends BaseContext {
    String getSinkName();
    Collection<String> getInputTopics();
    SinkConfig getSinkConfig();
    default SubscriptionType getSubscriptionType();
    default void seek(String topic, int partition, MessageId messageId) throws PulsarClientException;
    default void pause(String topic, int partition) throws PulsarClientException;
    default void resume(String topic, int partition) throws PulsarClientException;
    
    // BaseContext methods for tenant/namespace info, logging, state, counters, metrics
    String getTenant();
    String getNamespace();
    Logger getLogger();
    void putState(String key, ByteBuffer value);
    ByteBuffer getState(String key);
    void incrCounter(String key, long amount);
    long getCounter(String key);
    void recordMetric(String metricName, double value);
}

Context Interfaces

Connector Annotations

Annotation-based metadata system for connector discovery, configuration, and documentation.

// Connector metadata annotation
@Target(TYPE)
@Retention(RUNTIME)
public @interface Connector {
    String name();
    IOType type();
    String help();
    Class configClass();
}

// Configuration field documentation
@Target(FIELD)
@Retention(RUNTIME)
public @interface FieldDoc {
    boolean required() default false;
    String defaultValue();
    boolean sensitive() default false;
    String help();
}

Connector Annotations

Utility Classes

Helper classes for common data structures and operations.

// Generic key-value pair container
public class KeyValue<K, V> {
    public KeyValue(K key, V value);
    K getKey();
    V getValue();
    void setKey(K key);
    void setValue(V value);
}

Utility Classes