or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

connectors.mddocs/connect/

Connector Framework

Kafka Connect is a framework for integrating Kafka with external systems through connectors.

Core Concepts

Connector

Base class for all connectors.

package org.apache.kafka.connect.connector;

public abstract class Connector implements Versioned {
    /**
     * Initialize the connector with context.
     */
    public void initialize(ConnectorContext ctx);
    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs);

    /**
     * Start the connector with configuration.
     */
    public abstract void start(Map<String, String> props);

    /**
     * Stop the connector.
     */
    public abstract void stop();

    /**
     * Reconfigure the connector (optional).
     */
    public void reconfigure(Map<String, String> props);

    /**
     * Return the Task implementation for this connector.
     */
    public abstract Class<? extends Task> taskClass();

    /**
     * Return task configurations for max number of tasks.
     */
    public abstract List<Map<String, String>> taskConfigs(int maxTasks);

    /**
     * Validate the connector configuration.
     */
    public Config validate(Map<String, String> connectorConfigs);

    /**
     * Return the configuration definition.
     */
    public abstract ConfigDef config();

    /**
     * Get connector context.
     */
    protected ConnectorContext context();

    /**
     * Get connector version (from Versioned interface).
     */
    @Override
    public String version();
}

Source Connectors

SourceConnector

Pull data from external systems into Kafka.

package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
    /**
     * Exactly-once support level.
     */
    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig);

    /**
     * Whether connector can define transaction boundaries.
     */
    public ConnectorTransactionBoundaries canDefineTransactionBoundaries(
        Map<String, String> connectorConfig);

    /**
     * Alter offsets for this connector.
     */
    public boolean alterOffsets(Map<String, String> connectorConfig,
                                Map<Map<String, ?>, Map<String, ?>> offsets);

    /**
     * Get source connector context.
     */
    @Override
    protected SourceConnectorContext context();
}

ExactlyOnceSupport:

package org.apache.kafka.connect.connector.policy;

public enum ExactlyOnceSupport {
    SUPPORTED,
    UNSUPPORTED
}

ConnectorTransactionBoundaries:

package org.apache.kafka.connect.connector.policy;

public enum ConnectorTransactionBoundaries {
    SUPPORTED,
    UNSUPPORTED
}

SourceTask

Task implementation for pulling data.

package org.apache.kafka.connect.source;

public abstract class SourceTask implements Task {
    /**
     * Initialize the source task.
     */
    public void initialize(SourceTaskContext context);

    /**
     * Start the task with configuration.
     */
    @Override
    public abstract void start(Map<String, String> props);

    /**
     * Poll for new records.
     * @return List of source records, or null if no data available
     */
    public abstract List<SourceRecord> poll() throws InterruptedException;

    /**
     * Commit offsets (optional).
     */
    public void commit() throws InterruptedException;

    /**
     * Commit individual record metadata (optional).
     */
    public void commitRecord(SourceRecord record,
                            RecordMetadata metadata) throws InterruptedException;

    /**
     * Stop the task.
     */
    @Override
    public abstract void stop();

    /**
     * Get task version.
     */
    @Override
    public abstract String version();

    // Transaction boundary constants
    public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";

    public enum TransactionBoundary {
        POLL,       // Transaction per poll()
        INTERVAL,   // Transaction per time interval
        CONNECTOR   // Connector-defined boundaries
    }
}

SourceRecord

Record produced by a source connector.

package org.apache.kafka.connect.source;

public class SourceRecord extends ConnectRecord<SourceRecord> {
    /**
     * Create a source record.
     */
    public SourceRecord(Map<String, ?> sourcePartition,
                       Map<String, ?> sourceOffset,
                       String topic,
                       Integer partition,
                       Schema keySchema,
                       Object key,
                       Schema valueSchema,
                       Object value);

    public SourceRecord(Map<String, ?> sourcePartition,
                       Map<String, ?> sourceOffset,
                       String topic,
                       Integer partition,
                       Schema keySchema,
                       Object key,
                       Schema valueSchema,
                       Object value,
                       Long timestamp);

    public SourceRecord(Map<String, ?> sourcePartition,
                       Map<String, ?> sourceOffset,
                       String topic,
                       Integer partition,
                       Schema keySchema,
                       Object key,
                       Schema valueSchema,
                       Object value,
                       Long timestamp,
                       Iterable<Header> headers);

    /**
     * Get source partition identifier.
     */
    public Map<String, ?> sourcePartition();

    /**
     * Get source offset.
     */
    public Map<String, ?> sourceOffset();
}

Example Source Connector:

import org.apache.kafka.connect.source.*;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.common.config.ConfigDef;
import java.util.*;

public class FileSourceConnector extends SourceConnector {
    private Map<String, String> config;

    @Override
    public void start(Map<String, String> props) {
        this.config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return FileSourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // Create task configs (one per file or split work)
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> taskConfig = new HashMap<>(config);
            taskConfig.put("task.id", String.valueOf(i));
            taskConfigs.add(taskConfig);
        }
        return taskConfigs;
    }

    @Override
    public void stop() {
        // Cleanup
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("file.path", ConfigDef.Type.STRING,
                ConfigDef.Importance.HIGH, "File path to read")
            .define("topic", ConfigDef.Type.STRING,
                ConfigDef.Importance.HIGH, "Target topic");
    }

    @Override
    public String version() {
        return "1.0.0";
    }
}

public class FileSourceTask extends SourceTask {
    private String filePath;
    private String topic;
    private BufferedReader reader;
    private long offset;

    @Override
    public void start(Map<String, String> props) {
        this.filePath = props.get("file.path");
        this.topic = props.get("topic");

        // Load offset
        Map<String, Object> partition = Collections.singletonMap("file", filePath);
        Map<String, Object> storedOffset = context.offsetStorageReader().offset(partition);
        this.offset = storedOffset != null ? (Long) storedOffset.get("position") : 0L;

        try {
            this.reader = new BufferedReader(new FileReader(filePath));
            reader.skip(offset);
        } catch (IOException e) {
            throw new ConnectException("Failed to open file", e);
        }
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();

        try {
            String line;
            int count = 0;
            while ((line = reader.readLine()) != null && count < 100) {
                Map<String, Object> partition = Collections.singletonMap("file", filePath);
                Map<String, Object> offset = Collections.singletonMap("position", this.offset);

                SourceRecord record = new SourceRecord(
                    partition,
                    offset,
                    topic,
                    null, // partition
                    Schema.STRING_SCHEMA,
                    "key",
                    Schema.STRING_SCHEMA,
                    line
                );

                records.add(record);
                this.offset += line.length() + 1; // +1 for newline
                count++;
            }
        } catch (IOException e) {
            throw new ConnectException("Error reading file", e);
        }

        return records.isEmpty() ? null : records;
    }

    @Override
    public void stop() {
        try {
            if (reader != null) {
                reader.close();
            }
        } catch (IOException e) {
            // Log error
        }
    }

    @Override
    public String version() {
        return "1.0.0";
    }
}

Sink Connectors

SinkConnector

Push data from Kafka to external systems.

package org.apache.kafka.connect.sink;

public abstract class SinkConnector extends Connector {
    // Configuration constant
    public static final String TOPICS_CONFIG = "topics";

    /**
     * Alter offsets for this connector.
     */
    public boolean alterOffsets(Map<String, String> connectorConfig,
                                Map<TopicPartition, Long> offsets);

    /**
     * Get sink connector context.
     */
    @Override
    protected SinkConnectorContext context();
}

SinkTask

Task implementation for pushing data.

package org.apache.kafka.connect.sink;

public abstract class SinkTask implements Task {
    // Configuration constants
    public static final String TOPICS_CONFIG = "topics";
    public static final String TOPICS_REGEX_CONFIG = "topics.regex";

    /**
     * Initialize the sink task.
     */
    public void initialize(SinkTaskContext context);

    /**
     * Start the task with configuration.
     */
    @Override
    public abstract void start(Map<String, String> props);

    /**
     * Put records to external system.
     */
    public abstract void put(Collection<SinkRecord> records);

    /**
     * Flush data to external system (optional).
     */
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets);

    /**
     * Pre-commit hook (optional).
     */
    public Map<TopicPartition, OffsetAndMetadata> preCommit(
        Map<TopicPartition, OffsetAndMetadata> currentOffsets);

    /**
     * Partition assignment notification (optional).
     */
    public void open(Collection<TopicPartition> partitions);

    /**
     * Partition revocation notification (optional).
     */
    public void close(Collection<TopicPartition> partitions);

    /**
     * Stop the task.
     */
    @Override
    public abstract void stop();

    /**
     * Get task version.
     */
    @Override
    public abstract String version();
}

SinkRecord

Record consumed by a sink connector.

package org.apache.kafka.connect.sink;

public class SinkRecord extends ConnectRecord<SinkRecord> {
    /**
     * Create a sink record.
     */
    public SinkRecord(String topic,
                     int partition,
                     Schema keySchema,
                     Object key,
                     Schema valueSchema,
                     Object value,
                     long kafkaOffset);

    public SinkRecord(String topic,
                     int partition,
                     Schema keySchema,
                     Object key,
                     Schema valueSchema,
                     Object value,
                     long kafkaOffset,
                     Long timestamp,
                     TimestampType timestampType);

    public SinkRecord(String topic,
                     int partition,
                     Schema keySchema,
                     Object key,
                     Schema valueSchema,
                     Object value,
                     long kafkaOffset,
                     Long timestamp,
                     TimestampType timestampType,
                     Iterable<Header> headers);

    /**
     * Get Kafka offset.
     */
    public long kafkaOffset();

    /**
     * Get timestamp type.
     */
    public TimestampType timestampType();

    /**
     * Get original topic (before transformations).
     */
    public String originalTopic();

    /**
     * Get original partition.
     */
    public Integer originalKafkaPartition();

    /**
     * Get original offset.
     */
    public Long originalKafkaOffset();
}

Example Sink Connector:

import org.apache.kafka.connect.sink.*;
import org.apache.kafka.common.config.ConfigDef;
import java.io.*;
import java.util.*;

public class FileSinkConnector extends SinkConnector {
    private Map<String, String> config;

    @Override
    public void start(Map<String, String> props) {
        this.config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return FileSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(new HashMap<>(config));
        }
        return taskConfigs;
    }

    @Override
    public void stop() {
        // Cleanup
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("file.path", ConfigDef.Type.STRING,
                ConfigDef.Importance.HIGH, "Output file path")
            .define("topics", ConfigDef.Type.LIST,
                ConfigDef.Importance.HIGH, "Topics to consume");
    }

    @Override
    public String version() {
        return "1.0.0";
    }
}

public class FileSinkTask extends SinkTask {
    private String filePath;
    private PrintWriter writer;

    @Override
    public void start(Map<String, String> props) {
        this.filePath = props.get("file.path");

        try {
            this.writer = new PrintWriter(new FileWriter(filePath, true));
        } catch (IOException e) {
            throw new ConnectException("Failed to open file", e);
        }
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            writer.println(record.value());
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        writer.flush();
    }

    @Override
    public void stop() {
        if (writer != null) {
            writer.close();
        }
    }

    @Override
    public String version() {
        return "1.0.0";
    }
}

Context Interfaces

ConnectorContext

Runtime context for connectors.

package org.apache.kafka.connect.connector;

public interface ConnectorContext {
    /**
     * Request task reconfiguration.
     */
    void requestTaskReconfiguration();

    /**
     * Raise an error.
     */
    void raiseError(Exception e);

    /**
     * Get plugin metrics.
     */
    ConnectorMetrics pluginMetrics();
}

SourceConnectorContext

Context for source connectors.

package org.apache.kafka.connect.storage;

public interface SourceConnectorContext extends ConnectorContext {
    /**
     * Get offset storage reader.
     */
    OffsetStorageReader offsetStorageReader();
}

SinkConnectorContext

Context for sink connectors.

package org.apache.kafka.connect.sink;

public interface SinkConnectorContext extends ConnectorContext {
    // Marker interface
}

SourceTaskContext

Context for source tasks.

package org.apache.kafka.connect.source;

public interface SourceTaskContext {
    /**
     * Get task configuration.
     */
    Map<String, String> configs();

    /**
     * Get offset storage reader.
     */
    OffsetStorageReader offsetStorageReader();

    /**
     * Get transaction context (for exactly-once).
     */
    TransactionContext transactionContext();

    /**
     * Get plugin metrics.
     */
    ConnectorMetrics pluginMetrics();
}

SinkTaskContext

Context for sink tasks.

package org.apache.kafka.connect.sink;

public interface SinkTaskContext {
    /**
     * Get task configuration.
     */
    Map<String, String> configs();

    /**
     * Reset consumer offsets.
     */
    void offset(Map<TopicPartition, Long> offsets);
    void offset(TopicPartition tp, long offset);

    /**
     * Set timeout for next poll.
     */
    void timeout(long timeoutMs);

    /**
     * Get assigned partitions.
     */
    Set<TopicPartition> assignment();

    /**
     * Pause consumption.
     */
    void pause(TopicPartition... partitions);

    /**
     * Resume consumption.
     */
    void resume(TopicPartition... partitions);

    /**
     * Request offset commit.
     */
    void requestCommit();

    /**
     * Get errant record reporter (for DLQ).
     */
    ErrantRecordReporter errantRecordReporter();

    /**
     * Get plugin metrics.
     */
    ConnectorMetrics pluginMetrics();
}

Storage APIs

OffsetStorageReader

Read stored offsets for source connectors.

package org.apache.kafka.connect.storage;

public interface OffsetStorageReader {
    /**
     * Get offset for a partition.
     */
    <T> Map<String, Object> offset(Map<String, T> partition);

    /**
     * Get offsets for multiple partitions.
     */
    <T> Map<Map<String, T>, Map<String, Object>> offsets(
        Collection<Map<String, T>> partitions);
}

TransactionContext

Control transaction boundaries for exactly-once source connectors.

package org.apache.kafka.connect.source;

public interface TransactionContext {
    /**
     * Commit current transaction.
     */
    void commitTransaction();
    void commitTransaction(SourceRecord record);

    /**
     * Abort current transaction.
     */
    void abortTransaction();
    void abortTransaction(SourceRecord record);
}

ErrantRecordReporter

Report errant records to dead letter queue.

package org.apache.kafka.connect.sink;

public interface ErrantRecordReporter {
    /**
     * Report a failed record.
     * @return Future that completes when record is written to DLQ
     */
    Future<Void> report(SinkRecord record, Throwable error);
}

Usage Example:

import org.apache.kafka.connect.sink.*;
import java.util.*;

public class SafeSinkTask extends SinkTask {
    private SinkTaskContext context;

    @Override
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        ErrantRecordReporter reporter = context.errantRecordReporter();

        for (SinkRecord record : records) {
            try {
                // Process record
                processRecord(record);
            } catch (Exception e) {
                if (reporter != null) {
                    // Send to DLQ
                    reporter.report(record, e);
                } else {
                    // No DLQ configured, throw exception
                    throw new ConnectException("Failed to process record", e);
                }
            }
        }
    }

    private void processRecord(SinkRecord record) throws Exception {
        // Processing logic
    }

    @Override
    public void start(Map<String, String> props) {}

    @Override
    public void stop() {}

    @Override
    public String version() {
        return "1.0.0";
    }
}

Configuration

ConfigDef

Define connector configuration.

import org.apache.kafka.common.config.ConfigDef;

ConfigDef configDef = new ConfigDef()
    .define("connection.url",
        ConfigDef.Type.STRING,
        ConfigDef.NO_DEFAULT_VALUE,
        ConfigDef.Importance.HIGH,
        "Database connection URL")
    .define("batch.size",
        ConfigDef.Type.INT,
        1000,
        ConfigDef.Range.atLeast(1),
        ConfigDef.Importance.MEDIUM,
        "Batch size for writes")
    .define("poll.interval.ms",
        ConfigDef.Type.LONG,
        5000L,
        ConfigDef.Importance.LOW,
        "Poll interval in milliseconds");

Connector Lifecycle

// 1. Worker creates connector instance
Connector connector = new MyConnector();

// 2. Initialize with context
connector.initialize(connectorContext);

// 3. Start with configuration
connector.start(config);

// 4. Get task configurations
List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks);

// 5. Worker creates task instances
for (Map<String, String> taskConfig : taskConfigs) {
    Task task = connector.taskClass().newInstance();
    task.initialize(taskContext);
    task.start(taskConfig);

    // For source tasks: poll() called repeatedly
    // For sink tasks: put() called with records
}

// 6. Stop tasks
task.stop();

// 7. Stop connector
connector.stop();

Troubleshooting Connect

Common Connector Issues

Issue: Connector Fails to Start

Symptoms:

  • Connector stays in FAILED state
  • Configuration validation errors
  • Tasks not created

Causes:

  • Invalid configuration
  • Missing required properties
  • Resource unavailability (database, filesystem, etc.)
  • Permission issues

Solutions:

import org.apache.kafka.connect.source.*;
import org.apache.kafka.common.config.*;
import java.util.*;

public class RobustSourceConnector extends SourceConnector {
    private Map<String, String> config;
    
    @Override
    public void start(Map<String, String> props) {
        // Validate configuration thoroughly
        ConfigDef configDef = config();
        try {
            configDef.parse(props);
        } catch (ConfigException e) {
            throw new ConnectException("Invalid configuration: " + e.getMessage(), e);
        }
        
        // Validate required resources
        String filePath = props.get("file.path");
        if (filePath == null || filePath.isEmpty()) {
            throw new ConnectException("file.path is required");
        }
        
        File file = new File(filePath);
        if (!file.exists()) {
            throw new ConnectException("File does not exist: " + filePath);
        }
        
        if (!file.canRead()) {
            throw new ConnectException("Cannot read file: " + filePath);
        }
        
        this.config = props;
        System.out.println("Connector started successfully");
    }
    
    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("file.path",
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                new ConfigDef.NonEmptyString(),
                ConfigDef.Importance.HIGH,
                "Path to input file")
            .define("topic",
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                ConfigDef.Importance.HIGH,
                "Output topic name")
            .define("batch.size",
                ConfigDef.Type.INT,
                100,
                ConfigDef.Range.atLeast(1),
                ConfigDef.Importance.MEDIUM,
                "Records per batch");
    }
    
    // ... other methods
}

Prevention:

  • Implement comprehensive validation
  • Use ConfigDef validators
  • Check resource availability in start()
  • Provide clear error messages

Issue: Task Failures and Retries

Symptoms:

  • Tasks repeatedly failing
  • Connector in RUNNING but tasks FAILED
  • Errors in task logs

Causes:

  • Transient errors (network, resource unavailable)
  • Data format issues
  • External system errors
  • Resource exhaustion

Solutions:

import org.apache.kafka.connect.source.*;
import org.apache.kafka.connect.errors.RetriableException;
import java.util.*;

public class ResilientSourceTask extends SourceTask {
    private String filePath;
    private BufferedReader reader;
    private int maxRetries = 3;
    
    @Override
    public void start(Map<String, String> props) {
        this.filePath = props.get("file.path");
        this.maxRetries = Integer.parseInt(
            props.getOrDefault("max.retries", "3"));
        
        // Initialize with retry
        initializeWithRetry();
    }
    
    private void initializeWithRetry() {
        int attempt = 0;
        while (attempt < maxRetries) {
            try {
                this.reader = new BufferedReader(new FileReader(filePath));
                System.out.println("Successfully opened file: " + filePath);
                return;
            } catch (IOException e) {
                attempt++;
                System.err.println("Failed to open file (attempt " + attempt + 
                    "): " + e.getMessage());
                
                if (attempt >= maxRetries) {
                    throw new ConnectException("Failed to open file after " + 
                        maxRetries + " attempts", e);
                }
                
                try {
                    Thread.sleep(1000 * attempt); // Exponential backoff
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new ConnectException("Interrupted during retry", ie);
                }
            }
        }
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        try {
            String line = reader.readLine();
            if (line == null) {
                return null; // End of file
            }
            
            SourceRecord record = new SourceRecord(
                Collections.singletonMap("file", filePath),
                Collections.singletonMap("position", reader.ready()),
                "output-topic",
                Schema.STRING_SCHEMA,
                line
            );
            
            return Collections.singletonList(record);
            
        } catch (IOException e) {
            // Throw RetriableException for transient errors
            throw new RetriableException("Error reading file: " + e.getMessage(), e);
        }
    }
    
    @Override
    public void stop() {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                System.err.println("Error closing reader: " + e.getMessage());
            }
        }
    }
    
    @Override
    public String version() {
        return "1.0.0";
    }
}

Prevention:

  • Implement retry logic for transient errors
  • Use RetriableException for recoverable errors
  • Validate data before processing
  • Monitor task health

Edge Cases

Offset Management in Source Connectors

// Source connectors must track offsets to support exactly-once
public class OffsetTrackingSourceTask extends SourceTask {
    private Map<String, Object> sourcePartition;
    private Map<String, Object> currentOffset;
    private long recordCount = 0;
    
    @Override
    public void start(Map<String, String> props) {
        // Define source partition (identifies this task's data source)
        this.sourcePartition = Collections.singletonMap("file", props.get("file.path"));
        
        // Load last committed offset from Connect
        Map<String, Object> offset = context.offsetStorageReader()
            .offset(sourcePartition);
        
        if (offset != null) {
            this.recordCount = (Long) offset.get("record_count");
            System.out.println("Resuming from offset: " + recordCount);
        } else {
            this.recordCount = 0;
            System.out.println("Starting from beginning");
        }
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // Read data
        String data = readNextRecord();
        if (data == null) {
            return null;
        }
        
        // Increment offset
        recordCount++;
        this.currentOffset = Collections.singletonMap("record_count", recordCount);
        
        // Create source record with offset
        SourceRecord record = new SourceRecord(
            sourcePartition,
            currentOffset, // Offset stored by Connect
            "output-topic",
            Schema.STRING_SCHEMA,
            data
        );
        
        return Collections.singletonList(record);
    }
    
    @Override
    public void commit() throws InterruptedException {
        // Called periodically by Connect to commit offsets
        // Offsets from SourceRecord are committed automatically
        System.out.println("Committing offset: " + recordCount);
    }
    
    private String readNextRecord() {
        // Implementation
        return null;
    }
}

Handling Schema Evolution

import org.apache.kafka.connect.data.*;

public class SchemaEvolutionTask extends SourceTask {
    private Schema currentSchema;
    private int schemaVersion = 1;
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // Detect schema changes
        Schema newSchema = detectSchema();
        
        if (!newSchema.equals(currentSchema)) {
            // Schema evolved
            schemaVersion++;
            currentSchema = SchemaBuilder.struct()
                .name("com.example.Record")
                .version(schemaVersion)
                .fields(newSchema.fields())
                .build();
            
            System.out.println("Schema evolved to version: " + schemaVersion);
        }
        
        // Create record with current schema
        Struct value = new Struct(currentSchema);
        // ... populate value
        
        SourceRecord record = new SourceRecord(
            sourcePartition,
            sourceOffset,
            "output-topic",
            currentSchema,
            value
        );
        
        return Collections.singletonList(record);
    }
    
    private Schema detectSchema() {
        // Detect schema from data source
        return SchemaBuilder.struct()
            .field("id", Schema.INT64_SCHEMA)
            .field("name", Schema.STRING_SCHEMA)
            .build();
    }
}

Transaction Boundary Handling

// Source connectors with exactly-once support
public class ExactlyOnceSourceTask extends SourceTask {
    
    @Override
    public void start(Map<String, String> props) {
        // Check transaction boundary configuration
        String txnBoundary = props.get(TRANSACTION_BOUNDARY_CONFIG);
        
        if (TransactionBoundary.CONNECTOR.toString().equals(txnBoundary)) {
            // Connector defines transaction boundaries
            System.out.println("Using connector-defined transaction boundaries");
        } else {
            // Framework defines boundaries (POLL or INTERVAL)
            System.out.println("Using framework-defined transaction boundaries: " + 
                txnBoundary);
        }
    }
    
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // Return batch of records
        // Framework commits transaction after poll() returns
        
        List<SourceRecord> records = new ArrayList<>();
        
        // Read batch of records
        for (int i = 0; i < 100; i++) {
            String data = readRecord();
            if (data == null) break;
            
            records.add(new SourceRecord(
                sourcePartition,
                sourceOffset,
                "output-topic",
                Schema.STRING_SCHEMA,
                data
            ));
        }
        
        return records.isEmpty() ? null : records;
    }
    
    @Override
    public void commitRecord(SourceRecord record, RecordMetadata metadata) 
            throws InterruptedException {
        // Called after each record successfully written
        // Use for fine-grained offset tracking
        System.out.println("Record committed at offset: " + metadata.offset());
    }
    
    private String readRecord() {
        // Implementation
        return null;
    }
}

Best Practices

Connector Configuration Validation

import org.apache.kafka.common.config.*;
import org.apache.kafka.connect.connector.Connector;

public class ValidatedConnector extends SourceConnector {
    
    @Override
    public ConfigDef config() {
        return new ConfigDef()
            // Required string
            .define("connection.url",
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                new ConfigDef.NonEmptyString(),
                ConfigDef.Importance.HIGH,
                "Database connection URL")
            
            // Integer with range
            .define("batch.size",
                ConfigDef.Type.INT,
                100,
                ConfigDef.Range.between(1, 10000),
                ConfigDef.Importance.MEDIUM,
                "Records per batch")
            
            // String with valid values
            .define("mode",
                ConfigDef.Type.STRING,
                "incremental",
                ConfigDef.ValidString.in("bulk", "incremental", "timestamp"),
                ConfigDef.Importance.MEDIUM,
                "Query mode")
            
            // Password (hidden in logs)
            .define("connection.password",
                ConfigDef.Type.PASSWORD,
                ConfigDef.NO_DEFAULT_VALUE,
                ConfigDef.Importance.HIGH,
                "Database password")
            
            // List of strings
            .define("tables",
                ConfigDef.Type.LIST,
                Collections.emptyList(),
                ConfigDef.Importance.HIGH,
                "Tables to replicate");
    }
    
    @Override
    public Config validate(Map<String, String> connectorConfigs) {
        // Custom validation beyond ConfigDef
        Config config = super.validate(connectorConfigs);
        
        // Add custom validation
        String url = connectorConfigs.get("connection.url");
        if (url != null && !url.startsWith("jdbc:")) {
            config.configValues().stream()
                .filter(cv -> cv.name().equals("connection.url"))
                .forEach(cv -> cv.addErrorMessage("URL must start with 'jdbc:'"));
        }
        
        return config;
    }
}

Error Handling in Tasks

import org.apache.kafka.connect.errors.*;

public class ErrorHandlingSinkTask extends SinkTask {
    
    @Override
    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            try {
                writeToExternalSystem(record);
            } catch (IOException e) {
                // Retriable error - will retry
                throw new RetriableException(
                    "Failed to write record: " + e.getMessage(), e);
            } catch (IllegalArgumentException e) {
                // Non-retriable error - skip record
                System.err.println("Invalid record, skipping: " + e.getMessage());
                // Log to dead-letter queue
                logToDeadLetterQueue(record, e);
            } catch (Exception e) {
                // Unknown error - fail task
                throw new ConnectException(
                    "Unexpected error: " + e.getMessage(), e);
            }
        }
    }
    
    private void writeToExternalSystem(SinkRecord record) throws IOException {
        // Implementation
    }
    
    private void logToDeadLetterQueue(SinkRecord record, Exception e) {
        // Send to DLQ topic
        System.err.println("DLQ: " + record + " - " + e.getMessage());
    }
}