0
# Message Processing
1
2
The Quarkus Kafka extension uses MicroProfile Reactive Messaging annotations to provide declarative message processing. Messages flow through channels that connect to Kafka topics.
3
4
## Core Annotations
5
6
### @Incoming
7
8
Marks a method as a message consumer from a specific channel.
9
10
```java { .api }
11
@Incoming("channel-name")
12
public void consume(String message);
13
14
@Incoming("channel-name")
15
public CompletionStage<Void> consume(Message<String> message);
16
17
@Incoming("channel-name")
18
public Uni<Void> consume(String message);
19
```
20
21
**Parameters:**
22
- `value`: Channel name (connects to Kafka topic via configuration)
23
24
### @Outgoing
25
26
Marks a method as a message producer to a specific channel.
27
28
```java { .api }
29
@Outgoing("channel-name")
30
public String produce();
31
32
@Outgoing("channel-name")
33
public Multi<String> produce();
34
35
@Outgoing("channel-name")
36
public Message<String> produce();
37
```
38
39
**Parameters:**
40
- `value`: Channel name (connects to Kafka topic via configuration)
41
42
### Combined Processing
43
44
Process messages from one channel and send to another.
45
46
```java { .api }
47
@Incoming("input-channel")
48
@Outgoing("output-channel")
49
public String transform(String input);
50
51
@Incoming("input-channel")
52
@Outgoing("output-channel")
53
public Message<String> transform(Message<String> input);
54
```
55
56
## Working with Records
57
58
### Kafka Records
59
60
Access Kafka-specific metadata using `Record<K, V>`.
61
62
```java { .api }
63
import io.smallrye.reactive.messaging.kafka.Record;
64
65
@Incoming("keyed-messages")
66
public void consume(Record<String, Person> record) {
67
String key = record.key();
68
Person value = record.value();
69
}
70
```
71
72
### Message Metadata
73
74
Access Kafka metadata from Message objects.
75
76
```java { .api }
77
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
78
79
@Incoming("data-with-metadata-in")
80
public void consume(String data, IncomingKafkaRecordMetadata<String, String> metadata) {
81
String topic = metadata.getTopic();
82
int partition = metadata.getPartition();
83
long offset = metadata.getOffset();
84
String key = metadata.getKey();
85
}
86
```
87
88
## Message Processing Patterns
89
90
### Simple Consumer
91
92
Basic message consumption with automatic acknowledgment.
93
94
```java
95
@ApplicationScoped
96
public class SimpleConsumer {
97
98
@Incoming("notifications")
99
public void process(String notification) {
100
System.out.println("Received: " + notification);
101
// Message automatically acknowledged
102
}
103
}
104
```
105
106
### Message Transformation
107
108
Transform messages between topics.
109
110
```java
111
@ApplicationScoped
112
public class MessageTransformer {
113
114
@Incoming("raw-data")
115
@Outgoing("processed-data")
116
public String transform(String rawData) {
117
return rawData.toUpperCase().trim();
118
}
119
}
120
```
121
122
### Async Processing with CompletionStage
123
124
Handle messages asynchronously with manual acknowledgment control.
125
126
```java
127
@ApplicationScoped
128
public class AsyncProcessor {
129
130
@Incoming("async-messages")
131
public CompletionStage<Void> processAsync(Message<String> message) {
132
return CompletableFuture
133
.supplyAsync(() -> {
134
// Async processing logic
135
processData(message.getPayload());
136
return null;
137
})
138
.thenCompose(v -> message.ack());
139
}
140
}
141
```
142
143
### Reactive Streams with Mutiny
144
145
Use Mutiny reactive types for advanced stream processing.
146
147
```java
148
@ApplicationScoped
149
public class ReactiveProcessor {
150
151
@Incoming("stream-data")
152
public Uni<Void> processReactive(String data) {
153
return Uni.createFrom().item(data)
154
.map(String::toUpperCase)
155
.onItem().invoke(processed -> saveToDatabase(processed))
156
.replaceWithVoid();
157
}
158
}
159
```
160
161
### Keyed Message Processing
162
163
Process keyed messages with KeyedMulti for partitioned processing.
164
165
```java { .api }
166
import io.smallrye.reactive.messaging.keyed.KeyedMulti;
167
168
@Incoming("keyed-input")
169
@Outgoing("keyed-output")
170
public Multi<String> processKeyed(KeyedMulti<String, String> keyedData) {
171
return keyedData.map(value -> keyedData.key() + ":" + value);
172
}
173
```
174
175
## Configuration Examples
176
177
### Basic Channel Configuration
178
179
```properties
180
# Consumer configuration
181
mp.messaging.incoming.notifications.connector=smallrye-kafka
182
mp.messaging.incoming.notifications.topic=notification-topic
183
mp.messaging.incoming.notifications.bootstrap.servers=localhost:9092
184
mp.messaging.incoming.notifications.group.id=notification-consumer
185
186
# Producer configuration
187
mp.messaging.outgoing.processed-data.connector=smallrye-kafka
188
mp.messaging.outgoing.processed-data.topic=processed-topic
189
mp.messaging.outgoing.processed-data.bootstrap.servers=localhost:9092
190
```
191
192
### Advanced Configuration
193
194
```properties
195
# Consumer with custom deserializer
196
mp.messaging.incoming.complex-data.connector=smallrye-kafka
197
mp.messaging.incoming.complex-data.topic=complex-topic
198
mp.messaging.incoming.complex-data.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer
199
mp.messaging.incoming.complex-data.apicurio.registry.url=http://localhost:8080/apis/registry/v2
200
201
# Producer with custom serializer
202
mp.messaging.outgoing.events.connector=smallrye-kafka
203
mp.messaging.outgoing.events.topic=event-topic
204
mp.messaging.outgoing.events.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
205
```
206
207
## Error Handling
208
209
### Exception Handling in Consumers
210
211
```java
212
@ApplicationScoped
213
public class ErrorHandlingConsumer {
214
215
@Incoming("messages")
216
public CompletionStage<Void> consume(Message<String> message) {
217
try {
218
processMessage(message.getPayload());
219
return message.ack();
220
} catch (Exception e) {
221
logger.error("Failed to process message", e);
222
return message.nack(e);
223
}
224
}
225
}
226
```
227
228
### Dead Letter Queue
229
230
Configure dead letter topics for failed messages:
231
232
```properties
233
mp.messaging.incoming.messages.connector=smallrye-kafka
234
mp.messaging.incoming.messages.topic=main-topic
235
mp.messaging.incoming.messages.failure-strategy=dead-letter-queue
236
mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages
237
```
238
239
## Types
240
241
```java { .api }
242
// Core MicroProfile Reactive Messaging types
243
import org.eclipse.microprofile.reactive.messaging.Message;
244
import org.eclipse.microprofile.reactive.messaging.Incoming;
245
import org.eclipse.microprofile.reactive.messaging.Outgoing;
246
247
// Kafka-specific types
248
import io.smallrye.reactive.messaging.kafka.Record;
249
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
250
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
251
252
// Keyed processing types
253
import io.smallrye.reactive.messaging.keyed.KeyedMulti;
254
import io.smallrye.reactive.messaging.keyed.Keyed;
255
256
// Mutiny reactive types
257
import io.smallrye.mutiny.Uni;
258
import io.smallrye.mutiny.Multi;
259
```