or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdmessage-processing.mdstate-management.md

configuration.mddocs/

0

# Configuration and Customization

1

2

The Quarkus Kafka extension provides extensive configuration options and customization capabilities for Kafka client behavior, security settings, and messaging properties.

3

4

## Configuration Classes

5

6

### ReactiveMessagingKafkaConfig

7

8

Main configuration class for Kafka extension settings.

9

10

```java { .api }

11

package io.quarkus.smallrye.reactivemessaging.kafka;

12

13

import io.quarkus.runtime.annotations.ConfigItem;

14

import io.quarkus.runtime.annotations.ConfigRoot;

15

16

@ConfigRoot(name = "messaging.kafka")

17

public class ReactiveMessagingKafkaConfig {

18

19

/**

20

* Enables the graceful shutdown in dev and test modes.

21

* The graceful shutdown waits until the inflight records have been processed

22

* and the offset committed to Kafka. While this setting is highly recommended

23

* in production, in dev and test modes, it's disabled by default.

24

*/

25

@ConfigItem(defaultValue = "false")

26

public boolean enableGracefulShutdownInDevAndTestMode;

27

}

28

```

29

30

**Properties:**

31

- `enableGracefulShutdownInDevAndTestMode`: Controls graceful shutdown behavior (default: false)

32

33

### KafkaConfigCustomizer

34

35

Customizer for Kafka client configuration, particularly for TLS and security settings.

36

37

```java { .api }

38

package io.quarkus.smallrye.reactivemessaging.kafka;

39

40

import jakarta.enterprise.context.ApplicationScoped;

41

import org.eclipse.microprofile.config.Config;

42

import io.smallrye.reactive.messaging.ClientCustomizer;

43

44

@ApplicationScoped

45

public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {

46

47

/**

48

* Customize Kafka client configuration for a specific channel.

49

*

50

* @param channel The channel name being configured

51

* @param channelConfig The configuration for the channel

52

* @param config The Kafka client configuration map to customize

53

* @return The customized configuration map

54

*/

55

@Override

56

public Map<String, Object> customize(String channel, Config channelConfig,

57

Map<String, Object> config);

58

}

59

```

60

61

## Configuration Properties

62

63

### Core Configuration Prefixes

64

65

The extension supports configuration under these prefixes:

66

67

#### MicroProfile Reactive Messaging

68

```properties

69

# Channel-level configuration

70

mp.messaging.incoming.[channel-name].[property]

71

mp.messaging.outgoing.[channel-name].[property]

72

73

# Connector-level configuration

74

mp.messaging.connector.smallrye-kafka.[property]

75

```

76

77

#### Quarkus Messaging Configuration

78

```properties

79

# General messaging configuration

80

quarkus.messaging.[property]

81

82

# Kafka-specific configuration

83

quarkus.messaging.kafka.[property]

84

```

85

86

#### Kafka Client Configuration

87

```properties

88

# Direct Kafka configuration

89

quarkus.kafka.[property]

90

```

91

92

### Channel Configuration Examples

93

94

#### Basic Consumer Configuration

95

96

```properties

97

# Configure incoming channel

98

mp.messaging.incoming.my-topic.connector=smallrye-kafka

99

mp.messaging.incoming.my-topic.topic=input-topic

100

mp.messaging.incoming.my-topic.bootstrap.servers=localhost:9092

101

mp.messaging.incoming.my-topic.group.id=my-consumer-group

102

mp.messaging.incoming.my-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

103

mp.messaging.incoming.my-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

104

```

105

106

#### Basic Producer Configuration

107

108

```properties

109

# Configure outgoing channel

110

mp.messaging.outgoing.my-output.connector=smallrye-kafka

111

mp.messaging.outgoing.my-output.topic=output-topic

112

mp.messaging.outgoing.my-output.bootstrap.servers=localhost:9092

113

mp.messaging.outgoing.my-output.key.serializer=org.apache.kafka.common.serialization.StringSerializer

114

mp.messaging.outgoing.my-output.value.serializer=org.apache.kafka.common.serialization.StringSerializer

115

```

116

117

#### Advanced Consumer Configuration

118

119

```properties

120

# Consumer with advanced settings

121

mp.messaging.incoming.advanced-consumer.connector=smallrye-kafka

122

mp.messaging.incoming.advanced-consumer.topic=advanced-topic

123

mp.messaging.incoming.advanced-consumer.bootstrap.servers=kafka1:9092,kafka2:9092

124

mp.messaging.incoming.advanced-consumer.group.id=advanced-group

125

mp.messaging.incoming.advanced-consumer.auto.offset.reset=earliest

126

mp.messaging.incoming.advanced-consumer.enable.auto.commit=false

127

mp.messaging.incoming.advanced-consumer.max.poll.records=100

128

mp.messaging.incoming.advanced-consumer.session.timeout.ms=30000

129

mp.messaging.incoming.advanced-consumer.heartbeat.interval.ms=3000

130

```

131

132

### Security Configuration

133

134

#### TLS/SSL Configuration

135

136

```properties

137

# Enable TLS with Quarkus TLS configuration

138

mp.messaging.incoming.secure-topic.connector=smallrye-kafka

139

mp.messaging.incoming.secure-topic.topic=secure-topic

140

mp.messaging.incoming.secure-topic.bootstrap.servers=kafka-ssl:9093

141

mp.messaging.incoming.secure-topic.tls-configuration-name=kafka-tls

142

143

# TLS configuration

144

quarkus.tls.kafka-tls.trust-store.p12.path=kafka.client.truststore.p12

145

quarkus.tls.kafka-tls.trust-store.password=truststore-password

146

quarkus.tls.kafka-tls.key-store.p12.path=kafka.client.keystore.p12

147

quarkus.tls.kafka-tls.key-store.password=keystore-password

148

```

149

150

#### SASL Authentication

151

152

```properties

153

# SASL/PLAIN authentication

154

mp.messaging.incoming.sasl-topic.connector=smallrye-kafka

155

mp.messaging.incoming.sasl-topic.topic=sasl-topic

156

mp.messaging.incoming.sasl-topic.bootstrap.servers=kafka-sasl:9092

157

mp.messaging.incoming.sasl-topic.security.protocol=SASL_PLAINTEXT

158

mp.messaging.incoming.sasl-topic.sasl.mechanism=PLAIN

159

mp.messaging.incoming.sasl-topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";

160

161

# SASL/SCRAM authentication

162

mp.messaging.incoming.scram-topic.sasl.mechanism=SCRAM-SHA-256

163

mp.messaging.incoming.scram-topic.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";

164

```

165

166

### State Store Configuration

167

168

#### Hibernate ORM State Store

169

170

```properties

171

# Configure Hibernate ORM state store

172

mp.messaging.incoming.stateful-topic.connector=smallrye-kafka

173

mp.messaging.incoming.stateful-topic.topic=stateful-topic

174

mp.messaging.incoming.stateful-topic.checkpoint.state-store=quarkus-hibernate-orm

175

176

# Optional: Custom state codec

177

mp.messaging.incoming.stateful-topic.checkpoint.state-codec-factory=com.example.MyCodecFactory

178

```

179

180

#### Redis State Store

181

182

```properties

183

# Configure Redis state store

184

mp.messaging.incoming.redis-topic.connector=smallrye-kafka

185

mp.messaging.incoming.redis-topic.topic=redis-topic

186

mp.messaging.incoming.redis-topic.checkpoint.state-store=quarkus-redis

187

188

# Redis client configuration

189

quarkus.redis.hosts=redis://localhost:6379

190

quarkus.redis.password=redis-password

191

quarkus.redis.database=0

192

```

193

194

### Serialization Configuration

195

196

#### JSON Serialization with Jackson

197

198

```properties

199

# Jackson JSON serialization

200

mp.messaging.incoming.json-topic.connector=smallrye-kafka

201

mp.messaging.incoming.json-topic.topic=json-topic

202

mp.messaging.incoming.json-topic.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer

203

mp.messaging.incoming.json-topic.value.deserializer.type=com.example.MyObject

204

205

mp.messaging.outgoing.json-output.connector=smallrye-kafka

206

mp.messaging.outgoing.json-output.topic=json-output-topic

207

mp.messaging.outgoing.json-output.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

208

```

209

210

#### Avro Serialization with Schema Registry

211

212

```properties

213

# Avro with Apicurio Registry

214

mp.messaging.incoming.avro-topic.connector=smallrye-kafka

215

mp.messaging.incoming.avro-topic.topic=avro-topic

216

mp.messaging.incoming.avro-topic.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer

217

mp.messaging.incoming.avro-topic.apicurio.registry.url=http://schema-registry:8080/apis/registry/v2

218

mp.messaging.incoming.avro-topic.apicurio.registry.auto-register=true

219

```

220

221

## Custom Configuration Examples

222

223

### Custom Kafka Config Customizer

224

225

```java

226

import jakarta.enterprise.context.ApplicationScoped;

227

import io.smallrye.reactive.messaging.ClientCustomizer;

228

229

@ApplicationScoped

230

public class MyKafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {

231

232

@Override

233

public Map<String, Object> customize(String channel, Config channelConfig,

234

Map<String, Object> config) {

235

// Add custom configuration based on channel

236

if ("secure-channel".equals(channel)) {

237

config.put("ssl.endpoint.identification.algorithm", "");

238

config.put("ssl.truststore.type", "PKCS12");

239

}

240

241

// Add monitoring configuration

242

config.put("interceptor.classes", "com.example.MyKafkaInterceptor");

243

244

return config;

245

}

246

}

247

```

248

249

### Environment-Specific Configuration

250

251

```properties

252

# Development configuration

253

%dev.mp.messaging.incoming.events.bootstrap.servers=localhost:9092

254

%dev.quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true

255

256

# Production configuration

257

%prod.mp.messaging.incoming.events.bootstrap.servers=kafka-cluster:9092

258

%prod.mp.messaging.incoming.events.security.protocol=SASL_SSL

259

%prod.mp.messaging.incoming.events.sasl.mechanism=SCRAM-SHA-256

260

261

# Test configuration

262

%test.mp.messaging.incoming.events.bootstrap.servers=${kafka.bootstrap.servers:localhost:9092}

263

```

264

265

### Global Connector Configuration

266

267

```properties

268

# Apply to all channels using smallrye-kafka connector

269

mp.messaging.connector.smallrye-kafka.bootstrap.servers=kafka:9092

270

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL

271

mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-256

272

mp.messaging.connector.smallrye-kafka.retries=3

273

mp.messaging.connector.smallrye-kafka.retry.backoff.ms=1000

274

```

275

276

## Graceful Shutdown Configuration

277

278

### Extension-Level Shutdown Settings

279

280

```properties

281

# Enable graceful shutdown in dev/test modes

282

quarkus.messaging.kafka.enableGracefulShutdownInDevAndTestMode=true

283

284

# Global graceful shutdown timeout

285

quarkus.shutdown.timeout=30s

286

```

287

288

### Channel-Level Shutdown Settings

289

290

```properties

291

# Channel-specific graceful shutdown

292

mp.messaging.incoming.my-topic.graceful-shutdown=true

293

mp.messaging.incoming.my-topic.graceful-shutdown.timeout=15s

294

```

295

296

## Error Handling Configuration

297

298

### Dead Letter Queue

299

300

```properties

301

# Configure dead letter queue for failed messages

302

mp.messaging.incoming.messages.connector=smallrye-kafka

303

mp.messaging.incoming.messages.topic=main-topic

304

mp.messaging.incoming.messages.failure-strategy=dead-letter-queue

305

mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages

306

mp.messaging.incoming.messages.dead-letter-queue.key.serializer=org.apache.kafka.common.serialization.StringSerializer

307

mp.messaging.incoming.messages.dead-letter-queue.value.serializer=org.apache.kafka.common.serialization.StringSerializer

308

```

309

310

### Retry Configuration

311

312

```properties

313

# Configure retry behavior

314

mp.messaging.incoming.retry-topic.connector=smallrye-kafka

315

mp.messaging.incoming.retry-topic.topic=retry-topic

316

mp.messaging.incoming.retry-topic.failure-strategy=retry

317

mp.messaging.incoming.retry-topic.retry.max-retries=3

318

mp.messaging.incoming.retry-topic.retry.delay=5s

319

```

320

321

## Types

322

323

```java { .api }

324

// Configuration types

325

import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;

326

import io.quarkus.smallrye.reactivemessaging.kafka.KafkaConfigCustomizer;

327

328

// Client customization

329

import io.smallrye.reactive.messaging.ClientCustomizer;

330

import org.eclipse.microprofile.config.Config;

331

332

// Annotations

333

import io.quarkus.runtime.annotations.ConfigItem;

334

import io.quarkus.runtime.annotations.ConfigRoot;

335

import jakarta.enterprise.context.ApplicationScoped;

336

337

// Standard Java types

338

import java.util.Map;

339

```