or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdmessage-processing.mdstate-management.md
tile.json

tessl/maven-io-quarkus--quarkus-messaging-kafka

Connect to Kafka with Reactive Messaging

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/io.quarkus/quarkus-messaging-kafka@3.15.x

To install, run

npx @tessl/cli install tessl/maven-io-quarkus--quarkus-messaging-kafka@3.15.0

index.mddocs/

Quarkus SmallRye Reactive Messaging Kafka Extension

The Quarkus SmallRye Reactive Messaging Kafka Extension provides seamless integration between Apache Kafka and MicroProfile Reactive Messaging within the Quarkus framework. It enables developers to build reactive, event-driven applications using declarative annotations and reactive streams.

Package Information

  • Package Name: quarkus-messaging-kafka
  • Package Type: Maven (Quarkus Extension)
  • Language: Java
  • Installation: Add dependency to your pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

Core Imports

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;

Basic Usage

Simple Message Processing

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class KafkaProcessor {
    
    @Incoming("incoming-topic")
    @Outgoing("outgoing-topic")
    public String process(String input) {
        return input.toUpperCase();
    }
    
    @Incoming("messages")
    public void consume(String message) {
        System.out.println("Received: " + message);
    }
}

Configuration

Configure channels in application.properties:

# Incoming channel
mp.messaging.incoming.incoming-topic.connector=smallrye-kafka
mp.messaging.incoming.incoming-topic.topic=input-topic
mp.messaging.incoming.incoming-topic.bootstrap.servers=localhost:9092

# Outgoing channel  
mp.messaging.outgoing.outgoing-topic.connector=smallrye-kafka
mp.messaging.outgoing.outgoing-topic.topic=output-topic
mp.messaging.outgoing.outgoing-topic.bootstrap.servers=localhost:9092

Architecture

The extension provides several key components:

  • Message Processing: Declarative message processing with @Incoming and @Outgoing annotations
  • State Management: Exactly-once processing with checkpoint state stores
  • Configuration: Flexible Kafka client configuration and customization
  • TLS Integration: Automatic TLS configuration with Quarkus security

Capabilities

Message Processing with Reactive Messaging

Core message processing using MicroProfile Reactive Messaging annotations.

@Incoming("channel-name")
public void consume(String message);

@Incoming("input-channel") 
@Outgoing("output-channel")
public String process(String input);

@Incoming("keyed-channel")
public void consume(Record<String, Person> record);

Message Processing

Exactly-Once Processing and State Management

Advanced exactly-once processing capabilities with checkpoint state management.

@Incoming("people-in")
public CompletionStage<Void> consume(Message<Person> msg) {
    CheckpointMetadata<UserState> store = CheckpointMetadata.fromMessage(msg);
    // State management logic
    return msg.ack();
}

State Management

Configuration and Customization

Configuration classes and customization options for Kafka client behavior.

@ApplicationScoped
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
    public Map<String, Object> customize(String channel, Config channelConfig, Map<String, Object> config);
}

Configuration

Configuration Properties

The extension supports configuration under these prefixes:

  • mp.messaging.* - MicroProfile Reactive Messaging configuration
  • quarkus.messaging.* - Quarkus-specific messaging configuration
  • quarkus.kafka.* - Kafka-specific configuration
  • quarkus.messaging.kafka.* - Direct extension configuration

Key Features

  • Reactive Streams: Full reactive programming model with backpressure support
  • Exactly-Once Semantics: Checkpoint-based exactly-once processing
  • Multiple State Stores: Hibernate ORM, Hibernate Reactive, and Redis backends
  • TLS Security: Automatic TLS configuration integration
  • Custom Serialization: Support for custom serializers and deserializers
  • Cloud Native: Optimized for containers and serverless deployments