CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-atmosphere--atmosphere-runtime

The Atmosphere Framework runtime providing comprehensive support for building real-time, event-driven web applications with transparent transport protocol support including WebSockets, Server Sent Events, Long-Polling, HTTP Streaming, and JSONP.

Pending
Overview
Eval results
Files

caching.mddocs/

Caching System

Message caching and replay capabilities for connection recovery, with multiple cache implementations and inspection utilities. The caching system enables message persistence and delivery guarantees.

Capabilities

BroadcasterCache Interface

Core interface for caching broadcast messages with support for storage, retrieval, and cleanup operations.

/**
 * Cache broadcast messages for replay and recovery
 */
public interface BroadcasterCache {
    /**
     * Add message to cache for specific broadcaster and resource
     * @param broadcasterId broadcaster identifier
     * @param uuid resource UUID
     * @param message BroadcastMessage to cache
     */
    public void addToCache(String broadcasterId, String uuid, BroadcastMessage message);
    
    /**
     * Retrieve cached messages for resource
     * @param broadcasterId broadcaster identifier  
     * @param uuid resource UUID
     * @return List of cached messages
     */
    public List<Object> retrieveFromCache(String broadcasterId, String uuid);
    
    /**
     * Clear cache for specific resource
     * @param broadcasterId broadcaster identifier
     * @param uuid resource UUID
     */
    public void clearCache(String broadcasterId, String uuid);
    
    /**
     * Clear all cached messages for broadcaster
     * @param broadcasterId broadcaster identifier
     */
    public void clearCache(String broadcasterId);
    
    /**
     * Set cache inspector for message examination
     * @param inspector BroadcasterCacheInspector instance
     * @return this cache for chaining
     */
    public BroadcasterCache inspector(BroadcasterCacheInspector inspector);
    
    /**
     * Start cache background tasks and initialization
     */
    public void start();
    
    /**
     * Stop cache and cleanup resources
     */
    public void stop();
}

Usage Examples:

// Configure broadcaster with cache
Broadcaster broadcaster = BroadcasterFactory.getDefault().lookup("/chat", true);
BroadcasterCache cache = new UUIDBroadcasterCache();
broadcaster.getBroadcasterConfig().setBroadcasterCache(cache);

// Messages will automatically be cached for replay
broadcaster.broadcast("Welcome message");

// Retrieve cached messages for specific resource
List<Object> cachedMessages = cache.retrieveFromCache("/chat", resourceUuid);

Cache Implementations

Pre-built cache implementations for different storage and retrieval strategies.

/**
 * Default in-memory cache implementation
 */
public class DefaultBroadcasterCache extends AbstractBroadcasterCache {
    /**
     * Create default cache with unlimited size
     */
    public DefaultBroadcasterCache();
    
    /**
     * Set maximum cache size per broadcaster
     * @param maxCacheSize maximum number of messages to cache
     * @return this cache
     */
    public DefaultBroadcasterCache setMaxCacheSize(int maxCacheSize);
    
    /**
     * Set cache expiration time
     * @param expireTime expiration time value
     * @param unit TimeUnit for expiration
     * @return this cache
     */
    public DefaultBroadcasterCache setExpireTime(long expireTime, TimeUnit unit);
}

/**
 * UUID-based cache for message tracking
 */
public class UUIDBroadcasterCache extends AbstractBroadcasterCache {
    /**
     * Create UUID cache with default settings
     */
    public UUIDBroadcasterCache();
    
    /**
     * Set whether to cache per resource UUID
     * @param cachePerUUID true to cache per UUID
     * @return this cache
     */
    public UUIDBroadcasterCache setCachePerUUID(boolean cachePerUUID);
}

/**
 * Session-aware cache implementation
 */
public class SessionBroadcasterCache extends AbstractBroadcasterCache {
    /**
     * Create session-based cache
     */
    public SessionBroadcasterCache();
    
    /**
     * Set session timeout for cache expiration
     * @param sessionTimeout timeout in milliseconds
     * @return this cache
     */
    public SessionBroadcasterCache setSessionTimeout(long sessionTimeout);
}

/**
 * Base implementation for BroadcasterCache
 */
public abstract class AbstractBroadcasterCache implements BroadcasterCache {
    /**
     * Add cache listener for monitoring cache events
     * @param listener BroadcasterCacheListener instance
     * @return this cache
     */
    public AbstractBroadcasterCache addBroadcasterCacheListener(BroadcasterCacheListener listener);
    
    /**
     * Remove cache listener
     * @param listener BroadcasterCacheListener to remove
     * @return this cache
     */
    public AbstractBroadcasterCache removeBroadcasterCacheListener(BroadcasterCacheListener listener);
    
    /**
     * Set cleanup frequency for expired entries
     * @param cleanupFrequency cleanup interval
     * @param unit TimeUnit for interval
     * @return this cache
     */
    public AbstractBroadcasterCache setCleanupFrequency(long cleanupFrequency, TimeUnit unit);
}

Usage Examples:

// Default in-memory cache with size limit
DefaultBroadcasterCache defaultCache = new DefaultBroadcasterCache()
    .setMaxCacheSize(1000)
    .setExpireTime(1, TimeUnit.HOURS);

// UUID-based cache for per-client message tracking
UUIDBroadcasterCache uuidCache = new UUIDBroadcasterCache()
    .setCachePerUUID(true);

// Session-aware cache
SessionBroadcasterCache sessionCache = new SessionBroadcasterCache()
    .setSessionTimeout(30 * 60 * 1000); // 30 minutes

// Configure broadcaster with cache
broadcaster.getBroadcasterConfig().setBroadcasterCache(defaultCache);

Cache Data Structures

Data classes for representing cached messages and cache entries.

/**
 * Wrapper for cached broadcast messages
 */
public class CacheMessage {
    /**
     * Get the cached message content
     * @return message object
     */
    public Object getMessage();
    
    /**
     * Get message creation timestamp
     * @return timestamp in milliseconds
     */
    public long getCreateTime();
    
    /**
     * Get message expiration time
     * @return expiration timestamp or -1 if no expiration
     */
    public long getExpireTime();
    
    /**
     * Check if message has expired
     * @return true if expired
     */
    public boolean isExpired();
    
    /**
     * Get broadcaster ID that created this message
     * @return broadcaster identifier
     */
    public String getBroadcasterId();
}

/**
 * Message data structure for broadcasting
 */
public class BroadcastMessage {
    /**
     * Get message content
     * @return message object
     */
    public Object getMessage();
    
    /**
     * Get target AtmosphereResource UUID
     * @return resource UUID or null for broadcast to all
     */
    public String getUuid();
    
    /**
     * Check if message is for specific resource
     * @return true if targeted to specific resource
     */
    public boolean isTargeted();
    
    /**
     * Create broadcast message
     * @param message content to broadcast
     * @return BroadcastMessage instance
     */
    public static BroadcastMessage createBroadcastMessage(Object message);
    
    /**
     * Create targeted broadcast message
     * @param message content to broadcast
     * @param uuid target resource UUID
     * @return BroadcastMessage instance
     */
    public static BroadcastMessage createBroadcastMessage(Object message, String uuid);
}

Cache Inspection and Monitoring

Interfaces for inspecting cached messages and monitoring cache operations.

/**
 * Inspect and modify cached messages
 */
public interface BroadcasterCacheInspector {
    /**
     * Inspect cache entry and determine action
     * @param entry cache entry to inspect
     * @return CacheInspector action (CONTINUE, SKIP, REMOVE)
     */
    public CacheInspector inspect(CacheMessage entry);
    
    /**
     * Configure inspector with AtmosphereConfig
     * @param config AtmosphereConfig instance
     */
    public void configure(AtmosphereConfig config);
}

/**
 * Cache inspection result
 */
public enum CacheInspector {
    CONTINUE,  // Continue processing entry
    SKIP,      // Skip this entry  
    REMOVE     // Remove entry from cache
}

/**
 * Listen to cache events for monitoring
 */
public interface BroadcasterCacheListener {
    /**
     * Called when message is added to cache
     * @param broadcasterId broadcaster identifier
     * @param uuid resource UUID
     * @param message cached message
     */
    public void onAddCache(String broadcasterId, String uuid, Object message);
    
    /**
     * Called when message is removed from cache
     * @param broadcasterId broadcaster identifier
     * @param uuid resource UUID
     * @param message removed message
     */
    public void onRemoveCache(String broadcasterId, String uuid, Object message);
    
    /**
     * Called when cache is cleared
     * @param broadcasterId broadcaster identifier
     */
    public void onClearCache(String broadcasterId);
}

Usage Examples:

// Custom cache inspector for message filtering
public class MessageFilterInspector implements BroadcasterCacheInspector {
    @Override
    public CacheInspector inspect(CacheMessage entry) {
        // Remove expired messages
        if (entry.isExpired()) {
            return CacheInspector.REMOVE;
        }
        
        // Skip messages older than 1 hour
        long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000);
        if (entry.getCreateTime() < oneHourAgo) {
            return CacheInspector.SKIP;
        }
        
        return CacheInspector.CONTINUE;
    }
}

// Cache listener for monitoring
public class CacheMonitorListener implements BroadcasterCacheListener {
    @Override
    public void onAddCache(String broadcasterId, String uuid, Object message) {
        System.out.println("Cached message for " + broadcasterId + ":" + uuid);
    }
    
    @Override
    public void onRemoveCache(String broadcasterId, String uuid, Object message) {
        System.out.println("Removed cached message for " + broadcasterId + ":" + uuid);
    }
}

// Configure cache with inspector and listener
DefaultBroadcasterCache cache = new DefaultBroadcasterCache()
    .inspector(new MessageFilterInspector())
    .addBroadcasterCacheListener(new CacheMonitorListener());

Cache Configuration

Configuration utilities and service annotations for cache setup.

/**
 * Register BroadcasterCache implementations
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface BroadcasterCacheService {
}

/**
 * Register BroadcasterCacheInspector implementations
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface BroadcasterCacheInspectorService {
}

/**
 * Register BroadcasterCacheListener implementations
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface BroadcasterCacheListenerService {
}

Usage Examples:

@BroadcasterCacheService
public class RedisBroadcasterCache implements BroadcasterCache {
    private RedisTemplate<String, Object> redisTemplate;
    
    @Override
    public void addToCache(String broadcasterId, String uuid, BroadcastMessage message) {
        String key = broadcasterId + ":" + uuid;
        redisTemplate.opsForList().rightPush(key, message);
        
        // Set expiration
        redisTemplate.expire(key, 1, TimeUnit.HOURS);
    }
    
    @Override
    public List<Object> retrieveFromCache(String broadcasterId, String uuid) {
        String key = broadcasterId + ":" + uuid;
        return redisTemplate.opsForList().range(key, 0, -1);
    }
    
    @Override
    public void clearCache(String broadcasterId, String uuid) {
        String key = broadcasterId + ":" + uuid;
        redisTemplate.delete(key);
    }
}

@BroadcasterCacheInspectorService
public class ExpiryInspector implements BroadcasterCacheInspector {
    private long maxAge = TimeUnit.HOURS.toMillis(2); // 2 hours
    
    @Override
    public CacheInspector inspect(CacheMessage entry) {
        long age = System.currentTimeMillis() - entry.getCreateTime();
        return age > maxAge ? CacheInspector.REMOVE : CacheInspector.CONTINUE;
    }
}

Advanced Cache Usage

Complete example showing cache setup, configuration, and usage in a real-time application.

@ManagedService(path = "/api/notifications/{userId}")
public class NotificationService {
    private BroadcasterCache cache;
    
    @Ready
    public void onReady(@PathParam("userId") String userId, 
                       AtmosphereResource resource) {
        // Configure cache for this broadcaster
        if (cache == null) {
            cache = new UUIDBroadcasterCache()
                .setCachePerUUID(true)
                .setCleanupFrequency(5, TimeUnit.MINUTES)
                .inspector(new NotificationCacheInspector())
                .addBroadcasterCacheListener(new NotificationCacheListener());
            
            resource.getBroadcaster().getBroadcasterConfig()
                   .setBroadcasterCache(cache);
        }
        
        // Replay cached messages for reconnecting user
        String resourceUuid = resource.uuid();
        List<Object> cachedMessages = cache.retrieveFromCache(
            resource.getBroadcaster().getID(), resourceUuid);
        
        for (Object message : cachedMessages) {
            resource.write(message);
        }
        
        resource.suspend();
    }
    
    @Message
    public void onMessage(@PathParam("userId") String userId, 
                         String message) {
        // Broadcast notification (will be automatically cached)
        NotificationMessage notification = new NotificationMessage(
            userId, message, System.currentTimeMillis());
        
        // Send to all users subscribed to this user's notifications
        BroadcasterFactory.getDefault()
            .lookup("/api/notifications/" + userId, true)
            .broadcast(notification);
    }
    
    // Custom cache inspector for notifications
    private static class NotificationCacheInspector implements BroadcasterCacheInspector {
        @Override
        public CacheInspector inspect(CacheMessage entry) {
            // Keep only last 100 notifications per user
            // Remove messages older than 24 hours
            long dayAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
            
            if (entry.getCreateTime() < dayAgo) {
                return CacheInspector.REMOVE;
            }
            
            return CacheInspector.CONTINUE;
        }
    }
    
    // Cache listener for notification monitoring
    private static class NotificationCacheListener implements BroadcasterCacheListener {
        @Override
        public void onAddCache(String broadcasterId, String uuid, Object message) {
            System.out.println("Cached notification for user: " + extractUserId(broadcasterId));
        }
        
        @Override
        public void onRemoveCache(String broadcasterId, String uuid, Object message) {
            System.out.println("Removed cached notification for user: " + extractUserId(broadcasterId));
        }
        
        private String extractUserId(String broadcasterId) {
            // Extract user ID from broadcaster path like "/api/notifications/user123"
            return broadcasterId.substring(broadcasterId.lastIndexOf('/') + 1);
        }
    }
    
    // Notification message class
    private static class NotificationMessage {
        private final String userId;
        private final String message; 
        private final long timestamp;
        
        public NotificationMessage(String userId, String message, long timestamp) {
            this.userId = userId;
            this.message = message;
            this.timestamp = timestamp;
        }
        
        // Getters...
        public String getUserId() { return userId; }
        public String getMessage() { return message; }
        public long getTimestamp() { return timestamp; }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-atmosphere--atmosphere-runtime

docs

annotations.md

broadcasting.md

caching.md

core-framework.md

index.md

interceptors.md

websocket.md

tile.json