Connect to Kafka with Reactive Messaging
npx @tessl/cli install tessl/maven-io-quarkus--quarkus-messaging-kafka@3.15.00
# Quarkus SmallRye Reactive Messaging Kafka Extension
1
2
The Quarkus SmallRye Reactive Messaging Kafka Extension provides seamless integration between Apache Kafka and MicroProfile Reactive Messaging within the Quarkus framework. It enables developers to build reactive, event-driven applications using declarative annotations and reactive streams.
3
4
## Package Information
5
6
- **Package Name**: quarkus-messaging-kafka
7
- **Package Type**: Maven (Quarkus Extension)
8
- **Language**: Java
9
- **Installation**: Add dependency to your `pom.xml`
10
11
```xml
12
<dependency>
13
<groupId>io.quarkus</groupId>
14
<artifactId>quarkus-messaging-kafka</artifactId>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java { .api }
21
import org.eclipse.microprofile.reactive.messaging.Incoming;
22
import org.eclipse.microprofile.reactive.messaging.Outgoing;
23
import org.eclipse.microprofile.reactive.messaging.Message;
24
import io.smallrye.reactive.messaging.kafka.Record;
25
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
26
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
27
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;
28
```
29
30
## Basic Usage
31
32
### Simple Message Processing
33
34
```java
35
import org.eclipse.microprofile.reactive.messaging.Incoming;
36
import org.eclipse.microprofile.reactive.messaging.Outgoing;
37
import jakarta.enterprise.context.ApplicationScoped;
38
39
@ApplicationScoped
40
public class KafkaProcessor {
41
42
@Incoming("incoming-topic")
43
@Outgoing("outgoing-topic")
44
public String process(String input) {
45
return input.toUpperCase();
46
}
47
48
@Incoming("messages")
49
public void consume(String message) {
50
System.out.println("Received: " + message);
51
}
52
}
53
```
54
55
### Configuration
56
57
Configure channels in `application.properties`:
58
59
```properties
60
# Incoming channel
61
mp.messaging.incoming.incoming-topic.connector=smallrye-kafka
62
mp.messaging.incoming.incoming-topic.topic=input-topic
63
mp.messaging.incoming.incoming-topic.bootstrap.servers=localhost:9092
64
65
# Outgoing channel
66
mp.messaging.outgoing.outgoing-topic.connector=smallrye-kafka
67
mp.messaging.outgoing.outgoing-topic.topic=output-topic
68
mp.messaging.outgoing.outgoing-topic.bootstrap.servers=localhost:9092
69
```
70
71
## Architecture
72
73
The extension provides several key components:
74
75
- **Message Processing**: Declarative message processing with `@Incoming` and `@Outgoing` annotations
76
- **State Management**: Exactly-once processing with checkpoint state stores
77
- **Configuration**: Flexible Kafka client configuration and customization
78
- **TLS Integration**: Automatic TLS configuration with Quarkus security
79
80
## Capabilities
81
82
### Message Processing with Reactive Messaging
83
84
Core message processing using MicroProfile Reactive Messaging annotations.
85
86
```java { .api }
87
@Incoming("channel-name")
88
public void consume(String message);
89
90
@Incoming("input-channel")
91
@Outgoing("output-channel")
92
public String process(String input);
93
94
@Incoming("keyed-channel")
95
public void consume(Record<String, Person> record);
96
```
97
98
[Message Processing](./message-processing.md)
99
100
### Exactly-Once Processing and State Management
101
102
Advanced exactly-once processing capabilities with checkpoint state management.
103
104
```java { .api }
105
@Incoming("people-in")
106
public CompletionStage<Void> consume(Message<Person> msg) {
107
CheckpointMetadata<UserState> store = CheckpointMetadata.fromMessage(msg);
108
// State management logic
109
return msg.ack();
110
}
111
```
112
113
[State Management](./state-management.md)
114
115
### Configuration and Customization
116
117
Configuration classes and customization options for Kafka client behavior.
118
119
```java { .api }
120
@ApplicationScoped
121
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
122
public Map<String, Object> customize(String channel, Config channelConfig, Map<String, Object> config);
123
}
124
```
125
126
[Configuration](./configuration.md)
127
128
## Configuration Properties
129
130
The extension supports configuration under these prefixes:
131
- `mp.messaging.*` - MicroProfile Reactive Messaging configuration
132
- `quarkus.messaging.*` - Quarkus-specific messaging configuration
133
- `quarkus.kafka.*` - Kafka-specific configuration
134
- `quarkus.messaging.kafka.*` - Direct extension configuration
135
136
## Key Features
137
138
- **Reactive Streams**: Full reactive programming model with backpressure support
139
- **Exactly-Once Semantics**: Checkpoint-based exactly-once processing
140
- **Multiple State Stores**: Hibernate ORM, Hibernate Reactive, and Redis backends
141
- **TLS Security**: Automatic TLS configuration integration
142
- **Custom Serialization**: Support for custom serializers and deserializers
143
- **Cloud Native**: Optimized for containers and serverless deployments