Schema Registry for Apache Kafka - covers schema management (Avro, Protobuf, JSON Schema), compatibility modes, schema evolution, REST API, serializer/deserializer configuration, Kafka Connect converters, Flink SQL integration, and Confluent Cloud.
100
Does it follow best practices?
Validation for skill structure
| Format | Serializer | Deserializer |
|---|---|---|
| Avro | io.confluent.kafka.serializers.KafkaAvroSerializer | io.confluent.kafka.serializers.KafkaAvroDeserializer |
| Protobuf | io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer | io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer |
| JSON Schema | io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer | io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer |
| Property | Description |
|---|---|
schema.registry.url | Schema Registry URL(s), comma-separated for HA |
| Property | Default | Description |
|---|---|---|
auto.register.schemas | true | Automatically register schemas on produce |
use.latest.version | false | Use latest registered schema instead of auto-registering |
latest.compatibility.strict | true | Validate backward compatibility when using latest version |
normalize.schemas | false | Normalize schema before registration |
| Property | Default | Description |
|---|---|---|
key.subject.name.strategy | TopicNameStrategy | Subject naming for message keys |
value.subject.name.strategy | TopicNameStrategy | Subject naming for message values |
reference.subject.name.strategy | DefaultReferenceSubjectNameStrategy | Naming for auto-registered schema references |
Strategy classes (under io.confluent.kafka.serializers.subject):
TopicNameStrategyRecordNameStrategyTopicRecordNameStrategy| Property | Description |
|---|---|
basic.auth.credentials.source | URL, USER_INFO, or SASL_INHERIT |
basic.auth.user.info | username:password (when source is USER_INFO) |
bearer.auth.token | Bearer token for authentication |
schema.registry.ssl.truststore.location | SSL truststore path |
schema.registry.ssl.truststore.password | SSL truststore password |
schema.registry.ssl.keystore.location | SSL keystore path |
schema.registry.ssl.keystore.password | SSL keystore password |
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
String schemaString = "{\"type\":\"record\",\"name\":\"User\",\"fields\":" +
"[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");
producer.send(new ProducerRecord<>("users", "key1", user));// User class generated by avro-maven-plugin from .avsc file
props.put("specific.avro.reader", "true");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users"));
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.println(user.getName());
}<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals><goal>schema</goal></goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer<String, UserOuterClass.User> producer = new KafkaProducer<>(props);
UserOuterClass.User user = UserOuterClass.User.newBuilder()
.setId(1)
.setName("Alice")
.build();
producer.send(new ProducerRecord<>("users", "key1", user));Protobuf serializers automatically register referenced/imported schemas recursively.
// JsonSchema annotation on POJO
public class User {
@JsonProperty
public int id;
@JsonProperty
public String name;
}
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put("json.fail.invalid.schema", "true");
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("users", "key1", new User(1, "Alice")));props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.createObjectNode()
.put("id", 1)
.put("name", "Alice");
producer.send(new ProducerRecord<>("users", "key1", node));| Format | Converter Class |
|---|---|
| Avro | io.confluent.connect.avro.AvroConverter |
| Protobuf | io.confluent.connect.protobuf.ProtobufConverter |
| JSON Schema | io.confluent.connect.json.JsonSchemaConverter |
# connect-distributed.properties
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.enhanced.protobuf.schema.support": "true"
}Avro:
| Property | Default | Description |
|---|---|---|
enhanced.avro.schema.support | false | Preserve package info and enums |
schemas.cache.config | 1000 | Schema cache size |
scrub.invalid.names | false | Replace invalid characters in names |
connect.meta.data | true | Maintain Connect metadata |
Protobuf:
| Property | Default | Description |
|---|---|---|
enhanced.protobuf.schema.support | false | Retain package structure |
generate.index.for.unions | false | Field name suffixing for oneOf |
int.for.enums | false | Represent enums as integers |
JSON Schema:
| Property | Default | Description |
|---|---|---|
object.additional.properties | true | Allow extra properties |
decimal.format | BASE64 | BASE64 or NUMERIC |
replace.null.with.default | true | Substitute defaults for nulls |
value.converter.schema.registry.ssl.truststore.location=/path/to/truststore.jks
value.converter.schema.registry.ssl.truststore.password=password
value.converter.schema.registry.ssl.keystore.location=/path/to/keystore.jks
value.converter.schema.registry.ssl.keystore.password=password<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency><dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.6.0</version>
</dependency><dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.6.0</version>
</dependency><dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>7.6.0</version>
</dependency>Install with Tessl CLI
npx tessl i gamussa/schema-registry@0.2.0