0
# Change Streams
1
2
Real-time change monitoring at cluster, database, and collection levels with resume token support, filtering capabilities, and comprehensive change event information.
3
4
## Capabilities
5
6
### ChangeStreamIterable Interface
7
8
Primary interface for configuring and consuming change streams with filtering and resume capabilities.
9
10
```java { .api }
11
/**
12
* Interface for configuring and consuming change streams
13
*/
14
public interface ChangeStreamIterable<TResult> extends MongoIterable<ChangeStreamDocument<TResult>> {
15
/**
16
* Returns a change stream cursor for iterating over change events
17
* @return MongoChangeStreamCursor for consuming change events
18
*/
19
MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor();
20
21
/**
22
* Sets the full document option for change events
23
* @param fullDocument option for including full documents in change events
24
* @return ChangeStreamIterable with full document option
25
*/
26
ChangeStreamIterable<TResult> fullDocument(FullDocument fullDocument);
27
28
/**
29
* Sets the full document before change option
30
* @param fullDocumentBeforeChange option for including pre-change documents
31
* @return ChangeStreamIterable with before change document option
32
*/
33
ChangeStreamIterable<TResult> fullDocumentBeforeChange(FullDocumentBeforeChange fullDocumentBeforeChange);
34
35
/**
36
* Sets the resume token to resume change stream from a specific point
37
* @param resumeToken the resume token as BsonDocument
38
* @return ChangeStreamIterable that resumes from the specified token
39
*/
40
ChangeStreamIterable<TResult> resumeAfter(BsonDocument resumeToken);
41
42
/**
43
* Sets the start after token to begin change stream after a specific event
44
* @param startAfter the start after token as BsonDocument
45
* @return ChangeStreamIterable that starts after the specified token
46
*/
47
ChangeStreamIterable<TResult> startAfter(BsonDocument startAfter);
48
49
/**
50
* Sets the cluster time to start the change stream from
51
* @param startAtOperationTime the cluster time to start from
52
* @return ChangeStreamIterable that starts at the specified time
53
*/
54
ChangeStreamIterable<TResult> startAtOperationTime(BsonTimestamp startAtOperationTime);
55
56
/**
57
* Sets the maximum time to wait for changes when using await
58
* @param maxAwaitTime the maximum await time
59
* @param timeUnit the time unit
60
* @return ChangeStreamIterable with await time limit
61
*/
62
ChangeStreamIterable<TResult> maxAwaitTime(long maxAwaitTime, TimeUnit timeUnit);
63
64
/**
65
* Sets collation for string operations in the change stream pipeline
66
* @param collation the collation specification
67
* @return ChangeStreamIterable with applied collation
68
*/
69
ChangeStreamIterable<TResult> collation(Collation collation);
70
71
/**
72
* Sets the batch size for change stream cursor
73
* @param batchSize the batch size
74
* @return ChangeStreamIterable with specified batch size
75
*/
76
ChangeStreamIterable<TResult> batchSize(int batchSize);
77
78
/**
79
* Adds a comment to the change stream operation
80
* @param comment the comment string
81
* @return ChangeStreamIterable with comment
82
*/
83
ChangeStreamIterable<TResult> comment(String comment);
84
85
/**
86
* Enables showing expanded events in change streams
87
* @param showExpandedEvents true to show expanded events
88
* @return ChangeStreamIterable with expanded events option
89
*/
90
ChangeStreamIterable<TResult> showExpandedEvents(Boolean showExpandedEvents);
91
}
92
```
93
94
**Usage Examples:**
95
96
```java
97
import com.mongodb.client.ChangeStreamIterable;
98
import com.mongodb.client.MongoChangeStreamCursor;
99
import com.mongodb.client.model.changestream.ChangeStreamDocument;
100
import com.mongodb.client.model.changestream.FullDocument;
101
102
// Basic change stream monitoring
103
ChangeStreamIterable<Document> changeStream = collection.watch();
104
105
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor()) {
106
while (cursor.hasNext()) {
107
ChangeStreamDocument<Document> change = cursor.next();
108
System.out.println("Operation: " + change.getOperationType());
109
System.out.println("Document: " + change.getFullDocument());
110
System.out.println("Resume token: " + change.getResumeToken());
111
}
112
}
113
114
// Change stream with full document lookup
115
ChangeStreamIterable<Document> fullDocStream = collection.watch()
116
.fullDocument(FullDocument.UPDATE_LOOKUP)
117
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
118
119
// Filtered change stream
120
List<Bson> pipeline = Arrays.asList(
121
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update", "delete")))
122
);
123
124
ChangeStreamIterable<Document> filteredStream = collection.watch(pipeline)
125
.maxAwaitTime(5, TimeUnit.SECONDS)
126
.batchSize(10);
127
```
128
129
### MongoChangeStreamCursor Interface
130
131
Enhanced cursor interface specifically for change streams with resume token access.
132
133
```java { .api }
134
/**
135
* Cursor interface for change streams with resume token support
136
*/
137
public interface MongoChangeStreamCursor<TResult> extends MongoCursor<TResult> {
138
/**
139
* Gets the current resume token for the change stream
140
* @return BsonDocument containing the resume token
141
*/
142
BsonDocument getResumeToken();
143
144
/**
145
* Returns the next change event without blocking
146
* @return next change event or null if none available
147
*/
148
TResult tryNext();
149
}
150
```
151
152
**Usage Examples:**
153
154
```java
155
// Robust change stream with resume capability
156
BsonDocument resumeToken = null;
157
158
while (true) {
159
try {
160
ChangeStreamIterable<Document> stream = collection.watch();
161
162
// Resume from last known position if available
163
if (resumeToken != null) {
164
stream = stream.resumeAfter(resumeToken);
165
}
166
167
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {
168
while (cursor.hasNext()) {
169
ChangeStreamDocument<Document> change = cursor.next();
170
171
// Process change event
172
processChangeEvent(change);
173
174
// Store resume token for fault tolerance
175
resumeToken = cursor.getResumeToken();
176
persistResumeToken(resumeToken);
177
}
178
}
179
180
} catch (MongoException e) {
181
System.err.println("Change stream error: " + e.getMessage());
182
// Wait before reconnecting
183
Thread.sleep(1000);
184
}
185
}
186
```
187
188
### Change Stream Levels
189
190
Change streams can be created at different levels for various monitoring scopes.
191
192
```java { .api }
193
// Cluster-level change streams (monitor all databases)
194
ChangeStreamIterable<Document> clusterChanges = mongoClient.watch();
195
196
// Database-level change streams (monitor all collections in database)
197
ChangeStreamIterable<Document> databaseChanges = database.watch();
198
199
// Collection-level change streams (monitor specific collection)
200
ChangeStreamIterable<Document> collectionChanges = collection.watch();
201
```
202
203
**Usage Examples:**
204
205
```java
206
// Monitor all database operations
207
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = mongoClient.watch().cursor()) {
208
while (cursor.hasNext()) {
209
ChangeStreamDocument<Document> change = cursor.next();
210
MongoNamespace namespace = change.getNamespace();
211
212
System.out.println("Database: " + namespace.getDatabaseName());
213
System.out.println("Collection: " + namespace.getCollectionName());
214
System.out.println("Operation: " + change.getOperationType());
215
}
216
}
217
218
// Monitor specific database with filtering
219
List<Bson> dbPipeline = Arrays.asList(
220
Aggregates.match(Filters.and(
221
Filters.in("operationType", Arrays.asList("insert", "update", "delete")),
222
Filters.not(Filters.regex("ns.coll", "^system\\.")) // Exclude system collections
223
))
224
);
225
226
ChangeStreamIterable<Document> filteredDbStream = database.watch(dbPipeline);
227
228
// Monitor with session for causally consistent reads
229
try (ClientSession session = mongoClient.startSession()) {
230
ChangeStreamIterable<Document> sessionStream = collection.watch(session);
231
// Process changes within session context
232
}
233
```
234
235
### Change Event Processing
236
237
Comprehensive handling of different types of change events and their data.
238
239
```java { .api }
240
private void processChangeEvent(ChangeStreamDocument<Document> change) {
241
OperationType operationType = change.getOperationType();
242
243
switch (operationType) {
244
case INSERT:
245
handleInsert(change);
246
break;
247
case UPDATE:
248
handleUpdate(change);
249
break;
250
case REPLACE:
251
handleReplace(change);
252
break;
253
case DELETE:
254
handleDelete(change);
255
break;
256
case INVALIDATE:
257
handleInvalidate(change);
258
break;
259
case DROP:
260
handleDrop(change);
261
break;
262
case DROP_DATABASE:
263
handleDropDatabase(change);
264
break;
265
case RENAME:
266
handleRename(change);
267
break;
268
default:
269
System.out.println("Unknown operation type: " + operationType);
270
}
271
}
272
273
private void handleInsert(ChangeStreamDocument<Document> change) {
274
Document newDocument = change.getFullDocument();
275
BsonDocument documentKey = change.getDocumentKey();
276
277
System.out.println("New document inserted:");
278
System.out.println("ID: " + documentKey.get("_id"));
279
System.out.println("Document: " + newDocument.toJson());
280
281
// Trigger post-insert processing
282
onDocumentInserted(newDocument);
283
}
284
285
private void handleUpdate(ChangeStreamDocument<Document> change) {
286
BsonDocument documentKey = change.getDocumentKey();
287
UpdateDescription updateDescription = change.getUpdateDescription();
288
Document fullDocumentAfter = change.getFullDocument(); // If UPDATE_LOOKUP enabled
289
Document fullDocumentBefore = change.getFullDocumentBeforeChange(); // If enabled
290
291
System.out.println("Document updated:");
292
System.out.println("ID: " + documentKey.get("_id"));
293
294
if (updateDescription != null) {
295
System.out.println("Updated fields: " + updateDescription.getUpdatedFields());
296
System.out.println("Removed fields: " + updateDescription.getRemovedFields());
297
System.out.println("Truncated arrays: " + updateDescription.getTruncatedArrays());
298
}
299
300
// Compare before and after if available
301
if (fullDocumentBefore != null && fullDocumentAfter != null) {
302
analyzeChanges(fullDocumentBefore, fullDocumentAfter);
303
}
304
305
// Trigger post-update processing
306
onDocumentUpdated(documentKey, updateDescription);
307
}
308
309
private void handleDelete(ChangeStreamDocument<Document> change) {
310
BsonDocument documentKey = change.getDocumentKey();
311
Document fullDocumentBefore = change.getFullDocumentBeforeChange();
312
313
System.out.println("Document deleted:");
314
System.out.println("ID: " + documentKey.get("_id"));
315
316
if (fullDocumentBefore != null) {
317
System.out.println("Deleted document: " + fullDocumentBefore.toJson());
318
}
319
320
// Trigger post-delete processing
321
onDocumentDeleted(documentKey);
322
}
323
```
324
325
### Advanced Change Stream Patterns
326
327
Complex change stream patterns for real-world applications.
328
329
```java { .api }
330
// Real-time cache invalidation
331
private void setupCacheInvalidationStream() {
332
List<Bson> pipeline = Arrays.asList(
333
Aggregates.match(Filters.or(
334
Filters.eq("operationType", "update"),
335
Filters.eq("operationType", "delete"),
336
Filters.eq("operationType", "replace")
337
))
338
);
339
340
ChangeStreamIterable<Document> cacheStream = collection.watch(pipeline)
341
.fullDocument(FullDocument.UPDATE_LOOKUP);
342
343
// Run in background thread
344
CompletableFuture.runAsync(() -> {
345
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = cacheStream.cursor()) {
346
while (!Thread.currentThread().isInterrupted()) {
347
ChangeStreamDocument<Document> change = cursor.tryNext();
348
if (change != null) {
349
String cacheKey = extractCacheKey(change.getDocumentKey());
350
cacheManager.invalidate(cacheKey);
351
System.out.println("Cache invalidated for key: " + cacheKey);
352
}
353
Thread.sleep(100); // Prevent tight loop
354
}
355
} catch (InterruptedException e) {
356
Thread.currentThread().interrupt();
357
}
358
});
359
}
360
361
// Data synchronization between systems
362
private void setupDataSyncStream() {
363
// Start from current time to avoid processing historical data
364
BsonTimestamp now = new BsonTimestamp((int) (System.currentTimeMillis() / 1000), 0);
365
366
ChangeStreamIterable<Document> syncStream = collection.watch()
367
.startAtOperationTime(now)
368
.fullDocument(FullDocument.UPDATE_LOOKUP)
369
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
370
371
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = syncStream.cursor()) {
372
while (cursor.hasNext()) {
373
ChangeStreamDocument<Document> change = cursor.next();
374
375
// Sync to external system
376
syncToExternalSystem(change);
377
378
// Update high-water mark
379
updateSyncPosition(cursor.getResumeToken());
380
}
381
}
382
}
383
384
// Audit trail generation
385
private void setupAuditTrailStream() {
386
List<Bson> auditPipeline = Arrays.asList(
387
Aggregates.match(Filters.in("operationType",
388
Arrays.asList("insert", "update", "delete", "replace"))),
389
Aggregates.project(Projections.fields(
390
Projections.include("_id", "operationType", "ns", "documentKey"),
391
Projections.computed("timestamp", new Document("$toDate", "$clusterTime")),
392
Projections.computed("user", "$$USER"), // If authentication enabled
393
Projections.excludeId()
394
))
395
);
396
397
ChangeStreamIterable<Document> auditStream = collection.watch(auditPipeline);
398
399
MongoCollection<Document> auditCollection = database.getCollection("audit_log");
400
401
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = auditStream.cursor()) {
402
while (cursor.hasNext()) {
403
ChangeStreamDocument<Document> change = cursor.next();
404
405
Document auditEntry = new Document()
406
.append("changeId", change.getResumeToken())
407
.append("operation", change.getOperationType().getValue())
408
.append("collection", change.getNamespace().getCollectionName())
409
.append("documentId", change.getDocumentKey())
410
.append("timestamp", new Date())
411
.append("clusterTime", change.getClusterTime());
412
413
auditCollection.insertOne(auditEntry);
414
}
415
}
416
}
417
418
// Horizontal scaling with change stream routing
419
private void setupShardedChangeStreamProcessing() {
420
// Distribute processing across multiple consumers based on document ID
421
int consumerCount = 4;
422
int consumerId = 0; // This consumer's ID (0-3)
423
424
List<Bson> shardingPipeline = Arrays.asList(
425
Aggregates.match(Filters.expr(
426
new Document("$eq", Arrays.asList(
427
new Document("$mod", Arrays.asList("$documentKey._id", consumerCount)),
428
consumerId
429
))
430
))
431
);
432
433
ChangeStreamIterable<Document> shardedStream = collection.watch(shardingPipeline);
434
435
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = shardedStream.cursor()) {
436
while (cursor.hasNext()) {
437
ChangeStreamDocument<Document> change = cursor.next();
438
processChangeForShard(change, consumerId);
439
}
440
}
441
}
442
```
443
444
### Error Handling and Resilience
445
446
Best practices for handling change stream errors and maintaining resilience.
447
448
```java { .api }
449
private void robustChangeStreamProcessing() {
450
BsonDocument resumeToken = loadLastResumeToken();
451
int reconnectAttempts = 0;
452
final int maxReconnectAttempts = 10;
453
454
while (reconnectAttempts < maxReconnectAttempts) {
455
try {
456
ChangeStreamIterable<Document> stream = collection.watch();
457
458
if (resumeToken != null) {
459
stream = stream.resumeAfter(resumeToken);
460
}
461
462
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {
463
reconnectAttempts = 0; // Reset on successful connection
464
465
while (cursor.hasNext()) {
466
try {
467
ChangeStreamDocument<Document> change = cursor.next();
468
469
// Process change with retry logic
470
processChangeWithRetry(change);
471
472
// Update resume token
473
resumeToken = cursor.getResumeToken();
474
saveResumeToken(resumeToken);
475
476
} catch (Exception e) {
477
System.err.println("Error processing change event: " + e.getMessage());
478
// Continue processing other events
479
}
480
}
481
}
482
483
} catch (MongoChangeStreamException e) {
484
if (e.getErrorCode() == 40585) { // Resume token not found
485
System.warn("Resume token expired, starting from current time");
486
resumeToken = null; // Start fresh
487
} else {
488
System.err.println("Change stream error: " + e.getMessage());
489
}
490
491
reconnectAttempts++;
492
if (reconnectAttempts < maxReconnectAttempts) {
493
try {
494
Thread.sleep(Math.min(1000 * reconnectAttempts, 30000)); // Exponential backoff
495
} catch (InterruptedException ie) {
496
Thread.currentThread().interrupt();
497
break;
498
}
499
}
500
501
} catch (MongoException e) {
502
System.err.println("MongoDB error: " + e.getMessage());
503
reconnectAttempts++;
504
505
if (reconnectAttempts < maxReconnectAttempts) {
506
try {
507
Thread.sleep(5000);
508
} catch (InterruptedException ie) {
509
Thread.currentThread().interrupt();
510
break;
511
}
512
}
513
}
514
}
515
516
if (reconnectAttempts >= maxReconnectAttempts) {
517
System.err.println("Max reconnect attempts exceeded, stopping change stream processing");
518
}
519
}
520
521
private void processChangeWithRetry(ChangeStreamDocument<Document> change) {
522
int retryCount = 0;
523
final int maxRetries = 3;
524
525
while (retryCount < maxRetries) {
526
try {
527
processChangeEvent(change);
528
return; // Success
529
530
} catch (Exception e) {
531
retryCount++;
532
if (retryCount >= maxRetries) {
533
// Send to dead letter queue or log as failed
534
logFailedChangeEvent(change, e);
535
throw new RuntimeException("Failed to process change event after " + maxRetries + " attempts", e);
536
}
537
538
try {
539
Thread.sleep(1000 * retryCount); // Linear backoff
540
} catch (InterruptedException ie) {
541
Thread.currentThread().interrupt();
542
throw new RuntimeException("Interrupted during retry", ie);
543
}
544
}
545
}
546
}
547
```