tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Single Message Transformations (SMTs) modify records as they pass through Connect.
package org.apache.kafka.connect.transforms;
public interface Transformation<R extends ConnectRecord<R>>
extends Configurable, Closeable {
/**
* Apply transformation to a record.
* @param record The record to transform
* @return Transformed record, or null to filter out
*/
R apply(R record);
/**
* Configuration definition for this transformation.
*/
ConfigDef config();
/**
* Close and cleanup resources.
*/
void close();
}Conditional execution of transformations.
package org.apache.kafka.connect.transforms.predicates;
public interface Predicate<R extends ConnectRecord<R>>
extends Configurable, AutoCloseable {
/**
* Configuration definition for this predicate.
*/
ConfigDef config();
/**
* Test if predicate matches the record.
*/
boolean test(R record);
/**
* Close and cleanup resources.
*/
void close();
}import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.data.*;
import java.util.Map;
public class MaskField<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String FIELD_CONFIG = "field";
private static final String REPLACEMENT_CONFIG = "replacement";
private String fieldName;
private String replacement;
@Override
public void configure(Map<String, ?> configs) {
this.fieldName = (String) configs.get(FIELD_CONFIG);
this.replacement = (String) configs.get(REPLACEMENT_CONFIG);
if (replacement == null) {
replacement = "****";
}
}
@Override
public R apply(R record) {
if (record.valueSchema() == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
private R applySchemaless(R record) {
if (!(record.value() instanceof Map)) {
return record;
}
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) record.value();
Map<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(fieldName, replacement);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
null, // no schema
updatedValue,
record.timestamp()
);
}
private R applyWithSchema(R record) {
Struct value = (Struct) record.value();
Schema schema = record.valueSchema();
// Check if field exists
Field field = schema.field(fieldName);
if (field == null) {
return record;
}
// Create new struct with masked field
Struct updatedValue = new Struct(schema);
for (Field f : schema.fields()) {
if (f.name().equals(fieldName)) {
updatedValue.put(f, replacement);
} else {
updatedValue.put(f, value.get(f));
}
}
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
schema,
updatedValue,
record.timestamp()
);
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(FIELD_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Field name to mask")
.define(REPLACEMENT_CONFIG,
ConfigDef.Type.STRING,
"****",
ConfigDef.Importance.MEDIUM,
"Replacement value");
}
@Override
public void close() {
// Cleanup resources if needed
}
}import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.data.*;
import java.util.Map;
public class TopicMatches<R extends ConnectRecord<R>> implements Predicate<R> {
private static final String PATTERN_CONFIG = "pattern";
private String pattern;
@Override
public void configure(Map<String, ?> configs) {
this.pattern = (String) configs.get(PATTERN_CONFIG);
}
@Override
public boolean test(R record) {
return record.topic().matches(pattern);
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(PATTERN_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Regex pattern to match topic names");
}
@Override
public void close() {
// Cleanup
}
}Kafka Connect provides several built-in Single Message Transformations (SMTs) for common data manipulation tasks. Each transformation can operate on the record key or value, specified by using either the $Key or $Value suffix in the transformation class name.
Cast fields or entire key/value to a specific type.
Purpose: Force type conversions for fields, useful for ensuring correct data types in downstream systems or reducing numeric precision.
Supported types: int8, int16, int32, int64, float32, float64, boolean, string. Binary fields can be cast to string (base64 encoded).
Configuration properties:
spec (required): List of field:type pairs (e.g., age:int32,score:float64) or a single type to cast entire valuereplace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
// Cast specific fields
transforms=castTypes
transforms.castTypes.type=org.apache.kafka.connect.transforms.Cast$Value
transforms.castTypes.spec=age:int32,score:float64,active:boolean
// Cast entire value
transforms=castWholeValue
transforms.castWholeValue.type=org.apache.kafka.connect.transforms.Cast$Value
transforms.castWholeValue.spec=int64Before/After example:
// Before: {"age": "25", "score": "98.5", "active": "true"}
// After: {"age": 25, "score": 98.5, "active": true}Extract a single field from a Struct or Map.
Purpose: Simplify records by extracting only one field from a complex structure. Useful for extracting IDs or simple values.
Configuration properties:
field (required): Field name to extractreplace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
transforms=extractId
transforms.extractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractId.field=idBefore/After example:
// Before: {"id": 123, "name": "Alice", "email": "alice@example.com"}
// After: 123Flatten nested structures into a single-level structure.
Purpose: Convert nested objects into flat structures by concatenating field names with a delimiter. Essential for systems that don't support nested data.
Configuration properties:
delimiter (default: "."): Delimiter character to insert between field namesConfiguration example:
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_Before/After example:
// Before: {"user": {"id": 123, "name": "Alice"}, "timestamp": 1234567890}
// After: {"user_id": 123, "user_name": "Alice", "timestamp": 1234567890}Wrap data in a Struct or Map with a single named field.
Purpose: Add a wrapping layer to records, useful for adding context or meeting schema requirements of downstream systems.
Configuration properties:
field (required): Field name for the single field in the resulting Struct/MapConfiguration example:
transforms=wrapValue
transforms.wrapValue.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapValue.field=payloadBefore/After example:
// Before: {"id": 123, "name": "Alice"}
// After: {"payload": {"id": 123, "name": "Alice"}}Add fields to records using record metadata or static values.
Purpose: Enrich records with metadata (topic, partition, offset, timestamp) or static values for tracking and routing.
Configuration properties:
topic.field: Field name for Kafka topic (suffix with ! for required, ? for optional)partition.field: Field name for Kafka partitionoffset.field: Field name for Kafka offset (sink connectors only)timestamp.field: Field name for record timestampstatic.field: Field name for static datastatic.value: Static value to insertreplace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
transforms=addMetadata
transforms.addMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addMetadata.topic.field=sourceTopic
transforms.addMetadata.timestamp.field=processedAt
transforms.addMetadata.static.field=source
transforms.addMetadata.static.value=my-connectorBefore/After example:
// Before: {"id": 123, "name": "Alice"}
// After: {"id": 123, "name": "Alice", "sourceTopic": "users",
// "processedAt": 1234567890000, "source": "my-connector"}Mask sensitive field values with null equivalents or custom replacements.
Purpose: Protect sensitive data (PII, credentials) by replacing with masked values. Essential for data privacy compliance.
Configuration properties:
fields (required): List of field names to maskreplacement: Custom replacement value (for numeric and string fields only)replace.null.with.default (default: true): Whether to replace null fields with their default valuesMasking behavior:
Configuration example:
transforms=maskPII
transforms.maskPII.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskPII.fields=ssn,creditCard,password
transforms.maskPII.replacement=***REDACTED***Before/After example:
// Before: {"name": "Alice", "ssn": "123-45-6789", "age": 30}
// After: {"name": "Alice", "ssn": "***REDACTED***", "age": 30}
// Without replacement:
// Before: {"balance": 1000.50, "account": "12345", "active": true}
// After: {"balance": 0.0, "account": "", "active": false}Filter or rename fields in records.
Purpose: Remove unwanted fields, rename fields for compatibility, or select specific fields to include.
Configuration properties:
exclude: List of fields to exclude (takes precedence over include)include: List of fields to include (if specified, only these fields are kept)renames: Field rename mappings in format oldName:newName,other:renamedreplace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
// Exclude fields
transforms=dropSensitive
transforms.dropSensitive.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.dropSensitive.exclude=password,internalId
// Rename fields
transforms=renameFields
transforms.renameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameFields.renames=userId:user_id,createdAt:created_at
// Include only specific fields
transforms=selectFields
transforms.selectFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.selectFields.include=id,name,emailBefore/After example:
// Exclude example:
// Before: {"id": 123, "name": "Alice", "password": "secret", "email": "alice@example.com"}
// After: {"id": 123, "name": "Alice", "email": "alice@example.com"}
// Rename example:
// Before: {"userId": 123, "createdAt": 1234567890}
// After: {"user_id": 123, "created_at": 1234567890}Set schema name and/or version on the record's key or value schema.
Purpose: Tag schemas with names and versions for schema registry compatibility and schema evolution tracking.
Configuration properties:
schema.name: Schema name to setschema.version: Schema version to set (integer)replace.null.with.default (default: true): Whether to replace null fields with their default valuesNote: At least one of schema.name or schema.version must be specified.
Configuration example:
transforms=setSchema
transforms.setSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.setSchema.schema.name=com.example.UserRecord
transforms.setSchema.schema.version=2Before/After example:
// Before: Schema(name=null, version=null, fields=[id, name])
// After: Schema(name=com.example.UserRecord, version=2, fields=[id, name])Convert timestamps between different formats.
Purpose: Transform timestamps between Unix epoch, strings, and Connect Date/Time/Timestamp types for compatibility with various systems.
Supported formats:
unix: Unix epoch time (configurable precision)string: String format with configurable pattern (SimpleDateFormat)Date: Connect Date type (date only, no time)Time: Connect Time type (time only, no date)Timestamp: Connect Timestamp type (date and time)Configuration properties:
field: Field containing the timestamp (empty string for entire key/value)target.type (required): Target format (string, unix, Date, Time, or Timestamp)format: SimpleDateFormat pattern (required when target.type=string)unix.precision (default: milliseconds): Unix timestamp precision (seconds, milliseconds, microseconds, nanoseconds)replace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
// Convert Unix timestamp to string
transforms=formatTimestamp
transforms.formatTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.formatTimestamp.field=timestamp
transforms.formatTimestamp.target.type=string
transforms.formatTimestamp.format=yyyy-MM-dd HH:mm:ss
// Convert string to Timestamp
transforms=parseTimestamp
transforms.parseTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.parseTimestamp.field=createdAt
transforms.parseTimestamp.target.type=Timestamp
transforms.parseTimestamp.format=yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
// Convert Unix seconds to milliseconds
transforms=convertPrecision
transforms.convertPrecision.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convertPrecision.field=timestamp
transforms.convertPrecision.target.type=unix
transforms.convertPrecision.unix.precision=millisecondsBefore/After example:
// String conversion:
// Before: {"timestamp": 1609459200000}
// After: {"timestamp": "2021-01-01 00:00:00"}
// Timestamp parsing:
// Before: {"createdAt": "2021-01-01T12:30:00.000Z"}
// After: {"createdAt": 1609502400000}Route records to topics based on timestamp and original topic name.
Purpose: Implement time-based topic routing for sink connectors, enabling time-partitioned storage (e.g., daily tables).
Configuration properties:
topic.format (default: "${topic}-${timestamp}"): Format string with ${topic} and ${timestamp} placeholderstimestamp.format (default: "yyyyMMdd"): SimpleDateFormat pattern for timestampConfiguration example:
transforms=routeByDay
transforms.routeByDay.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.routeByDay.topic.format=${topic}-${timestamp}
transforms.routeByDay.timestamp.format=yyyy-MM-ddBefore/After example:
// Before: topic="events", timestamp=1609459200000 (2021-01-01)
// After: topic="events-2021-01-01", timestamp=1609459200000Route records to topics based on regex pattern matching.
Purpose: Dynamically route records to different topics based on the original topic name using regex patterns.
Configuration properties:
regex (required): Regular expression for matching topic namesreplacement (required): Replacement string (supports regex capture groups)Configuration example:
transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=(.*)-(.*)-(.*)
transforms.renameTopic.replacement=$3-$2-$1
// Example: route all dev topics to staging
transforms=routeToStaging
transforms.routeToStaging.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeToStaging.regex=dev-(.*)
transforms.routeToStaging.replacement=staging-$1Before/After example:
// Before: topic="dev-users-events"
// After: topic="staging-users-events"Replace record key with fields extracted from the record value.
Purpose: Derive record keys from value fields, useful when source data doesn't have proper keys or for rekeying data.
Configuration properties:
fields (required): List of field names from value to extract as the keyreplace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
transforms=extractKey
transforms.extractKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.extractKey.fields=userId,tenantIdBefore/After example:
// Single field:
// Before: key=null, value={"userId": 123, "name": "Alice"}
// After: key={"userId": 123}, value={"userId": 123, "name": "Alice"}
// Multiple fields:
// Before: key=null, value={"userId": 123, "tenantId": "acme", "name": "Alice"}
// After: key={"userId": 123, "tenantId": "acme"}, value={"userId": 123, "tenantId": "acme", "name": "Alice"}Drop records from the pipeline.
Purpose: Filter out records conditionally (requires use with predicates). Returning null drops the record.
Configuration properties: None (works with predicates)
Configuration example:
// Filter out tombstone records
transforms=filterTombstones
transforms.filterTombstones.type=org.apache.kafka.connect.transforms.Filter
transforms.filterTombstones.predicate=isTombstone
predicates=isTombstone
predicates.isTombstone.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
// Filter records from specific topics
transforms=filterTestTopics
transforms.filterTestTopics.type=org.apache.kafka.connect.transforms.Filter
transforms.filterTestTopics.predicate=isTestTopic
predicates=isTestTopic
predicates.isTestTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isTestTopic.pattern=test-.*Before/After example:
// Before: {"id": 123, "name": null} (tombstone record)
// After: (record dropped)Remove specific headers from records.
Purpose: Clean up headers, remove sensitive metadata, or reduce message size by dropping unnecessary headers.
Configuration properties:
headers (required): List of header names to removeConfiguration example:
transforms=dropHeaders
transforms.dropHeaders.type=org.apache.kafka.connect.transforms.DropHeaders
transforms.dropHeaders.headers=internalId,debugInfoBefore/After example:
// Before: headers=[internalId: "abc", userId: "123", debugInfo: "test"]
// After: headers=[userId: "123"]Move or copy fields from key/value to headers.
Purpose: Promote fields to headers for routing, filtering, or metadata without parsing the entire message body.
Configuration properties:
fields (required): List of field names to move/copyheaders (required): List of header names (must match fields list length)operation (required): Either "move" (remove from key/value) or "copy" (keep in key/value)replace.null.with.default (default: true): Whether to replace null fields with their default valuesConfiguration example:
// Copy fields to headers
transforms=copyToHeaders
transforms.copyToHeaders.type=org.apache.kafka.connect.transforms.HeaderFrom$Value
transforms.copyToHeaders.fields=userId,tenantId
transforms.copyToHeaders.headers=user-id,tenant-id
transforms.copyToHeaders.operation=copy
// Move fields to headers
transforms=moveToHeaders
transforms.moveToHeaders.type=org.apache.kafka.connect.transforms.HeaderFrom$Value
transforms.moveToHeaders.fields=routingKey
transforms.moveToHeaders.headers=routing-key
transforms.moveToHeaders.operation=moveBefore/After example:
// Copy operation:
// Before: value={"userId": 123, "name": "Alice"}, headers=[]
// After: value={"userId": 123, "name": "Alice"}, headers=[userId: 123]
// Move operation:
// Before: value={"userId": 123, "routingKey": "us-east", "name": "Alice"}, headers=[]
// After: value={"userId": 123, "name": "Alice"}, headers=[routingKey: "us-east"]Add a static header to all records.
Purpose: Tag records with static metadata for routing, filtering, or tracking connector source.
Configuration properties:
header (required): Header namevalue.literal (required): Static value to setConfiguration example:
transforms=addSourceHeader
transforms.addSourceHeader.type=org.apache.kafka.connect.transforms.InsertHeader
transforms.addSourceHeader.header=source
transforms.addSourceHeader.value.literal=mysql-connectorBefore/After example:
// Before: headers=[]
// After: headers=[source: "mysql-connector"]Predicates enable conditional transformation execution. A transformation with a predicate is only applied when the predicate returns true (or false if negate=true).
Match tombstone records (records with null value).
Purpose: Identify delete markers in change data capture (CDC) streams or filter out tombstone records.
Configuration properties: None
Configuration example:
transforms=handleTombstones
transforms.handleTombstones.type=org.apache.kafka.connect.transforms.Filter
transforms.handleTombstones.predicate=isTombstone
predicates=isTombstone
predicates.isTombstone.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
// Or use with negate to apply transformation only to non-tombstone records
transforms=enrichData
transforms.enrichData.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.enrichData.timestamp.field=processedAt
transforms.enrichData.predicate=isTombstone
transforms.enrichData.negate=trueMatch records based on topic name pattern.
Purpose: Apply transformations selectively to specific topics using regex patterns.
Configuration properties:
pattern (required): Java regular expression for matching topic namesConfiguration example:
transforms=maskUserData
transforms.maskUserData.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskUserData.fields=ssn,email
transforms.maskUserData.predicate=isUserTopic
predicates=isUserTopic
predicates.isUserTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isUserTopic.pattern=users-.*Match records that have a specific header.
Purpose: Apply transformations based on header presence, useful for conditional processing based on metadata.
Configuration properties:
name (required): Header name to check forConfiguration example:
transforms=routeImportant
transforms.routeImportant.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeImportant.regex=(.*)
transforms.routeImportant.replacement=$1-priority
transforms.routeImportant.predicate=hasHighPriority
predicates=hasHighPriority
predicates.hasHighPriority.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasHighPriority.name=priority// Apply transformations in connector configuration
Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("connector.class", "com.example.MySourceConnector");
connectorConfig.put("tasks.max", "1");
// Define transformations chain
connectorConfig.put("transforms", "maskPII,addMetadata,filter");
// First transformation: mask PII fields
connectorConfig.put("transforms.maskPII.type",
"org.apache.kafka.connect.transforms.MaskField$Value");
connectorConfig.put("transforms.maskPII.fields", "ssn,creditCard");
// Second transformation: add metadata
connectorConfig.put("transforms.addMetadata.type",
"org.apache.kafka.connect.transforms.InsertField$Value");
connectorConfig.put("transforms.addMetadata.static.field", "source");
connectorConfig.put("transforms.addMetadata.static.value", "my-connector");
// Third transformation: filter nulls
connectorConfig.put("transforms.filter.type",
"org.apache.kafka.connect.transforms.Filter");
connectorConfig.put("transforms.filter.predicate", "isNull");
// Define predicate
connectorConfig.put("predicates", "isNull");
connectorConfig.put("predicates.isNull.type",
"org.apache.kafka.connect.predicates.RecordIsTombstone");Use predicates to apply transformations conditionally:
// Apply transformation only to specific topics
connectorConfig.put("transforms", "maskEmail");
connectorConfig.put("transforms.maskEmail.type", "com.example.MaskField");
connectorConfig.put("transforms.maskEmail.field", "email");
connectorConfig.put("transforms.maskEmail.predicate", "userTopic");
// Define predicate
connectorConfig.put("predicates", "userTopic");
connectorConfig.put("predicates.userTopic.type",
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
connectorConfig.put("predicates.userTopic.pattern", "users-.*");
// Negate predicate
connectorConfig.put("transforms.maskEmail.negate", "false");Complex transformation pipeline:
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.common.config.ConfigDef;
import java.util.*;
public class EnrichRecord<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String ENRICHMENT_FIELD_CONFIG = "enrichment.field";
private static final String LOOKUP_URL_CONFIG = "lookup.url";
private String enrichmentField;
private String lookupUrl;
private Map<String, String> cache;
@Override
public void configure(Map<String, ?> configs) {
this.enrichmentField = (String) configs.get(ENRICHMENT_FIELD_CONFIG);
this.lookupUrl = (String) configs.get(LOOKUP_URL_CONFIG);
this.cache = new HashMap<>();
}
@Override
public R apply(R record) {
if (record.value() == null) {
return record;
}
if (record.valueSchema() == null) {
return enrichSchemaless(record);
} else {
return enrichWithSchema(record);
}
}
private R enrichSchemaless(R record) {
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) record.value();
Map<String, Object> enriched = new HashMap<>(value);
// Perform lookup and add enrichment
String key = (String) value.get("id");
String enrichmentValue = lookup(key);
enriched.put(enrichmentField, enrichmentValue);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
null,
enriched,
record.timestamp()
);
}
private R enrichWithSchema(R record) {
Struct value = (Struct) record.value();
Schema originalSchema = record.valueSchema();
// Build new schema with enrichment field
SchemaBuilder builder = SchemaBuilder.struct();
for (Field field : originalSchema.fields()) {
builder.field(field.name(), field.schema());
}
builder.field(enrichmentField, Schema.OPTIONAL_STRING_SCHEMA);
Schema enrichedSchema = builder.build();
// Copy original fields and add enrichment
Struct enriched = new Struct(enrichedSchema);
for (Field field : originalSchema.fields()) {
enriched.put(field.name(), value.get(field));
}
String key = value.getString("id");
String enrichmentValue = lookup(key);
enriched.put(enrichmentField, enrichmentValue);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
enrichedSchema,
enriched,
record.timestamp()
);
}
private String lookup(String key) {
// Check cache first
if (cache.containsKey(key)) {
return cache.get(key);
}
// Perform lookup (simplified)
String result = performHttpLookup(lookupUrl, key);
cache.put(key, result);
return result;
}
private String performHttpLookup(String url, String key) {
// HTTP lookup implementation
return "enriched-" + key;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(ENRICHMENT_FIELD_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Field name for enrichment data")
.define(LOOKUP_URL_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"URL for enrichment lookups");
}
@Override
public void close() {
cache.clear();
}
}import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.*;
import org.junit.Test;
import java.util.*;
public class MaskFieldTest {
@Test
public void testMaskFieldWithSchema() {
// Create transformation
MaskField<SourceRecord> transform = new MaskField<>();
Map<String, Object> config = new HashMap<>();
config.put("field", "email");
config.put("replacement", "***@***.com");
transform.configure(config);
// Create test record
Schema schema = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.STRING_SCHEMA)
.build();
Struct value = new Struct(schema)
.put("id", 123L)
.put("name", "John Doe")
.put("email", "john@example.com");
SourceRecord record = new SourceRecord(
null, null, "test-topic", 0,
schema, value
);
// Apply transformation
SourceRecord transformed = transform.apply(record);
// Verify
Struct transformedValue = (Struct) transformed.value();
assertEquals("***@***.com", transformedValue.getString("email"));
assertEquals("John Doe", transformedValue.getString("name"));
assertEquals(123L, transformedValue.getInt64("id").longValue());
}
@Test
public void testMaskFieldSchemaless() {
// Create transformation
MaskField<SourceRecord> transform = new MaskField<>();
Map<String, Object> config = new HashMap<>();
config.put("field", "password");
transform.configure(config);
// Create schemaless record
Map<String, Object> value = new HashMap<>();
value.put("username", "john");
value.put("password", "secret123");
SourceRecord record = new SourceRecord(
null, null, "test-topic", 0,
null, value
);
// Apply transformation
SourceRecord transformed = transform.apply(record);
// Verify
@SuppressWarnings("unchecked")
Map<String, Object> transformedValue = (Map<String, Object>) transformed.value();
assertEquals("****", transformedValue.get("password"));
assertEquals("john", transformedValue.get("username"));
}
}// Cache expensive operations
public class CachingTransformation<R extends ConnectRecord<R>>
implements Transformation<R> {
private Map<String, String> cache = new ConcurrentHashMap<>();
private long cacheHits = 0;
private long cacheMisses = 0;
@Override
public R apply(R record) {
String key = extractKey(record);
String result = cache.computeIfAbsent(key, k -> {
cacheMisses++;
return performExpensiveOperation(k);
});
if (cache.containsKey(key)) {
cacheHits++;
}
return applyResult(record, result);
}
@Override
public void close() {
System.out.println("Cache hits: " + cacheHits +
", misses: " + cacheMisses);
cache.clear();
}
}@Override
public R apply(R record) {
try {
return doTransform(record);
} catch (Exception e) {
// Log error with record details
System.err.println("Transformation failed for record: " +
"topic=" + record.topic() +
", partition=" + record.kafkaPartition() +
", offset=" + record.sourceOffset());
e.printStackTrace();
// Option 1: Return original record
return record;
// Option 2: Throw exception to fail pipeline
// throw new DataException("Transformation failed", e);
// Option 3: Return null to filter out problematic record
// return null;
}
}@Override
public ConfigDef config() {
return new ConfigDef()
.define("field",
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"Field to transform")
.define("replacement",
ConfigDef.Type.STRING,
"****",
new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
String str = (String) value;
if (str == null || str.isEmpty()) {
throw new ConfigException(name,
value, "Replacement cannot be empty");
}
}
},
ConfigDef.Importance.MEDIUM,
"Replacement value");
}// Connect configuration for chaining multiple transformations
import java.util.*;
public class TransformationChainExample {
public Map<String, String> createConnectorConfig() {
Map<String, String> config = new HashMap<>();
// Define transformation chain
config.put("transforms", "mask,addField,route");
// Transform 1: Mask sensitive field
config.put("transforms.mask.type", "com.example.MaskField");
config.put("transforms.mask.field", "ssn");
config.put("transforms.mask.replacement", "***-**-****");
// Transform 2: Add timestamp field
config.put("transforms.addField.type",
"org.apache.kafka.connect.transforms.InsertField$Value");
config.put("transforms.addField.timestamp.field", "processed_at");
// Transform 3: Route based on predicate
config.put("transforms.route.type",
"org.apache.kafka.connect.transforms.RegexRouter");
config.put("transforms.route.regex", "(.*)");
config.put("transforms.route.replacement", "$1-processed");
// Apply transformations conditionally
config.put("predicates", "isImportant");
config.put("predicates.isImportant.type",
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
config.put("predicates.isImportant.pattern", "important-.*");
// Apply mask only to important topics
config.put("transforms.mask.predicate", "isImportant");
return config;
}
}import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.*;
public class SafeTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
try {
return doTransform(record);
} catch (Exception e) {
// Log error but don't fail entire pipeline
System.err.println("Transformation error for record " + record.key() +
": " + e.getMessage());
// Options:
// 1. Return original record unchanged
return record;
// 2. Return null to filter out problematic record
// return null;
// 3. Throw RetriableException to retry
// throw new RetriableException("Transformation failed", e);
// 4. Throw ConnectException to fail task
// throw new ConnectException("Fatal transformation error", e);
}
}
private R doTransform(R record) {
// Transformation logic that may throw exceptions
return record;
}
}// Handle both schema and schemaless records
public class FlexibleTransformation<R extends ConnectRecord<R>>
implements Transformation<R> {
@Override
public R apply(R record) {
if (record.valueSchema() == null) {
return transformSchemaless(record);
} else {
return transformWithSchema(record);
}
}
private R transformSchemaless(R record) {
// Handle Map-based value
if (!(record.value() instanceof Map)) {
System.err.println("Expected Map for schemaless record");
return record;
}
@SuppressWarnings("unchecked")
Map<String, Object> value = new HashMap<>((Map<String, Object>) record.value());
// Transform
value.put("transformed", true);
value.put("timestamp", System.currentTimeMillis());
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
null, // No schema
value,
record.timestamp()
);
}
private R transformWithSchema(R record) {
// Handle Struct-based value
if (!(record.value() instanceof Struct)) {
System.err.println("Expected Struct for schema-based record");
return record;
}
Struct oldValue = (Struct) record.value();
Schema oldSchema = record.valueSchema();
// Create new schema with additional field
Schema newSchema = SchemaBuilder.struct()
.name(oldSchema.name())
.version(oldSchema.version())
.fields(oldSchema.fields()) // Copy existing fields
.field("transformed", Schema.BOOLEAN_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA)
.build();
// Create new struct
Struct newValue = new Struct(newSchema);
for (Field field : oldSchema.fields()) {
newValue.put(field, oldValue.get(field));
}
newValue.put("transformed", true);
newValue.put("timestamp", System.currentTimeMillis());
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
newSchema,
newValue,
record.timestamp()
);
}
}// Transformation can filter records by returning null
public class FilteringTransformation<R extends ConnectRecord<R>>
implements Transformation<R> {
private String filterField;
private String filterValue;
@Override
public void configure(Map<String, ?> configs) {
this.filterField = (String) configs.get("filter.field");
this.filterValue = (String) configs.get("filter.value");
}
@Override
public R apply(R record) {
if (record.valueSchema() != null) {
Struct value = (Struct) record.value();
Object fieldValue = value.get(filterField);
if (filterValue.equals(String.valueOf(fieldValue))) {
return null; // Filter out this record
}
}
return record; // Pass through
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("filter.field", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Field to filter on")
.define("filter.value", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Value to filter");
}
@Override
public void close() {}
}