or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

data-schemas.mddocs/connect/

Data Schemas

Kafka Connect provides a schema system for structured data with logical types for common data formats.

Schema

Data type definition for Connect records.

package org.apache.kafka.connect.data;

public interface Schema {
    enum Type {
        INT8, INT16, INT32, INT64,
        FLOAT32, FLOAT64,
        BOOLEAN, STRING, BYTES,
        ARRAY, MAP, STRUCT
    }

    // Schema metadata
    Type type();
    boolean isOptional();
    Object defaultValue();
    String name();
    Integer version();
    String doc();
    Map<String, String> parameters();

    // Complex type accessors
    Schema keySchema();    // For MAP
    Schema valueSchema();  // For MAP and ARRAY
    List<Field> fields();  // For STRUCT
    Field field(String fieldName);

    // Predefined schemas
    Schema INT8_SCHEMA;
    Schema INT16_SCHEMA;
    Schema INT32_SCHEMA;
    Schema INT64_SCHEMA;
    Schema FLOAT32_SCHEMA;
    Schema FLOAT64_SCHEMA;
    Schema BOOLEAN_SCHEMA;
    Schema STRING_SCHEMA;
    Schema BYTES_SCHEMA;

    // Optional variants
    Schema OPTIONAL_INT8_SCHEMA;
    Schema OPTIONAL_INT16_SCHEMA;
    Schema OPTIONAL_INT32_SCHEMA;
    Schema OPTIONAL_INT64_SCHEMA;
    Schema OPTIONAL_FLOAT32_SCHEMA;
    Schema OPTIONAL_FLOAT64_SCHEMA;
    Schema OPTIONAL_BOOLEAN_SCHEMA;
    Schema OPTIONAL_STRING_SCHEMA;
    Schema OPTIONAL_BYTES_SCHEMA;
}

SchemaBuilder

Fluent API for building schemas.

package org.apache.kafka.connect.data;

public class SchemaBuilder implements Schema {
    // Primitive types
    public static SchemaBuilder int8();
    public static SchemaBuilder int16();
    public static SchemaBuilder int32();
    public static SchemaBuilder int64();
    public static SchemaBuilder float32();
    public static SchemaBuilder float64();
    public static SchemaBuilder bool();
    public static SchemaBuilder string();
    public static SchemaBuilder bytes();

    // Complex types
    public static SchemaBuilder struct();
    public static SchemaBuilder array(Schema valueSchema);
    public static SchemaBuilder map(Schema keySchema, Schema valueSchema);

    // Configuration
    public SchemaBuilder optional();
    public SchemaBuilder required();
    public SchemaBuilder defaultValue(Object value);
    public SchemaBuilder name(String name);
    public SchemaBuilder version(Integer version);
    public SchemaBuilder doc(String doc);
    public SchemaBuilder parameter(String name, String value);
    public SchemaBuilder parameters(Map<String, String> params);

    // Struct fields
    public SchemaBuilder field(String fieldName, Schema fieldSchema);

    // Build
    public Schema build();
}

Usage Examples:

import org.apache.kafka.connect.data.*;

// Primitive schema
Schema intSchema = SchemaBuilder.int32()
    .optional()
    .defaultValue(0)
    .doc("User age")
    .build();

// Struct schema
Schema userSchema = SchemaBuilder.struct()
    .name("com.example.User")
    .version(1)
    .doc("User record")
    .field("id", Schema.INT64_SCHEMA)
    .field("name", Schema.STRING_SCHEMA)
    .field("email", Schema.OPTIONAL_STRING_SCHEMA)
    .field("age", Schema.INT32_SCHEMA)
    .field("active", Schema.BOOLEAN_SCHEMA)
    .build();

// Array schema
Schema tagsSchema = SchemaBuilder.array(Schema.STRING_SCHEMA)
    .optional()
    .build();

// Map schema
Schema metadataSchema = SchemaBuilder.map(
    Schema.STRING_SCHEMA,
    Schema.STRING_SCHEMA
).optional().build();

// Nested struct
Schema addressSchema = SchemaBuilder.struct()
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .field("zipCode", Schema.STRING_SCHEMA)
    .build();

Schema personSchema = SchemaBuilder.struct()
    .field("name", Schema.STRING_SCHEMA)
    .field("address", addressSchema)
    .build();

Struct

Structured record value.

package org.apache.kafka.connect.data;

public class Struct {
    // Constructor
    public Struct(Schema schema);

    // Schema
    public Schema schema();

    // Generic get/put
    public Object get(String fieldName);
    public Object get(Field field);
    public Struct put(String fieldName, Object value);
    public Struct put(Field field, Object value);

    // Type-specific getters
    public Byte getByte(String fieldName);
    public Byte getInt8(String fieldName);
    public Short getInt16(String fieldName);
    public Integer getInt32(String fieldName);
    public Long getInt64(String fieldName);
    public Float getFloat32(String fieldName);
    public Double getFloat64(String fieldName);
    public Boolean getBoolean(String fieldName);
    public String getString(String fieldName);
    public byte[] getBytes(String fieldName);
    public <T> List<T> getArray(String fieldName);
    public <K, V> Map<K, V> getMap(String fieldName);
    public Struct getStruct(String fieldName);

    // Validation
    public void validate();
}

Usage Examples:

import org.apache.kafka.connect.data.*;

// Create struct with schema
Schema userSchema = SchemaBuilder.struct()
    .field("id", Schema.INT64_SCHEMA)
    .field("name", Schema.STRING_SCHEMA)
    .field("email", Schema.OPTIONAL_STRING_SCHEMA)
    .field("age", Schema.INT32_SCHEMA)
    .build();

Struct user = new Struct(userSchema)
    .put("id", 123L)
    .put("name", "John Doe")
    .put("email", "john@example.com")
    .put("age", 30);

// Validate
user.validate();

// Read values
Long id = user.getInt64("id");
String name = user.getString("name");
Integer age = user.getInt32("age");

// Nested struct
Schema addressSchema = SchemaBuilder.struct()
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .build();

Schema personSchema = SchemaBuilder.struct()
    .field("name", Schema.STRING_SCHEMA)
    .field("address", addressSchema)
    .build();

Struct address = new Struct(addressSchema)
    .put("street", "123 Main St")
    .put("city", "Springfield");

Struct person = new Struct(personSchema)
    .put("name", "Jane Doe")
    .put("address", address);

Struct retrievedAddress = person.getStruct("address");
String city = retrievedAddress.getString("city");

Field

Struct field definition.

package org.apache.kafka.connect.data;

public class Field {
    // Constructor
    public Field(String name, int index, Schema schema);

    // Accessors
    public String name();
    public int index();
    public Schema schema();
}

Logical Types

Date

Days since Unix epoch (INT32).

package org.apache.kafka.connect.data;

public class Date {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Date";
    public static final Schema SCHEMA;

    // Builder
    public static SchemaBuilder builder();
    public static Schema schema();

    // Conversion
    public static int fromLogical(Schema schema, java.util.Date value);
    public static java.util.Date toLogical(Schema schema, int value);
}

Usage:

import org.apache.kafka.connect.data.Date;

// Create date schema
Schema dateSchema = Date.builder().build();
// or
Schema dateSchema = Date.SCHEMA;

// Convert Date to int
java.util.Date javaDate = new java.util.Date();
int daysSinceEpoch = Date.fromLogical(dateSchema, javaDate);

// Convert int to Date
java.util.Date converted = Date.toLogical(dateSchema, daysSinceEpoch);

// Use in struct
Schema personSchema = SchemaBuilder.struct()
    .field("name", Schema.STRING_SCHEMA)
    .field("birthDate", Date.SCHEMA)
    .build();

Struct person = new Struct(personSchema)
    .put("name", "John")
    .put("birthDate", Date.fromLogical(Date.SCHEMA, new java.util.Date()));

Time

Milliseconds since midnight (INT32).

package org.apache.kafka.connect.data;

public class Time {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Time";
    public static final Schema SCHEMA;

    public static SchemaBuilder builder();
    public static int fromLogical(Schema schema, java.util.Date value);
    public static java.util.Date toLogical(Schema schema, int value);
}

Timestamp

Milliseconds since Unix epoch (INT64).

package org.apache.kafka.connect.data;

public class Timestamp {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp";
    public static final Schema SCHEMA;

    public static SchemaBuilder builder();
    public static long fromLogical(Schema schema, java.util.Date value);
    public static java.util.Date toLogical(Schema schema, long value);
}

Usage:

import org.apache.kafka.connect.data.Timestamp;

// Timestamp schema
Schema timestampSchema = Timestamp.SCHEMA;

// Convert
java.util.Date now = new java.util.Date();
long millisSinceEpoch = Timestamp.fromLogical(timestampSchema, now);
java.util.Date converted = Timestamp.toLogical(timestampSchema, millisSinceEpoch);

// Use in struct
Schema eventSchema = SchemaBuilder.struct()
    .field("id", Schema.STRING_SCHEMA)
    .field("timestamp", Timestamp.SCHEMA)
    .build();

Struct event = new Struct(eventSchema)
    .put("id", "event-123")
    .put("timestamp", Timestamp.fromLogical(Timestamp.SCHEMA, new java.util.Date()));

Decimal

Arbitrary precision decimal (BYTES).

package org.apache.kafka.connect.data;

public class Decimal {
    public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Decimal";
    public static final String SCALE_FIELD = "scale";

    // Builder
    public static SchemaBuilder builder(int scale);
    public static Schema schema(int scale);

    // Conversion
    public static byte[] fromLogical(Schema schema, BigDecimal value);
    public static BigDecimal toLogical(Schema schema, byte[] value);
}

Usage:

import org.apache.kafka.connect.data.Decimal;
import java.math.BigDecimal;

// Decimal schema with scale=2 (for currency)
Schema priceSchema = Decimal.schema(2);

// Convert BigDecimal to bytes
BigDecimal price = new BigDecimal("19.99");
byte[] encoded = Decimal.fromLogical(priceSchema, price);

// Convert bytes to BigDecimal
BigDecimal decoded = Decimal.toLogical(priceSchema, encoded);

// Use in struct
Schema productSchema = SchemaBuilder.struct()
    .field("name", Schema.STRING_SCHEMA)
    .field("price", Decimal.schema(2))
    .build();

Struct product = new Struct(productSchema)
    .put("name", "Widget")
    .put("price", Decimal.fromLogical(Decimal.schema(2), new BigDecimal("29.99")));

SchemaAndValue

Combined schema and value container.

package org.apache.kafka.connect.data;

public class SchemaAndValue {
    // Constant
    public static final SchemaAndValue NULL;

    // Constructor
    public SchemaAndValue(Schema schema, Object value);

    // Accessors
    public Schema schema();
    public Object value();
}

Values Utility

Type conversion utilities.

package org.apache.kafka.connect.data;

public class Values {
    // Type conversions
    public static Boolean convertToBoolean(Schema schema, Object value);
    public static Byte convertToByte(Schema schema, Object value);
    public static Short convertToShort(Schema schema, Object value);
    public static Integer convertToInteger(Schema schema, Object value);
    public static Long convertToLong(Schema schema, Object value);
    public static Float convertToFloat(Schema schema, Object value);
    public static Double convertToDouble(Schema schema, Object value);
    public static String convertToString(Schema schema, Object value);
    public static byte[] convertToBytes(Schema schema, Object value);
    public static List<?> convertToList(Schema schema, Object value);
    public static Map<?, ?> convertToMap(Schema schema, Object value);
    public static Struct convertToStruct(Schema schema, Object value);

    // Logical type conversions
    public static java.util.Date convertToDate(Schema schema, Object value);
    public static java.util.Date convertToTime(Schema schema, Object value);
    public static java.util.Date convertToTimestamp(Schema schema, Object value);
    public static BigDecimal convertToDecimal(Schema schema, Object value);

    // Schema inference
    public static Schema inferSchema(Object value);

    // String parsing
    public static Object parseString(String value);
}

Usage:

import org.apache.kafka.connect.data.*;

// Convert between types
Integer intValue = Values.convertToInteger(Schema.INT32_SCHEMA, "123");
String strValue = Values.convertToString(Schema.STRING_SCHEMA, 123);
Long longValue = Values.convertToLong(Schema.INT64_SCHEMA, 456);

// Infer schema from value
Map<String, Object> map = new HashMap<>();
map.put("id", 123);
map.put("name", "John");
Schema inferredSchema = Values.inferSchema(map);

// Parse string
Object parsed = Values.parseString("123");  // Returns Integer 123
Object parsed2 = Values.parseString("true");  // Returns Boolean true

SchemaProjector

Project data between schemas.

package org.apache.kafka.connect.data;

public class SchemaProjector {
    /**
     * Project a record from source schema to target schema.
     */
    public static Object project(Schema source, Object record, Schema target);
}

Usage:

import org.apache.kafka.connect.data.*;

// Source schema
Schema sourceSchema = SchemaBuilder.struct()
    .field("id", Schema.INT64_SCHEMA)
    .field("name", Schema.STRING_SCHEMA)
    .field("email", Schema.STRING_SCHEMA)
    .field("age", Schema.INT32_SCHEMA)
    .build();

// Target schema (subset of fields)
Schema targetSchema = SchemaBuilder.struct()
    .field("id", Schema.INT64_SCHEMA)
    .field("name", Schema.STRING_SCHEMA)
    .build();

// Source record
Struct source = new Struct(sourceSchema)
    .put("id", 123L)
    .put("name", "John")
    .put("email", "john@example.com")
    .put("age", 30);

// Project to target
Struct projected = (Struct) SchemaProjector.project(sourceSchema, source, targetSchema);
// Result only contains id and name fields

Complete Example

import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.source.*;
import java.util.*;

public class UserEvent {
    public static Schema SCHEMA = SchemaBuilder.struct()
        .name("com.example.UserEvent")
        .version(1)
        .doc("User lifecycle event")
        .field("userId", Schema.INT64_SCHEMA)
        .field("username", Schema.STRING_SCHEMA)
        .field("email", Schema.OPTIONAL_STRING_SCHEMA)
        .field("eventType", Schema.STRING_SCHEMA)
        .field("eventTime", Timestamp.SCHEMA)
        .field("metadata", SchemaBuilder.map(
            Schema.STRING_SCHEMA,
            Schema.STRING_SCHEMA
        ).optional().build())
        .build();

    public static Struct create(long userId, String username, String email,
                               String eventType, Date eventTime, Map<String, String> metadata) {
        Struct struct = new Struct(SCHEMA)
            .put("userId", userId)
            .put("username", username)
            .put("email", email)
            .put("eventType", eventType)
            .put("eventTime", Timestamp.fromLogical(Timestamp.SCHEMA, eventTime));

        if (metadata != null) {
            struct.put("metadata", metadata);
        }

        return struct;
    }

    public static SourceRecord toSourceRecord(Struct event, String topic) {
        return new SourceRecord(
            null,  // source partition
            null,  // source offset
            topic,
            null,  // kafka partition
            Schema.INT64_SCHEMA,  // key schema
            event.getInt64("userId"),  // key
            SCHEMA,  // value schema
            event,  // value
            null,  // timestamp
            null   // headers
        );
    }
}

// Usage
Struct userEvent = UserEvent.create(
    123L,
    "johndoe",
    "john@example.com",
    "USER_CREATED",
    new Date(),
    Collections.singletonMap("source", "api")
);

SourceRecord record = UserEvent.toSourceRecord(userEvent, "user-events");

Schema Best Practices

Schema Versioning

import org.apache.kafka.connect.data.*;

public class VersionedSchemaExample {
    
    // Version 1: Initial schema
    public static final Schema SCHEMA_V1 = SchemaBuilder.struct()
        .name("com.example.User")
        .version(1)
        .field("id", Schema.INT64_SCHEMA)
        .field("name", Schema.STRING_SCHEMA)
        .build();
    
    // Version 2: Added optional email field (backward compatible)
    public static final Schema SCHEMA_V2 = SchemaBuilder.struct()
        .name("com.example.User")
        .version(2)
        .field("id", Schema.INT64_SCHEMA)
        .field("name", Schema.STRING_SCHEMA)
        .field("email", Schema.OPTIONAL_STRING_SCHEMA) // Optional = backward compatible
        .build();
    
    // Version 3: Added required field (NOT backward compatible)
    public static final Schema SCHEMA_V3 = SchemaBuilder.struct()
        .name("com.example.User")
        .version(3)
        .field("id", Schema.INT64_SCHEMA)
        .field("name", Schema.STRING_SCHEMA)
        .field("email", Schema.OPTIONAL_STRING_SCHEMA)
        .field("created_at", Timestamp.SCHEMA) // Required = breaking change
        .build();
    
    // Handle multiple schema versions
    public Struct migrateToLatest(Struct oldRecord, Schema oldSchema) {
        int oldVersion = oldSchema.version() != null ? oldSchema.version() : 1;
        
        if (oldVersion == 3) {
            return oldRecord; // Already latest
        }
        
        // Migrate from v1 or v2 to v3
        Struct newRecord = new Struct(SCHEMA_V3);
        newRecord.put("id", oldRecord.getInt64("id"));
        newRecord.put("name", oldRecord.getString("name"));
        
        // Handle optional fields
        if (oldVersion >= 2 && oldSchema.field("email") != null) {
            newRecord.put("email", oldRecord.getString("email"));
        } else {
            newRecord.put("email", null);
        }
        
        // Set default for new required field
        newRecord.put("created_at", new Date());
        
        return newRecord;
    }
}

Edge Cases

Null and Optional Fields

import org.apache.kafka.connect.data.*;

// Optional vs required fields
Schema schema = SchemaBuilder.struct()
    .field("required_field", Schema.STRING_SCHEMA) // Required
    .field("optional_field", Schema.OPTIONAL_STRING_SCHEMA) // Optional
    .build();

Struct record = new Struct(schema);
record.put("required_field", "value");
record.put("optional_field", null); // Valid for optional field

// Validation
try {
    record.validate(); // Succeeds
} catch (DataException e) {
    System.err.println("Validation failed: " + e.getMessage());
}

// Missing required field
Struct invalidRecord = new Struct(schema);
invalidRecord.put("optional_field", "value");
// Missing required_field

try {
    invalidRecord.validate(); // Throws DataException
} catch (DataException e) {
    System.err.println("Missing required field: " + e.getMessage());
}

Complex Nested Structures

// Deeply nested schemas
Schema addressSchema = SchemaBuilder.struct()
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .field("country", Schema.STRING_SCHEMA)
    .build();

Schema contactSchema = SchemaBuilder.struct()
    .field("email", Schema.STRING_SCHEMA)
    .field("phone", Schema.OPTIONAL_STRING_SCHEMA)
    .field("address", addressSchema) // Nested struct
    .build();

Schema userSchema = SchemaBuilder.struct()
    .field("id", Schema.INT64_SCHEMA)
    .field("name", Schema.STRING_SCHEMA)
    .field("contacts", SchemaBuilder.array(contactSchema).build()) // Array of structs
    .field("metadata", SchemaBuilder.map(
        Schema.STRING_SCHEMA, 
        Schema.STRING_SCHEMA).build()) // Map
    .build();

// Create nested structure
Struct address = new Struct(addressSchema)
    .put("street", "123 Main St")
    .put("city", "Springfield")
    .put("country", "USA");

Struct contact = new Struct(contactSchema)
    .put("email", "user@example.com")
    .put("phone", "+1234567890")
    .put("address", address);

Struct user = new Struct(userSchema)
    .put("id", 123L)
    .put("name", "John Doe")
    .put("contacts", Collections.singletonList(contact))
    .put("metadata", Collections.singletonMap("source", "api"));

// Access nested values
Struct retrievedContact = user.getArray("contacts").get(0);
Struct retrievedAddress = retrievedContact.getStruct("address");
String city = retrievedAddress.getString("city");