or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

transformations.mddocs/connect/

Transformations

Single Message Transformations (SMTs) modify records as they pass through Connect.

Transformation Interface

package org.apache.kafka.connect.transforms;

public interface Transformation<R extends ConnectRecord<R>>
    extends Configurable, Closeable {

    /**
     * Apply transformation to a record.
     * @param record The record to transform
     * @return Transformed record, or null to filter out
     */
    R apply(R record);

    /**
     * Configuration definition for this transformation.
     */
    ConfigDef config();

    /**
     * Close and cleanup resources.
     */
    void close();
}

Predicate Interface

Conditional execution of transformations.

package org.apache.kafka.connect.transforms.predicates;

public interface Predicate<R extends ConnectRecord<R>>
    extends Configurable, AutoCloseable {

    /**
     * Configuration definition for this predicate.
     */
    ConfigDef config();

    /**
     * Test if predicate matches the record.
     */
    boolean test(R record);

    /**
     * Close and cleanup resources.
     */
    void close();
}

Custom Transformation Example

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.data.*;
import java.util.Map;

public class MaskField<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final String FIELD_CONFIG = "field";
    private static final String REPLACEMENT_CONFIG = "replacement";

    private String fieldName;
    private String replacement;

    @Override
    public void configure(Map<String, ?> configs) {
        this.fieldName = (String) configs.get(FIELD_CONFIG);
        this.replacement = (String) configs.get(REPLACEMENT_CONFIG);

        if (replacement == null) {
            replacement = "****";
        }
    }

    @Override
    public R apply(R record) {
        if (record.valueSchema() == null) {
            return applySchemaless(record);
        } else {
            return applyWithSchema(record);
        }
    }

    private R applySchemaless(R record) {
        if (!(record.value() instanceof Map)) {
            return record;
        }

        @SuppressWarnings("unchecked")
        Map<String, Object> value = (Map<String, Object>) record.value();
        Map<String, Object> updatedValue = new HashMap<>(value);
        updatedValue.put(fieldName, replacement);

        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            null,  // no schema
            updatedValue,
            record.timestamp()
        );
    }

    private R applyWithSchema(R record) {
        Struct value = (Struct) record.value();
        Schema schema = record.valueSchema();

        // Check if field exists
        Field field = schema.field(fieldName);
        if (field == null) {
            return record;
        }

        // Create new struct with masked field
        Struct updatedValue = new Struct(schema);
        for (Field f : schema.fields()) {
            if (f.name().equals(fieldName)) {
                updatedValue.put(f, replacement);
            } else {
                updatedValue.put(f, value.get(f));
            }
        }

        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            schema,
            updatedValue,
            record.timestamp()
        );
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define(FIELD_CONFIG,
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                ConfigDef.Importance.HIGH,
                "Field name to mask")
            .define(REPLACEMENT_CONFIG,
                ConfigDef.Type.STRING,
                "****",
                ConfigDef.Importance.MEDIUM,
                "Replacement value");
    }

    @Override
    public void close() {
        // Cleanup resources if needed
    }
}

Custom Predicate Example

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.data.*;
import java.util.Map;

public class TopicMatches<R extends ConnectRecord<R>> implements Predicate<R> {
    private static final String PATTERN_CONFIG = "pattern";

    private String pattern;

    @Override
    public void configure(Map<String, ?> configs) {
        this.pattern = (String) configs.get(PATTERN_CONFIG);
    }

    @Override
    public boolean test(R record) {
        return record.topic().matches(pattern);
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define(PATTERN_CONFIG,
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                ConfigDef.Importance.HIGH,
                "Regex pattern to match topic names");
    }

    @Override
    public void close() {
        // Cleanup
    }
}

Built-in Transformations

Kafka Connect provides several built-in Single Message Transformations (SMTs) for common data manipulation tasks. Each transformation can operate on the record key or value, specified by using either the $Key or $Value suffix in the transformation class name.

Cast

Cast fields or entire key/value to a specific type.

Purpose: Force type conversions for fields, useful for ensuring correct data types in downstream systems or reducing numeric precision.

Supported types: int8, int16, int32, int64, float32, float64, boolean, string. Binary fields can be cast to string (base64 encoded).

Configuration properties:

  • spec (required): List of field:type pairs (e.g., age:int32,score:float64) or a single type to cast entire value
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

// Cast specific fields
transforms=castTypes
transforms.castTypes.type=org.apache.kafka.connect.transforms.Cast$Value
transforms.castTypes.spec=age:int32,score:float64,active:boolean

// Cast entire value
transforms=castWholeValue
transforms.castWholeValue.type=org.apache.kafka.connect.transforms.Cast$Value
transforms.castWholeValue.spec=int64

Before/After example:

// Before: {"age": "25", "score": "98.5", "active": "true"}
// After:  {"age": 25, "score": 98.5, "active": true}

ExtractField

Extract a single field from a Struct or Map.

Purpose: Simplify records by extracting only one field from a complex structure. Useful for extracting IDs or simple values.

Configuration properties:

  • field (required): Field name to extract
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

transforms=extractId
transforms.extractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractId.field=id

Before/After example:

// Before: {"id": 123, "name": "Alice", "email": "alice@example.com"}
// After:  123

Flatten

Flatten nested structures into a single-level structure.

Purpose: Convert nested objects into flat structures by concatenating field names with a delimiter. Essential for systems that don't support nested data.

Configuration properties:

  • delimiter (default: "."): Delimiter character to insert between field names

Configuration example:

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_

Before/After example:

// Before: {"user": {"id": 123, "name": "Alice"}, "timestamp": 1234567890}
// After:  {"user_id": 123, "user_name": "Alice", "timestamp": 1234567890}

HoistField

Wrap data in a Struct or Map with a single named field.

Purpose: Add a wrapping layer to records, useful for adding context or meeting schema requirements of downstream systems.

Configuration properties:

  • field (required): Field name for the single field in the resulting Struct/Map

Configuration example:

transforms=wrapValue
transforms.wrapValue.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapValue.field=payload

Before/After example:

// Before: {"id": 123, "name": "Alice"}
// After:  {"payload": {"id": 123, "name": "Alice"}}

InsertField

Add fields to records using record metadata or static values.

Purpose: Enrich records with metadata (topic, partition, offset, timestamp) or static values for tracking and routing.

Configuration properties:

  • topic.field: Field name for Kafka topic (suffix with ! for required, ? for optional)
  • partition.field: Field name for Kafka partition
  • offset.field: Field name for Kafka offset (sink connectors only)
  • timestamp.field: Field name for record timestamp
  • static.field: Field name for static data
  • static.value: Static value to insert
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

transforms=addMetadata
transforms.addMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addMetadata.topic.field=sourceTopic
transforms.addMetadata.timestamp.field=processedAt
transforms.addMetadata.static.field=source
transforms.addMetadata.static.value=my-connector

Before/After example:

// Before: {"id": 123, "name": "Alice"}
// After:  {"id": 123, "name": "Alice", "sourceTopic": "users",
//          "processedAt": 1234567890000, "source": "my-connector"}

MaskField

Mask sensitive field values with null equivalents or custom replacements.

Purpose: Protect sensitive data (PII, credentials) by replacing with masked values. Essential for data privacy compliance.

Configuration properties:

  • fields (required): List of field names to mask
  • replacement: Custom replacement value (for numeric and string fields only)
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Masking behavior:

  • Numeric fields: 0
  • Boolean fields: false
  • String fields: empty string ""
  • Date fields: epoch (Jan 1, 1970)
  • Collections: empty list/map

Configuration example:

transforms=maskPII
transforms.maskPII.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskPII.fields=ssn,creditCard,password
transforms.maskPII.replacement=***REDACTED***

Before/After example:

// Before: {"name": "Alice", "ssn": "123-45-6789", "age": 30}
// After:  {"name": "Alice", "ssn": "***REDACTED***", "age": 30}

// Without replacement:
// Before: {"balance": 1000.50, "account": "12345", "active": true}
// After:  {"balance": 0.0, "account": "", "active": false}

ReplaceField

Filter or rename fields in records.

Purpose: Remove unwanted fields, rename fields for compatibility, or select specific fields to include.

Configuration properties:

  • exclude: List of fields to exclude (takes precedence over include)
  • include: List of fields to include (if specified, only these fields are kept)
  • renames: Field rename mappings in format oldName:newName,other:renamed
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

// Exclude fields
transforms=dropSensitive
transforms.dropSensitive.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.dropSensitive.exclude=password,internalId

// Rename fields
transforms=renameFields
transforms.renameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameFields.renames=userId:user_id,createdAt:created_at

// Include only specific fields
transforms=selectFields
transforms.selectFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.selectFields.include=id,name,email

Before/After example:

// Exclude example:
// Before: {"id": 123, "name": "Alice", "password": "secret", "email": "alice@example.com"}
// After:  {"id": 123, "name": "Alice", "email": "alice@example.com"}

// Rename example:
// Before: {"userId": 123, "createdAt": 1234567890}
// After:  {"user_id": 123, "created_at": 1234567890}

SetSchemaMetadata

Set schema name and/or version on the record's key or value schema.

Purpose: Tag schemas with names and versions for schema registry compatibility and schema evolution tracking.

Configuration properties:

  • schema.name: Schema name to set
  • schema.version: Schema version to set (integer)
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Note: At least one of schema.name or schema.version must be specified.

Configuration example:

transforms=setSchema
transforms.setSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.setSchema.schema.name=com.example.UserRecord
transforms.setSchema.schema.version=2

Before/After example:

// Before: Schema(name=null, version=null, fields=[id, name])
// After:  Schema(name=com.example.UserRecord, version=2, fields=[id, name])

TimestampConverter

Convert timestamps between different formats.

Purpose: Transform timestamps between Unix epoch, strings, and Connect Date/Time/Timestamp types for compatibility with various systems.

Supported formats:

  • unix: Unix epoch time (configurable precision)
  • string: String format with configurable pattern (SimpleDateFormat)
  • Date: Connect Date type (date only, no time)
  • Time: Connect Time type (time only, no date)
  • Timestamp: Connect Timestamp type (date and time)

Configuration properties:

  • field: Field containing the timestamp (empty string for entire key/value)
  • target.type (required): Target format (string, unix, Date, Time, or Timestamp)
  • format: SimpleDateFormat pattern (required when target.type=string)
  • unix.precision (default: milliseconds): Unix timestamp precision (seconds, milliseconds, microseconds, nanoseconds)
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

// Convert Unix timestamp to string
transforms=formatTimestamp
transforms.formatTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.formatTimestamp.field=timestamp
transforms.formatTimestamp.target.type=string
transforms.formatTimestamp.format=yyyy-MM-dd HH:mm:ss

// Convert string to Timestamp
transforms=parseTimestamp
transforms.parseTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.parseTimestamp.field=createdAt
transforms.parseTimestamp.target.type=Timestamp
transforms.parseTimestamp.format=yyyy-MM-dd'T'HH:mm:ss.SSS'Z'

// Convert Unix seconds to milliseconds
transforms=convertPrecision
transforms.convertPrecision.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convertPrecision.field=timestamp
transforms.convertPrecision.target.type=unix
transforms.convertPrecision.unix.precision=milliseconds

Before/After example:

// String conversion:
// Before: {"timestamp": 1609459200000}
// After:  {"timestamp": "2021-01-01 00:00:00"}

// Timestamp parsing:
// Before: {"createdAt": "2021-01-01T12:30:00.000Z"}
// After:  {"createdAt": 1609502400000}

TimestampRouter

Route records to topics based on timestamp and original topic name.

Purpose: Implement time-based topic routing for sink connectors, enabling time-partitioned storage (e.g., daily tables).

Configuration properties:

  • topic.format (default: "${topic}-${timestamp}"): Format string with ${topic} and ${timestamp} placeholders
  • timestamp.format (default: "yyyyMMdd"): SimpleDateFormat pattern for timestamp

Configuration example:

transforms=routeByDay
transforms.routeByDay.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.routeByDay.topic.format=${topic}-${timestamp}
transforms.routeByDay.timestamp.format=yyyy-MM-dd

Before/After example:

// Before: topic="events", timestamp=1609459200000 (2021-01-01)
// After:  topic="events-2021-01-01", timestamp=1609459200000

RegexRouter

Route records to topics based on regex pattern matching.

Purpose: Dynamically route records to different topics based on the original topic name using regex patterns.

Configuration properties:

  • regex (required): Regular expression for matching topic names
  • replacement (required): Replacement string (supports regex capture groups)

Configuration example:

transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=(.*)-(.*)-(.*)
transforms.renameTopic.replacement=$3-$2-$1

// Example: route all dev topics to staging
transforms=routeToStaging
transforms.routeToStaging.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeToStaging.regex=dev-(.*)
transforms.routeToStaging.replacement=staging-$1

Before/After example:

// Before: topic="dev-users-events"
// After:  topic="staging-users-events"

ValueToKey

Replace record key with fields extracted from the record value.

Purpose: Derive record keys from value fields, useful when source data doesn't have proper keys or for rekeying data.

Configuration properties:

  • fields (required): List of field names from value to extract as the key
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

transforms=extractKey
transforms.extractKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.extractKey.fields=userId,tenantId

Before/After example:

// Single field:
// Before: key=null, value={"userId": 123, "name": "Alice"}
// After:  key={"userId": 123}, value={"userId": 123, "name": "Alice"}

// Multiple fields:
// Before: key=null, value={"userId": 123, "tenantId": "acme", "name": "Alice"}
// After:  key={"userId": 123, "tenantId": "acme"}, value={"userId": 123, "tenantId": "acme", "name": "Alice"}

Filter

Drop records from the pipeline.

Purpose: Filter out records conditionally (requires use with predicates). Returning null drops the record.

Configuration properties: None (works with predicates)

Configuration example:

// Filter out tombstone records
transforms=filterTombstones
transforms.filterTombstones.type=org.apache.kafka.connect.transforms.Filter
transforms.filterTombstones.predicate=isTombstone

predicates=isTombstone
predicates.isTombstone.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

// Filter records from specific topics
transforms=filterTestTopics
transforms.filterTestTopics.type=org.apache.kafka.connect.transforms.Filter
transforms.filterTestTopics.predicate=isTestTopic

predicates=isTestTopic
predicates.isTestTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isTestTopic.pattern=test-.*

Before/After example:

// Before: {"id": 123, "name": null} (tombstone record)
// After:  (record dropped)

DropHeaders

Remove specific headers from records.

Purpose: Clean up headers, remove sensitive metadata, or reduce message size by dropping unnecessary headers.

Configuration properties:

  • headers (required): List of header names to remove

Configuration example:

transforms=dropHeaders
transforms.dropHeaders.type=org.apache.kafka.connect.transforms.DropHeaders
transforms.dropHeaders.headers=internalId,debugInfo

Before/After example:

// Before: headers=[internalId: "abc", userId: "123", debugInfo: "test"]
// After:  headers=[userId: "123"]

HeaderFrom

Move or copy fields from key/value to headers.

Purpose: Promote fields to headers for routing, filtering, or metadata without parsing the entire message body.

Configuration properties:

  • fields (required): List of field names to move/copy
  • headers (required): List of header names (must match fields list length)
  • operation (required): Either "move" (remove from key/value) or "copy" (keep in key/value)
  • replace.null.with.default (default: true): Whether to replace null fields with their default values

Configuration example:

// Copy fields to headers
transforms=copyToHeaders
transforms.copyToHeaders.type=org.apache.kafka.connect.transforms.HeaderFrom$Value
transforms.copyToHeaders.fields=userId,tenantId
transforms.copyToHeaders.headers=user-id,tenant-id
transforms.copyToHeaders.operation=copy

// Move fields to headers
transforms=moveToHeaders
transforms.moveToHeaders.type=org.apache.kafka.connect.transforms.HeaderFrom$Value
transforms.moveToHeaders.fields=routingKey
transforms.moveToHeaders.headers=routing-key
transforms.moveToHeaders.operation=move

Before/After example:

// Copy operation:
// Before: value={"userId": 123, "name": "Alice"}, headers=[]
// After:  value={"userId": 123, "name": "Alice"}, headers=[userId: 123]

// Move operation:
// Before: value={"userId": 123, "routingKey": "us-east", "name": "Alice"}, headers=[]
// After:  value={"userId": 123, "name": "Alice"}, headers=[routingKey: "us-east"]

InsertHeader

Add a static header to all records.

Purpose: Tag records with static metadata for routing, filtering, or tracking connector source.

Configuration properties:

  • header (required): Header name
  • value.literal (required): Static value to set

Configuration example:

transforms=addSourceHeader
transforms.addSourceHeader.type=org.apache.kafka.connect.transforms.InsertHeader
transforms.addSourceHeader.header=source
transforms.addSourceHeader.value.literal=mysql-connector

Before/After example:

// Before: headers=[]
// After:  headers=[source: "mysql-connector"]

Built-in Predicates

Predicates enable conditional transformation execution. A transformation with a predicate is only applied when the predicate returns true (or false if negate=true).

RecordIsTombstone

Match tombstone records (records with null value).

Purpose: Identify delete markers in change data capture (CDC) streams or filter out tombstone records.

Configuration properties: None

Configuration example:

transforms=handleTombstones
transforms.handleTombstones.type=org.apache.kafka.connect.transforms.Filter
transforms.handleTombstones.predicate=isTombstone

predicates=isTombstone
predicates.isTombstone.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

// Or use with negate to apply transformation only to non-tombstone records
transforms=enrichData
transforms.enrichData.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.enrichData.timestamp.field=processedAt
transforms.enrichData.predicate=isTombstone
transforms.enrichData.negate=true

TopicNameMatches

Match records based on topic name pattern.

Purpose: Apply transformations selectively to specific topics using regex patterns.

Configuration properties:

  • pattern (required): Java regular expression for matching topic names

Configuration example:

transforms=maskUserData
transforms.maskUserData.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskUserData.fields=ssn,email
transforms.maskUserData.predicate=isUserTopic

predicates=isUserTopic
predicates.isUserTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isUserTopic.pattern=users-.*

HasHeaderKey

Match records that have a specific header.

Purpose: Apply transformations based on header presence, useful for conditional processing based on metadata.

Configuration properties:

  • name (required): Header name to check for

Configuration example:

transforms=routeImportant
transforms.routeImportant.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeImportant.regex=(.*)
transforms.routeImportant.replacement=$1-priority
transforms.routeImportant.predicate=hasHighPriority

predicates=hasHighPriority
predicates.hasHighPriority.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasHighPriority.name=priority

Using Transformations

Connector Configuration

// Apply transformations in connector configuration
Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("connector.class", "com.example.MySourceConnector");
connectorConfig.put("tasks.max", "1");

// Define transformations chain
connectorConfig.put("transforms", "maskPII,addMetadata,filter");

// First transformation: mask PII fields
connectorConfig.put("transforms.maskPII.type",
    "org.apache.kafka.connect.transforms.MaskField$Value");
connectorConfig.put("transforms.maskPII.fields", "ssn,creditCard");

// Second transformation: add metadata
connectorConfig.put("transforms.addMetadata.type",
    "org.apache.kafka.connect.transforms.InsertField$Value");
connectorConfig.put("transforms.addMetadata.static.field", "source");
connectorConfig.put("transforms.addMetadata.static.value", "my-connector");

// Third transformation: filter nulls
connectorConfig.put("transforms.filter.type",
    "org.apache.kafka.connect.transforms.Filter");
connectorConfig.put("transforms.filter.predicate", "isNull");

// Define predicate
connectorConfig.put("predicates", "isNull");
connectorConfig.put("predicates.isNull.type",
    "org.apache.kafka.connect.predicates.RecordIsTombstone");

Conditional Transformations

Use predicates to apply transformations conditionally:

// Apply transformation only to specific topics
connectorConfig.put("transforms", "maskEmail");
connectorConfig.put("transforms.maskEmail.type", "com.example.MaskField");
connectorConfig.put("transforms.maskEmail.field", "email");
connectorConfig.put("transforms.maskEmail.predicate", "userTopic");

// Define predicate
connectorConfig.put("predicates", "userTopic");
connectorConfig.put("predicates.userTopic.type",
    "org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
connectorConfig.put("predicates.userTopic.pattern", "users-.*");

// Negate predicate
connectorConfig.put("transforms.maskEmail.negate", "false");

Advanced Example

Complex transformation pipeline:

import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.common.config.ConfigDef;
import java.util.*;

public class EnrichRecord<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final String ENRICHMENT_FIELD_CONFIG = "enrichment.field";
    private static final String LOOKUP_URL_CONFIG = "lookup.url";

    private String enrichmentField;
    private String lookupUrl;
    private Map<String, String> cache;

    @Override
    public void configure(Map<String, ?> configs) {
        this.enrichmentField = (String) configs.get(ENRICHMENT_FIELD_CONFIG);
        this.lookupUrl = (String) configs.get(LOOKUP_URL_CONFIG);
        this.cache = new HashMap<>();
    }

    @Override
    public R apply(R record) {
        if (record.value() == null) {
            return record;
        }

        if (record.valueSchema() == null) {
            return enrichSchemaless(record);
        } else {
            return enrichWithSchema(record);
        }
    }

    private R enrichSchemaless(R record) {
        @SuppressWarnings("unchecked")
        Map<String, Object> value = (Map<String, Object>) record.value();
        Map<String, Object> enriched = new HashMap<>(value);

        // Perform lookup and add enrichment
        String key = (String) value.get("id");
        String enrichmentValue = lookup(key);
        enriched.put(enrichmentField, enrichmentValue);

        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            null,
            enriched,
            record.timestamp()
        );
    }

    private R enrichWithSchema(R record) {
        Struct value = (Struct) record.value();
        Schema originalSchema = record.valueSchema();

        // Build new schema with enrichment field
        SchemaBuilder builder = SchemaBuilder.struct();
        for (Field field : originalSchema.fields()) {
            builder.field(field.name(), field.schema());
        }
        builder.field(enrichmentField, Schema.OPTIONAL_STRING_SCHEMA);
        Schema enrichedSchema = builder.build();

        // Copy original fields and add enrichment
        Struct enriched = new Struct(enrichedSchema);
        for (Field field : originalSchema.fields()) {
            enriched.put(field.name(), value.get(field));
        }

        String key = value.getString("id");
        String enrichmentValue = lookup(key);
        enriched.put(enrichmentField, enrichmentValue);

        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            enrichedSchema,
            enriched,
            record.timestamp()
        );
    }

    private String lookup(String key) {
        // Check cache first
        if (cache.containsKey(key)) {
            return cache.get(key);
        }

        // Perform lookup (simplified)
        String result = performHttpLookup(lookupUrl, key);
        cache.put(key, result);

        return result;
    }

    private String performHttpLookup(String url, String key) {
        // HTTP lookup implementation
        return "enriched-" + key;
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define(ENRICHMENT_FIELD_CONFIG,
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                ConfigDef.Importance.HIGH,
                "Field name for enrichment data")
            .define(LOOKUP_URL_CONFIG,
                ConfigDef.Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                ConfigDef.Importance.HIGH,
                "URL for enrichment lookups");
    }

    @Override
    public void close() {
        cache.clear();
    }
}

Testing Transformations

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.*;
import org.junit.Test;
import java.util.*;

public class MaskFieldTest {
    @Test
    public void testMaskFieldWithSchema() {
        // Create transformation
        MaskField<SourceRecord> transform = new MaskField<>();
        Map<String, Object> config = new HashMap<>();
        config.put("field", "email");
        config.put("replacement", "***@***.com");
        transform.configure(config);

        // Create test record
        Schema schema = SchemaBuilder.struct()
            .field("id", Schema.INT64_SCHEMA)
            .field("name", Schema.STRING_SCHEMA)
            .field("email", Schema.STRING_SCHEMA)
            .build();

        Struct value = new Struct(schema)
            .put("id", 123L)
            .put("name", "John Doe")
            .put("email", "john@example.com");

        SourceRecord record = new SourceRecord(
            null, null, "test-topic", 0,
            schema, value
        );

        // Apply transformation
        SourceRecord transformed = transform.apply(record);

        // Verify
        Struct transformedValue = (Struct) transformed.value();
        assertEquals("***@***.com", transformedValue.getString("email"));
        assertEquals("John Doe", transformedValue.getString("name"));
        assertEquals(123L, transformedValue.getInt64("id").longValue());
    }

    @Test
    public void testMaskFieldSchemaless() {
        // Create transformation
        MaskField<SourceRecord> transform = new MaskField<>();
        Map<String, Object> config = new HashMap<>();
        config.put("field", "password");
        transform.configure(config);

        // Create schemaless record
        Map<String, Object> value = new HashMap<>();
        value.put("username", "john");
        value.put("password", "secret123");

        SourceRecord record = new SourceRecord(
            null, null, "test-topic", 0,
            null, value
        );

        // Apply transformation
        SourceRecord transformed = transform.apply(record);

        // Verify
        @SuppressWarnings("unchecked")
        Map<String, Object> transformedValue = (Map<String, Object>) transformed.value();
        assertEquals("****", transformedValue.get("password"));
        assertEquals("john", transformedValue.get("username"));
    }
}

Best Practices

Performance

// Cache expensive operations
public class CachingTransformation<R extends ConnectRecord<R>>
    implements Transformation<R> {

    private Map<String, String> cache = new ConcurrentHashMap<>();
    private long cacheHits = 0;
    private long cacheMisses = 0;

    @Override
    public R apply(R record) {
        String key = extractKey(record);

        String result = cache.computeIfAbsent(key, k -> {
            cacheMisses++;
            return performExpensiveOperation(k);
        });

        if (cache.containsKey(key)) {
            cacheHits++;
        }

        return applyResult(record, result);
    }

    @Override
    public void close() {
        System.out.println("Cache hits: " + cacheHits +
            ", misses: " + cacheMisses);
        cache.clear();
    }
}

Error Handling

@Override
public R apply(R record) {
    try {
        return doTransform(record);
    } catch (Exception e) {
        // Log error with record details
        System.err.println("Transformation failed for record: " +
            "topic=" + record.topic() +
            ", partition=" + record.kafkaPartition() +
            ", offset=" + record.sourceOffset());
        e.printStackTrace();

        // Option 1: Return original record
        return record;

        // Option 2: Throw exception to fail pipeline
        // throw new DataException("Transformation failed", e);

        // Option 3: Return null to filter out problematic record
        // return null;
    }
}

Configuration Validation

@Override
public ConfigDef config() {
    return new ConfigDef()
        .define("field",
            ConfigDef.Type.STRING,
            ConfigDef.NO_DEFAULT_VALUE,
            ConfigDef.Importance.HIGH,
            "Field to transform")
        .define("replacement",
            ConfigDef.Type.STRING,
            "****",
            new ConfigDef.Validator() {
                @Override
                public void ensureValid(String name, Object value) {
                    String str = (String) value;
                    if (str == null || str.isEmpty()) {
                        throw new ConfigException(name,
                            value, "Replacement cannot be empty");
                    }
                }
            },
            ConfigDef.Importance.MEDIUM,
            "Replacement value");
}

Transformation Best Practices

Chaining Transformations

// Connect configuration for chaining multiple transformations
import java.util.*;

public class TransformationChainExample {
    
    public Map<String, String> createConnectorConfig() {
        Map<String, String> config = new HashMap<>();
        
        // Define transformation chain
        config.put("transforms", "mask,addField,route");
        
        // Transform 1: Mask sensitive field
        config.put("transforms.mask.type", "com.example.MaskField");
        config.put("transforms.mask.field", "ssn");
        config.put("transforms.mask.replacement", "***-**-****");
        
        // Transform 2: Add timestamp field
        config.put("transforms.addField.type", 
            "org.apache.kafka.connect.transforms.InsertField$Value");
        config.put("transforms.addField.timestamp.field", "processed_at");
        
        // Transform 3: Route based on predicate
        config.put("transforms.route.type", 
            "org.apache.kafka.connect.transforms.RegexRouter");
        config.put("transforms.route.regex", "(.*)");
        config.put("transforms.route.replacement", "$1-processed");
        
        // Apply transformations conditionally
        config.put("predicates", "isImportant");
        config.put("predicates.isImportant.type",
            "org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
        config.put("predicates.isImportant.pattern", "important-.*");
        
        // Apply mask only to important topics
        config.put("transforms.mask.predicate", "isImportant");
        
        return config;
    }
}

Error Handling in Transformations

import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.*;

public class SafeTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
    
    @Override
    public R apply(R record) {
        try {
            return doTransform(record);
        } catch (Exception e) {
            // Log error but don't fail entire pipeline
            System.err.println("Transformation error for record " + record.key() + 
                ": " + e.getMessage());
            
            // Options:
            // 1. Return original record unchanged
            return record;
            
            // 2. Return null to filter out problematic record
            // return null;
            
            // 3. Throw RetriableException to retry
            // throw new RetriableException("Transformation failed", e);
            
            // 4. Throw ConnectException to fail task
            // throw new ConnectException("Fatal transformation error", e);
        }
    }
    
    private R doTransform(R record) {
        // Transformation logic that may throw exceptions
        return record;
    }
}

Edge Cases

Transforming Records with No Schema

// Handle both schema and schemaless records
public class FlexibleTransformation<R extends ConnectRecord<R>> 
        implements Transformation<R> {
    
    @Override
    public R apply(R record) {
        if (record.valueSchema() == null) {
            return transformSchemaless(record);
        } else {
            return transformWithSchema(record);
        }
    }
    
    private R transformSchemaless(R record) {
        // Handle Map-based value
        if (!(record.value() instanceof Map)) {
            System.err.println("Expected Map for schemaless record");
            return record;
        }
        
        @SuppressWarnings("unchecked")
        Map<String, Object> value = new HashMap<>((Map<String, Object>) record.value());
        
        // Transform
        value.put("transformed", true);
        value.put("timestamp", System.currentTimeMillis());
        
        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            null, // No schema
            value,
            record.timestamp()
        );
    }
    
    private R transformWithSchema(R record) {
        // Handle Struct-based value
        if (!(record.value() instanceof Struct)) {
            System.err.println("Expected Struct for schema-based record");
            return record;
        }
        
        Struct oldValue = (Struct) record.value();
        Schema oldSchema = record.valueSchema();
        
        // Create new schema with additional field
        Schema newSchema = SchemaBuilder.struct()
            .name(oldSchema.name())
            .version(oldSchema.version())
            .fields(oldSchema.fields()) // Copy existing fields
            .field("transformed", Schema.BOOLEAN_SCHEMA)
            .field("timestamp", Schema.INT64_SCHEMA)
            .build();
        
        // Create new struct
        Struct newValue = new Struct(newSchema);
        for (Field field : oldSchema.fields()) {
            newValue.put(field, oldValue.get(field));
        }
        newValue.put("transformed", true);
        newValue.put("timestamp", System.currentTimeMillis());
        
        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            newSchema,
            newValue,
            record.timestamp()
        );
    }
}

Filtering Records

// Transformation can filter records by returning null
public class FilteringTransformation<R extends ConnectRecord<R>> 
        implements Transformation<R> {
    private String filterField;
    private String filterValue;
    
    @Override
    public void configure(Map<String, ?> configs) {
        this.filterField = (String) configs.get("filter.field");
        this.filterValue = (String) configs.get("filter.value");
    }
    
    @Override
    public R apply(R record) {
        if (record.valueSchema() != null) {
            Struct value = (Struct) record.value();
            Object fieldValue = value.get(filterField);
            
            if (filterValue.equals(String.valueOf(fieldValue))) {
                return null; // Filter out this record
            }
        }
        
        return record; // Pass through
    }
    
    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("filter.field", ConfigDef.Type.STRING, 
                ConfigDef.Importance.HIGH, "Field to filter on")
            .define("filter.value", ConfigDef.Type.STRING,
                ConfigDef.Importance.HIGH, "Value to filter");
    }
    
    @Override
    public void close() {}
}