Connect to Kafka with Reactive Messaging
—
The Quarkus Kafka extension provides exactly-once processing capabilities through checkpoint state management. This enables applications to maintain processing state across restarts and handle message processing failures gracefully.
Access and manipulate checkpoint state within message processors.
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
@Incoming("channel-name")
public CompletionStage<Void> consume(Message<DataType> message) {
CheckpointMetadata<StateType> checkpoint = CheckpointMetadata.fromMessage(message);
// Transform state
checkpoint.transform(new StateType(), state -> {
// Modify state
return state;
});
return message.ack();
}Methods:
fromMessage(Message<?> message): Extract checkpoint metadata from messagetransform(S initialState, Function<S, S> stateTransformer): Transform checkpoint stateDatabase-backed state store using Hibernate ORM for transactional persistence.
package io.quarkus.smallrye.reactivemessaging.kafka;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
public class HibernateOrmStateStore implements CheckpointStateStore {
public static final String HIBERNATE_ORM_STATE_STORE = "quarkus-hibernate-orm";
public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
Collection<TopicPartition> partitions);
public Uni<Void> persistProcessingState(
Map<TopicPartition, ProcessingState<?>> state);
}Reactive database-backed state store using Hibernate Reactive.
package io.quarkus.smallrye.reactivemessaging.kafka;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
public class HibernateReactiveStateStore implements CheckpointStateStore {
public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";
public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
Collection<TopicPartition> partitions);
public Uni<Void> persistProcessingState(
Map<TopicPartition, ProcessingState<?>> state);
}Redis-backed state store for distributed state management.
package io.quarkus.smallrye.reactivemessaging.kafka;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
public class RedisStateStore implements CheckpointStateStore {
public static final String REDIS_STATE_STORE = "quarkus-redis";
public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
Collection<TopicPartition> partitions);
public Uni<Void> persistProcessingState(
Map<TopicPartition, ProcessingState<?>> state);
public void close();
}Base entity class for database-backed checkpoint persistence.
package io.quarkus.smallrye.reactivemessaging.kafka;
import jakarta.persistence.Embeddable;
import jakarta.persistence.MappedSuperclass;
import org.apache.kafka.common.TopicPartition;
@MappedSuperclass
public class CheckpointEntity {
@EmbeddedId
public CheckpointEntityId id;
public Long offset;
// Static factory method
public static <S extends CheckpointEntity> S from(ProcessingState<S> state, CheckpointEntityId entityId);
// Static utility method
public static TopicPartition topicPartition(CheckpointEntity entity);
// Standard getters and setters
public CheckpointEntityId getId();
public void setId(CheckpointEntityId id);
public Long getOffset();
public void setOffset(Long offset);
}Composite ID for checkpoint entities (consumer group + topic + partition).
package io.quarkus.smallrye.reactivemessaging.kafka;
import jakarta.persistence.Embeddable;
import jakarta.persistence.Column;
import org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
@Embeddable
public class CheckpointEntityId implements Serializable {
@Column(name = "consumer_group_id", insertable = false)
public String consumerGroupId;
public String topic;
public int partition;
// Constructors
public CheckpointEntityId();
public CheckpointEntityId(String consumerGroupId, TopicPartition topicPartition);
// Standard getters, setters, equals, hashCode, toString
public String getConsumerGroupId();
public void setConsumerGroupId(String consumerGroupId);
public String getTopic();
public void setTopic(String topic);
public int getPartition();
public void setPartition(int partition);
}Jackson-based codec for serializing/deserializing processing state.
package io.quarkus.smallrye.reactivemessaging.kafka;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
public class DatabindProcessingStateCodec implements ProcessingStateCodec {
public ProcessingState<?> decode(byte[] bytes);
public byte[] encode(ProcessingState<?> object);
// Factory for creating codec instances
public static class Factory implements ProcessingStateCodec.Factory {
public ProcessingStateCodec create();
}
}First, create a checkpoint entity:
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
@Entity
@Table(name = "user_processing_checkpoints")
public class UserProcessingCheckpoint extends CheckpointEntity {
// Entity automatically inherits id and offset fields
}Then use it in message processing:
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
@ApplicationScoped
public class UserProcessor {
static class UserProcessingState {
public String processedNames;
public int totalCount;
}
@Incoming("users")
public CompletionStage<Void> processUser(Message<User> message) {
CheckpointMetadata<UserProcessingState> checkpoint =
CheckpointMetadata.fromMessage(message);
User user = message.getPayload();
checkpoint.transform(new UserProcessingState(), state -> {
if (state.processedNames == null) {
state.processedNames = user.getName();
} else {
state.processedNames += ";" + user.getName();
}
state.totalCount++;
return state;
});
return message.ack();
}
}# Enable Hibernate ORM state store
mp.messaging.incoming.users.connector=smallrye-kafka
mp.messaging.incoming.users.topic=user-topic
mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-orm
# Database configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/mydb
quarkus.hibernate-orm.database.generation=update# Enable Hibernate Reactive state store
mp.messaging.incoming.users.connector=smallrye-kafka
mp.messaging.incoming.users.topic=user-topic
mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-reactive
# Reactive database configuration
quarkus.datasource.reactive.url=postgresql://localhost:5432/mydb
quarkus.hibernate-orm.database.generation=update# Enable Redis state store
mp.messaging.incoming.users.connector=smallrye-kafka
mp.messaging.incoming.users.topic=user-topic
mp.messaging.incoming.users.checkpoint.state-store=quarkus-redis
# Redis configuration
quarkus.redis.hosts=redis://localhost:6379For custom state serialization:
# Configure custom codec
mp.messaging.incoming.users.checkpoint.state-codec-factory=com.example.MyStateCodecFactoryState stores automatically recover processing state during application startup, ensuring exactly-once processing guarantees across restarts.
Database-backed state stores (Hibernate ORM/Reactive) provide transactional guarantees, ensuring state consistency.
Redis state store enables distributed state management across multiple application instances.
// State management types
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
// Entity types
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;
// State store implementations
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore;
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore;
import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore;
// Codec implementation
import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec;
// Kafka types
import org.apache.kafka.common.TopicPartition;
// JPA annotations
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
import jakarta.persistence.Embeddable;
import jakarta.persistence.EmbeddedId;
import jakarta.persistence.MappedSuperclass;
import jakarta.persistence.Column;
// Java types
import java.io.Serializable;Install with Tessl CLI
npx tessl i tessl/maven-io-quarkus--quarkus-messaging-kafka