or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdmessage-processing.mdstate-management.md

state-management.mddocs/

0

# State Management and Exactly-Once Processing

1

2

The Quarkus Kafka extension provides exactly-once processing capabilities through checkpoint state management. This enables applications to maintain processing state across restarts and handle message processing failures gracefully.

3

4

## Checkpoint State Management

5

6

### CheckpointMetadata

7

8

Access and manipulate checkpoint state within message processors.

9

10

```java { .api }

11

import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

12

13

@Incoming("channel-name")

14

public CompletionStage<Void> consume(Message<DataType> message) {

15

CheckpointMetadata<StateType> checkpoint = CheckpointMetadata.fromMessage(message);

16

17

// Transform state

18

checkpoint.transform(new StateType(), state -> {

19

// Modify state

20

return state;

21

});

22

23

return message.ack();

24

}

25

```

26

27

**Methods:**

28

- `fromMessage(Message<?> message)`: Extract checkpoint metadata from message

29

- `transform(S initialState, Function<S, S> stateTransformer)`: Transform checkpoint state

30

31

## State Store Implementations

32

33

### Hibernate ORM State Store

34

35

Database-backed state store using Hibernate ORM for transactional persistence.

36

37

```java { .api }

38

package io.quarkus.smallrye.reactivemessaging.kafka;

39

40

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

41

42

public class HibernateOrmStateStore implements CheckpointStateStore {

43

public static final String HIBERNATE_ORM_STATE_STORE = "quarkus-hibernate-orm";

44

45

public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(

46

Collection<TopicPartition> partitions);

47

48

public Uni<Void> persistProcessingState(

49

Map<TopicPartition, ProcessingState<?>> state);

50

}

51

```

52

53

### Hibernate Reactive State Store

54

55

Reactive database-backed state store using Hibernate Reactive.

56

57

```java { .api }

58

package io.quarkus.smallrye.reactivemessaging.kafka;

59

60

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

61

62

public class HibernateReactiveStateStore implements CheckpointStateStore {

63

public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";

64

65

public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(

66

Collection<TopicPartition> partitions);

67

68

public Uni<Void> persistProcessingState(

69

Map<TopicPartition, ProcessingState<?>> state);

70

}

71

```

72

73

### Redis State Store

74

75

Redis-backed state store for distributed state management.

76

77

```java { .api }

78

package io.quarkus.smallrye.reactivemessaging.kafka;

79

80

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

81

82

public class RedisStateStore implements CheckpointStateStore {

83

public static final String REDIS_STATE_STORE = "quarkus-redis";

84

85

public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(

86

Collection<TopicPartition> partitions);

87

88

public Uni<Void> persistProcessingState(

89

Map<TopicPartition, ProcessingState<?>> state);

90

91

public void close();

92

}

93

```

94

95

## Database Entity Classes

96

97

### CheckpointEntity

98

99

Base entity class for database-backed checkpoint persistence.

100

101

```java { .api }

102

package io.quarkus.smallrye.reactivemessaging.kafka;

103

104

import jakarta.persistence.Embeddable;

105

import jakarta.persistence.MappedSuperclass;

106

import org.apache.kafka.common.TopicPartition;

107

108

@MappedSuperclass

109

public class CheckpointEntity {

110

@EmbeddedId

111

public CheckpointEntityId id;

112

113

public Long offset;

114

115

// Static factory method

116

public static <S extends CheckpointEntity> S from(ProcessingState<S> state, CheckpointEntityId entityId);

117

118

// Static utility method

119

public static TopicPartition topicPartition(CheckpointEntity entity);

120

121

// Standard getters and setters

122

public CheckpointEntityId getId();

123

public void setId(CheckpointEntityId id);

124

public Long getOffset();

125

public void setOffset(Long offset);

126

}

127

```

128

129

### CheckpointEntityId

130

131

Composite ID for checkpoint entities (consumer group + topic + partition).

132

133

```java { .api }

134

package io.quarkus.smallrye.reactivemessaging.kafka;

135

136

import jakarta.persistence.Embeddable;

137

import jakarta.persistence.Column;

138

import org.apache.kafka.common.TopicPartition;

139

import java.io.Serializable;

140

141

@Embeddable

142

public class CheckpointEntityId implements Serializable {

143

@Column(name = "consumer_group_id", insertable = false)

144

public String consumerGroupId;

145

public String topic;

146

public int partition;

147

148

// Constructors

149

public CheckpointEntityId();

150

public CheckpointEntityId(String consumerGroupId, TopicPartition topicPartition);

151

152

// Standard getters, setters, equals, hashCode, toString

153

public String getConsumerGroupId();

154

public void setConsumerGroupId(String consumerGroupId);

155

public String getTopic();

156

public void setTopic(String topic);

157

public int getPartition();

158

public void setPartition(int partition);

159

}

160

```

161

162

## Processing State Codec

163

164

### DatabindProcessingStateCodec

165

166

Jackson-based codec for serializing/deserializing processing state.

167

168

```java { .api }

169

package io.quarkus.smallrye.reactivemessaging.kafka;

170

171

import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;

172

import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;

173

174

public class DatabindProcessingStateCodec implements ProcessingStateCodec {

175

176

public ProcessingState<?> decode(byte[] bytes);

177

public byte[] encode(ProcessingState<?> object);

178

179

// Factory for creating codec instances

180

public static class Factory implements ProcessingStateCodec.Factory {

181

public ProcessingStateCodec create();

182

}

183

}

184

```

185

186

## Usage Examples

187

188

### Basic State Management with Hibernate ORM

189

190

First, create a checkpoint entity:

191

192

```java

193

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

194

import jakarta.persistence.Entity;

195

import jakarta.persistence.Table;

196

197

@Entity

198

@Table(name = "user_processing_checkpoints")

199

public class UserProcessingCheckpoint extends CheckpointEntity {

200

// Entity automatically inherits id and offset fields

201

}

202

```

203

204

Then use it in message processing:

205

206

```java

207

import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

208

209

@ApplicationScoped

210

public class UserProcessor {

211

212

static class UserProcessingState {

213

public String processedNames;

214

public int totalCount;

215

}

216

217

@Incoming("users")

218

public CompletionStage<Void> processUser(Message<User> message) {

219

CheckpointMetadata<UserProcessingState> checkpoint =

220

CheckpointMetadata.fromMessage(message);

221

222

User user = message.getPayload();

223

224

checkpoint.transform(new UserProcessingState(), state -> {

225

if (state.processedNames == null) {

226

state.processedNames = user.getName();

227

} else {

228

state.processedNames += ";" + user.getName();

229

}

230

state.totalCount++;

231

return state;

232

});

233

234

return message.ack();

235

}

236

}

237

```

238

239

### Configuration for State Stores

240

241

#### Hibernate ORM State Store

242

243

```properties

244

# Enable Hibernate ORM state store

245

mp.messaging.incoming.users.connector=smallrye-kafka

246

mp.messaging.incoming.users.topic=user-topic

247

mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-orm

248

249

# Database configuration

250

quarkus.datasource.db-kind=postgresql

251

quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/mydb

252

quarkus.hibernate-orm.database.generation=update

253

```

254

255

#### Hibernate Reactive State Store

256

257

```properties

258

# Enable Hibernate Reactive state store

259

mp.messaging.incoming.users.connector=smallrye-kafka

260

mp.messaging.incoming.users.topic=user-topic

261

mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-reactive

262

263

# Reactive database configuration

264

quarkus.datasource.reactive.url=postgresql://localhost:5432/mydb

265

quarkus.hibernate-orm.database.generation=update

266

```

267

268

#### Redis State Store

269

270

```properties

271

# Enable Redis state store

272

mp.messaging.incoming.users.connector=smallrye-kafka

273

mp.messaging.incoming.users.topic=user-topic

274

mp.messaging.incoming.users.checkpoint.state-store=quarkus-redis

275

276

# Redis configuration

277

quarkus.redis.hosts=redis://localhost:6379

278

```

279

280

### Custom State Codec

281

282

For custom state serialization:

283

284

```properties

285

# Configure custom codec

286

mp.messaging.incoming.users.checkpoint.state-codec-factory=com.example.MyStateCodecFactory

287

```

288

289

## Advanced Features

290

291

### State Recovery

292

293

State stores automatically recover processing state during application startup, ensuring exactly-once processing guarantees across restarts.

294

295

### Transactional Processing

296

297

Database-backed state stores (Hibernate ORM/Reactive) provide transactional guarantees, ensuring state consistency.

298

299

### Distributed State

300

301

Redis state store enables distributed state management across multiple application instances.

302

303

## Types

304

305

```java { .api }

306

// State management types

307

import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

308

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

309

import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;

310

import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;

311

312

// Entity types

313

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

314

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;

315

316

// State store implementations

317

import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore;

318

import io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore;

319

import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore;

320

321

// Codec implementation

322

import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec;

323

324

// Kafka types

325

import org.apache.kafka.common.TopicPartition;

326

327

// JPA annotations

328

import jakarta.persistence.Entity;

329

import jakarta.persistence.Table;

330

import jakarta.persistence.Embeddable;

331

import jakarta.persistence.EmbeddedId;

332

import jakarta.persistence.MappedSuperclass;

333

import jakarta.persistence.Column;

334

335

// Java types

336

import java.io.Serializable;

337

```