tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka Connect is a framework for integrating Kafka with external systems through connectors.
Base class for all connectors.
package org.apache.kafka.connect.connector;
public abstract class Connector implements Versioned {
/**
* Initialize the connector with context.
*/
public void initialize(ConnectorContext ctx);
public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs);
/**
* Start the connector with configuration.
*/
public abstract void start(Map<String, String> props);
/**
* Stop the connector.
*/
public abstract void stop();
/**
* Reconfigure the connector (optional).
*/
public void reconfigure(Map<String, String> props);
/**
* Return the Task implementation for this connector.
*/
public abstract Class<? extends Task> taskClass();
/**
* Return task configurations for max number of tasks.
*/
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
/**
* Validate the connector configuration.
*/
public Config validate(Map<String, String> connectorConfigs);
/**
* Return the configuration definition.
*/
public abstract ConfigDef config();
/**
* Get connector context.
*/
protected ConnectorContext context();
/**
* Get connector version (from Versioned interface).
*/
@Override
public String version();
}Pull data from external systems into Kafka.
package org.apache.kafka.connect.source;
public abstract class SourceConnector extends Connector {
/**
* Exactly-once support level.
*/
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig);
/**
* Whether connector can define transaction boundaries.
*/
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(
Map<String, String> connectorConfig);
/**
* Alter offsets for this connector.
*/
public boolean alterOffsets(Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets);
/**
* Get source connector context.
*/
@Override
protected SourceConnectorContext context();
}ExactlyOnceSupport:
package org.apache.kafka.connect.connector.policy;
public enum ExactlyOnceSupport {
SUPPORTED,
UNSUPPORTED
}ConnectorTransactionBoundaries:
package org.apache.kafka.connect.connector.policy;
public enum ConnectorTransactionBoundaries {
SUPPORTED,
UNSUPPORTED
}Task implementation for pulling data.
package org.apache.kafka.connect.source;
public abstract class SourceTask implements Task {
/**
* Initialize the source task.
*/
public void initialize(SourceTaskContext context);
/**
* Start the task with configuration.
*/
@Override
public abstract void start(Map<String, String> props);
/**
* Poll for new records.
* @return List of source records, or null if no data available
*/
public abstract List<SourceRecord> poll() throws InterruptedException;
/**
* Commit offsets (optional).
*/
public void commit() throws InterruptedException;
/**
* Commit individual record metadata (optional).
*/
public void commitRecord(SourceRecord record,
RecordMetadata metadata) throws InterruptedException;
/**
* Stop the task.
*/
@Override
public abstract void stop();
/**
* Get task version.
*/
@Override
public abstract String version();
// Transaction boundary constants
public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";
public enum TransactionBoundary {
POLL, // Transaction per poll()
INTERVAL, // Transaction per time interval
CONNECTOR // Connector-defined boundaries
}
}Record produced by a source connector.
package org.apache.kafka.connect.source;
public class SourceRecord extends ConnectRecord<SourceRecord> {
/**
* Create a source record.
*/
public SourceRecord(Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset,
String topic,
Integer partition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value);
public SourceRecord(Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset,
String topic,
Integer partition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value,
Long timestamp);
public SourceRecord(Map<String, ?> sourcePartition,
Map<String, ?> sourceOffset,
String topic,
Integer partition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value,
Long timestamp,
Iterable<Header> headers);
/**
* Get source partition identifier.
*/
public Map<String, ?> sourcePartition();
/**
* Get source offset.
*/
public Map<String, ?> sourceOffset();
}Example Source Connector:
import org.apache.kafka.connect.source.*;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.common.config.ConfigDef;
import java.util.*;
public class FileSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
}
@Override
public Class<? extends Task> taskClass() {
return FileSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// Create task configs (one per file or split work)
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(config);
taskConfig.put("task.id", String.valueOf(i));
taskConfigs.add(taskConfig);
}
return taskConfigs;
}
@Override
public void stop() {
// Cleanup
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("file.path", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "File path to read")
.define("topic", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Target topic");
}
@Override
public String version() {
return "1.0.0";
}
}
public class FileSourceTask extends SourceTask {
private String filePath;
private String topic;
private BufferedReader reader;
private long offset;
@Override
public void start(Map<String, String> props) {
this.filePath = props.get("file.path");
this.topic = props.get("topic");
// Load offset
Map<String, Object> partition = Collections.singletonMap("file", filePath);
Map<String, Object> storedOffset = context.offsetStorageReader().offset(partition);
this.offset = storedOffset != null ? (Long) storedOffset.get("position") : 0L;
try {
this.reader = new BufferedReader(new FileReader(filePath));
reader.skip(offset);
} catch (IOException e) {
throw new ConnectException("Failed to open file", e);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
try {
String line;
int count = 0;
while ((line = reader.readLine()) != null && count < 100) {
Map<String, Object> partition = Collections.singletonMap("file", filePath);
Map<String, Object> offset = Collections.singletonMap("position", this.offset);
SourceRecord record = new SourceRecord(
partition,
offset,
topic,
null, // partition
Schema.STRING_SCHEMA,
"key",
Schema.STRING_SCHEMA,
line
);
records.add(record);
this.offset += line.length() + 1; // +1 for newline
count++;
}
} catch (IOException e) {
throw new ConnectException("Error reading file", e);
}
return records.isEmpty() ? null : records;
}
@Override
public void stop() {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
// Log error
}
}
@Override
public String version() {
return "1.0.0";
}
}Push data from Kafka to external systems.
package org.apache.kafka.connect.sink;
public abstract class SinkConnector extends Connector {
// Configuration constant
public static final String TOPICS_CONFIG = "topics";
/**
* Alter offsets for this connector.
*/
public boolean alterOffsets(Map<String, String> connectorConfig,
Map<TopicPartition, Long> offsets);
/**
* Get sink connector context.
*/
@Override
protected SinkConnectorContext context();
}Task implementation for pushing data.
package org.apache.kafka.connect.sink;
public abstract class SinkTask implements Task {
// Configuration constants
public static final String TOPICS_CONFIG = "topics";
public static final String TOPICS_REGEX_CONFIG = "topics.regex";
/**
* Initialize the sink task.
*/
public void initialize(SinkTaskContext context);
/**
* Start the task with configuration.
*/
@Override
public abstract void start(Map<String, String> props);
/**
* Put records to external system.
*/
public abstract void put(Collection<SinkRecord> records);
/**
* Flush data to external system (optional).
*/
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets);
/**
* Pre-commit hook (optional).
*/
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> currentOffsets);
/**
* Partition assignment notification (optional).
*/
public void open(Collection<TopicPartition> partitions);
/**
* Partition revocation notification (optional).
*/
public void close(Collection<TopicPartition> partitions);
/**
* Stop the task.
*/
@Override
public abstract void stop();
/**
* Get task version.
*/
@Override
public abstract String version();
}Record consumed by a sink connector.
package org.apache.kafka.connect.sink;
public class SinkRecord extends ConnectRecord<SinkRecord> {
/**
* Create a sink record.
*/
public SinkRecord(String topic,
int partition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value,
long kafkaOffset);
public SinkRecord(String topic,
int partition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value,
long kafkaOffset,
Long timestamp,
TimestampType timestampType);
public SinkRecord(String topic,
int partition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value,
long kafkaOffset,
Long timestamp,
TimestampType timestampType,
Iterable<Header> headers);
/**
* Get Kafka offset.
*/
public long kafkaOffset();
/**
* Get timestamp type.
*/
public TimestampType timestampType();
/**
* Get original topic (before transformations).
*/
public String originalTopic();
/**
* Get original partition.
*/
public Integer originalKafkaPartition();
/**
* Get original offset.
*/
public Long originalKafkaOffset();
}Example Sink Connector:
import org.apache.kafka.connect.sink.*;
import org.apache.kafka.common.config.ConfigDef;
import java.io.*;
import java.util.*;
public class FileSinkConnector extends SinkConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
this.config = props;
}
@Override
public Class<? extends Task> taskClass() {
return FileSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(new HashMap<>(config));
}
return taskConfigs;
}
@Override
public void stop() {
// Cleanup
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("file.path", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Output file path")
.define("topics", ConfigDef.Type.LIST,
ConfigDef.Importance.HIGH, "Topics to consume");
}
@Override
public String version() {
return "1.0.0";
}
}
public class FileSinkTask extends SinkTask {
private String filePath;
private PrintWriter writer;
@Override
public void start(Map<String, String> props) {
this.filePath = props.get("file.path");
try {
this.writer = new PrintWriter(new FileWriter(filePath, true));
} catch (IOException e) {
throw new ConnectException("Failed to open file", e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
writer.println(record.value());
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
writer.flush();
}
@Override
public void stop() {
if (writer != null) {
writer.close();
}
}
@Override
public String version() {
return "1.0.0";
}
}Runtime context for connectors.
package org.apache.kafka.connect.connector;
public interface ConnectorContext {
/**
* Request task reconfiguration.
*/
void requestTaskReconfiguration();
/**
* Raise an error.
*/
void raiseError(Exception e);
/**
* Get plugin metrics.
*/
ConnectorMetrics pluginMetrics();
}Context for source connectors.
package org.apache.kafka.connect.storage;
public interface SourceConnectorContext extends ConnectorContext {
/**
* Get offset storage reader.
*/
OffsetStorageReader offsetStorageReader();
}Context for sink connectors.
package org.apache.kafka.connect.sink;
public interface SinkConnectorContext extends ConnectorContext {
// Marker interface
}Context for source tasks.
package org.apache.kafka.connect.source;
public interface SourceTaskContext {
/**
* Get task configuration.
*/
Map<String, String> configs();
/**
* Get offset storage reader.
*/
OffsetStorageReader offsetStorageReader();
/**
* Get transaction context (for exactly-once).
*/
TransactionContext transactionContext();
/**
* Get plugin metrics.
*/
ConnectorMetrics pluginMetrics();
}Context for sink tasks.
package org.apache.kafka.connect.sink;
public interface SinkTaskContext {
/**
* Get task configuration.
*/
Map<String, String> configs();
/**
* Reset consumer offsets.
*/
void offset(Map<TopicPartition, Long> offsets);
void offset(TopicPartition tp, long offset);
/**
* Set timeout for next poll.
*/
void timeout(long timeoutMs);
/**
* Get assigned partitions.
*/
Set<TopicPartition> assignment();
/**
* Pause consumption.
*/
void pause(TopicPartition... partitions);
/**
* Resume consumption.
*/
void resume(TopicPartition... partitions);
/**
* Request offset commit.
*/
void requestCommit();
/**
* Get errant record reporter (for DLQ).
*/
ErrantRecordReporter errantRecordReporter();
/**
* Get plugin metrics.
*/
ConnectorMetrics pluginMetrics();
}Read stored offsets for source connectors.
package org.apache.kafka.connect.storage;
public interface OffsetStorageReader {
/**
* Get offset for a partition.
*/
<T> Map<String, Object> offset(Map<String, T> partition);
/**
* Get offsets for multiple partitions.
*/
<T> Map<Map<String, T>, Map<String, Object>> offsets(
Collection<Map<String, T>> partitions);
}Control transaction boundaries for exactly-once source connectors.
package org.apache.kafka.connect.source;
public interface TransactionContext {
/**
* Commit current transaction.
*/
void commitTransaction();
void commitTransaction(SourceRecord record);
/**
* Abort current transaction.
*/
void abortTransaction();
void abortTransaction(SourceRecord record);
}Report errant records to dead letter queue.
package org.apache.kafka.connect.sink;
public interface ErrantRecordReporter {
/**
* Report a failed record.
* @return Future that completes when record is written to DLQ
*/
Future<Void> report(SinkRecord record, Throwable error);
}Usage Example:
import org.apache.kafka.connect.sink.*;
import java.util.*;
public class SafeSinkTask extends SinkTask {
private SinkTaskContext context;
@Override
public void initialize(SinkTaskContext context) {
this.context = context;
}
@Override
public void put(Collection<SinkRecord> records) {
ErrantRecordReporter reporter = context.errantRecordReporter();
for (SinkRecord record : records) {
try {
// Process record
processRecord(record);
} catch (Exception e) {
if (reporter != null) {
// Send to DLQ
reporter.report(record, e);
} else {
// No DLQ configured, throw exception
throw new ConnectException("Failed to process record", e);
}
}
}
}
private void processRecord(SinkRecord record) throws Exception {
// Processing logic
}
@Override
public void start(Map<String, String> props) {}
@Override
public void stop() {}
@Override
public String version() {
return "1.0.0";
}
}Define connector configuration.
import org.apache.kafka.common.config.ConfigDef;
ConfigDef configDef = new ConfigDef()
.define("connection.url",
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Database connection URL")
.define("batch.size",
ConfigDef.Type.INT,
1000,
ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM,
"Batch size for writes")
.define("poll.interval.ms",
ConfigDef.Type.LONG,
5000L,
ConfigDef.Importance.LOW,
"Poll interval in milliseconds");// 1. Worker creates connector instance
Connector connector = new MyConnector();
// 2. Initialize with context
connector.initialize(connectorContext);
// 3. Start with configuration
connector.start(config);
// 4. Get task configurations
List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks);
// 5. Worker creates task instances
for (Map<String, String> taskConfig : taskConfigs) {
Task task = connector.taskClass().newInstance();
task.initialize(taskContext);
task.start(taskConfig);
// For source tasks: poll() called repeatedly
// For sink tasks: put() called with records
}
// 6. Stop tasks
task.stop();
// 7. Stop connector
connector.stop();Symptoms:
Causes:
Solutions:
import org.apache.kafka.connect.source.*;
import org.apache.kafka.common.config.*;
import java.util.*;
public class RobustSourceConnector extends SourceConnector {
private Map<String, String> config;
@Override
public void start(Map<String, String> props) {
// Validate configuration thoroughly
ConfigDef configDef = config();
try {
configDef.parse(props);
} catch (ConfigException e) {
throw new ConnectException("Invalid configuration: " + e.getMessage(), e);
}
// Validate required resources
String filePath = props.get("file.path");
if (filePath == null || filePath.isEmpty()) {
throw new ConnectException("file.path is required");
}
File file = new File(filePath);
if (!file.exists()) {
throw new ConnectException("File does not exist: " + filePath);
}
if (!file.canRead()) {
throw new ConnectException("Cannot read file: " + filePath);
}
this.config = props;
System.out.println("Connector started successfully");
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("file.path",
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(),
ConfigDef.Importance.HIGH,
"Path to input file")
.define("topic",
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Output topic name")
.define("batch.size",
ConfigDef.Type.INT,
100,
ConfigDef.Range.atLeast(1),
ConfigDef.Importance.MEDIUM,
"Records per batch");
}
// ... other methods
}Prevention:
Symptoms:
Causes:
Solutions:
import org.apache.kafka.connect.source.*;
import org.apache.kafka.connect.errors.RetriableException;
import java.util.*;
public class ResilientSourceTask extends SourceTask {
private String filePath;
private BufferedReader reader;
private int maxRetries = 3;
@Override
public void start(Map<String, String> props) {
this.filePath = props.get("file.path");
this.maxRetries = Integer.parseInt(
props.getOrDefault("max.retries", "3"));
// Initialize with retry
initializeWithRetry();
}
private void initializeWithRetry() {
int attempt = 0;
while (attempt < maxRetries) {
try {
this.reader = new BufferedReader(new FileReader(filePath));
System.out.println("Successfully opened file: " + filePath);
return;
} catch (IOException e) {
attempt++;
System.err.println("Failed to open file (attempt " + attempt +
"): " + e.getMessage());
if (attempt >= maxRetries) {
throw new ConnectException("Failed to open file after " +
maxRetries + " attempts", e);
}
try {
Thread.sleep(1000 * attempt); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ConnectException("Interrupted during retry", ie);
}
}
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
String line = reader.readLine();
if (line == null) {
return null; // End of file
}
SourceRecord record = new SourceRecord(
Collections.singletonMap("file", filePath),
Collections.singletonMap("position", reader.ready()),
"output-topic",
Schema.STRING_SCHEMA,
line
);
return Collections.singletonList(record);
} catch (IOException e) {
// Throw RetriableException for transient errors
throw new RetriableException("Error reading file: " + e.getMessage(), e);
}
}
@Override
public void stop() {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
System.err.println("Error closing reader: " + e.getMessage());
}
}
}
@Override
public String version() {
return "1.0.0";
}
}Prevention:
// Source connectors must track offsets to support exactly-once
public class OffsetTrackingSourceTask extends SourceTask {
private Map<String, Object> sourcePartition;
private Map<String, Object> currentOffset;
private long recordCount = 0;
@Override
public void start(Map<String, String> props) {
// Define source partition (identifies this task's data source)
this.sourcePartition = Collections.singletonMap("file", props.get("file.path"));
// Load last committed offset from Connect
Map<String, Object> offset = context.offsetStorageReader()
.offset(sourcePartition);
if (offset != null) {
this.recordCount = (Long) offset.get("record_count");
System.out.println("Resuming from offset: " + recordCount);
} else {
this.recordCount = 0;
System.out.println("Starting from beginning");
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Read data
String data = readNextRecord();
if (data == null) {
return null;
}
// Increment offset
recordCount++;
this.currentOffset = Collections.singletonMap("record_count", recordCount);
// Create source record with offset
SourceRecord record = new SourceRecord(
sourcePartition,
currentOffset, // Offset stored by Connect
"output-topic",
Schema.STRING_SCHEMA,
data
);
return Collections.singletonList(record);
}
@Override
public void commit() throws InterruptedException {
// Called periodically by Connect to commit offsets
// Offsets from SourceRecord are committed automatically
System.out.println("Committing offset: " + recordCount);
}
private String readNextRecord() {
// Implementation
return null;
}
}import org.apache.kafka.connect.data.*;
public class SchemaEvolutionTask extends SourceTask {
private Schema currentSchema;
private int schemaVersion = 1;
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Detect schema changes
Schema newSchema = detectSchema();
if (!newSchema.equals(currentSchema)) {
// Schema evolved
schemaVersion++;
currentSchema = SchemaBuilder.struct()
.name("com.example.Record")
.version(schemaVersion)
.fields(newSchema.fields())
.build();
System.out.println("Schema evolved to version: " + schemaVersion);
}
// Create record with current schema
Struct value = new Struct(currentSchema);
// ... populate value
SourceRecord record = new SourceRecord(
sourcePartition,
sourceOffset,
"output-topic",
currentSchema,
value
);
return Collections.singletonList(record);
}
private Schema detectSchema() {
// Detect schema from data source
return SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build();
}
}// Source connectors with exactly-once support
public class ExactlyOnceSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
// Check transaction boundary configuration
String txnBoundary = props.get(TRANSACTION_BOUNDARY_CONFIG);
if (TransactionBoundary.CONNECTOR.toString().equals(txnBoundary)) {
// Connector defines transaction boundaries
System.out.println("Using connector-defined transaction boundaries");
} else {
// Framework defines boundaries (POLL or INTERVAL)
System.out.println("Using framework-defined transaction boundaries: " +
txnBoundary);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Return batch of records
// Framework commits transaction after poll() returns
List<SourceRecord> records = new ArrayList<>();
// Read batch of records
for (int i = 0; i < 100; i++) {
String data = readRecord();
if (data == null) break;
records.add(new SourceRecord(
sourcePartition,
sourceOffset,
"output-topic",
Schema.STRING_SCHEMA,
data
));
}
return records.isEmpty() ? null : records;
}
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata)
throws InterruptedException {
// Called after each record successfully written
// Use for fine-grained offset tracking
System.out.println("Record committed at offset: " + metadata.offset());
}
private String readRecord() {
// Implementation
return null;
}
}import org.apache.kafka.common.config.*;
import org.apache.kafka.connect.connector.Connector;
public class ValidatedConnector extends SourceConnector {
@Override
public ConfigDef config() {
return new ConfigDef()
// Required string
.define("connection.url",
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(),
ConfigDef.Importance.HIGH,
"Database connection URL")
// Integer with range
.define("batch.size",
ConfigDef.Type.INT,
100,
ConfigDef.Range.between(1, 10000),
ConfigDef.Importance.MEDIUM,
"Records per batch")
// String with valid values
.define("mode",
ConfigDef.Type.STRING,
"incremental",
ConfigDef.ValidString.in("bulk", "incremental", "timestamp"),
ConfigDef.Importance.MEDIUM,
"Query mode")
// Password (hidden in logs)
.define("connection.password",
ConfigDef.Type.PASSWORD,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Database password")
// List of strings
.define("tables",
ConfigDef.Type.LIST,
Collections.emptyList(),
ConfigDef.Importance.HIGH,
"Tables to replicate");
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
// Custom validation beyond ConfigDef
Config config = super.validate(connectorConfigs);
// Add custom validation
String url = connectorConfigs.get("connection.url");
if (url != null && !url.startsWith("jdbc:")) {
config.configValues().stream()
.filter(cv -> cv.name().equals("connection.url"))
.forEach(cv -> cv.addErrorMessage("URL must start with 'jdbc:'"));
}
return config;
}
}import org.apache.kafka.connect.errors.*;
public class ErrorHandlingSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
try {
writeToExternalSystem(record);
} catch (IOException e) {
// Retriable error - will retry
throw new RetriableException(
"Failed to write record: " + e.getMessage(), e);
} catch (IllegalArgumentException e) {
// Non-retriable error - skip record
System.err.println("Invalid record, skipping: " + e.getMessage());
// Log to dead-letter queue
logToDeadLetterQueue(record, e);
} catch (Exception e) {
// Unknown error - fail task
throw new ConnectException(
"Unexpected error: " + e.getMessage(), e);
}
}
}
private void writeToExternalSystem(SinkRecord record) throws IOException {
// Implementation
}
private void logToDeadLetterQueue(SinkRecord record, Exception e) {
// Send to DLQ topic
System.err.println("DLQ: " + record + " - " + e.getMessage());
}
}