or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdmessage-processing.mdstate-management.md

message-processing.mddocs/

0

# Message Processing

1

2

The Quarkus Kafka extension uses MicroProfile Reactive Messaging annotations to provide declarative message processing. Messages flow through channels that connect to Kafka topics.

3

4

## Core Annotations

5

6

### @Incoming

7

8

Marks a method as a message consumer from a specific channel.

9

10

```java { .api }

11

@Incoming("channel-name")

12

public void consume(String message);

13

14

@Incoming("channel-name")

15

public CompletionStage<Void> consume(Message<String> message);

16

17

@Incoming("channel-name")

18

public Uni<Void> consume(String message);

19

```

20

21

**Parameters:**

22

- `value`: Channel name (connects to Kafka topic via configuration)

23

24

### @Outgoing

25

26

Marks a method as a message producer to a specific channel.

27

28

```java { .api }

29

@Outgoing("channel-name")

30

public String produce();

31

32

@Outgoing("channel-name")

33

public Multi<String> produce();

34

35

@Outgoing("channel-name")

36

public Message<String> produce();

37

```

38

39

**Parameters:**

40

- `value`: Channel name (connects to Kafka topic via configuration)

41

42

### Combined Processing

43

44

Process messages from one channel and send to another.

45

46

```java { .api }

47

@Incoming("input-channel")

48

@Outgoing("output-channel")

49

public String transform(String input);

50

51

@Incoming("input-channel")

52

@Outgoing("output-channel")

53

public Message<String> transform(Message<String> input);

54

```

55

56

## Working with Records

57

58

### Kafka Records

59

60

Access Kafka-specific metadata using `Record<K, V>`.

61

62

```java { .api }

63

import io.smallrye.reactive.messaging.kafka.Record;

64

65

@Incoming("keyed-messages")

66

public void consume(Record<String, Person> record) {

67

String key = record.key();

68

Person value = record.value();

69

}

70

```

71

72

### Message Metadata

73

74

Access Kafka metadata from Message objects.

75

76

```java { .api }

77

import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;

78

79

@Incoming("data-with-metadata-in")

80

public void consume(String data, IncomingKafkaRecordMetadata<String, String> metadata) {

81

String topic = metadata.getTopic();

82

int partition = metadata.getPartition();

83

long offset = metadata.getOffset();

84

String key = metadata.getKey();

85

}

86

```

87

88

## Message Processing Patterns

89

90

### Simple Consumer

91

92

Basic message consumption with automatic acknowledgment.

93

94

```java

95

@ApplicationScoped

96

public class SimpleConsumer {

97

98

@Incoming("notifications")

99

public void process(String notification) {

100

System.out.println("Received: " + notification);

101

// Message automatically acknowledged

102

}

103

}

104

```

105

106

### Message Transformation

107

108

Transform messages between topics.

109

110

```java

111

@ApplicationScoped

112

public class MessageTransformer {

113

114

@Incoming("raw-data")

115

@Outgoing("processed-data")

116

public String transform(String rawData) {

117

return rawData.toUpperCase().trim();

118

}

119

}

120

```

121

122

### Async Processing with CompletionStage

123

124

Handle messages asynchronously with manual acknowledgment control.

125

126

```java

127

@ApplicationScoped

128

public class AsyncProcessor {

129

130

@Incoming("async-messages")

131

public CompletionStage<Void> processAsync(Message<String> message) {

132

return CompletableFuture

133

.supplyAsync(() -> {

134

// Async processing logic

135

processData(message.getPayload());

136

return null;

137

})

138

.thenCompose(v -> message.ack());

139

}

140

}

141

```

142

143

### Reactive Streams with Mutiny

144

145

Use Mutiny reactive types for advanced stream processing.

146

147

```java

148

@ApplicationScoped

149

public class ReactiveProcessor {

150

151

@Incoming("stream-data")

152

public Uni<Void> processReactive(String data) {

153

return Uni.createFrom().item(data)

154

.map(String::toUpperCase)

155

.onItem().invoke(processed -> saveToDatabase(processed))

156

.replaceWithVoid();

157

}

158

}

159

```

160

161

### Keyed Message Processing

162

163

Process keyed messages with KeyedMulti for partitioned processing.

164

165

```java { .api }

166

import io.smallrye.reactive.messaging.keyed.KeyedMulti;

167

168

@Incoming("keyed-input")

169

@Outgoing("keyed-output")

170

public Multi<String> processKeyed(KeyedMulti<String, String> keyedData) {

171

return keyedData.map(value -> keyedData.key() + ":" + value);

172

}

173

```

174

175

## Configuration Examples

176

177

### Basic Channel Configuration

178

179

```properties

180

# Consumer configuration

181

mp.messaging.incoming.notifications.connector=smallrye-kafka

182

mp.messaging.incoming.notifications.topic=notification-topic

183

mp.messaging.incoming.notifications.bootstrap.servers=localhost:9092

184

mp.messaging.incoming.notifications.group.id=notification-consumer

185

186

# Producer configuration

187

mp.messaging.outgoing.processed-data.connector=smallrye-kafka

188

mp.messaging.outgoing.processed-data.topic=processed-topic

189

mp.messaging.outgoing.processed-data.bootstrap.servers=localhost:9092

190

```

191

192

### Advanced Configuration

193

194

```properties

195

# Consumer with custom deserializer

196

mp.messaging.incoming.complex-data.connector=smallrye-kafka

197

mp.messaging.incoming.complex-data.topic=complex-topic

198

mp.messaging.incoming.complex-data.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperDeserializer

199

mp.messaging.incoming.complex-data.apicurio.registry.url=http://localhost:8080/apis/registry/v2

200

201

# Producer with custom serializer

202

mp.messaging.outgoing.events.connector=smallrye-kafka

203

mp.messaging.outgoing.events.topic=event-topic

204

mp.messaging.outgoing.events.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

205

```

206

207

## Error Handling

208

209

### Exception Handling in Consumers

210

211

```java

212

@ApplicationScoped

213

public class ErrorHandlingConsumer {

214

215

@Incoming("messages")

216

public CompletionStage<Void> consume(Message<String> message) {

217

try {

218

processMessage(message.getPayload());

219

return message.ack();

220

} catch (Exception e) {

221

logger.error("Failed to process message", e);

222

return message.nack(e);

223

}

224

}

225

}

226

```

227

228

### Dead Letter Queue

229

230

Configure dead letter topics for failed messages:

231

232

```properties

233

mp.messaging.incoming.messages.connector=smallrye-kafka

234

mp.messaging.incoming.messages.topic=main-topic

235

mp.messaging.incoming.messages.failure-strategy=dead-letter-queue

236

mp.messaging.incoming.messages.dead-letter-queue.topic=failed-messages

237

```

238

239

## Types

240

241

```java { .api }

242

// Core MicroProfile Reactive Messaging types

243

import org.eclipse.microprofile.reactive.messaging.Message;

244

import org.eclipse.microprofile.reactive.messaging.Incoming;

245

import org.eclipse.microprofile.reactive.messaging.Outgoing;

246

247

// Kafka-specific types

248

import io.smallrye.reactive.messaging.kafka.Record;

249

import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;

250

import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;

251

252

// Keyed processing types

253

import io.smallrye.reactive.messaging.keyed.KeyedMulti;

254

import io.smallrye.reactive.messaging.keyed.Keyed;

255

256

// Mutiny reactive types

257

import io.smallrye.mutiny.Uni;

258

import io.smallrye.mutiny.Multi;

259

```