CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-quarkus--quarkus-messaging-kafka

Connect to Kafka with Reactive Messaging

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration and Customization

The Quarkus Kafka extension provides extensive configuration options and customization capabilities for Kafka client behavior, security settings, and messaging properties.

Configuration Classes

ReactiveMessagingKafkaConfig

Main configuration class for Kafka extension settings.

package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "messaging.kafka")
public class ReactiveMessagingKafkaConfig {
    
    /**
     * Enables the graceful shutdown in dev and test modes.
     * The graceful shutdown waits until the inflight records have been processed 
     * and the offset committed to Kafka. While this setting is highly recommended 
     * in production, in dev and test modes, it's disabled by default.
     */
    @ConfigItem(defaultValue = "false")
    public boolean enableGracefulShutdownInDevAndTestMode;
}

Properties:

  • enableGracefulShutdownInDevAndTestMode: Controls graceful shutdown behavior (default: false)

KafkaConfigCustomizer

Customizer for Kafka client configuration, particularly for TLS and security settings.

package io.quarkus.smallrye.reactivemessaging.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.Config;
import io.smallrye.reactive.messaging.ClientCustomizer;

@ApplicationScoped
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
    
    /**
     * Customize Kafka client configuration for a specific channel.
     * 
     * @param channel The channel name being configured
     * @param channelConfig The configuration for the channel
     * @param config The Kafka client configuration map to customize
     * @return The customized configuration map
     */
    @Override
    public Map<String, Object> customize(String channel, Config channelConfig, 
                                       Map<String, Object> config);
}

Configuration Properties

Core Configuration Prefixes

The extension supports configuration under these prefixes:

MicroProfile Reactive Messaging

# Channel-level configuration
mp.messaging.incoming.[channel-name].[property]
mp.messaging.outgoing.[channel-name].[property]

# Connector-level configuration  
mp.messaging.connector.smallrye-kafka.[property]

Quarkus Messaging Configuration

# General messaging configuration
quarkus.messaging.[property]

# Kafka-specific configuration
quarkus.messaging.kafka.[property]

Kafka Client Configuration

# Direct Kafka configuration
quarkus.kafka.[property]

Channel Configuration Examples

Basic Consumer Configuration

# Configure incoming channel
mp.messaging.incoming.my-topic.connector=smallrye-kafka
mp.messaging.incoming.my-topic.topic=input-topic
mp.messaging.incoming.my-topic.bootstrap.servers=localhost:9092
mp.messaging.incoming.my-topic.group.id=my-consumer-group
mp.messaging.incoming.my-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.my-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Basic Producer Configuration

# Configure outgoing channel
mp.messaging.outgoing.my-output.connector=smallrye-kafka
mp.messaging.outgoing.my-output.topic=output-topic
mp.messaging.outgoing.my-output.bootstrap.servers=localhost:9092
mp.messaging.outgoing.my-output.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.my-output.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Advanced Consumer Configuration

# Consumer with advanced settings
mp.messaging.incoming.advanced-consumer.connector=smallrye-kafka
mp.messaging.incoming.advanced-consumer.topic=advanced-topic
mp.messaging.incoming.advanced-consumer.bootstrap.servers=kafka1:9092,kafka2:9092
mp.messaging.incoming.advanced-consumer.group.id=advanced-group
mp.messaging.incoming.advanced-consumer.auto.offset.reset=earliest
mp.messaging.incoming.advanced-consumer.enable.auto.commit=false
mp.messaging.incoming.advanced-consumer.max.poll.records=100
mp.messaging.incoming.advanced-consumer.session.timeout.ms=30000
mp.messaging.incoming.advanced-consumer.heartbeat.interval.ms=3000

Security Configuration

TLS/SSL Configuration

# Enable TLS with Quarkus TLS configuration
mp.messaging.incoming.secure-topic.connector=smallrye-kafka
mp.messaging.incoming.secure-topic.topic=secure-topic
mp.messaging.incoming.secure-topic.bootstrap.servers=kafka-ssl:9093
mp.messaging.incoming.secure-topic.tls-configuration-name=kafka-tls

# TLS configuration
quarkus.tls.kafka-tls.trust-store.p12.path=kafka.client.truststore.p12
quarkus.tls.kafka-tls.trust-store.password=truststore-password
quarkus.tls.kafka-tls.key-store.p12.path=kafka.client.keystore.p12
quarkus.tls.kafka-tls.key-store.password=keystore-password

SASL Authentication

# SASL/PLAIN authentication
mp.messaging.incoming.sasl-topic.connector=smallrye-kafka
mp.messaging.incoming.sasl-topic.topic=sasl-topic
mp.messaging.incoming.sasl-topic.bootstrap.servers=kafka-sasl:9092
mp.messaging.incoming.sasl-topic.security.protocol=SASL_PLAINTEXT
mp.messaging.incoming.sasl-topic.sasl.mechanism=PLAIN
mp.messaging.incoming.sasl-topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";

# SASL/SCRAM authentication
mp.messaging.incoming.scram-topic.sasl.mechanism=SCRAM-SHA-256
mp.messaging.incoming.scram-topic.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";

State Store Configuration

Hibernate ORM State Store

# Configure Hibernate ORM state store
mp.messaging.incoming.stateful-topic.connector=smallrye-kafka
mp.messaging.incoming.stateful-topic.topic=stateful-topic
mp.messaging.incoming.stateful-topic.checkpoint.state-store=quarkus-hibernate-orm

# Optional: Custom state codec
mp.messaging.incoming.stateful-topic.checkpoint.state-codec-factory=com.example.MyCodecFactory

Redis State Store

# Configure Redis state store
mp.messaging.incoming.redis-topic.connector=smallrye-kafka
mp.messaging.incoming.redis-topic.topic=redis-topic  
mp.messaging.incoming.redis-topic.checkpoint.state-store=quarkus-redis

# Redis client configuration
quarkus.redis.hosts=redis://localhost:6379
quarkus.redis.password=redis-password
quarkus.redis.database=0

Serialization Configuration

JSON Serialization with Jackson

# Jackson JSON serialization
mp.messaging.incoming.json-topic.connector=smallrye-kafka
mp.messaging.incoming.json-topic.topic=json-topic
mp.messaging.incoming.json-topic.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer
mp.messaging.incoming.json-topic.value.deserializer.type=com.example.MyObject

mp.messaging.outgoing.json-output.connector=smallrye-kafka
mp.messaging.outgoing.json-output.topic=json-output-topic
mp.messaging.outgoing.json-output.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Avro Serialization with Schema Registry

# Avro with Apicurio Registry
mp.messaging.incoming.avro-topic.connector=smallrye-kafka
mp.messaging.incoming.avro-topic.topic=avro-topic
mp.messaging.incoming.avro-topic.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer
mp.messaging.incoming.avro-topic.apicurio.registry.url=http://schema-registry:8080/apis/registry/v2
mp.messaging.incoming.avro-topic.apicurio.registry.auto-register=true

Custom Configuration Examples

Custom Kafka Config Customizer

import jakarta.enterprise.context.ApplicationScoped;
import io.smallrye.reactive.messaging.ClientCustomizer;

@ApplicationScoped  
public class MyKafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
    
    @Override
    public Map<String, Object> customize(String channel, Config channelConfig, 
                                       Map<String, Object> config) {
        // Add custom configuration based on channel
        if ("secure-channel".equals(channel)) {
            config.put("ssl.endpoint.identification.algorithm", "");
            config.put("ssl.truststore.type", "PKCS12");
        }
        
        // Add monitoring configuration
        config.put("interceptor.classes", "com.example.MyKafkaInterceptor");
        
        return config;
    }
}

Environment-Specific Configuration

# Development configuration
%dev.mp.messaging.incoming.events.bootstrap.servers=localhost:9092
%dev.quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true

# Production configuration  
%prod.mp.messaging.incoming.events.bootstrap.servers=kafka-cluster:9092
%prod.mp.messaging.incoming.events.security.protocol=SASL_SSL
%prod.mp.messaging.incoming.events.sasl.mechanism=SCRAM-SHA-256

# Test configuration
%test.mp.messaging.incoming.events.bootstrap.servers=${kafka.bootstrap.servers:localhost:9092}

Global Connector Configuration

# Apply to all channels using smallrye-kafka connector
mp.messaging.connector.smallrye-kafka.bootstrap.servers=kafka:9092
mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL
mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-256
mp.messaging.connector.smallrye-kafka.retries=3
mp.messaging.connector.smallrye-kafka.retry.backoff.ms=1000

Graceful Shutdown Configuration

Extension-Level Shutdown Settings

# Enable graceful shutdown in dev/test modes
quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true

# Global graceful shutdown timeout
quarkus.shutdown.timeout=30s

Channel-Level Shutdown Settings

# Channel-specific graceful shutdown
mp.messaging.incoming.my-topic.graceful-shutdown=true
mp.messaging.incoming.my-topic.graceful-shutdown.timeout=15s

Error Handling Configuration

Dead Letter Queue

# Configure dead letter queue for failed messages
mp.messaging.incoming.messages.connector=smallrye-kafka
mp.messaging.incoming.messages.topic=main-topic
mp.messaging.incoming.messages.failure-strategy=dead-letter-queue
mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages
mp.messaging.incoming.messages.dead-letter-queue.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.messages.dead-letter-queue.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Retry Configuration

# Configure retry behavior
mp.messaging.incoming.retry-topic.connector=smallrye-kafka
mp.messaging.incoming.retry-topic.topic=retry-topic
mp.messaging.incoming.retry-topic.failure-strategy=retry
mp.messaging.incoming.retry-topic.retry.max-retries=3
mp.messaging.incoming.retry-topic.retry.delay=5s

Types

// Configuration types
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
import io.quarkus.smallrye.reactivemessaging.kafka.KafkaConfigCustomizer;

// Client customization
import io.smallrye.reactive.messaging.ClientCustomizer;
import org.eclipse.microprofile.config.Config;

// Annotations
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigRoot;
import jakarta.enterprise.context.ApplicationScoped;

// Standard Java types
import java.util.Map;

Install with Tessl CLI

npx tessl i tessl/maven-io-quarkus--quarkus-messaging-kafka

docs

configuration.md

index.md

message-processing.md

state-management.md

tile.json