CtrlK
BlogDocsLog inGet started
Tessl Logo

gamussa/schema-registry

Schema Registry for Apache Kafka - covers schema management (Avro, Protobuf, JSON Schema), compatibility modes, schema evolution, REST API, serializer/deserializer configuration, Kafka Connect converters, Flink SQL integration, and Confluent Cloud.

100

Does it follow best practices?

Validation for skill structure

Overview
Skills
Evals
Files

serdes.mddocs/

Serializers, Deserializers, and Kafka Connect Converters

Table of Contents

  1. Kafka Producer/Consumer Serializers
  2. Configuration Properties
  3. Avro SerDes
  4. Protobuf SerDes
  5. JSON Schema SerDes
  6. Kafka Connect Converters
  7. Maven Dependencies

Kafka Producer/Consumer Serializers

Class Names

FormatSerializerDeserializer
Avroio.confluent.kafka.serializers.KafkaAvroSerializerio.confluent.kafka.serializers.KafkaAvroDeserializer
Protobufio.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerio.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
JSON Schemaio.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerio.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer

Configuration Properties

Required

PropertyDescription
schema.registry.urlSchema Registry URL(s), comma-separated for HA

Schema Registration

PropertyDefaultDescription
auto.register.schemastrueAutomatically register schemas on produce
use.latest.versionfalseUse latest registered schema instead of auto-registering
latest.compatibility.stricttrueValidate backward compatibility when using latest version
normalize.schemasfalseNormalize schema before registration

Subject Naming

PropertyDefaultDescription
key.subject.name.strategyTopicNameStrategySubject naming for message keys
value.subject.name.strategyTopicNameStrategySubject naming for message values
reference.subject.name.strategyDefaultReferenceSubjectNameStrategyNaming for auto-registered schema references

Strategy classes (under io.confluent.kafka.serializers.subject):

  • TopicNameStrategy
  • RecordNameStrategy
  • TopicRecordNameStrategy

Authentication

PropertyDescription
basic.auth.credentials.sourceURL, USER_INFO, or SASL_INHERIT
basic.auth.user.infousername:password (when source is USER_INFO)
bearer.auth.tokenBearer token for authentication
schema.registry.ssl.truststore.locationSSL truststore path
schema.registry.ssl.truststore.passwordSSL truststore password
schema.registry.ssl.keystore.locationSSL keystore path
schema.registry.ssl.keystore.passwordSSL keystore password

Avro SerDes

Generic Record (schema-less code)

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

String schemaString = "{\"type\":\"record\",\"name\":\"User\",\"fields\":" +
    "[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);

GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");

producer.send(new ProducerRecord<>("users", "key1", user));

Specific Record (code-generated classes)

// User class generated by avro-maven-plugin from .avsc file
props.put("specific.avro.reader", "true");

KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users"));

ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, User> record : records) {
    User user = record.value();
    System.out.println(user.getName());
}

Avro Maven Plugin (code generation)

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.11.3</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals><goal>schema</goal></goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

Protobuf SerDes

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

KafkaProducer<String, UserOuterClass.User> producer = new KafkaProducer<>(props);

UserOuterClass.User user = UserOuterClass.User.newBuilder()
    .setId(1)
    .setName("Alice")
    .build();

producer.send(new ProducerRecord<>("users", "key1", user));

Protobuf serializers automatically register referenced/imported schemas recursively.

JSON Schema SerDes

With POJO (recommended)

// JsonSchema annotation on POJO
public class User {
    @JsonProperty
    public int id;
    @JsonProperty
    public String name;
}

Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put("json.fail.invalid.schema", "true");

KafkaProducer<String, User> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("users", "key1", new User(1, "Alice")));

With JsonNode (generic)

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);

ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.createObjectNode()
    .put("id", 1)
    .put("name", "Alice");

producer.send(new ProducerRecord<>("users", "key1", node));

Kafka Connect Converters

Converter Class Names

FormatConverter Class
Avroio.confluent.connect.avro.AvroConverter
Protobufio.confluent.connect.protobuf.ProtobufConverter
JSON Schemaio.confluent.connect.json.JsonSchemaConverter

Worker-Level Configuration

# connect-distributed.properties
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Connector-Level Override

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
  "value.converter.schema.registry.url": "http://localhost:8081",
  "value.converter.enhanced.protobuf.schema.support": "true"
}

Format-Specific Converter Options

Avro:

PropertyDefaultDescription
enhanced.avro.schema.supportfalsePreserve package info and enums
schemas.cache.config1000Schema cache size
scrub.invalid.namesfalseReplace invalid characters in names
connect.meta.datatrueMaintain Connect metadata

Protobuf:

PropertyDefaultDescription
enhanced.protobuf.schema.supportfalseRetain package structure
generate.index.for.unionsfalseField name suffixing for oneOf
int.for.enumsfalseRepresent enums as integers

JSON Schema:

PropertyDefaultDescription
object.additional.propertiestrueAllow extra properties
decimal.formatBASE64BASE64 or NUMERIC
replace.null.with.defaulttrueSubstitute defaults for nulls

SSL Configuration for Converters

value.converter.schema.registry.ssl.truststore.location=/path/to/truststore.jks
value.converter.schema.registry.ssl.truststore.password=password
value.converter.schema.registry.ssl.keystore.location=/path/to/keystore.jks
value.converter.schema.registry.ssl.keystore.password=password

Maven Dependencies

Confluent Avro Serializer

<repositories>
  <repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
  </repository>
</repositories>

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>7.6.0</version>
</dependency>

Confluent Protobuf Serializer

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-protobuf-serializer</artifactId>
  <version>7.6.0</version>
</dependency>

Confluent JSON Schema Serializer

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-json-schema-serializer</artifactId>
  <version>7.6.0</version>
</dependency>

Kafka Connect Converters

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-connect-avro-converter</artifactId>
  <version>7.6.0</version>
</dependency>

Install with Tessl CLI

npx tessl i gamussa/schema-registry

docs

confluent-cloud.md

flink-sql.md

fundamentals.md

index.md

rest-api.md

serdes.md

tile.json