CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-quarkus--quarkus-messaging-kafka

Connect to Kafka with Reactive Messaging

Pending
Overview
Eval results
Files

message-processing.mddocs/

Message Processing

The Quarkus Kafka extension uses MicroProfile Reactive Messaging annotations to provide declarative message processing. Messages flow through channels that connect to Kafka topics.

Core Annotations

@Incoming

Marks a method as a message consumer from a specific channel.

@Incoming("channel-name")
public void consume(String message);

@Incoming("channel-name") 
public CompletionStage<Void> consume(Message<String> message);

@Incoming("channel-name")
public Uni<Void> consume(String message);

Parameters:

  • value: Channel name (connects to Kafka topic via configuration)

@Outgoing

Marks a method as a message producer to a specific channel.

@Outgoing("channel-name")
public String produce();

@Outgoing("channel-name")
public Multi<String> produce();

@Outgoing("channel-name") 
public Message<String> produce();

Parameters:

  • value: Channel name (connects to Kafka topic via configuration)

Combined Processing

Process messages from one channel and send to another.

@Incoming("input-channel")
@Outgoing("output-channel")
public String transform(String input);

@Incoming("input-channel")
@Outgoing("output-channel") 
public Message<String> transform(Message<String> input);

Working with Records

Kafka Records

Access Kafka-specific metadata using Record<K, V>.

import io.smallrye.reactive.messaging.kafka.Record;

@Incoming("keyed-messages")
public void consume(Record<String, Person> record) {
    String key = record.key();
    Person value = record.value();
}

Message Metadata

Access Kafka metadata from Message objects.

import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;

@Incoming("data-with-metadata-in")
public void consume(String data, IncomingKafkaRecordMetadata<String, String> metadata) {
    String topic = metadata.getTopic();
    int partition = metadata.getPartition();
    long offset = metadata.getOffset();
    String key = metadata.getKey();
}

Message Processing Patterns

Simple Consumer

Basic message consumption with automatic acknowledgment.

@ApplicationScoped
public class SimpleConsumer {
    
    @Incoming("notifications")
    public void process(String notification) {
        System.out.println("Received: " + notification);
        // Message automatically acknowledged
    }
}

Message Transformation

Transform messages between topics.

@ApplicationScoped  
public class MessageTransformer {
    
    @Incoming("raw-data")
    @Outgoing("processed-data")
    public String transform(String rawData) {
        return rawData.toUpperCase().trim();
    }
}

Async Processing with CompletionStage

Handle messages asynchronously with manual acknowledgment control.

@ApplicationScoped
public class AsyncProcessor {
    
    @Incoming("async-messages")
    public CompletionStage<Void> processAsync(Message<String> message) {
        return CompletableFuture
            .supplyAsync(() -> {
                // Async processing logic
                processData(message.getPayload());
                return null;
            })
            .thenCompose(v -> message.ack());
    }
}

Reactive Streams with Mutiny

Use Mutiny reactive types for advanced stream processing.

@ApplicationScoped
public class ReactiveProcessor {
    
    @Incoming("stream-data")
    public Uni<Void> processReactive(String data) {
        return Uni.createFrom().item(data)
            .map(String::toUpperCase)
            .onItem().invoke(processed -> saveToDatabase(processed))
            .replaceWithVoid();
    }
}

Keyed Message Processing

Process keyed messages with KeyedMulti for partitioned processing.

import io.smallrye.reactive.messaging.keyed.KeyedMulti;

@Incoming("keyed-input")
@Outgoing("keyed-output")
public Multi<String> processKeyed(KeyedMulti<String, String> keyedData) {
    return keyedData.map(value -> keyedData.key() + ":" + value);
}

Configuration Examples

Basic Channel Configuration

# Consumer configuration
mp.messaging.incoming.notifications.connector=smallrye-kafka
mp.messaging.incoming.notifications.topic=notification-topic
mp.messaging.incoming.notifications.bootstrap.servers=localhost:9092
mp.messaging.incoming.notifications.group.id=notification-consumer

# Producer configuration  
mp.messaging.outgoing.processed-data.connector=smallrye-kafka
mp.messaging.outgoing.processed-data.topic=processed-topic
mp.messaging.outgoing.processed-data.bootstrap.servers=localhost:9092

Advanced Configuration

# Consumer with custom deserializer
mp.messaging.incoming.complex-data.connector=smallrye-kafka
mp.messaging.incoming.complex-data.topic=complex-topic
mp.messaging.incoming.complex-data.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer
mp.messaging.incoming.complex-data.apicurio.registry.url=http://localhost:8080/apis/registry/v2

# Producer with custom serializer
mp.messaging.outgoing.events.connector=smallrye-kafka  
mp.messaging.outgoing.events.topic=event-topic
mp.messaging.outgoing.events.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Error Handling

Exception Handling in Consumers

@ApplicationScoped
public class ErrorHandlingConsumer {
    
    @Incoming("messages")
    public CompletionStage<Void> consume(Message<String> message) {
        try {
            processMessage(message.getPayload());
            return message.ack();
        } catch (Exception e) {
            logger.error("Failed to process message", e);
            return message.nack(e);
        }
    }
}

Dead Letter Queue

Configure dead letter topics for failed messages:

mp.messaging.incoming.messages.connector=smallrye-kafka
mp.messaging.incoming.messages.topic=main-topic
mp.messaging.incoming.messages.failure-strategy=dead-letter-queue
mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages

Types

// Core MicroProfile Reactive Messaging types
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

// Kafka-specific types
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

// Keyed processing types
import io.smallrye.reactive.messaging.keyed.KeyedMulti;
import io.smallrye.reactive.messaging.keyed.Keyed;

// Mutiny reactive types
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;

Install with Tessl CLI

npx tessl i tessl/maven-io-quarkus--quarkus-messaging-kafka

docs

configuration.md

index.md

message-processing.md

state-management.md

tile.json