Connect to Kafka with Reactive Messaging
—
The Quarkus Kafka extension uses MicroProfile Reactive Messaging annotations to provide declarative message processing. Messages flow through channels that connect to Kafka topics.
Marks a method as a message consumer from a specific channel.
@Incoming("channel-name")
public void consume(String message);
@Incoming("channel-name")
public CompletionStage<Void> consume(Message<String> message);
@Incoming("channel-name")
public Uni<Void> consume(String message);Parameters:
value: Channel name (connects to Kafka topic via configuration)Marks a method as a message producer to a specific channel.
@Outgoing("channel-name")
public String produce();
@Outgoing("channel-name")
public Multi<String> produce();
@Outgoing("channel-name")
public Message<String> produce();Parameters:
value: Channel name (connects to Kafka topic via configuration)Process messages from one channel and send to another.
@Incoming("input-channel")
@Outgoing("output-channel")
public String transform(String input);
@Incoming("input-channel")
@Outgoing("output-channel")
public Message<String> transform(Message<String> input);Access Kafka-specific metadata using Record<K, V>.
import io.smallrye.reactive.messaging.kafka.Record;
@Incoming("keyed-messages")
public void consume(Record<String, Person> record) {
String key = record.key();
Person value = record.value();
}Access Kafka metadata from Message objects.
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
@Incoming("data-with-metadata-in")
public void consume(String data, IncomingKafkaRecordMetadata<String, String> metadata) {
String topic = metadata.getTopic();
int partition = metadata.getPartition();
long offset = metadata.getOffset();
String key = metadata.getKey();
}Basic message consumption with automatic acknowledgment.
@ApplicationScoped
public class SimpleConsumer {
@Incoming("notifications")
public void process(String notification) {
System.out.println("Received: " + notification);
// Message automatically acknowledged
}
}Transform messages between topics.
@ApplicationScoped
public class MessageTransformer {
@Incoming("raw-data")
@Outgoing("processed-data")
public String transform(String rawData) {
return rawData.toUpperCase().trim();
}
}Handle messages asynchronously with manual acknowledgment control.
@ApplicationScoped
public class AsyncProcessor {
@Incoming("async-messages")
public CompletionStage<Void> processAsync(Message<String> message) {
return CompletableFuture
.supplyAsync(() -> {
// Async processing logic
processData(message.getPayload());
return null;
})
.thenCompose(v -> message.ack());
}
}Use Mutiny reactive types for advanced stream processing.
@ApplicationScoped
public class ReactiveProcessor {
@Incoming("stream-data")
public Uni<Void> processReactive(String data) {
return Uni.createFrom().item(data)
.map(String::toUpperCase)
.onItem().invoke(processed -> saveToDatabase(processed))
.replaceWithVoid();
}
}Process keyed messages with KeyedMulti for partitioned processing.
import io.smallrye.reactive.messaging.keyed.KeyedMulti;
@Incoming("keyed-input")
@Outgoing("keyed-output")
public Multi<String> processKeyed(KeyedMulti<String, String> keyedData) {
return keyedData.map(value -> keyedData.key() + ":" + value);
}# Consumer configuration
mp.messaging.incoming.notifications.connector=smallrye-kafka
mp.messaging.incoming.notifications.topic=notification-topic
mp.messaging.incoming.notifications.bootstrap.servers=localhost:9092
mp.messaging.incoming.notifications.group.id=notification-consumer
# Producer configuration
mp.messaging.outgoing.processed-data.connector=smallrye-kafka
mp.messaging.outgoing.processed-data.topic=processed-topic
mp.messaging.outgoing.processed-data.bootstrap.servers=localhost:9092# Consumer with custom deserializer
mp.messaging.incoming.complex-data.connector=smallrye-kafka
mp.messaging.incoming.complex-data.topic=complex-topic
mp.messaging.incoming.complex-data.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer
mp.messaging.incoming.complex-data.apicurio.registry.url=http://localhost:8080/apis/registry/v2
# Producer with custom serializer
mp.messaging.outgoing.events.connector=smallrye-kafka
mp.messaging.outgoing.events.topic=event-topic
mp.messaging.outgoing.events.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer@ApplicationScoped
public class ErrorHandlingConsumer {
@Incoming("messages")
public CompletionStage<Void> consume(Message<String> message) {
try {
processMessage(message.getPayload());
return message.ack();
} catch (Exception e) {
logger.error("Failed to process message", e);
return message.nack(e);
}
}
}Configure dead letter topics for failed messages:
mp.messaging.incoming.messages.connector=smallrye-kafka
mp.messaging.incoming.messages.topic=main-topic
mp.messaging.incoming.messages.failure-strategy=dead-letter-queue
mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages// Core MicroProfile Reactive Messaging types
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
// Kafka-specific types
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
// Keyed processing types
import io.smallrye.reactive.messaging.keyed.KeyedMulti;
import io.smallrye.reactive.messaging.keyed.Keyed;
// Mutiny reactive types
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;Install with Tessl CLI
npx tessl i tessl/maven-io-quarkus--quarkus-messaging-kafka