Schema Registry for Apache Kafka - covers schema management (Avro, Protobuf, JSON Schema), compatibility modes, schema evolution, REST API, serializer/deserializer configuration, Kafka Connect converters, Flink SQL integration, and Confluent Cloud.
100
Does it follow best practices?
Validation for skill structure
Confluent Cloud provides a fully managed Schema Registry as part of Stream Governance. Each Confluent Cloud environment has its own Schema Registry instance (one per environment, shared across clusters in that environment).
# Create a Schema Registry API key via Confluent CLI
confluent api-key create --resource <schema-registry-cluster-id>
# Use in REST calls
curl -u <SR_API_KEY>:<SR_API_SECRET> \
https://<SR_ENDPOINT>/subjects# Via Confluent CLI
confluent schema-registry cluster describe
# Output includes endpoint URL like:
# https://psrc-xxxxx.us-east-2.aws.confluent.cloud# Java producer/consumer
schema.registry.url=https://psrc-xxxxx.us-east-2.aws.confluent.cloud
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET># List schemas
confluent schema-registry schema list
# Describe a schema
confluent schema-registry schema describe --subject my-topic-value --version latest
# Create a schema
confluent schema-registry schema create --subject my-topic-value \
--schema schema.avsc --type AVRO
# Delete a schema
confluent schema-registry schema delete --subject my-topic-value --version latest
# Get/set compatibility
confluent schema-registry compatibility describe --subject my-topic-value
confluent schema-registry compatibility update --subject my-topic-value \
--compatibility FULL_TRANSITIVEThe REST API is identical to the open-source API but accessed over HTTPS with authentication:
# List subjects
curl -u <KEY>:<SECRET> https://psrc-xxxxx.us-east-2.aws.confluent.cloud/subjects
# Register schema
curl -u <KEY>:<SECRET> \
-X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "...", "schemaType": "AVRO"}' \
https://psrc-xxxxx.us-east-2.aws.confluent.cloud/subjects/my-topic-value/versions
# Test compatibility
curl -u <KEY>:<SECRET> \
-X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "..."}' \
https://psrc-xxxxx.us-east-2.aws.confluent.cloud/compatibility/subjects/my-topic-value/versions/latest?verbose=trueData contracts extend schemas with rules, metadata, and tags (Advanced tier).
Attach business metadata to schemas:
{
"schema": "...",
"metadata": {
"properties": {
"owner": "data-team",
"classification": "PII"
},
"tags": {
"*.name": ["PII", "SENSITIVE"]
}
}
}Rules execute during serialization/deserialization:
{
"schema": "...",
"ruleSet": {
"domainRules": [
{
"name": "encrypt-pii",
"kind": "TRANSFORM",
"type": "ENCRYPT",
"mode": "WRITEREAD",
"tags": ["PII"],
"params": {
"encrypt.kek.name": "my-kek",
"encrypt.kms.type": "aws-kms"
}
}
],
"migrationRules": [
{
"name": "upgrade-v1-to-v2",
"kind": "TRANSFORM",
"type": "JSONATA",
"mode": "UPGRADE",
"expr": "$ ~> |$|{\"new_field\": \"default\"}|"
}
]
}
}Rule types:
CONDITION: Validate data quality (fail if condition not met)TRANSFORM: Transform data on read/write (e.g., encryption, migration)Rule modes:
WRITE: Applied during serializationREAD: Applied during deserializationWRITEREAD: Applied in both directionsUPGRADE: Applied when reading older schema versionsDOWNGRADE: Applied when reading newer schema versionsEnable at the topic level to reject messages that don't match registered schemas:
confluent kafka topic update my-topic \
--config confluent.value.schema.validation=trueThis ensures all messages on the topic conform to the registered schema, even from producers that don't use Schema Registry serializers.
Schema Linking mirrors schemas between Schema Registry instances (e.g., across environments or regions).
# Create a schema exporter
confluent schema-registry exporter create my-exporter \
--subjects "orders-*" \
--context-type CUSTOM \
--context ".dest-context" \
--config exporter-config.propertiesUse cases:
Classify schemas and fields with business tags:
# Create a tag definition
confluent schema-registry tag create --name PII --description "Personally Identifiable Information"
# Tag a field
confluent schema-registry tag add --entity-type sr_field \
--entity-name ":.:<subject>:1:name" --tag PIISearch and discover schemas, topics, and their relationships:
# Search by tag
confluent schema-registry search --tag PII
# Search by name
confluent schema-registry search --name "orders*"For migrating schemas between environments:
# Export schemas
confluent schema-registry exporter list
confluent schema-registry exporter describe my-exporter
confluent schema-registry exporter status my-exporterInstall with Tessl CLI
npx tessl i gamussa/schema-registry@0.2.0