Connect to Kafka with Reactive Messaging
—
The Quarkus Kafka extension provides extensive configuration options and customization capabilities for Kafka client behavior, security settings, and messaging properties.
Main configuration class for Kafka extension settings.
package io.quarkus.smallrye.reactivemessaging.kafka;
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigRoot;
@ConfigRoot(name = "messaging.kafka")
public class ReactiveMessagingKafkaConfig {
/**
* Enables the graceful shutdown in dev and test modes.
* The graceful shutdown waits until the inflight records have been processed
* and the offset committed to Kafka. While this setting is highly recommended
* in production, in dev and test modes, it's disabled by default.
*/
@ConfigItem(defaultValue = "false")
public boolean enableGracefulShutdownInDevAndTestMode;
}Properties:
enableGracefulShutdownInDevAndTestMode: Controls graceful shutdown behavior (default: false)Customizer for Kafka client configuration, particularly for TLS and security settings.
package io.quarkus.smallrye.reactivemessaging.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.Config;
import io.smallrye.reactive.messaging.ClientCustomizer;
@ApplicationScoped
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
/**
* Customize Kafka client configuration for a specific channel.
*
* @param channel The channel name being configured
* @param channelConfig The configuration for the channel
* @param config The Kafka client configuration map to customize
* @return The customized configuration map
*/
@Override
public Map<String, Object> customize(String channel, Config channelConfig,
Map<String, Object> config);
}The extension supports configuration under these prefixes:
# Channel-level configuration
mp.messaging.incoming.[channel-name].[property]
mp.messaging.outgoing.[channel-name].[property]
# Connector-level configuration
mp.messaging.connector.smallrye-kafka.[property]# General messaging configuration
quarkus.messaging.[property]
# Kafka-specific configuration
quarkus.messaging.kafka.[property]# Direct Kafka configuration
quarkus.kafka.[property]# Configure incoming channel
mp.messaging.incoming.my-topic.connector=smallrye-kafka
mp.messaging.incoming.my-topic.topic=input-topic
mp.messaging.incoming.my-topic.bootstrap.servers=localhost:9092
mp.messaging.incoming.my-topic.group.id=my-consumer-group
mp.messaging.incoming.my-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.my-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer# Configure outgoing channel
mp.messaging.outgoing.my-output.connector=smallrye-kafka
mp.messaging.outgoing.my-output.topic=output-topic
mp.messaging.outgoing.my-output.bootstrap.servers=localhost:9092
mp.messaging.outgoing.my-output.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.my-output.value.serializer=org.apache.kafka.common.serialization.StringSerializer# Consumer with advanced settings
mp.messaging.incoming.advanced-consumer.connector=smallrye-kafka
mp.messaging.incoming.advanced-consumer.topic=advanced-topic
mp.messaging.incoming.advanced-consumer.bootstrap.servers=kafka1:9092,kafka2:9092
mp.messaging.incoming.advanced-consumer.group.id=advanced-group
mp.messaging.incoming.advanced-consumer.auto.offset.reset=earliest
mp.messaging.incoming.advanced-consumer.enable.auto.commit=false
mp.messaging.incoming.advanced-consumer.max.poll.records=100
mp.messaging.incoming.advanced-consumer.session.timeout.ms=30000
mp.messaging.incoming.advanced-consumer.heartbeat.interval.ms=3000# Enable TLS with Quarkus TLS configuration
mp.messaging.incoming.secure-topic.connector=smallrye-kafka
mp.messaging.incoming.secure-topic.topic=secure-topic
mp.messaging.incoming.secure-topic.bootstrap.servers=kafka-ssl:9093
mp.messaging.incoming.secure-topic.tls-configuration-name=kafka-tls
# TLS configuration
quarkus.tls.kafka-tls.trust-store.p12.path=kafka.client.truststore.p12
quarkus.tls.kafka-tls.trust-store.password=truststore-password
quarkus.tls.kafka-tls.key-store.p12.path=kafka.client.keystore.p12
quarkus.tls.kafka-tls.key-store.password=keystore-password# SASL/PLAIN authentication
mp.messaging.incoming.sasl-topic.connector=smallrye-kafka
mp.messaging.incoming.sasl-topic.topic=sasl-topic
mp.messaging.incoming.sasl-topic.bootstrap.servers=kafka-sasl:9092
mp.messaging.incoming.sasl-topic.security.protocol=SASL_PLAINTEXT
mp.messaging.incoming.sasl-topic.sasl.mechanism=PLAIN
mp.messaging.incoming.sasl-topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
# SASL/SCRAM authentication
mp.messaging.incoming.scram-topic.sasl.mechanism=SCRAM-SHA-256
mp.messaging.incoming.scram-topic.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";# Configure Hibernate ORM state store
mp.messaging.incoming.stateful-topic.connector=smallrye-kafka
mp.messaging.incoming.stateful-topic.topic=stateful-topic
mp.messaging.incoming.stateful-topic.checkpoint.state-store=quarkus-hibernate-orm
# Optional: Custom state codec
mp.messaging.incoming.stateful-topic.checkpoint.state-codec-factory=com.example.MyCodecFactory# Configure Redis state store
mp.messaging.incoming.redis-topic.connector=smallrye-kafka
mp.messaging.incoming.redis-topic.topic=redis-topic
mp.messaging.incoming.redis-topic.checkpoint.state-store=quarkus-redis
# Redis client configuration
quarkus.redis.hosts=redis://localhost:6379
quarkus.redis.password=redis-password
quarkus.redis.database=0# Jackson JSON serialization
mp.messaging.incoming.json-topic.connector=smallrye-kafka
mp.messaging.incoming.json-topic.topic=json-topic
mp.messaging.incoming.json-topic.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer
mp.messaging.incoming.json-topic.value.deserializer.type=com.example.MyObject
mp.messaging.outgoing.json-output.connector=smallrye-kafka
mp.messaging.outgoing.json-output.topic=json-output-topic
mp.messaging.outgoing.json-output.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer# Avro with Apicurio Registry
mp.messaging.incoming.avro-topic.connector=smallrye-kafka
mp.messaging.incoming.avro-topic.topic=avro-topic
mp.messaging.incoming.avro-topic.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer
mp.messaging.incoming.avro-topic.apicurio.registry.url=http://schema-registry:8080/apis/registry/v2
mp.messaging.incoming.avro-topic.apicurio.registry.auto-register=trueimport jakarta.enterprise.context.ApplicationScoped;
import io.smallrye.reactive.messaging.ClientCustomizer;
@ApplicationScoped
public class MyKafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
@Override
public Map<String, Object> customize(String channel, Config channelConfig,
Map<String, Object> config) {
// Add custom configuration based on channel
if ("secure-channel".equals(channel)) {
config.put("ssl.endpoint.identification.algorithm", "");
config.put("ssl.truststore.type", "PKCS12");
}
// Add monitoring configuration
config.put("interceptor.classes", "com.example.MyKafkaInterceptor");
return config;
}
}# Development configuration
%dev.mp.messaging.incoming.events.bootstrap.servers=localhost:9092
%dev.quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true
# Production configuration
%prod.mp.messaging.incoming.events.bootstrap.servers=kafka-cluster:9092
%prod.mp.messaging.incoming.events.security.protocol=SASL_SSL
%prod.mp.messaging.incoming.events.sasl.mechanism=SCRAM-SHA-256
# Test configuration
%test.mp.messaging.incoming.events.bootstrap.servers=${kafka.bootstrap.servers:localhost:9092}# Apply to all channels using smallrye-kafka connector
mp.messaging.connector.smallrye-kafka.bootstrap.servers=kafka:9092
mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL
mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-256
mp.messaging.connector.smallrye-kafka.retries=3
mp.messaging.connector.smallrye-kafka.retry.backoff.ms=1000# Enable graceful shutdown in dev/test modes
quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true
# Global graceful shutdown timeout
quarkus.shutdown.timeout=30s# Channel-specific graceful shutdown
mp.messaging.incoming.my-topic.graceful-shutdown=true
mp.messaging.incoming.my-topic.graceful-shutdown.timeout=15s# Configure dead letter queue for failed messages
mp.messaging.incoming.messages.connector=smallrye-kafka
mp.messaging.incoming.messages.topic=main-topic
mp.messaging.incoming.messages.failure-strategy=dead-letter-queue
mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages
mp.messaging.incoming.messages.dead-letter-queue.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.messages.dead-letter-queue.value.serializer=org.apache.kafka.common.serialization.StringSerializer# Configure retry behavior
mp.messaging.incoming.retry-topic.connector=smallrye-kafka
mp.messaging.incoming.retry-topic.topic=retry-topic
mp.messaging.incoming.retry-topic.failure-strategy=retry
mp.messaging.incoming.retry-topic.retry.max-retries=3
mp.messaging.incoming.retry-topic.retry.delay=5s// Configuration types
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
import io.quarkus.smallrye.reactivemessaging.kafka.KafkaConfigCustomizer;
// Client customization
import io.smallrye.reactive.messaging.ClientCustomizer;
import org.eclipse.microprofile.config.Config;
// Annotations
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigRoot;
import jakarta.enterprise.context.ApplicationScoped;
// Standard Java types
import java.util.Map;Install with Tessl CLI
npx tessl i tessl/maven-io-quarkus--quarkus-messaging-kafka