0
# State Management and Exactly-Once Processing
1
2
The Quarkus Kafka extension provides exactly-once processing capabilities through checkpoint state management. This enables applications to maintain processing state across restarts and handle message processing failures gracefully.
3
4
## Checkpoint State Management
5
6
### CheckpointMetadata
7
8
Access and manipulate checkpoint state within message processors.
9
10
```java { .api }
11
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
12
13
@Incoming("channel-name")
14
public CompletionStage<Void> consume(Message<DataType> message) {
15
CheckpointMetadata<StateType> checkpoint = CheckpointMetadata.fromMessage(message);
16
17
// Transform state
18
checkpoint.transform(new StateType(), state -> {
19
// Modify state
20
return state;
21
});
22
23
return message.ack();
24
}
25
```
26
27
**Methods:**
28
- `fromMessage(Message<?> message)`: Extract checkpoint metadata from message
29
- `transform(S initialState, Function<S, S> stateTransformer)`: Transform checkpoint state
30
31
## State Store Implementations
32
33
### Hibernate ORM State Store
34
35
Database-backed state store using Hibernate ORM for transactional persistence.
36
37
```java { .api }
38
package io.quarkus.smallrye.reactivemessaging.kafka;
39
40
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
41
42
public class HibernateOrmStateStore implements CheckpointStateStore {
43
public static final String HIBERNATE_ORM_STATE_STORE = "quarkus-hibernate-orm";
44
45
public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
46
Collection<TopicPartition> partitions);
47
48
public Uni<Void> persistProcessingState(
49
Map<TopicPartition, ProcessingState<?>> state);
50
}
51
```
52
53
### Hibernate Reactive State Store
54
55
Reactive database-backed state store using Hibernate Reactive.
56
57
```java { .api }
58
package io.quarkus.smallrye.reactivemessaging.kafka;
59
60
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
61
62
public class HibernateReactiveStateStore implements CheckpointStateStore {
63
public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";
64
65
public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
66
Collection<TopicPartition> partitions);
67
68
public Uni<Void> persistProcessingState(
69
Map<TopicPartition, ProcessingState<?>> state);
70
}
71
```
72
73
### Redis State Store
74
75
Redis-backed state store for distributed state management.
76
77
```java { .api }
78
package io.quarkus.smallrye.reactivemessaging.kafka;
79
80
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
81
82
public class RedisStateStore implements CheckpointStateStore {
83
public static final String REDIS_STATE_STORE = "quarkus-redis";
84
85
public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
86
Collection<TopicPartition> partitions);
87
88
public Uni<Void> persistProcessingState(
89
Map<TopicPartition, ProcessingState<?>> state);
90
91
public void close();
92
}
93
```
94
95
## Database Entity Classes
96
97
### CheckpointEntity
98
99
Base entity class for database-backed checkpoint persistence.
100
101
```java { .api }
102
package io.quarkus.smallrye.reactivemessaging.kafka;
103
104
import jakarta.persistence.Embeddable;
105
import jakarta.persistence.MappedSuperclass;
106
import org.apache.kafka.common.TopicPartition;
107
108
@MappedSuperclass
109
public class CheckpointEntity {
110
@EmbeddedId
111
public CheckpointEntityId id;
112
113
public Long offset;
114
115
// Static factory method
116
public static <S extends CheckpointEntity> S from(ProcessingState<S> state, CheckpointEntityId entityId);
117
118
// Static utility method
119
public static TopicPartition topicPartition(CheckpointEntity entity);
120
121
// Standard getters and setters
122
public CheckpointEntityId getId();
123
public void setId(CheckpointEntityId id);
124
public Long getOffset();
125
public void setOffset(Long offset);
126
}
127
```
128
129
### CheckpointEntityId
130
131
Composite ID for checkpoint entities (consumer group + topic + partition).
132
133
```java { .api }
134
package io.quarkus.smallrye.reactivemessaging.kafka;
135
136
import jakarta.persistence.Embeddable;
137
import jakarta.persistence.Column;
138
import org.apache.kafka.common.TopicPartition;
139
import java.io.Serializable;
140
141
@Embeddable
142
public class CheckpointEntityId implements Serializable {
143
@Column(name = "consumer_group_id", insertable = false)
144
public String consumerGroupId;
145
public String topic;
146
public int partition;
147
148
// Constructors
149
public CheckpointEntityId();
150
public CheckpointEntityId(String consumerGroupId, TopicPartition topicPartition);
151
152
// Standard getters, setters, equals, hashCode, toString
153
public String getConsumerGroupId();
154
public void setConsumerGroupId(String consumerGroupId);
155
public String getTopic();
156
public void setTopic(String topic);
157
public int getPartition();
158
public void setPartition(int partition);
159
}
160
```
161
162
## Processing State Codec
163
164
### DatabindProcessingStateCodec
165
166
Jackson-based codec for serializing/deserializing processing state.
167
168
```java { .api }
169
package io.quarkus.smallrye.reactivemessaging.kafka;
170
171
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
172
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
173
174
public class DatabindProcessingStateCodec implements ProcessingStateCodec {
175
176
public ProcessingState<?> decode(byte[] bytes);
177
public byte[] encode(ProcessingState<?> object);
178
179
// Factory for creating codec instances
180
public static class Factory implements ProcessingStateCodec.Factory {
181
public ProcessingStateCodec create();
182
}
183
}
184
```
185
186
## Usage Examples
187
188
### Basic State Management with Hibernate ORM
189
190
First, create a checkpoint entity:
191
192
```java
193
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
194
import jakarta.persistence.Entity;
195
import jakarta.persistence.Table;
196
197
@Entity
198
@Table(name = "user_processing_checkpoints")
199
public class UserProcessingCheckpoint extends CheckpointEntity {
200
// Entity automatically inherits id and offset fields
201
}
202
```
203
204
Then use it in message processing:
205
206
```java
207
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
208
209
@ApplicationScoped
210
public class UserProcessor {
211
212
static class UserProcessingState {
213
public String processedNames;
214
public int totalCount;
215
}
216
217
@Incoming("users")
218
public CompletionStage<Void> processUser(Message<User> message) {
219
CheckpointMetadata<UserProcessingState> checkpoint =
220
CheckpointMetadata.fromMessage(message);
221
222
User user = message.getPayload();
223
224
checkpoint.transform(new UserProcessingState(), state -> {
225
if (state.processedNames == null) {
226
state.processedNames = user.getName();
227
} else {
228
state.processedNames += ";" + user.getName();
229
}
230
state.totalCount++;
231
return state;
232
});
233
234
return message.ack();
235
}
236
}
237
```
238
239
### Configuration for State Stores
240
241
#### Hibernate ORM State Store
242
243
```properties
244
# Enable Hibernate ORM state store
245
mp.messaging.incoming.users.connector=smallrye-kafka
246
mp.messaging.incoming.users.topic=user-topic
247
mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-orm
248
249
# Database configuration
250
quarkus.datasource.db-kind=postgresql
251
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/mydb
252
quarkus.hibernate-orm.database.generation=update
253
```
254
255
#### Hibernate Reactive State Store
256
257
```properties
258
# Enable Hibernate Reactive state store
259
mp.messaging.incoming.users.connector=smallrye-kafka
260
mp.messaging.incoming.users.topic=user-topic
261
mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-reactive
262
263
# Reactive database configuration
264
quarkus.datasource.reactive.url=postgresql://localhost:5432/mydb
265
quarkus.hibernate-orm.database.generation=update
266
```
267
268
#### Redis State Store
269
270
```properties
271
# Enable Redis state store
272
mp.messaging.incoming.users.connector=smallrye-kafka
273
mp.messaging.incoming.users.topic=user-topic
274
mp.messaging.incoming.users.checkpoint.state-store=quarkus-redis
275
276
# Redis configuration
277
quarkus.redis.hosts=redis://localhost:6379
278
```
279
280
### Custom State Codec
281
282
For custom state serialization:
283
284
```properties
285
# Configure custom codec
286
mp.messaging.incoming.users.checkpoint.state-codec-factory=com.example.MyStateCodecFactory
287
```
288
289
## Advanced Features
290
291
### State Recovery
292
293
State stores automatically recover processing state during application startup, ensuring exactly-once processing guarantees across restarts.
294
295
### Transactional Processing
296
297
Database-backed state stores (Hibernate ORM/Reactive) provide transactional guarantees, ensuring state consistency.
298
299
### Distributed State
300
301
Redis state store enables distributed state management across multiple application instances.
302
303
## Types
304
305
```java { .api }
306
// State management types
307
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
308
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
309
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
310
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
311
312
// Entity types
313
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
314
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;
315
316
// State store implementations
317
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore;
318
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore;
319
import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore;
320
321
// Codec implementation
322
import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec;
323
324
// Kafka types
325
import org.apache.kafka.common.TopicPartition;
326
327
// JPA annotations
328
import jakarta.persistence.Entity;
329
import jakarta.persistence.Table;
330
import jakarta.persistence.Embeddable;
331
import jakarta.persistence.EmbeddedId;
332
import jakarta.persistence.MappedSuperclass;
333
import jakarta.persistence.Column;
334
335
// Java types
336
import java.io.Serializable;
337
```