CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-quarkus--quarkus-messaging-kafka

Connect to Kafka with Reactive Messaging

Pending
Overview
Eval results
Files

state-management.mddocs/

State Management and Exactly-Once Processing

The Quarkus Kafka extension provides exactly-once processing capabilities through checkpoint state management. This enables applications to maintain processing state across restarts and handle message processing failures gracefully.

Checkpoint State Management

CheckpointMetadata

Access and manipulate checkpoint state within message processors.

import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@Incoming("channel-name")
public CompletionStage<Void> consume(Message<DataType> message) {
    CheckpointMetadata<StateType> checkpoint = CheckpointMetadata.fromMessage(message);
    
    // Transform state
    checkpoint.transform(new StateType(), state -> {
        // Modify state
        return state;
    });
    
    return message.ack();
}

Methods:

  • fromMessage(Message<?> message): Extract checkpoint metadata from message
  • transform(S initialState, Function<S, S> stateTransformer): Transform checkpoint state

State Store Implementations

Hibernate ORM State Store

Database-backed state store using Hibernate ORM for transactional persistence.

package io.quarkus.smallrye.reactivemessaging.kafka;

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

public class HibernateOrmStateStore implements CheckpointStateStore {
    public static final String HIBERNATE_ORM_STATE_STORE = "quarkus-hibernate-orm";
    
    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
        Collection<TopicPartition> partitions);
    
    public Uni<Void> persistProcessingState(
        Map<TopicPartition, ProcessingState<?>> state);
}

Hibernate Reactive State Store

Reactive database-backed state store using Hibernate Reactive.

package io.quarkus.smallrye.reactivemessaging.kafka;

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

public class HibernateReactiveStateStore implements CheckpointStateStore {
    public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";
    
    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
        Collection<TopicPartition> partitions);
    
    public Uni<Void> persistProcessingState(
        Map<TopicPartition, ProcessingState<?>> state);
}

Redis State Store

Redis-backed state store for distributed state management.

package io.quarkus.smallrye.reactivemessaging.kafka;

import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;

public class RedisStateStore implements CheckpointStateStore {
    public static final String REDIS_STATE_STORE = "quarkus-redis";
    
    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(
        Collection<TopicPartition> partitions);
    
    public Uni<Void> persistProcessingState(
        Map<TopicPartition, ProcessingState<?>> state);
    
    public void close();
}

Database Entity Classes

CheckpointEntity

Base entity class for database-backed checkpoint persistence.

package io.quarkus.smallrye.reactivemessaging.kafka;

import jakarta.persistence.Embeddable;
import jakarta.persistence.MappedSuperclass;
import org.apache.kafka.common.TopicPartition;

@MappedSuperclass
public class CheckpointEntity {
    @EmbeddedId
    public CheckpointEntityId id;
    
    public Long offset;
    
    // Static factory method
    public static <S extends CheckpointEntity> S from(ProcessingState<S> state, CheckpointEntityId entityId);
    
    // Static utility method
    public static TopicPartition topicPartition(CheckpointEntity entity);
    
    // Standard getters and setters
    public CheckpointEntityId getId();
    public void setId(CheckpointEntityId id);
    public Long getOffset();
    public void setOffset(Long offset);
}

CheckpointEntityId

Composite ID for checkpoint entities (consumer group + topic + partition).

package io.quarkus.smallrye.reactivemessaging.kafka;

import jakarta.persistence.Embeddable;
import jakarta.persistence.Column;
import org.apache.kafka.common.TopicPartition;
import java.io.Serializable;

@Embeddable
public class CheckpointEntityId implements Serializable {
    @Column(name = "consumer_group_id", insertable = false)
    public String consumerGroupId;
    public String topic;
    public int partition;
    
    // Constructors
    public CheckpointEntityId();
    public CheckpointEntityId(String consumerGroupId, TopicPartition topicPartition);
    
    // Standard getters, setters, equals, hashCode, toString
    public String getConsumerGroupId();
    public void setConsumerGroupId(String consumerGroupId);
    public String getTopic();
    public void setTopic(String topic);
    public int getPartition();
    public void setPartition(int partition);
}

Processing State Codec

DatabindProcessingStateCodec

Jackson-based codec for serializing/deserializing processing state.

package io.quarkus.smallrye.reactivemessaging.kafka;

import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;

public class DatabindProcessingStateCodec implements ProcessingStateCodec {
    
    public ProcessingState<?> decode(byte[] bytes);
    public byte[] encode(ProcessingState<?> object);
    
    // Factory for creating codec instances
    public static class Factory implements ProcessingStateCodec.Factory {
        public ProcessingStateCodec create();
    }
}

Usage Examples

Basic State Management with Hibernate ORM

First, create a checkpoint entity:

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;

@Entity
@Table(name = "user_processing_checkpoints")
public class UserProcessingCheckpoint extends CheckpointEntity {
    // Entity automatically inherits id and offset fields
}

Then use it in message processing:

import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class UserProcessor {
    
    static class UserProcessingState {
        public String processedNames;
        public int totalCount;
    }
    
    @Incoming("users")
    public CompletionStage<Void> processUser(Message<User> message) {
        CheckpointMetadata<UserProcessingState> checkpoint = 
            CheckpointMetadata.fromMessage(message);
        
        User user = message.getPayload();
        
        checkpoint.transform(new UserProcessingState(), state -> {
            if (state.processedNames == null) {
                state.processedNames = user.getName(); 
            } else {
                state.processedNames += ";" + user.getName();
            }
            state.totalCount++;
            return state;
        });
        
        return message.ack();
    }
}

Configuration for State Stores

Hibernate ORM State Store

# Enable Hibernate ORM state store
mp.messaging.incoming.users.connector=smallrye-kafka
mp.messaging.incoming.users.topic=user-topic
mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-orm

# Database configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/mydb
quarkus.hibernate-orm.database.generation=update

Hibernate Reactive State Store

# Enable Hibernate Reactive state store  
mp.messaging.incoming.users.connector=smallrye-kafka
mp.messaging.incoming.users.topic=user-topic
mp.messaging.incoming.users.checkpoint.state-store=quarkus-hibernate-reactive

# Reactive database configuration
quarkus.datasource.reactive.url=postgresql://localhost:5432/mydb
quarkus.hibernate-orm.database.generation=update

Redis State Store

# Enable Redis state store
mp.messaging.incoming.users.connector=smallrye-kafka  
mp.messaging.incoming.users.topic=user-topic
mp.messaging.incoming.users.checkpoint.state-store=quarkus-redis

# Redis configuration
quarkus.redis.hosts=redis://localhost:6379

Custom State Codec

For custom state serialization:

# Configure custom codec
mp.messaging.incoming.users.checkpoint.state-codec-factory=com.example.MyStateCodecFactory

Advanced Features

State Recovery

State stores automatically recover processing state during application startup, ensuring exactly-once processing guarantees across restarts.

Transactional Processing

Database-backed state stores (Hibernate ORM/Reactive) provide transactional guarantees, ensuring state consistency.

Distributed State

Redis state store enables distributed state management across multiple application instances.

Types

// State management types
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;

// Entity types
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;

// State store implementations
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore;
import io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore;  
import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore;

// Codec implementation
import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec;

// Kafka types
import org.apache.kafka.common.TopicPartition;

// JPA annotations
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
import jakarta.persistence.Embeddable;
import jakarta.persistence.EmbeddedId;
import jakarta.persistence.MappedSuperclass;
import jakarta.persistence.Column;

// Java types
import java.io.Serializable;

Install with Tessl CLI

npx tessl i tessl/maven-io-quarkus--quarkus-messaging-kafka

docs

configuration.md

index.md

message-processing.md

state-management.md

tile.json