Build LLM-powered applications in Java with support for chatbots, agents, RAG, tools, and much more
Chat memory implementations for maintaining conversation context. Supports message window and token window strategies with optional persistence through ChatMemoryStore.
Core interface for managing conversation history. All chat memory implementations extend this interface.
package dev.langchain4j.memory;
import dev.langchain4j.data.message.ChatMessage;
import java.util.List;
/**
* Interface for managing conversation history
*/
public interface ChatMemory {
/**
* Get memory ID
* @return Memory identifier
*/
Object id();
/**
* Add message to memory
* @param message Chat message to add
*/
void add(ChatMessage message);
/**
* Add multiple messages to memory
* @param messages Messages to add
*/
void add(Iterable<ChatMessage> messages);
/**
* Get all messages in memory
* @return List of chat messages
*/
List<ChatMessage> messages();
/**
* Clear all messages
*/
void clear();
/**
* Replace all messages (added in 1.11.0)
* @param messages New messages
*/
void set(List<ChatMessage> messages);
}NOT thread-safe. Individual ChatMemory instances do not provide internal synchronization. Concurrent calls to add(), messages(), clear(), or set() from multiple threads can result in:
ConcurrentModificationException if iterating messages while modifyingSolution: Use ChatMemoryProvider to ensure each thread/user gets isolated memory instances, or implement external synchronization if sharing a single instance across threads.
Shared Memory Across Users: Using a single ChatMemory instance for multiple users causes conversation leakage. Always use ChatMemoryProvider with @MemoryId for multi-user scenarios.
Missing Memory ID: When using ChatMemoryProvider, forgetting to annotate the user/session parameter with @MemoryId will cause all users to share the same default memory instance.
Memory Leaks with Provider: ChatMemoryProvider lambda implementations that create new instances without bounds can cause memory leaks. Implement proper cleanup or use weak references for inactive conversations.
Assuming Persistence: By default, memory is in-memory only. Without a ChatMemoryStore, all conversation history is lost on application restart.
messages() on newly created memory returns an empty list (not null). Safe to iterate without null checks.add(ChatMessage) or add(Iterable) behavior is implementation-dependent. Some implementations may throw NullPointerException.set(null) typically throws NullPointerException. Calling set(Collections.emptyList()) clears all messages.memoryId passed to ChatMemoryProvider.get() relies on proper equals() and hashCode() implementations. Using mutable objects or objects without proper equality can cause memory isolation failures.ArrayList. Access is O(1) for indexed operations, but eviction in windowed memories requires shifting elements.clear() followed by add() may be less efficient than set() for bulk replacements, depending on implementation.set() in implementations prior to version 1.11.0.ChatMemoryStore, persistence operations can throw database-specific exceptions (e.g., SQLException, connection timeouts). These are typically unchecked and propagated to the caller.Interface for persisting chat memory. Implementations can store to databases, files, etc.
package dev.langchain4j.store.memory.chat;
import dev.langchain4j.data.message.ChatMessage;
import java.util.List;
/**
* Interface for persisting chat memory
*/
public interface ChatMemoryStore {
/**
* Get messages for given memory ID
* @param memoryId Memory identifier
* @return List of messages for this ID
*/
List<ChatMessage> getMessages(Object memoryId);
/**
* Update messages for given memory ID
* @param memoryId Memory identifier
* @param messages New message list
*/
void updateMessages(Object memoryId, List<ChatMessage> messages);
/**
* Delete messages for given memory ID
* @param memoryId Memory identifier
*/
void deleteMessages(Object memoryId);
}Implementation-dependent. The interface does not mandate thread safety. Most database-backed implementations are thread-safe due to underlying database connection pooling and transaction handling. However:
Best Practice: Assume stores are NOT thread-safe unless explicitly documented. Use connection pooling or external synchronization for concurrent access.
Blocking Operations: Store operations (especially getMessages and updateMessages) can block on I/O. Calling these on the main thread in UI applications causes freezing.
No Transactions: The interface doesn't define transactional semantics. Multiple concurrent updateMessages() calls can result in lost updates if the implementation doesn't handle concurrency properly.
Unbounded Storage: Implementations that don't implement cleanup logic accumulate messages indefinitely, causing database bloat and query performance degradation.
Serialization Issues: Custom message types or content fields may not serialize correctly. Always test with your specific message types.
getMessages() with non-existent memoryId should return empty list (not null), but behavior varies by implementation.NullPointerException when memoryId is null, but this is not guaranteed by the interface.updateMessages(id, emptyList) may either clear messages or be treated as a no-op, depending on implementation.deleteMessages() on non-existent memoryId is typically a no-op but may throw exceptions in some implementations.Provider interface for obtaining ChatMemory instances for different users or conversations.
package dev.langchain4j.memory.chat;
/**
* Provider interface for obtaining ChatMemory instances
*/
public interface ChatMemoryProvider {
/**
* Get ChatMemory for given memory ID (user/conversation)
* @param memoryId Identifier for the memory (can be any type with proper equals/hashCode)
* @return ChatMemory instance for the given ID
*/
ChatMemory get(Object memoryId);
}Thread-safe by design. The provider pattern enables thread safety by ensuring each memoryId maps to an isolated ChatMemory instance. However:
get() calls safelyConcurrentHashMap.computeIfAbsent() are thread-safeRecommended Pattern:
Map<Object, ChatMemory> memoryMap = new ConcurrentHashMap<>();
ChatMemoryProvider provider = memoryId ->
memoryMap.computeIfAbsent(memoryId, id ->
MessageWindowChatMemory.builder()
.id(id)
.maxMessages(10)
.build()
);Memory Leaks: Provider implementations that cache memory instances indefinitely cause memory leaks for inactive users. Implement eviction policies (LRU, time-based) or use weak references.
Creating New Instance Every Time: Lambda that returns new MessageWindowChatMemory(...) on every get() call creates a new memory for each message, losing conversation context.
Shared State in Lambda: Capturing mutable state in the provider lambda can cause thread-safety issues:
// WRONG: captures mutable map without synchronization
Map<Object, ChatMemory> map = new HashMap<>();
provider = id -> map.computeIfAbsent(id, ...);Missing @MemoryId Annotation: Forgetting @MemoryId on the AI Service method parameter causes all users to receive the same default memoryId (typically "default" or null).
NullPointerException.equals() and hashCode(). Using mutable objects or objects without proper equality breaks memory isolation.get() with the same new memoryId simultaneously, ensure only one memory instance is created (use computeIfAbsent).get() returns null, the AI service will typically throw NullPointerException when trying to use the memory.get() call is extremely inefficient.ChatMemoryStore backends, first access for each memoryId incurs database lookup latency.get() may throw database exceptions during memory initialization.ChatMemory implementation that retains a fixed number of most recent messages.
package dev.langchain4j.memory.chat;
/**
* ChatMemory implementation that retains a fixed number of most recent messages
* When the limit is reached, oldest messages are evicted
*/
public class MessageWindowChatMemory implements ChatMemory {
/**
* Create with max message limit
* Uses default in-memory storage
* @param maxMessages Maximum number of messages to retain
* @return MessageWindowChatMemory instance
*/
public static MessageWindowChatMemory withMaxMessages(int maxMessages);
/**
* Create builder for full configuration
* @return Builder instance
*/
public static Builder builder();
}NOT thread-safe. Concurrent access to the same instance can cause:
ConcurrentModificationException when iterating during modificationSolution: Use one instance per thread/user via ChatMemoryProvider, or wrap with external synchronization (e.g., Collections.synchronizedList() for internal storage).
Window Size Too Small: Setting maxMessages=1 or 2 loses context rapidly. Most conversations need at least 10-20 messages for coherent context.
Window Size Too Large: Very large windows (e.g., 1000+ messages) cause:
System Messages Counted: System messages, user messages, and assistant messages ALL count toward the limit. A 10-message window might only hold 5 actual conversation turns.
Forgetting Persistence: Without chatMemoryStore(), messages are lost on restart. Don't assume memory persists unless explicitly configured.
IllegalArgumentException in most implementations.add(List) with more messages than maxMessages retains only the most recent maxMessages from the batch.ChatMemoryStore, each add() triggers a store update. Consider batching or async writes for high-throughput scenarios.withMaxMessages() if maxMessages < 0 (zero may be allowed but creates useless memory).add(), clear(), and set() operations./**
* Builder for MessageWindowChatMemory
*/
public class Builder {
/**
* Set memory ID
* @param id Memory identifier
* @return Builder instance
*/
public Builder id(Object id);
/**
* Set max messages to retain
* @param maxMessages Maximum number of messages
* @return Builder instance
*/
public Builder maxMessages(int maxMessages);
/**
* Set chat memory store for persistence
* @param store Chat memory store implementation
* @return Builder instance
*/
public Builder chatMemoryStore(ChatMemoryStore store);
/**
* Build the MessageWindowChatMemory
* @return MessageWindowChatMemory instance
*/
public MessageWindowChatMemory build();
}NOT thread-safe. Builder instances should not be shared across threads. Each thread should create its own builder or use external synchronization.
Reusing Builder: Calling build() multiple times on the same builder may return separate instances sharing the same underlying store reference (implementation-dependent).
Forgetting maxMessages: If maxMessages() is not called, behavior is implementation-dependent (may default to Integer.MAX_VALUE or throw exception).
Store Without ID: Setting a store without an ID may cause the memory to use a default/null ID, leading to all instances sharing the same storage.
IllegalStateException, others may use a default value (e.g., 10).chatMemoryStore() is typically equivalent to not calling it (uses in-memory storage).NullPointerException.build() if required parameters are missing.ChatMemory implementation that retains messages within a token limit.
package dev.langchain4j.memory.chat;
/**
* ChatMemory implementation that retains messages within a token limit
* When adding a message would exceed the limit, oldest messages are evicted
* until the new message fits
*/
public class TokenWindowChatMemory implements ChatMemory {
/**
* Create with max token limit
* Uses default in-memory storage
* @param maxTokens Maximum number of tokens to retain
* @param tokenizer Token count estimator for counting tokens
* @return TokenWindowChatMemory instance
*/
public static TokenWindowChatMemory withMaxTokens(
int maxTokens,
TokenCountEstimator tokenizer
);
/**
* Create builder for full configuration
* @return Builder instance
*/
public static Builder builder();
}NOT thread-safe. Same concurrency issues as MessageWindowChatMemory. Use ChatMemoryProvider for isolated instances per thread/user.
Tokenizer Mismatch: Using a different tokenizer than the actual LLM model leads to incorrect token counts:
Tokenization Overhead: Token counting is CPU-intensive. Called on every add() operation. Can become a bottleneck in high-throughput scenarios.
Token Estimation Errors: Most tokenizers are estimates. Actual token count by the model may differ by 1-5%, potentially causing context limit exceeded errors.
System Message Overhead: System prompts consume tokens but are often forgotten in capacity planning. A 500-token system prompt reduces effective context window.
Window Size vs Model Limit: Setting maxTokens equal to model limit (e.g., 4096 for GPT-3.5) leaves no room for completion. Reserve 20-40% for response tokens.
IllegalArgumentException.NullPointerException when attempting to add messages.MessageWindowChatMemory./**
* Builder for TokenWindowChatMemory
*/
public class Builder {
/**
* Set memory ID
* @param id Memory identifier
* @return Builder instance
*/
public Builder id(Object id);
/**
* Set max tokens to retain
* @param maxTokens Maximum number of tokens
* @return Builder instance
*/
public Builder maxTokens(int maxTokens);
/**
* Set token count estimator
* @param tokenCountEstimator Token estimator for counting tokens
* @return Builder instance
*/
public Builder tokenCountEstimator(TokenCountEstimator tokenCountEstimator);
/**
* Set chat memory store for persistence
* @param store Chat memory store implementation
* @return Builder instance
*/
public Builder chatMemoryStore(ChatMemoryStore store);
/**
* Build the TokenWindowChatMemory
* @return TokenWindowChatMemory instance
*/
public TokenWindowChatMemory build();
}NOT thread-safe. Don't share builder instances across threads.
Missing Tokenizer: Forgetting tokenCountEstimator() typically throws IllegalStateException or NullPointerException on build.
Wrong Tokenizer: Using OpenAiTokenizer with Claude or other models causes inaccurate token counts.
Reusing Builder: May create instances sharing the same tokenizer reference (usually safe) but separate memory state.
IllegalStateException from build().MessageWindowChatMemory.Builder.build() if maxTokens or tokenCountEstimator not set.import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import dev.langchain4j.service.AiServices;
interface Assistant {
String chat(String message);
}
// Create AI service with shared memory
Assistant assistant = AiServices.builder(Assistant.class)
.chatModel(chatModel)
.chatMemory(MessageWindowChatMemory.withMaxMessages(10))
.build();
// Conversation maintains context
String response1 = assistant.chat("My name is Alice");
String response2 = assistant.chat("What is my name?"); // Will respond "Alice"import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import dev.langchain4j.memory.chat.ChatMemoryProvider;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.MemoryId;
interface Assistant {
String chat(@MemoryId String userId, String message);
}
// Create AI service with per-user memory
Assistant assistant = AiServices.builder(Assistant.class)
.chatModel(chatModel)
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
.build();
// Each user has separate conversation context
String response1 = assistant.chat("user1", "My name is Alice");
String response2 = assistant.chat("user2", "My name is Bob");
String response3 = assistant.chat("user1", "What is my name?"); // Will respond "Alice"
String response4 = assistant.chat("user2", "What is my name?"); // Will respond "Bob"import dev.langchain4j.memory.chat.TokenWindowChatMemory;
import dev.langchain4j.model.openai.OpenAiTokenizer;
import dev.langchain4j.service.AiServices;
interface Assistant {
String chat(String message);
}
// Create AI service with token-based memory
Assistant assistant = AiServices.builder(Assistant.class)
.chatModel(chatModel)
.chatMemory(TokenWindowChatMemory.withMaxTokens(
1000,
new OpenAiTokenizer()
))
.build();
// Memory automatically manages messages within token limit
String response = assistant.chat("Tell me about Java");import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import dev.langchain4j.store.memory.chat.ChatMemoryStore;
interface Assistant {
String chat(@MemoryId String userId, String message);
}
// Implement or use a ChatMemoryStore for persistence
ChatMemoryStore store = // ... your persistence implementation
Assistant assistant = AiServices.builder(Assistant.class)
.chatModel(chatModel)
.chatMemoryProvider(memoryId ->
MessageWindowChatMemory.builder()
.id(memoryId)
.maxMessages(20)
.chatMemoryStore(store)
.build()
)
.build();
// Messages are persisted across application restarts
String response = assistant.chat("user1", "Hello");import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.MemoryId;
// Custom memory ID type
record ConversationId(String userId, String sessionId) {
// Proper equals/hashCode automatically generated
}
interface Assistant {
String chat(@MemoryId ConversationId conversationId, String message);
}
Assistant assistant = AiServices.builder(Assistant.class)
.chatModel(chatModel)
.chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10))
.build();
// Use custom ID for fine-grained memory separation
ConversationId id1 = new ConversationId("user1", "session1");
ConversationId id2 = new ConversationId("user1", "session2");
String response1 = assistant.chat(id1, "My favorite color is blue");
String response2 = assistant.chat(id2, "My favorite color is red");
String response3 = assistant.chat(id1, "What is my favorite color?"); // Will respond "blue"import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class MessageWindowChatMemoryTest {
@Test
void shouldRetainMaxMessages() {
ChatMemory memory = MessageWindowChatMemory.withMaxMessages(3);
memory.add(new UserMessage("Message 1"));
memory.add(new UserMessage("Message 2"));
memory.add(new UserMessage("Message 3"));
memory.add(new UserMessage("Message 4"));
assertThat(memory.messages()).hasSize(3);
assertThat(memory.messages().get(0).text()).isEqualTo("Message 2");
assertThat(memory.messages().get(2).text()).isEqualTo("Message 4");
}
@Test
void shouldClearAllMessages() {
ChatMemory memory = MessageWindowChatMemory.withMaxMessages(10);
memory.add(new UserMessage("Test"));
memory.clear();
assertThat(memory.messages()).isEmpty();
}
}@Test
void shouldIsolateMemoryByUserId() {
Map<Object, ChatMemory> memoryMap = new ConcurrentHashMap<>();
ChatMemoryProvider provider = memoryId ->
memoryMap.computeIfAbsent(memoryId,
id -> MessageWindowChatMemory.withMaxMessages(10)
);
ChatMemory memory1 = provider.get("user1");
ChatMemory memory2 = provider.get("user2");
memory1.add(new UserMessage("User 1 message"));
memory2.add(new UserMessage("User 2 message"));
assertThat(memory1.messages()).hasSize(1);
assertThat(memory2.messages()).hasSize(1);
assertThat(provider.get("user1")).isSameAs(memory1); // Same instance
}@Test
void shouldEvictByTokenCount() {
TokenCountEstimator tokenizer = text -> text.split("\\s+").length; // Simple word count
ChatMemory memory = TokenWindowChatMemory.withMaxTokens(10, tokenizer);
memory.add(new UserMessage("one two three")); // 3 tokens
memory.add(new UserMessage("four five six seven")); // 4 tokens
memory.add(new UserMessage("eight nine ten eleven twelve")); // 5 tokens
// Total would be 12, exceeds 10, so oldest message evicted
assertThat(memory.messages()).hasSize(2);
}@Test
void shouldPersistAndRestore() {
ChatMemoryStore store = new InMemoryChatMemoryStore(); // Or real DB store
// Create memory, add messages, let it persist
ChatMemory memory1 = MessageWindowChatMemory.builder()
.id("user1")
.maxMessages(10)
.chatMemoryStore(store)
.build();
memory1.add(new UserMessage("First message"));
memory1.add(new UserMessage("Second message"));
// Create new memory instance with same ID
ChatMemory memory2 = MessageWindowChatMemory.builder()
.id("user1")
.maxMessages(10)
.chatMemoryStore(store)
.build();
// Should restore from store
assertThat(memory2.messages()).hasSize(2);
assertThat(memory2.messages().get(0).text()).isEqualTo("First message");
}@Test
void shouldEvictInactiveUsers() {
// Use WeakHashMap or LRU cache for provider
Map<Object, ChatMemory> memoryMap = Collections.synchronizedMap(
new WeakHashMap<>()
);
ChatMemoryProvider provider = memoryId ->
memoryMap.computeIfAbsent(memoryId,
id -> MessageWindowChatMemory.withMaxMessages(10)
);
// Create memory for user
ChatMemory memory = provider.get("user1");
memory.add(new UserMessage("Test"));
// Remove strong reference
memory = null;
// Force GC
System.gc();
Thread.sleep(100);
// Map should eventually be empty (weak references collected)
// Note: This is non-deterministic, use for demonstration only
}@Test
void shouldUseMemoryInAiService() {
ChatModel mockModel = mock(ChatModel.class);
when(mockModel.generate(any())).thenReturn(new AiMessage("Mocked response"));
Assistant assistant = AiServices.builder(Assistant.class)
.chatModel(mockModel)
.chatMemory(MessageWindowChatMemory.withMaxMessages(10))
.build();
assistant.chat("First message");
assistant.chat("Second message");
// Verify model received both messages in context
ArgumentCaptor<List<ChatMessage>> captor = ArgumentCaptor.forClass(List.class);
verify(mockModel, times(2)).generate(captor.capture());
List<ChatMessage> secondCall = captor.getAllValues().get(1);
assertThat(secondCall).hasSize(3); // 2 user + 1 AI response
}import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ResilientChatMemoryStore implements ChatMemoryStore {
private static final Logger log = LoggerFactory.getLogger(ResilientChatMemoryStore.class);
private final ChatMemoryStore delegate;
private final ChatMemoryStore fallback; // E.g., in-memory store
@Override
public List<ChatMessage> getMessages(Object memoryId) {
try {
return delegate.getMessages(memoryId);
} catch (Exception e) {
log.error("Failed to get messages from primary store, using fallback", e);
return fallback.getMessages(memoryId);
}
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
try {
delegate.updateMessages(memoryId, messages);
// Sync to fallback after successful primary write
fallback.updateMessages(memoryId, messages);
} catch (Exception e) {
log.error("Failed to update messages in primary store", e);
// Continue with in-memory fallback only
fallback.updateMessages(memoryId, messages);
}
}
@Override
public void deleteMessages(Object memoryId) {
try {
delegate.deleteMessages(memoryId);
} catch (Exception e) {
log.error("Failed to delete messages from primary store", e);
}
fallback.deleteMessages(memoryId);
}
}class RetryingChatMemoryStore implements ChatMemoryStore {
private final ChatMemoryStore delegate;
private final int maxRetries = 3;
private final long retryDelayMs = 100;
@Override
public List<ChatMessage> getMessages(Object memoryId) {
return executeWithRetry(() -> delegate.getMessages(memoryId));
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
executeWithRetry(() -> {
delegate.updateMessages(memoryId, messages);
return null;
});
}
private <T> T executeWithRetry(Supplier<T> operation) {
Exception lastException = null;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
return operation.get();
} catch (Exception e) {
lastException = e;
if (attempt < maxRetries) {
try {
Thread.sleep(retryDelayMs * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
}
throw new RuntimeException("Operation failed after " + maxRetries + " attempts", lastException);
}
}class GracefullyDegradingAssistant {
private final Assistant assistant;
private final ChatMemory emergencyMemory = MessageWindowChatMemory.withMaxMessages(5);
public String chat(String userId, String message) {
try {
return assistant.chat(userId, message);
} catch (Exception e) {
log.error("AI service failed, using degraded mode", e);
// Use emergency memory to maintain minimal context
emergencyMemory.add(new UserMessage(message));
// Return fallback response
return "I'm experiencing technical difficulties. Your message has been recorded.";
}
}
}class CircuitBreakerChatMemoryStore implements ChatMemoryStore {
private final ChatMemoryStore delegate;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final int failureThreshold = 5;
private volatile boolean circuitOpen = false;
private volatile long circuitOpenedAt = 0;
private final long resetTimeoutMs = 60_000; // 1 minute
@Override
public List<ChatMessage> getMessages(Object memoryId) {
if (isCircuitOpen()) {
throw new RuntimeException("Circuit breaker open - store unavailable");
}
try {
List<ChatMessage> result = delegate.getMessages(memoryId);
onSuccess();
return result;
} catch (Exception e) {
onFailure();
throw e;
}
}
private boolean isCircuitOpen() {
if (!circuitOpen) return false;
if (System.currentTimeMillis() - circuitOpenedAt > resetTimeoutMs) {
circuitOpen = false;
failureCount.set(0);
return false;
}
return true;
}
private void onSuccess() {
failureCount.set(0);
circuitOpen = false;
}
private void onFailure() {
if (failureCount.incrementAndGet() >= failureThreshold) {
circuitOpen = true;
circuitOpenedAt = System.currentTimeMillis();
}
}
}class SafeTokenWindowChatMemory implements ChatMemory {
private final TokenWindowChatMemory delegate;
private final int fallbackMaxMessages;
@Override
public void add(ChatMessage message) {
try {
delegate.add(message);
} catch (Exception e) {
log.error("Token estimation failed, using fallback message count", e);
// Fall back to simple message count eviction
List<ChatMessage> messages = delegate.messages();
if (messages.size() >= fallbackMaxMessages) {
messages.remove(0); // Evict oldest
}
// Re-add message (this may fail again, handle appropriately)
}
}
}class SelfHealingChatMemoryStore implements ChatMemoryStore {
private final ChatMemoryStore delegate;
@Override
public List<ChatMessage> getMessages(Object memoryId) {
try {
List<ChatMessage> messages = delegate.getMessages(memoryId);
// Validate messages
if (messages == null) {
log.warn("Null messages returned for {}, initializing empty", memoryId);
return new ArrayList<>();
}
// Remove corrupted messages
List<ChatMessage> valid = messages.stream()
.filter(msg -> msg != null && msg.text() != null)
.collect(Collectors.toList());
if (valid.size() < messages.size()) {
log.warn("Removed {} corrupted messages for {}",
messages.size() - valid.size(), memoryId);
// Persist cleaned version
delegate.updateMessages(memoryId, valid);
}
return valid;
} catch (Exception e) {
log.error("Failed to get messages, returning empty list", e);
return new ArrayList<>();
}
}
}class TimeBasedMemoryProvider implements ChatMemoryProvider {
private final Map<Object, MemoryWithTimestamp> memoryMap = new ConcurrentHashMap<>();
private final long maxIdleMs = 3600_000; // 1 hour
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
public TimeBasedMemoryProvider() {
// Run cleanup every 10 minutes
cleaner.scheduleAtFixedRate(this::cleanup, 10, 10, TimeUnit.MINUTES);
}
@Override
public ChatMemory get(Object memoryId) {
MemoryWithTimestamp entry = memoryMap.computeIfAbsent(memoryId, id ->
new MemoryWithTimestamp(MessageWindowChatMemory.withMaxMessages(10))
);
entry.updateAccessTime();
return entry.memory;
}
private void cleanup() {
long now = System.currentTimeMillis();
memoryMap.entrySet().removeIf(entry ->
now - entry.getValue().lastAccessTime > maxIdleMs
);
}
public void shutdown() {
cleaner.shutdown();
}
private static class MemoryWithTimestamp {
final ChatMemory memory;
volatile long lastAccessTime;
MemoryWithTimestamp(ChatMemory memory) {
this.memory = memory;
this.lastAccessTime = System.currentTimeMillis();
}
void updateAccessTime() {
this.lastAccessTime = System.currentTimeMillis();
}
}
}import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
class LruChatMemoryProvider implements ChatMemoryProvider {
private final Cache<Object, ChatMemory> cache;
public LruChatMemoryProvider(int maxSize) {
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxSize)
.expireAfterAccess(Duration.ofHours(1))
.removalListener(notification -> {
// Optional: persist memory before eviction
ChatMemory memory = (ChatMemory) notification.getValue();
if (memory != null) {
log.info("Evicting memory for {}", notification.getKey());
}
})
.build();
}
@Override
public ChatMemory get(Object memoryId) {
try {
return cache.get(memoryId, () ->
MessageWindowChatMemory.withMaxMessages(10)
);
} catch (ExecutionException e) {
throw new RuntimeException("Failed to create memory", e);
}
}
public void invalidate(Object memoryId) {
cache.invalidate(memoryId);
}
public void invalidateAll() {
cache.invalidateAll();
}
}class WeakReferenceChatMemoryProvider implements ChatMemoryProvider {
private final Map<Object, WeakReference<ChatMemory>> memoryMap =
Collections.synchronizedMap(new WeakHashMap<>());
@Override
public ChatMemory get(Object memoryId) {
WeakReference<ChatMemory> ref = memoryMap.get(memoryId);
ChatMemory memory = ref != null ? ref.get() : null;
if (memory == null) {
memory = MessageWindowChatMemory.withMaxMessages(10);
memoryMap.put(memoryId, new WeakReference<>(memory));
}
return memory;
}
// Memory is automatically collected when no longer referenced
}class PersistentMemoryManager {
private final ChatMemoryStore store;
private final Map<Object, ChatMemory> activeMemories = new ConcurrentHashMap<>();
private final ScheduledExecutorService persistScheduler =
Executors.newSingleThreadScheduledExecutor();
public PersistentMemoryManager(ChatMemoryStore store) {
this.store = store;
// Persist all active memories every 5 minutes
persistScheduler.scheduleAtFixedRate(
this::persistAll, 5, 5, TimeUnit.MINUTES
);
}
public ChatMemory getMemory(Object memoryId) {
return activeMemories.computeIfAbsent(memoryId, id ->
MessageWindowChatMemory.builder()
.id(id)
.maxMessages(20)
.chatMemoryStore(store)
.build()
);
}
private void persistAll() {
activeMemories.forEach((id, memory) -> {
try {
store.updateMessages(id, memory.messages());
} catch (Exception e) {
log.error("Failed to persist memory for {}", id, e);
}
});
}
public void closeMemory(Object memoryId) {
ChatMemory memory = activeMemories.remove(memoryId);
if (memory != null) {
store.updateMessages(memoryId, memory.messages());
}
}
public void shutdown() {
persistAll();
persistScheduler.shutdown();
}
}class MonitoredChatMemoryProvider implements ChatMemoryProvider {
private final Map<Object, ChatMemory> memoryMap = new ConcurrentHashMap<>();
private final AtomicInteger activeUsers = new AtomicInteger(0);
private final AtomicLong totalMessages = new AtomicLong(0);
@Override
public ChatMemory get(Object memoryId) {
return memoryMap.computeIfAbsent(memoryId, id -> {
activeUsers.incrementAndGet();
ChatMemory memory = MessageWindowChatMemory.withMaxMessages(10);
// Wrap to track message additions
return new ChatMemory() {
@Override
public void add(ChatMessage message) {
memory.add(message);
totalMessages.incrementAndGet();
}
// Delegate other methods...
@Override
public List<ChatMessage> messages() { return memory.messages(); }
@Override
public void clear() { memory.clear(); }
@Override
public Object id() { return memory.id(); }
@Override
public void set(List<ChatMessage> messages) { memory.set(messages); }
@Override
public void add(Iterable<ChatMessage> messages) {
memory.add(messages);
messages.forEach(m -> totalMessages.incrementAndGet());
}
};
});
}
public int getActiveUserCount() {
return activeUsers.get();
}
public long getTotalMessageCount() {
return totalMessages.get();
}
public long getCurrentMemorySize() {
return memoryMap.values().stream()
.mapToInt(m -> m.messages().size())
.sum();
}
}import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import com.fasterxml.jackson.databind.ObjectMapper;
class RedisChatMemoryStore implements ChatMemoryStore {
private final JedisPool jedisPool;
private final ObjectMapper objectMapper = new ObjectMapper();
private final String keyPrefix = "chat:memory:";
public RedisChatMemoryStore(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public List<ChatMessage> getMessages(Object memoryId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = keyPrefix + memoryId;
String json = jedis.get(key);
if (json == null) {
return new ArrayList<>();
}
return objectMapper.readValue(json,
objectMapper.getTypeFactory().constructCollectionType(List.class, ChatMessage.class)
);
} catch (Exception e) {
throw new RuntimeException("Failed to get messages from Redis", e);
}
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
try (Jedis jedis = jedisPool.getResource()) {
String key = keyPrefix + memoryId;
String json = objectMapper.writeValueAsString(messages);
// Set with TTL to auto-cleanup inactive conversations
jedis.setex(key, 86400, json); // 24 hour TTL
} catch (Exception e) {
throw new RuntimeException("Failed to update messages in Redis", e);
}
}
@Override
public void deleteMessages(Object memoryId) {
try (Jedis jedis = jedisPool.getResource()) {
String key = keyPrefix + memoryId;
jedis.del(key);
}
}
}import java.sql.*;
class DatabaseChatMemoryStore implements ChatMemoryStore {
private final DataSource dataSource;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public List<ChatMessage> getMessages(Object memoryId) {
String sql = "SELECT messages FROM chat_memory WHERE memory_id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, memoryId.toString());
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
String json = rs.getString("messages");
return objectMapper.readValue(json,
objectMapper.getTypeFactory().constructCollectionType(List.class, ChatMessage.class)
);
}
return new ArrayList<>();
} catch (Exception e) {
throw new RuntimeException("Failed to get messages from database", e);
}
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
String sql = "INSERT INTO chat_memory (memory_id, messages, updated_at) " +
"VALUES (?, ?, NOW()) " +
"ON DUPLICATE KEY UPDATE messages = ?, updated_at = NOW()";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
String json = objectMapper.writeValueAsString(messages);
stmt.setString(1, memoryId.toString());
stmt.setString(2, json);
stmt.setString(3, json);
stmt.executeUpdate();
} catch (Exception e) {
throw new RuntimeException("Failed to update messages in database", e);
}
}
@Override
public void deleteMessages(Object memoryId) {
String sql = "DELETE FROM chat_memory WHERE memory_id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, memoryId.toString());
stmt.executeUpdate();
} catch (Exception e) {
throw new RuntimeException("Failed to delete messages from database", e);
}
}
}import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
class HybridChatMemoryStore implements ChatMemoryStore {
private final ChatMemoryStore distributedStore; // Redis, DB, etc.
private final Cache<Object, List<ChatMessage>> localCache;
public HybridChatMemoryStore(ChatMemoryStore distributedStore) {
this.distributedStore = distributedStore;
this.localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(5))
.build();
}
@Override
public List<ChatMessage> getMessages(Object memoryId) {
// Try local cache first
List<ChatMessage> cached = localCache.getIfPresent(memoryId);
if (cached != null) {
return new ArrayList<>(cached); // Return copy
}
// Fall back to distributed store
List<ChatMessage> messages = distributedStore.getMessages(memoryId);
localCache.put(memoryId, new ArrayList<>(messages));
return messages;
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
// Update both caches
localCache.put(memoryId, new ArrayList<>(messages));
distributedStore.updateMessages(memoryId, messages);
}
@Override
public void deleteMessages(Object memoryId) {
localCache.invalidate(memoryId);
distributedStore.deleteMessages(memoryId);
}
}import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
class EventBroadcastingChatMemoryStore implements ChatMemoryStore {
private final ChatMemoryStore delegate;
private final RedisTemplate<String, MemoryUpdateEvent> redisTemplate;
private final ChannelTopic topic = new ChannelTopic("memory:updates");
private final Cache<Object, List<ChatMessage>> localCache;
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
// Update backing store
delegate.updateMessages(memoryId, messages);
// Update local cache
localCache.put(memoryId, new ArrayList<>(messages));
// Broadcast update to other instances
redisTemplate.convertAndSend(topic.getTopic(),
new MemoryUpdateEvent(memoryId, messages)
);
}
// Subscribe to updates from other instances
public void onMemoryUpdate(MemoryUpdateEvent event) {
// Invalidate local cache when other instance updates
localCache.invalidate(event.memoryId());
}
record MemoryUpdateEvent(Object memoryId, List<ChatMessage> messages) {}
}class ShardedChatMemoryStore implements ChatMemoryStore {
private final List<ChatMemoryStore> shards;
public ShardedChatMemoryStore(List<ChatMemoryStore> shards) {
this.shards = shards;
}
private ChatMemoryStore getShard(Object memoryId) {
int shardIndex = Math.abs(memoryId.hashCode() % shards.size());
return shards.get(shardIndex);
}
@Override
public List<ChatMessage> getMessages(Object memoryId) {
return getShard(memoryId).getMessages(memoryId);
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
getShard(memoryId).updateMessages(memoryId, messages);
}
@Override
public void deleteMessages(Object memoryId) {
getShard(memoryId).deleteMessages(memoryId);
}
}import com.google.common.hash.Hashing;
class ConsistentHashingMemoryStore implements ChatMemoryStore {
private final TreeMap<Long, ChatMemoryStore> ring = new TreeMap<>();
private final int virtualNodesPerStore = 150;
public ConsistentHashingMemoryStore(List<ChatMemoryStore> stores) {
for (int i = 0; i < stores.size(); i++) {
ChatMemoryStore store = stores.get(i);
for (int v = 0; v < virtualNodesPerStore; v++) {
long hash = hash("store-" + i + "-vnode-" + v);
ring.put(hash, store);
}
}
}
private ChatMemoryStore getStore(Object memoryId) {
long hash = hash(memoryId.toString());
Map.Entry<Long, ChatMemoryStore> entry = ring.ceilingEntry(hash);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
private long hash(String key) {
return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong();
}
@Override
public List<ChatMessage> getMessages(Object memoryId) {
return getStore(memoryId).getMessages(memoryId);
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
getStore(memoryId).updateMessages(memoryId, messages);
}
@Override
public void deleteMessages(Object memoryId) {
getStore(memoryId).deleteMessages(memoryId);
}
}class ReplicatedChatMemoryStore implements ChatMemoryStore {
private final ChatMemoryStore primaryStore;
private final List<ChatMemoryStore> replicaStores;
private final ExecutorService replicationExecutor = Executors.newFixedThreadPool(4);
@Override
public List<ChatMessage> getMessages(Object memoryId) {
// Read from primary
return primaryStore.getMessages(memoryId);
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
// Write to primary synchronously
primaryStore.updateMessages(memoryId, messages);
// Replicate to secondaries asynchronously
replicaStores.forEach(replica ->
replicationExecutor.submit(() -> {
try {
replica.updateMessages(memoryId, messages);
} catch (Exception e) {
log.error("Failed to replicate to secondary store", e);
}
})
);
}
@Override
public void deleteMessages(Object memoryId) {
primaryStore.deleteMessages(memoryId);
replicaStores.forEach(replica ->
replicationExecutor.submit(() -> {
try {
replica.deleteMessages(memoryId);
} catch (Exception e) {
log.error("Failed to delete from secondary store", e);
}
})
);
}
public void shutdown() {
replicationExecutor.shutdown();
}
}Install with Tessl CLI
npx tessl i tessl/maven-dev-langchain4j--langchain4j