0
# Caching System
1
2
Message caching and replay capabilities for connection recovery, with multiple cache implementations and inspection utilities. The caching system enables message persistence and delivery guarantees.
3
4
## Capabilities
5
6
### BroadcasterCache Interface
7
8
Core interface for caching broadcast messages with support for storage, retrieval, and cleanup operations.
9
10
```java { .api }
11
/**
12
* Cache broadcast messages for replay and recovery
13
*/
14
public interface BroadcasterCache {
15
/**
16
* Add message to cache for specific broadcaster and resource
17
* @param broadcasterId broadcaster identifier
18
* @param uuid resource UUID
19
* @param message BroadcastMessage to cache
20
*/
21
public void addToCache(String broadcasterId, String uuid, BroadcastMessage message);
22
23
/**
24
* Retrieve cached messages for resource
25
* @param broadcasterId broadcaster identifier
26
* @param uuid resource UUID
27
* @return List of cached messages
28
*/
29
public List<Object> retrieveFromCache(String broadcasterId, String uuid);
30
31
/**
32
* Clear cache for specific resource
33
* @param broadcasterId broadcaster identifier
34
* @param uuid resource UUID
35
*/
36
public void clearCache(String broadcasterId, String uuid);
37
38
/**
39
* Clear all cached messages for broadcaster
40
* @param broadcasterId broadcaster identifier
41
*/
42
public void clearCache(String broadcasterId);
43
44
/**
45
* Set cache inspector for message examination
46
* @param inspector BroadcasterCacheInspector instance
47
* @return this cache for chaining
48
*/
49
public BroadcasterCache inspector(BroadcasterCacheInspector inspector);
50
51
/**
52
* Start cache background tasks and initialization
53
*/
54
public void start();
55
56
/**
57
* Stop cache and cleanup resources
58
*/
59
public void stop();
60
}
61
```
62
63
**Usage Examples:**
64
65
```java
66
// Configure broadcaster with cache
67
Broadcaster broadcaster = BroadcasterFactory.getDefault().lookup("/chat", true);
68
BroadcasterCache cache = new UUIDBroadcasterCache();
69
broadcaster.getBroadcasterConfig().setBroadcasterCache(cache);
70
71
// Messages will automatically be cached for replay
72
broadcaster.broadcast("Welcome message");
73
74
// Retrieve cached messages for specific resource
75
List<Object> cachedMessages = cache.retrieveFromCache("/chat", resourceUuid);
76
```
77
78
### Cache Implementations
79
80
Pre-built cache implementations for different storage and retrieval strategies.
81
82
```java { .api }
83
/**
84
* Default in-memory cache implementation
85
*/
86
public class DefaultBroadcasterCache extends AbstractBroadcasterCache {
87
/**
88
* Create default cache with unlimited size
89
*/
90
public DefaultBroadcasterCache();
91
92
/**
93
* Set maximum cache size per broadcaster
94
* @param maxCacheSize maximum number of messages to cache
95
* @return this cache
96
*/
97
public DefaultBroadcasterCache setMaxCacheSize(int maxCacheSize);
98
99
/**
100
* Set cache expiration time
101
* @param expireTime expiration time value
102
* @param unit TimeUnit for expiration
103
* @return this cache
104
*/
105
public DefaultBroadcasterCache setExpireTime(long expireTime, TimeUnit unit);
106
}
107
108
/**
109
* UUID-based cache for message tracking
110
*/
111
public class UUIDBroadcasterCache extends AbstractBroadcasterCache {
112
/**
113
* Create UUID cache with default settings
114
*/
115
public UUIDBroadcasterCache();
116
117
/**
118
* Set whether to cache per resource UUID
119
* @param cachePerUUID true to cache per UUID
120
* @return this cache
121
*/
122
public UUIDBroadcasterCache setCachePerUUID(boolean cachePerUUID);
123
}
124
125
/**
126
* Session-aware cache implementation
127
*/
128
public class SessionBroadcasterCache extends AbstractBroadcasterCache {
129
/**
130
* Create session-based cache
131
*/
132
public SessionBroadcasterCache();
133
134
/**
135
* Set session timeout for cache expiration
136
* @param sessionTimeout timeout in milliseconds
137
* @return this cache
138
*/
139
public SessionBroadcasterCache setSessionTimeout(long sessionTimeout);
140
}
141
142
/**
143
* Base implementation for BroadcasterCache
144
*/
145
public abstract class AbstractBroadcasterCache implements BroadcasterCache {
146
/**
147
* Add cache listener for monitoring cache events
148
* @param listener BroadcasterCacheListener instance
149
* @return this cache
150
*/
151
public AbstractBroadcasterCache addBroadcasterCacheListener(BroadcasterCacheListener listener);
152
153
/**
154
* Remove cache listener
155
* @param listener BroadcasterCacheListener to remove
156
* @return this cache
157
*/
158
public AbstractBroadcasterCache removeBroadcasterCacheListener(BroadcasterCacheListener listener);
159
160
/**
161
* Set cleanup frequency for expired entries
162
* @param cleanupFrequency cleanup interval
163
* @param unit TimeUnit for interval
164
* @return this cache
165
*/
166
public AbstractBroadcasterCache setCleanupFrequency(long cleanupFrequency, TimeUnit unit);
167
}
168
```
169
170
**Usage Examples:**
171
172
```java
173
// Default in-memory cache with size limit
174
DefaultBroadcasterCache defaultCache = new DefaultBroadcasterCache()
175
.setMaxCacheSize(1000)
176
.setExpireTime(1, TimeUnit.HOURS);
177
178
// UUID-based cache for per-client message tracking
179
UUIDBroadcasterCache uuidCache = new UUIDBroadcasterCache()
180
.setCachePerUUID(true);
181
182
// Session-aware cache
183
SessionBroadcasterCache sessionCache = new SessionBroadcasterCache()
184
.setSessionTimeout(30 * 60 * 1000); // 30 minutes
185
186
// Configure broadcaster with cache
187
broadcaster.getBroadcasterConfig().setBroadcasterCache(defaultCache);
188
```
189
190
### Cache Data Structures
191
192
Data classes for representing cached messages and cache entries.
193
194
```java { .api }
195
/**
196
* Wrapper for cached broadcast messages
197
*/
198
public class CacheMessage {
199
/**
200
* Get the cached message content
201
* @return message object
202
*/
203
public Object getMessage();
204
205
/**
206
* Get message creation timestamp
207
* @return timestamp in milliseconds
208
*/
209
public long getCreateTime();
210
211
/**
212
* Get message expiration time
213
* @return expiration timestamp or -1 if no expiration
214
*/
215
public long getExpireTime();
216
217
/**
218
* Check if message has expired
219
* @return true if expired
220
*/
221
public boolean isExpired();
222
223
/**
224
* Get broadcaster ID that created this message
225
* @return broadcaster identifier
226
*/
227
public String getBroadcasterId();
228
}
229
230
/**
231
* Message data structure for broadcasting
232
*/
233
public class BroadcastMessage {
234
/**
235
* Get message content
236
* @return message object
237
*/
238
public Object getMessage();
239
240
/**
241
* Get target AtmosphereResource UUID
242
* @return resource UUID or null for broadcast to all
243
*/
244
public String getUuid();
245
246
/**
247
* Check if message is for specific resource
248
* @return true if targeted to specific resource
249
*/
250
public boolean isTargeted();
251
252
/**
253
* Create broadcast message
254
* @param message content to broadcast
255
* @return BroadcastMessage instance
256
*/
257
public static BroadcastMessage createBroadcastMessage(Object message);
258
259
/**
260
* Create targeted broadcast message
261
* @param message content to broadcast
262
* @param uuid target resource UUID
263
* @return BroadcastMessage instance
264
*/
265
public static BroadcastMessage createBroadcastMessage(Object message, String uuid);
266
}
267
```
268
269
### Cache Inspection and Monitoring
270
271
Interfaces for inspecting cached messages and monitoring cache operations.
272
273
```java { .api }
274
/**
275
* Inspect and modify cached messages
276
*/
277
public interface BroadcasterCacheInspector {
278
/**
279
* Inspect cache entry and determine action
280
* @param entry cache entry to inspect
281
* @return CacheInspector action (CONTINUE, SKIP, REMOVE)
282
*/
283
public CacheInspector inspect(CacheMessage entry);
284
285
/**
286
* Configure inspector with AtmosphereConfig
287
* @param config AtmosphereConfig instance
288
*/
289
public void configure(AtmosphereConfig config);
290
}
291
292
/**
293
* Cache inspection result
294
*/
295
public enum CacheInspector {
296
CONTINUE, // Continue processing entry
297
SKIP, // Skip this entry
298
REMOVE // Remove entry from cache
299
}
300
301
/**
302
* Listen to cache events for monitoring
303
*/
304
public interface BroadcasterCacheListener {
305
/**
306
* Called when message is added to cache
307
* @param broadcasterId broadcaster identifier
308
* @param uuid resource UUID
309
* @param message cached message
310
*/
311
public void onAddCache(String broadcasterId, String uuid, Object message);
312
313
/**
314
* Called when message is removed from cache
315
* @param broadcasterId broadcaster identifier
316
* @param uuid resource UUID
317
* @param message removed message
318
*/
319
public void onRemoveCache(String broadcasterId, String uuid, Object message);
320
321
/**
322
* Called when cache is cleared
323
* @param broadcasterId broadcaster identifier
324
*/
325
public void onClearCache(String broadcasterId);
326
}
327
```
328
329
**Usage Examples:**
330
331
```java
332
// Custom cache inspector for message filtering
333
public class MessageFilterInspector implements BroadcasterCacheInspector {
334
@Override
335
public CacheInspector inspect(CacheMessage entry) {
336
// Remove expired messages
337
if (entry.isExpired()) {
338
return CacheInspector.REMOVE;
339
}
340
341
// Skip messages older than 1 hour
342
long oneHourAgo = System.currentTimeMillis() - (60 * 60 * 1000);
343
if (entry.getCreateTime() < oneHourAgo) {
344
return CacheInspector.SKIP;
345
}
346
347
return CacheInspector.CONTINUE;
348
}
349
}
350
351
// Cache listener for monitoring
352
public class CacheMonitorListener implements BroadcasterCacheListener {
353
@Override
354
public void onAddCache(String broadcasterId, String uuid, Object message) {
355
System.out.println("Cached message for " + broadcasterId + ":" + uuid);
356
}
357
358
@Override
359
public void onRemoveCache(String broadcasterId, String uuid, Object message) {
360
System.out.println("Removed cached message for " + broadcasterId + ":" + uuid);
361
}
362
}
363
364
// Configure cache with inspector and listener
365
DefaultBroadcasterCache cache = new DefaultBroadcasterCache()
366
.inspector(new MessageFilterInspector())
367
.addBroadcasterCacheListener(new CacheMonitorListener());
368
```
369
370
### Cache Configuration
371
372
Configuration utilities and service annotations for cache setup.
373
374
```java { .api }
375
/**
376
* Register BroadcasterCache implementations
377
*/
378
@Target({ElementType.TYPE})
379
@Retention(RetentionPolicy.RUNTIME)
380
public @interface BroadcasterCacheService {
381
}
382
383
/**
384
* Register BroadcasterCacheInspector implementations
385
*/
386
@Target({ElementType.TYPE})
387
@Retention(RetentionPolicy.RUNTIME)
388
public @interface BroadcasterCacheInspectorService {
389
}
390
391
/**
392
* Register BroadcasterCacheListener implementations
393
*/
394
@Target({ElementType.TYPE})
395
@Retention(RetentionPolicy.RUNTIME)
396
public @interface BroadcasterCacheListenerService {
397
}
398
```
399
400
**Usage Examples:**
401
402
```java
403
@BroadcasterCacheService
404
public class RedisBroadcasterCache implements BroadcasterCache {
405
private RedisTemplate<String, Object> redisTemplate;
406
407
@Override
408
public void addToCache(String broadcasterId, String uuid, BroadcastMessage message) {
409
String key = broadcasterId + ":" + uuid;
410
redisTemplate.opsForList().rightPush(key, message);
411
412
// Set expiration
413
redisTemplate.expire(key, 1, TimeUnit.HOURS);
414
}
415
416
@Override
417
public List<Object> retrieveFromCache(String broadcasterId, String uuid) {
418
String key = broadcasterId + ":" + uuid;
419
return redisTemplate.opsForList().range(key, 0, -1);
420
}
421
422
@Override
423
public void clearCache(String broadcasterId, String uuid) {
424
String key = broadcasterId + ":" + uuid;
425
redisTemplate.delete(key);
426
}
427
}
428
429
@BroadcasterCacheInspectorService
430
public class ExpiryInspector implements BroadcasterCacheInspector {
431
private long maxAge = TimeUnit.HOURS.toMillis(2); // 2 hours
432
433
@Override
434
public CacheInspector inspect(CacheMessage entry) {
435
long age = System.currentTimeMillis() - entry.getCreateTime();
436
return age > maxAge ? CacheInspector.REMOVE : CacheInspector.CONTINUE;
437
}
438
}
439
```
440
441
### Advanced Cache Usage
442
443
Complete example showing cache setup, configuration, and usage in a real-time application.
444
445
```java
446
@ManagedService(path = "/api/notifications/{userId}")
447
public class NotificationService {
448
private BroadcasterCache cache;
449
450
@Ready
451
public void onReady(@PathParam("userId") String userId,
452
AtmosphereResource resource) {
453
// Configure cache for this broadcaster
454
if (cache == null) {
455
cache = new UUIDBroadcasterCache()
456
.setCachePerUUID(true)
457
.setCleanupFrequency(5, TimeUnit.MINUTES)
458
.inspector(new NotificationCacheInspector())
459
.addBroadcasterCacheListener(new NotificationCacheListener());
460
461
resource.getBroadcaster().getBroadcasterConfig()
462
.setBroadcasterCache(cache);
463
}
464
465
// Replay cached messages for reconnecting user
466
String resourceUuid = resource.uuid();
467
List<Object> cachedMessages = cache.retrieveFromCache(
468
resource.getBroadcaster().getID(), resourceUuid);
469
470
for (Object message : cachedMessages) {
471
resource.write(message);
472
}
473
474
resource.suspend();
475
}
476
477
@Message
478
public void onMessage(@PathParam("userId") String userId,
479
String message) {
480
// Broadcast notification (will be automatically cached)
481
NotificationMessage notification = new NotificationMessage(
482
userId, message, System.currentTimeMillis());
483
484
// Send to all users subscribed to this user's notifications
485
BroadcasterFactory.getDefault()
486
.lookup("/api/notifications/" + userId, true)
487
.broadcast(notification);
488
}
489
490
// Custom cache inspector for notifications
491
private static class NotificationCacheInspector implements BroadcasterCacheInspector {
492
@Override
493
public CacheInspector inspect(CacheMessage entry) {
494
// Keep only last 100 notifications per user
495
// Remove messages older than 24 hours
496
long dayAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
497
498
if (entry.getCreateTime() < dayAgo) {
499
return CacheInspector.REMOVE;
500
}
501
502
return CacheInspector.CONTINUE;
503
}
504
}
505
506
// Cache listener for notification monitoring
507
private static class NotificationCacheListener implements BroadcasterCacheListener {
508
@Override
509
public void onAddCache(String broadcasterId, String uuid, Object message) {
510
System.out.println("Cached notification for user: " + extractUserId(broadcasterId));
511
}
512
513
@Override
514
public void onRemoveCache(String broadcasterId, String uuid, Object message) {
515
System.out.println("Removed cached notification for user: " + extractUserId(broadcasterId));
516
}
517
518
private String extractUserId(String broadcasterId) {
519
// Extract user ID from broadcaster path like "/api/notifications/user123"
520
return broadcasterId.substring(broadcasterId.lastIndexOf('/') + 1);
521
}
522
}
523
524
// Notification message class
525
private static class NotificationMessage {
526
private final String userId;
527
private final String message;
528
private final long timestamp;
529
530
public NotificationMessage(String userId, String message, long timestamp) {
531
this.userId = userId;
532
this.message = message;
533
this.timestamp = timestamp;
534
}
535
536
// Getters...
537
public String getUserId() { return userId; }
538
public String getMessage() { return message; }
539
public long getTimestamp() { return timestamp; }
540
}
541
}
542
```