tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka Connect provides a schema system for structured data with logical types for common data formats.
Data type definition for Connect records.
package org.apache.kafka.connect.data;
public interface Schema {
enum Type {
INT8, INT16, INT32, INT64,
FLOAT32, FLOAT64,
BOOLEAN, STRING, BYTES,
ARRAY, MAP, STRUCT
}
// Schema metadata
Type type();
boolean isOptional();
Object defaultValue();
String name();
Integer version();
String doc();
Map<String, String> parameters();
// Complex type accessors
Schema keySchema(); // For MAP
Schema valueSchema(); // For MAP and ARRAY
List<Field> fields(); // For STRUCT
Field field(String fieldName);
// Predefined schemas
Schema INT8_SCHEMA;
Schema INT16_SCHEMA;
Schema INT32_SCHEMA;
Schema INT64_SCHEMA;
Schema FLOAT32_SCHEMA;
Schema FLOAT64_SCHEMA;
Schema BOOLEAN_SCHEMA;
Schema STRING_SCHEMA;
Schema BYTES_SCHEMA;
// Optional variants
Schema OPTIONAL_INT8_SCHEMA;
Schema OPTIONAL_INT16_SCHEMA;
Schema OPTIONAL_INT32_SCHEMA;
Schema OPTIONAL_INT64_SCHEMA;
Schema OPTIONAL_FLOAT32_SCHEMA;
Schema OPTIONAL_FLOAT64_SCHEMA;
Schema OPTIONAL_BOOLEAN_SCHEMA;
Schema OPTIONAL_STRING_SCHEMA;
Schema OPTIONAL_BYTES_SCHEMA;
}Fluent API for building schemas.
package org.apache.kafka.connect.data;
public class SchemaBuilder implements Schema {
// Primitive types
public static SchemaBuilder int8();
public static SchemaBuilder int16();
public static SchemaBuilder int32();
public static SchemaBuilder int64();
public static SchemaBuilder float32();
public static SchemaBuilder float64();
public static SchemaBuilder bool();
public static SchemaBuilder string();
public static SchemaBuilder bytes();
// Complex types
public static SchemaBuilder struct();
public static SchemaBuilder array(Schema valueSchema);
public static SchemaBuilder map(Schema keySchema, Schema valueSchema);
// Configuration
public SchemaBuilder optional();
public SchemaBuilder required();
public SchemaBuilder defaultValue(Object value);
public SchemaBuilder name(String name);
public SchemaBuilder version(Integer version);
public SchemaBuilder doc(String doc);
public SchemaBuilder parameter(String name, String value);
public SchemaBuilder parameters(Map<String, String> params);
// Struct fields
public SchemaBuilder field(String fieldName, Schema fieldSchema);
// Build
public Schema build();
}Usage Examples:
import org.apache.kafka.connect.data.*;
// Primitive schema
Schema intSchema = SchemaBuilder.int32()
.optional()
.defaultValue(0)
.doc("User age")
.build();
// Struct schema
Schema userSchema = SchemaBuilder.struct()
.name("com.example.User")
.version(1)
.doc("User record")
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA)
.field("age", Schema.INT32_SCHEMA)
.field("active", Schema.BOOLEAN_SCHEMA)
.build();
// Array schema
Schema tagsSchema = SchemaBuilder.array(Schema.STRING_SCHEMA)
.optional()
.build();
// Map schema
Schema metadataSchema = SchemaBuilder.map(
Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA
).optional().build();
// Nested struct
Schema addressSchema = SchemaBuilder.struct()
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.field("zipCode", Schema.STRING_SCHEMA)
.build();
Schema personSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("address", addressSchema)
.build();Structured record value.
package org.apache.kafka.connect.data;
public class Struct {
// Constructor
public Struct(Schema schema);
// Schema
public Schema schema();
// Generic get/put
public Object get(String fieldName);
public Object get(Field field);
public Struct put(String fieldName, Object value);
public Struct put(Field field, Object value);
// Type-specific getters
public Byte getByte(String fieldName);
public Byte getInt8(String fieldName);
public Short getInt16(String fieldName);
public Integer getInt32(String fieldName);
public Long getInt64(String fieldName);
public Float getFloat32(String fieldName);
public Double getFloat64(String fieldName);
public Boolean getBoolean(String fieldName);
public String getString(String fieldName);
public byte[] getBytes(String fieldName);
public <T> List<T> getArray(String fieldName);
public <K, V> Map<K, V> getMap(String fieldName);
public Struct getStruct(String fieldName);
// Validation
public void validate();
}Usage Examples:
import org.apache.kafka.connect.data.*;
// Create struct with schema
Schema userSchema = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA)
.field("age", Schema.INT32_SCHEMA)
.build();
Struct user = new Struct(userSchema)
.put("id", 123L)
.put("name", "John Doe")
.put("email", "john@example.com")
.put("age", 30);
// Validate
user.validate();
// Read values
Long id = user.getInt64("id");
String name = user.getString("name");
Integer age = user.getInt32("age");
// Nested struct
Schema addressSchema = SchemaBuilder.struct()
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.build();
Schema personSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("address", addressSchema)
.build();
Struct address = new Struct(addressSchema)
.put("street", "123 Main St")
.put("city", "Springfield");
Struct person = new Struct(personSchema)
.put("name", "Jane Doe")
.put("address", address);
Struct retrievedAddress = person.getStruct("address");
String city = retrievedAddress.getString("city");Struct field definition.
package org.apache.kafka.connect.data;
public class Field {
// Constructor
public Field(String name, int index, Schema schema);
// Accessors
public String name();
public int index();
public Schema schema();
}Days since Unix epoch (INT32).
package org.apache.kafka.connect.data;
public class Date {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Date";
public static final Schema SCHEMA;
// Builder
public static SchemaBuilder builder();
public static Schema schema();
// Conversion
public static int fromLogical(Schema schema, java.util.Date value);
public static java.util.Date toLogical(Schema schema, int value);
}Usage:
import org.apache.kafka.connect.data.Date;
// Create date schema
Schema dateSchema = Date.builder().build();
// or
Schema dateSchema = Date.SCHEMA;
// Convert Date to int
java.util.Date javaDate = new java.util.Date();
int daysSinceEpoch = Date.fromLogical(dateSchema, javaDate);
// Convert int to Date
java.util.Date converted = Date.toLogical(dateSchema, daysSinceEpoch);
// Use in struct
Schema personSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("birthDate", Date.SCHEMA)
.build();
Struct person = new Struct(personSchema)
.put("name", "John")
.put("birthDate", Date.fromLogical(Date.SCHEMA, new java.util.Date()));Milliseconds since midnight (INT32).
package org.apache.kafka.connect.data;
public class Time {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Time";
public static final Schema SCHEMA;
public static SchemaBuilder builder();
public static int fromLogical(Schema schema, java.util.Date value);
public static java.util.Date toLogical(Schema schema, int value);
}Milliseconds since Unix epoch (INT64).
package org.apache.kafka.connect.data;
public class Timestamp {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp";
public static final Schema SCHEMA;
public static SchemaBuilder builder();
public static long fromLogical(Schema schema, java.util.Date value);
public static java.util.Date toLogical(Schema schema, long value);
}Usage:
import org.apache.kafka.connect.data.Timestamp;
// Timestamp schema
Schema timestampSchema = Timestamp.SCHEMA;
// Convert
java.util.Date now = new java.util.Date();
long millisSinceEpoch = Timestamp.fromLogical(timestampSchema, now);
java.util.Date converted = Timestamp.toLogical(timestampSchema, millisSinceEpoch);
// Use in struct
Schema eventSchema = SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.field("timestamp", Timestamp.SCHEMA)
.build();
Struct event = new Struct(eventSchema)
.put("id", "event-123")
.put("timestamp", Timestamp.fromLogical(Timestamp.SCHEMA, new java.util.Date()));Arbitrary precision decimal (BYTES).
package org.apache.kafka.connect.data;
public class Decimal {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Decimal";
public static final String SCALE_FIELD = "scale";
// Builder
public static SchemaBuilder builder(int scale);
public static Schema schema(int scale);
// Conversion
public static byte[] fromLogical(Schema schema, BigDecimal value);
public static BigDecimal toLogical(Schema schema, byte[] value);
}Usage:
import org.apache.kafka.connect.data.Decimal;
import java.math.BigDecimal;
// Decimal schema with scale=2 (for currency)
Schema priceSchema = Decimal.schema(2);
// Convert BigDecimal to bytes
BigDecimal price = new BigDecimal("19.99");
byte[] encoded = Decimal.fromLogical(priceSchema, price);
// Convert bytes to BigDecimal
BigDecimal decoded = Decimal.toLogical(priceSchema, encoded);
// Use in struct
Schema productSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("price", Decimal.schema(2))
.build();
Struct product = new Struct(productSchema)
.put("name", "Widget")
.put("price", Decimal.fromLogical(Decimal.schema(2), new BigDecimal("29.99")));Combined schema and value container.
package org.apache.kafka.connect.data;
public class SchemaAndValue {
// Constant
public static final SchemaAndValue NULL;
// Constructor
public SchemaAndValue(Schema schema, Object value);
// Accessors
public Schema schema();
public Object value();
}Type conversion utilities.
package org.apache.kafka.connect.data;
public class Values {
// Type conversions
public static Boolean convertToBoolean(Schema schema, Object value);
public static Byte convertToByte(Schema schema, Object value);
public static Short convertToShort(Schema schema, Object value);
public static Integer convertToInteger(Schema schema, Object value);
public static Long convertToLong(Schema schema, Object value);
public static Float convertToFloat(Schema schema, Object value);
public static Double convertToDouble(Schema schema, Object value);
public static String convertToString(Schema schema, Object value);
public static byte[] convertToBytes(Schema schema, Object value);
public static List<?> convertToList(Schema schema, Object value);
public static Map<?, ?> convertToMap(Schema schema, Object value);
public static Struct convertToStruct(Schema schema, Object value);
// Logical type conversions
public static java.util.Date convertToDate(Schema schema, Object value);
public static java.util.Date convertToTime(Schema schema, Object value);
public static java.util.Date convertToTimestamp(Schema schema, Object value);
public static BigDecimal convertToDecimal(Schema schema, Object value);
// Schema inference
public static Schema inferSchema(Object value);
// String parsing
public static Object parseString(String value);
}Usage:
import org.apache.kafka.connect.data.*;
// Convert between types
Integer intValue = Values.convertToInteger(Schema.INT32_SCHEMA, "123");
String strValue = Values.convertToString(Schema.STRING_SCHEMA, 123);
Long longValue = Values.convertToLong(Schema.INT64_SCHEMA, 456);
// Infer schema from value
Map<String, Object> map = new HashMap<>();
map.put("id", 123);
map.put("name", "John");
Schema inferredSchema = Values.inferSchema(map);
// Parse string
Object parsed = Values.parseString("123"); // Returns Integer 123
Object parsed2 = Values.parseString("true"); // Returns Boolean trueProject data between schemas.
package org.apache.kafka.connect.data;
public class SchemaProjector {
/**
* Project a record from source schema to target schema.
*/
public static Object project(Schema source, Object record, Schema target);
}Usage:
import org.apache.kafka.connect.data.*;
// Source schema
Schema sourceSchema = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.STRING_SCHEMA)
.field("age", Schema.INT32_SCHEMA)
.build();
// Target schema (subset of fields)
Schema targetSchema = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build();
// Source record
Struct source = new Struct(sourceSchema)
.put("id", 123L)
.put("name", "John")
.put("email", "john@example.com")
.put("age", 30);
// Project to target
Struct projected = (Struct) SchemaProjector.project(sourceSchema, source, targetSchema);
// Result only contains id and name fieldsimport org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.source.*;
import java.util.*;
public class UserEvent {
public static Schema SCHEMA = SchemaBuilder.struct()
.name("com.example.UserEvent")
.version(1)
.doc("User lifecycle event")
.field("userId", Schema.INT64_SCHEMA)
.field("username", Schema.STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA)
.field("eventType", Schema.STRING_SCHEMA)
.field("eventTime", Timestamp.SCHEMA)
.field("metadata", SchemaBuilder.map(
Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA
).optional().build())
.build();
public static Struct create(long userId, String username, String email,
String eventType, Date eventTime, Map<String, String> metadata) {
Struct struct = new Struct(SCHEMA)
.put("userId", userId)
.put("username", username)
.put("email", email)
.put("eventType", eventType)
.put("eventTime", Timestamp.fromLogical(Timestamp.SCHEMA, eventTime));
if (metadata != null) {
struct.put("metadata", metadata);
}
return struct;
}
public static SourceRecord toSourceRecord(Struct event, String topic) {
return new SourceRecord(
null, // source partition
null, // source offset
topic,
null, // kafka partition
Schema.INT64_SCHEMA, // key schema
event.getInt64("userId"), // key
SCHEMA, // value schema
event, // value
null, // timestamp
null // headers
);
}
}
// Usage
Struct userEvent = UserEvent.create(
123L,
"johndoe",
"john@example.com",
"USER_CREATED",
new Date(),
Collections.singletonMap("source", "api")
);
SourceRecord record = UserEvent.toSourceRecord(userEvent, "user-events");import org.apache.kafka.connect.data.*;
public class VersionedSchemaExample {
// Version 1: Initial schema
public static final Schema SCHEMA_V1 = SchemaBuilder.struct()
.name("com.example.User")
.version(1)
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build();
// Version 2: Added optional email field (backward compatible)
public static final Schema SCHEMA_V2 = SchemaBuilder.struct()
.name("com.example.User")
.version(2)
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA) // Optional = backward compatible
.build();
// Version 3: Added required field (NOT backward compatible)
public static final Schema SCHEMA_V3 = SchemaBuilder.struct()
.name("com.example.User")
.version(3)
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("email", Schema.OPTIONAL_STRING_SCHEMA)
.field("created_at", Timestamp.SCHEMA) // Required = breaking change
.build();
// Handle multiple schema versions
public Struct migrateToLatest(Struct oldRecord, Schema oldSchema) {
int oldVersion = oldSchema.version() != null ? oldSchema.version() : 1;
if (oldVersion == 3) {
return oldRecord; // Already latest
}
// Migrate from v1 or v2 to v3
Struct newRecord = new Struct(SCHEMA_V3);
newRecord.put("id", oldRecord.getInt64("id"));
newRecord.put("name", oldRecord.getString("name"));
// Handle optional fields
if (oldVersion >= 2 && oldSchema.field("email") != null) {
newRecord.put("email", oldRecord.getString("email"));
} else {
newRecord.put("email", null);
}
// Set default for new required field
newRecord.put("created_at", new Date());
return newRecord;
}
}import org.apache.kafka.connect.data.*;
// Optional vs required fields
Schema schema = SchemaBuilder.struct()
.field("required_field", Schema.STRING_SCHEMA) // Required
.field("optional_field", Schema.OPTIONAL_STRING_SCHEMA) // Optional
.build();
Struct record = new Struct(schema);
record.put("required_field", "value");
record.put("optional_field", null); // Valid for optional field
// Validation
try {
record.validate(); // Succeeds
} catch (DataException e) {
System.err.println("Validation failed: " + e.getMessage());
}
// Missing required field
Struct invalidRecord = new Struct(schema);
invalidRecord.put("optional_field", "value");
// Missing required_field
try {
invalidRecord.validate(); // Throws DataException
} catch (DataException e) {
System.err.println("Missing required field: " + e.getMessage());
}// Deeply nested schemas
Schema addressSchema = SchemaBuilder.struct()
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.field("country", Schema.STRING_SCHEMA)
.build();
Schema contactSchema = SchemaBuilder.struct()
.field("email", Schema.STRING_SCHEMA)
.field("phone", Schema.OPTIONAL_STRING_SCHEMA)
.field("address", addressSchema) // Nested struct
.build();
Schema userSchema = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("contacts", SchemaBuilder.array(contactSchema).build()) // Array of structs
.field("metadata", SchemaBuilder.map(
Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA).build()) // Map
.build();
// Create nested structure
Struct address = new Struct(addressSchema)
.put("street", "123 Main St")
.put("city", "Springfield")
.put("country", "USA");
Struct contact = new Struct(contactSchema)
.put("email", "user@example.com")
.put("phone", "+1234567890")
.put("address", address);
Struct user = new Struct(userSchema)
.put("id", 123L)
.put("name", "John Doe")
.put("contacts", Collections.singletonList(contact))
.put("metadata", Collections.singletonMap("source", "api"));
// Access nested values
Struct retrievedContact = user.getArray("contacts").get(0);
Struct retrievedAddress = retrievedContact.getStruct("address");
String city = retrievedAddress.getString("city");