0
# Configuration and Customization
1
2
The Quarkus Kafka extension provides extensive configuration options and customization capabilities for Kafka client behavior, security settings, and messaging properties.
3
4
## Configuration Classes
5
6
### ReactiveMessagingKafkaConfig
7
8
Main configuration class for Kafka extension settings.
9
10
```java { .api }
11
package io.quarkus.smallrye.reactivemessaging.kafka;
12
13
import io.quarkus.runtime.annotations.ConfigItem;
14
import io.quarkus.runtime.annotations.ConfigRoot;
15
16
@ConfigRoot(name = "messaging.kafka")
17
public class ReactiveMessagingKafkaConfig {
18
19
/**
20
* Enables the graceful shutdown in dev and test modes.
21
* The graceful shutdown waits until the inflight records have been processed
22
* and the offset committed to Kafka. While this setting is highly recommended
23
* in production, in dev and test modes, it's disabled by default.
24
*/
25
@ConfigItem(defaultValue = "false")
26
public boolean enableGracefulShutdownInDevAndTestMode;
27
}
28
```
29
30
**Properties:**
31
- `enableGracefulShutdownInDevAndTestMode`: Controls graceful shutdown behavior (default: false)
32
33
### KafkaConfigCustomizer
34
35
Customizer for Kafka client configuration, particularly for TLS and security settings.
36
37
```java { .api }
38
package io.quarkus.smallrye.reactivemessaging.kafka;
39
40
import jakarta.enterprise.context.ApplicationScoped;
41
import org.eclipse.microprofile.config.Config;
42
import io.smallrye.reactive.messaging.ClientCustomizer;
43
44
@ApplicationScoped
45
public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
46
47
/**
48
* Customize Kafka client configuration for a specific channel.
49
*
50
* @param channel The channel name being configured
51
* @param channelConfig The configuration for the channel
52
* @param config The Kafka client configuration map to customize
53
* @return The customized configuration map
54
*/
55
@Override
56
public Map<String, Object> customize(String channel, Config channelConfig,
57
Map<String, Object> config);
58
}
59
```
60
61
## Configuration Properties
62
63
### Core Configuration Prefixes
64
65
The extension supports configuration under these prefixes:
66
67
#### MicroProfile Reactive Messaging
68
```properties
69
# Channel-level configuration
70
mp.messaging.incoming.[channel-name].[property]
71
mp.messaging.outgoing.[channel-name].[property]
72
73
# Connector-level configuration
74
mp.messaging.connector.smallrye-kafka.[property]
75
```
76
77
#### Quarkus Messaging Configuration
78
```properties
79
# General messaging configuration
80
quarkus.messaging.[property]
81
82
# Kafka-specific configuration
83
quarkus.messaging.kafka.[property]
84
```
85
86
#### Kafka Client Configuration
87
```properties
88
# Direct Kafka configuration
89
quarkus.kafka.[property]
90
```
91
92
### Channel Configuration Examples
93
94
#### Basic Consumer Configuration
95
96
```properties
97
# Configure incoming channel
98
mp.messaging.incoming.my-topic.connector=smallrye-kafka
99
mp.messaging.incoming.my-topic.topic=input-topic
100
mp.messaging.incoming.my-topic.bootstrap.servers=localhost:9092
101
mp.messaging.incoming.my-topic.group.id=my-consumer-group
102
mp.messaging.incoming.my-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
103
mp.messaging.incoming.my-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
104
```
105
106
#### Basic Producer Configuration
107
108
```properties
109
# Configure outgoing channel
110
mp.messaging.outgoing.my-output.connector=smallrye-kafka
111
mp.messaging.outgoing.my-output.topic=output-topic
112
mp.messaging.outgoing.my-output.bootstrap.servers=localhost:9092
113
mp.messaging.outgoing.my-output.key.serializer=org.apache.kafka.common.serialization.StringSerializer
114
mp.messaging.outgoing.my-output.value.serializer=org.apache.kafka.common.serialization.StringSerializer
115
```
116
117
#### Advanced Consumer Configuration
118
119
```properties
120
# Consumer with advanced settings
121
mp.messaging.incoming.advanced-consumer.connector=smallrye-kafka
122
mp.messaging.incoming.advanced-consumer.topic=advanced-topic
123
mp.messaging.incoming.advanced-consumer.bootstrap.servers=kafka1:9092,kafka2:9092
124
mp.messaging.incoming.advanced-consumer.group.id=advanced-group
125
mp.messaging.incoming.advanced-consumer.auto.offset.reset=earliest
126
mp.messaging.incoming.advanced-consumer.enable.auto.commit=false
127
mp.messaging.incoming.advanced-consumer.max.poll.records=100
128
mp.messaging.incoming.advanced-consumer.session.timeout.ms=30000
129
mp.messaging.incoming.advanced-consumer.heartbeat.interval.ms=3000
130
```
131
132
### Security Configuration
133
134
#### TLS/SSL Configuration
135
136
```properties
137
# Enable TLS with Quarkus TLS configuration
138
mp.messaging.incoming.secure-topic.connector=smallrye-kafka
139
mp.messaging.incoming.secure-topic.topic=secure-topic
140
mp.messaging.incoming.secure-topic.bootstrap.servers=kafka-ssl:9093
141
mp.messaging.incoming.secure-topic.tls-configuration-name=kafka-tls
142
143
# TLS configuration
144
quarkus.tls.kafka-tls.trust-store.p12.path=kafka.client.truststore.p12
145
quarkus.tls.kafka-tls.trust-store.password=truststore-password
146
quarkus.tls.kafka-tls.key-store.p12.path=kafka.client.keystore.p12
147
quarkus.tls.kafka-tls.key-store.password=keystore-password
148
```
149
150
#### SASL Authentication
151
152
```properties
153
# SASL/PLAIN authentication
154
mp.messaging.incoming.sasl-topic.connector=smallrye-kafka
155
mp.messaging.incoming.sasl-topic.topic=sasl-topic
156
mp.messaging.incoming.sasl-topic.bootstrap.servers=kafka-sasl:9092
157
mp.messaging.incoming.sasl-topic.security.protocol=SASL_PLAINTEXT
158
mp.messaging.incoming.sasl-topic.sasl.mechanism=PLAIN
159
mp.messaging.incoming.sasl-topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
160
161
# SASL/SCRAM authentication
162
mp.messaging.incoming.scram-topic.sasl.mechanism=SCRAM-SHA-256
163
mp.messaging.incoming.scram-topic.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";
164
```
165
166
### State Store Configuration
167
168
#### Hibernate ORM State Store
169
170
```properties
171
# Configure Hibernate ORM state store
172
mp.messaging.incoming.stateful-topic.connector=smallrye-kafka
173
mp.messaging.incoming.stateful-topic.topic=stateful-topic
174
mp.messaging.incoming.stateful-topic.checkpoint.state-store=quarkus-hibernate-orm
175
176
# Optional: Custom state codec
177
mp.messaging.incoming.stateful-topic.checkpoint.state-codec-factory=com.example.MyCodecFactory
178
```
179
180
#### Redis State Store
181
182
```properties
183
# Configure Redis state store
184
mp.messaging.incoming.redis-topic.connector=smallrye-kafka
185
mp.messaging.incoming.redis-topic.topic=redis-topic
186
mp.messaging.incoming.redis-topic.checkpoint.state-store=quarkus-redis
187
188
# Redis client configuration
189
quarkus.redis.hosts=redis://localhost:6379
190
quarkus.redis.password=redis-password
191
quarkus.redis.database=0
192
```
193
194
### Serialization Configuration
195
196
#### JSON Serialization with Jackson
197
198
```properties
199
# Jackson JSON serialization
200
mp.messaging.incoming.json-topic.connector=smallrye-kafka
201
mp.messaging.incoming.json-topic.topic=json-topic
202
mp.messaging.incoming.json-topic.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer
203
mp.messaging.incoming.json-topic.value.deserializer.type=com.example.MyObject
204
205
mp.messaging.outgoing.json-output.connector=smallrye-kafka
206
mp.messaging.outgoing.json-output.topic=json-output-topic
207
mp.messaging.outgoing.json-output.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
208
```
209
210
#### Avro Serialization with Schema Registry
211
212
```properties
213
# Avro with Apicurio Registry
214
mp.messaging.incoming.avro-topic.connector=smallrye-kafka
215
mp.messaging.incoming.avro-topic.topic=avro-topic
216
mp.messaging.incoming.avro-topic.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer
217
mp.messaging.incoming.avro-topic.apicurio.registry.url=http://schema-registry:8080/apis/registry/v2
218
mp.messaging.incoming.avro-topic.apicurio.registry.auto-register=true
219
```
220
221
## Custom Configuration Examples
222
223
### Custom Kafka Config Customizer
224
225
```java
226
import jakarta.enterprise.context.ApplicationScoped;
227
import io.smallrye.reactive.messaging.ClientCustomizer;
228
229
@ApplicationScoped
230
public class MyKafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {
231
232
@Override
233
public Map<String, Object> customize(String channel, Config channelConfig,
234
Map<String, Object> config) {
235
// Add custom configuration based on channel
236
if ("secure-channel".equals(channel)) {
237
config.put("ssl.endpoint.identification.algorithm", "");
238
config.put("ssl.truststore.type", "PKCS12");
239
}
240
241
// Add monitoring configuration
242
config.put("interceptor.classes", "com.example.MyKafkaInterceptor");
243
244
return config;
245
}
246
}
247
```
248
249
### Environment-Specific Configuration
250
251
```properties
252
# Development configuration
253
%dev.mp.messaging.incoming.events.bootstrap.servers=localhost:9092
254
%dev.quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true
255
256
# Production configuration
257
%prod.mp.messaging.incoming.events.bootstrap.servers=kafka-cluster:9092
258
%prod.mp.messaging.incoming.events.security.protocol=SASL_SSL
259
%prod.mp.messaging.incoming.events.sasl.mechanism=SCRAM-SHA-256
260
261
# Test configuration
262
%test.mp.messaging.incoming.events.bootstrap.servers=${kafka.bootstrap.servers:localhost:9092}
263
```
264
265
### Global Connector Configuration
266
267
```properties
268
# Apply to all channels using smallrye-kafka connector
269
mp.messaging.connector.smallrye-kafka.bootstrap.servers=kafka:9092
270
mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL
271
mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-256
272
mp.messaging.connector.smallrye-kafka.retries=3
273
mp.messaging.connector.smallrye-kafka.retry.backoff.ms=1000
274
```
275
276
## Graceful Shutdown Configuration
277
278
### Extension-Level Shutdown Settings
279
280
```properties
281
# Enable graceful shutdown in dev/test modes
282
quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true
283
284
# Global graceful shutdown timeout
285
quarkus.shutdown.timeout=30s
286
```
287
288
### Channel-Level Shutdown Settings
289
290
```properties
291
# Channel-specific graceful shutdown
292
mp.messaging.incoming.my-topic.graceful-shutdown=true
293
mp.messaging.incoming.my-topic.graceful-shutdown.timeout=15s
294
```
295
296
## Error Handling Configuration
297
298
### Dead Letter Queue
299
300
```properties
301
# Configure dead letter queue for failed messages
302
mp.messaging.incoming.messages.connector=smallrye-kafka
303
mp.messaging.incoming.messages.topic=main-topic
304
mp.messaging.incoming.messages.failure-strategy=dead-letter-queue
305
mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages
306
mp.messaging.incoming.messages.dead-letter-queue.key.serializer=org.apache.kafka.common.serialization.StringSerializer
307
mp.messaging.incoming.messages.dead-letter-queue.value.serializer=org.apache.kafka.common.serialization.StringSerializer
308
```
309
310
### Retry Configuration
311
312
```properties
313
# Configure retry behavior
314
mp.messaging.incoming.retry-topic.connector=smallrye-kafka
315
mp.messaging.incoming.retry-topic.topic=retry-topic
316
mp.messaging.incoming.retry-topic.failure-strategy=retry
317
mp.messaging.incoming.retry-topic.retry.max-retries=3
318
mp.messaging.incoming.retry-topic.retry.delay=5s
319
```
320
321
## Types
322
323
```java { .api }
324
// Configuration types
325
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
326
import io.quarkus.smallrye.reactivemessaging.kafka.KafkaConfigCustomizer;
327
328
// Client customization
329
import io.smallrye.reactive.messaging.ClientCustomizer;
330
import org.eclipse.microprofile.config.Config;
331
332
// Annotations
333
import io.quarkus.runtime.annotations.ConfigItem;
334
import io.quarkus.runtime.annotations.ConfigRoot;
335
import jakarta.enterprise.context.ApplicationScoped;
336
337
// Standard Java types
338
import java.util.Map;
339
```