Connect to Kafka with Reactive Messaging
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
The Quarkus SmallRye Reactive Messaging Kafka Extension provides seamless integration between Apache Kafka and MicroProfile Reactive Messaging within the Quarkus framework. It enables developers to build reactive, event-driven applications using declarative annotations and reactive streams.
pom.xml<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class KafkaProcessor {
@Incoming("incoming-topic")
@Outgoing("outgoing-topic")
public String process(String input) {
return input.toUpperCase();
}
@Incoming("messages")
public void consume(String message) {
System.out.println("Received: " + message);
}
}Configure channels in application.properties:
# Incoming channel
mp.messaging.incoming.incoming-topic.connector=smallrye-kafka
mp.messaging.incoming.incoming-topic.topic=input-topic
mp.messaging.incoming.incoming-topic.bootstrap.servers=localhost:9092
# Outgoing channel
mp.messaging.outgoing.outgoing-topic.connector=smallrye-kafka
mp.messaging.outgoing.outgoing-topic.topic=output-topic
mp.messaging.outgoing.outgoing-topic.bootstrap.servers=localhost:9092The extension provides several key components:
@Incoming and @Outgoing annotationsCore message processing using MicroProfile Reactive Messaging annotations.
@Incoming("channel-name")
public void consume(String message);
@Incoming("input-channel")
@Outgoing("output-channel")
public String process(String input);
@Incoming("keyed-channel")
public void consume(Record<String, Person> record);Advanced exactly-once processing capabilities with checkpoint state management.
@Incoming("people-in")
public CompletionStage<Void> consume(Message<Person> msg) {
CheckpointMetadata<UserState> store = CheckpointMetadata.fromMessage(msg);
// State management logic
return msg.ack();
}Configuration classes and customization options for Kafka client behavior.
@ApplicationScoped
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
public Map<String, Object> customize(String channel, Config channelConfig, Map<String, Object> config);
}The extension supports configuration under these prefixes:
mp.messaging.* - MicroProfile Reactive Messaging configurationquarkus.messaging.* - Quarkus-specific messaging configurationquarkus.kafka.* - Kafka-specific configurationquarkus.messaging.kafka.* - Direct extension configuration