or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-io-quarkus--quarkus-messaging-kafka

Connect to Kafka with Reactive Messaging

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/io.quarkus/quarkus-messaging-kafka@3.15.x

To install, run

npx @tessl/cli install tessl/maven-io-quarkus--quarkus-messaging-kafka@3.15.0

0

# Quarkus SmallRye Reactive Messaging Kafka Extension

1

2

The Quarkus SmallRye Reactive Messaging Kafka Extension provides seamless integration between Apache Kafka and MicroProfile Reactive Messaging within the Quarkus framework. It enables developers to build reactive, event-driven applications using declarative annotations and reactive streams.

3

4

## Package Information

5

6

- **Package Name**: quarkus-messaging-kafka

7

- **Package Type**: Maven (Quarkus Extension)

8

- **Language**: Java

9

- **Installation**: Add dependency to your `pom.xml`

10

11

```xml

12

<dependency>

13

<groupId>io.quarkus</groupId>

14

<artifactId>quarkus-messaging-kafka</artifactId>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java { .api }

21

import org.eclipse.microprofile.reactive.messaging.Incoming;

22

import org.eclipse.microprofile.reactive.messaging.Outgoing;

23

import org.eclipse.microprofile.reactive.messaging.Message;

24

import io.smallrye.reactive.messaging.kafka.Record;

25

import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

26

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

27

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;

28

```

29

30

## Basic Usage

31

32

### Simple Message Processing

33

34

```java

35

import org.eclipse.microprofile.reactive.messaging.Incoming;

36

import org.eclipse.microprofile.reactive.messaging.Outgoing;

37

import jakarta.enterprise.context.ApplicationScoped;

38

39

@ApplicationScoped

40

public class KafkaProcessor {

41

42

@Incoming("incoming-topic")

43

@Outgoing("outgoing-topic")

44

public String process(String input) {

45

return input.toUpperCase();

46

}

47

48

@Incoming("messages")

49

public void consume(String message) {

50

System.out.println("Received: " + message);

51

}

52

}

53

```

54

55

### Configuration

56

57

Configure channels in `application.properties`:

58

59

```properties

60

# Incoming channel

61

mp.messaging.incoming.incoming-topic.connector=smallrye-kafka

62

mp.messaging.incoming.incoming-topic.topic=input-topic

63

mp.messaging.incoming.incoming-topic.bootstrap.servers=localhost:9092

64

65

# Outgoing channel

66

mp.messaging.outgoing.outgoing-topic.connector=smallrye-kafka

67

mp.messaging.outgoing.outgoing-topic.topic=output-topic

68

mp.messaging.outgoing.outgoing-topic.bootstrap.servers=localhost:9092

69

```

70

71

## Architecture

72

73

The extension provides several key components:

74

75

- **Message Processing**: Declarative message processing with `@Incoming` and `@Outgoing` annotations

76

- **State Management**: Exactly-once processing with checkpoint state stores

77

- **Configuration**: Flexible Kafka client configuration and customization

78

- **TLS Integration**: Automatic TLS configuration with Quarkus security

79

80

## Capabilities

81

82

### Message Processing with Reactive Messaging

83

84

Core message processing using MicroProfile Reactive Messaging annotations.

85

86

```java { .api }

87

@Incoming("channel-name")

88

public void consume(String message);

89

90

@Incoming("input-channel")

91

@Outgoing("output-channel")

92

public String process(String input);

93

94

@Incoming("keyed-channel")

95

public void consume(Record<String, Person> record);

96

```

97

98

[Message Processing](./message-processing.md)

99

100

### Exactly-Once Processing and State Management

101

102

Advanced exactly-once processing capabilities with checkpoint state management.

103

104

```java { .api }

105

@Incoming("people-in")

106

public CompletionStage<Void> consume(Message<Person> msg) {

107

CheckpointMetadata<UserState> store = CheckpointMetadata.fromMessage(msg);

108

// State management logic

109

return msg.ack();

110

}

111

```

112

113

[State Management](./state-management.md)

114

115

### Configuration and Customization

116

117

Configuration classes and customization options for Kafka client behavior.

118

119

```java { .api }

120

@ApplicationScoped

121

public class KafkaConfigCustomizer implements ClientCustomizer<Map<String, Object>> {

122

public Map<String, Object> customize(String channel, Config channelConfig, Map<String, Object> config);

123

}

124

```

125

126

[Configuration](./configuration.md)

127

128

## Configuration Properties

129

130

The extension supports configuration under these prefixes:

131

- `mp.messaging.*` - MicroProfile Reactive Messaging configuration

132

- `quarkus.messaging.*` - Quarkus-specific messaging configuration

133

- `quarkus.kafka.*` - Kafka-specific configuration

134

- `quarkus.messaging.kafka.*` - Direct extension configuration

135

136

## Key Features

137

138

- **Reactive Streams**: Full reactive programming model with backpressure support

139

- **Exactly-Once Semantics**: Checkpoint-based exactly-once processing

140

- **Multiple State Stores**: Hibernate ORM, Hibernate Reactive, and Redis backends

141

- **TLS Security**: Automatic TLS configuration integration

142

- **Custom Serialization**: Support for custom serializers and deserializers

143

- **Cloud Native**: Optimized for containers and serverless deployments