Connect to Kafka with Reactive Messaging
npx @tessl/cli install tessl/maven-io-quarkus--quarkus-messaging-kafka@3.15.0The Quarkus SmallRye Reactive Messaging Kafka Extension provides seamless integration between Apache Kafka and MicroProfile Reactive Messaging within the Quarkus framework. It enables developers to build reactive, event-driven applications using declarative annotations and reactive streams.
pom.xml<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class KafkaProcessor {
@Incoming("incoming-topic")
@Outgoing("outgoing-topic")
public String process(String input) {
return input.toUpperCase();
}
@Incoming("messages")
public void consume(String message) {
System.out.println("Received: " + message);
}
}Configure channels in application.properties:
# Incoming channel
mp.messaging.incoming.incoming-topic.connector=smallrye-kafka
mp.messaging.incoming.incoming-topic.topic=input-topic
mp.messaging.incoming.incoming-topic.bootstrap.servers=localhost:9092
# Outgoing channel
mp.messaging.outgoing.outgoing-topic.connector=smallrye-kafka
mp.messaging.outgoing.outgoing-topic.topic=output-topic
mp.messaging.outgoing.outgoing-topic.bootstrap.servers=localhost:9092The extension provides several key components:
@Incoming and @Outgoing annotationsCore message processing using MicroProfile Reactive Messaging annotations.
@Incoming("channel-name")
public void consume(String message);
@Incoming("input-channel")
@Outgoing("output-channel")
public String process(String input);
@Incoming("keyed-channel")
public void consume(Record<String, Person> record);Advanced exactly-once processing capabilities with checkpoint state management.
@Incoming("people-in")
public CompletionStage<Void> consume(Message<Person> msg) {
CheckpointMetadata<UserState> store = CheckpointMetadata.fromMessage(msg);
// State management logic
return msg.ack();
}Configuration classes and customization options for Kafka client behavior.
@ApplicationScoped
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
public Map<String, Object> customize(String channel, Config channelConfig, Map<String, Object> config);
}The extension supports configuration under these prefixes:
mp.messaging.* - MicroProfile Reactive Messaging configurationquarkus.messaging.* - Quarkus-specific messaging configurationquarkus.kafka.* - Kafka-specific configurationquarkus.messaging.kafka.* - Direct extension configuration