The MongoDB Synchronous Driver for Java providing blocking I/O patterns for database operations
—
Real-time change monitoring at cluster, database, and collection levels with resume token support, filtering capabilities, and comprehensive change event information.
Primary interface for configuring and consuming change streams with filtering and resume capabilities.
/**
* Interface for configuring and consuming change streams
*/
public interface ChangeStreamIterable<TResult> extends MongoIterable<ChangeStreamDocument<TResult>> {
/**
* Returns a change stream cursor for iterating over change events
* @return MongoChangeStreamCursor for consuming change events
*/
MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor();
/**
* Sets the full document option for change events
* @param fullDocument option for including full documents in change events
* @return ChangeStreamIterable with full document option
*/
ChangeStreamIterable<TResult> fullDocument(FullDocument fullDocument);
/**
* Sets the full document before change option
* @param fullDocumentBeforeChange option for including pre-change documents
* @return ChangeStreamIterable with before change document option
*/
ChangeStreamIterable<TResult> fullDocumentBeforeChange(FullDocumentBeforeChange fullDocumentBeforeChange);
/**
* Sets the resume token to resume change stream from a specific point
* @param resumeToken the resume token as BsonDocument
* @return ChangeStreamIterable that resumes from the specified token
*/
ChangeStreamIterable<TResult> resumeAfter(BsonDocument resumeToken);
/**
* Sets the start after token to begin change stream after a specific event
* @param startAfter the start after token as BsonDocument
* @return ChangeStreamIterable that starts after the specified token
*/
ChangeStreamIterable<TResult> startAfter(BsonDocument startAfter);
/**
* Sets the cluster time to start the change stream from
* @param startAtOperationTime the cluster time to start from
* @return ChangeStreamIterable that starts at the specified time
*/
ChangeStreamIterable<TResult> startAtOperationTime(BsonTimestamp startAtOperationTime);
/**
* Sets the maximum time to wait for changes when using await
* @param maxAwaitTime the maximum await time
* @param timeUnit the time unit
* @return ChangeStreamIterable with await time limit
*/
ChangeStreamIterable<TResult> maxAwaitTime(long maxAwaitTime, TimeUnit timeUnit);
/**
* Sets collation for string operations in the change stream pipeline
* @param collation the collation specification
* @return ChangeStreamIterable with applied collation
*/
ChangeStreamIterable<TResult> collation(Collation collation);
/**
* Sets the batch size for change stream cursor
* @param batchSize the batch size
* @return ChangeStreamIterable with specified batch size
*/
ChangeStreamIterable<TResult> batchSize(int batchSize);
/**
* Adds a comment to the change stream operation
* @param comment the comment string
* @return ChangeStreamIterable with comment
*/
ChangeStreamIterable<TResult> comment(String comment);
/**
* Enables showing expanded events in change streams
* @param showExpandedEvents true to show expanded events
* @return ChangeStreamIterable with expanded events option
*/
ChangeStreamIterable<TResult> showExpandedEvents(Boolean showExpandedEvents);
}Usage Examples:
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
// Basic change stream monitoring
ChangeStreamIterable<Document> changeStream = collection.watch();
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
System.out.println("Operation: " + change.getOperationType());
System.out.println("Document: " + change.getFullDocument());
System.out.println("Resume token: " + change.getResumeToken());
}
}
// Change stream with full document lookup
ChangeStreamIterable<Document> fullDocStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
// Filtered change stream
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update", "delete")))
);
ChangeStreamIterable<Document> filteredStream = collection.watch(pipeline)
.maxAwaitTime(5, TimeUnit.SECONDS)
.batchSize(10);Enhanced cursor interface specifically for change streams with resume token access.
/**
* Cursor interface for change streams with resume token support
*/
public interface MongoChangeStreamCursor<TResult> extends MongoCursor<TResult> {
/**
* Gets the current resume token for the change stream
* @return BsonDocument containing the resume token
*/
BsonDocument getResumeToken();
/**
* Returns the next change event without blocking
* @return next change event or null if none available
*/
TResult tryNext();
}Usage Examples:
// Robust change stream with resume capability
BsonDocument resumeToken = null;
while (true) {
try {
ChangeStreamIterable<Document> stream = collection.watch();
// Resume from last known position if available
if (resumeToken != null) {
stream = stream.resumeAfter(resumeToken);
}
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
// Process change event
processChangeEvent(change);
// Store resume token for fault tolerance
resumeToken = cursor.getResumeToken();
persistResumeToken(resumeToken);
}
}
} catch (MongoException e) {
System.err.println("Change stream error: " + e.getMessage());
// Wait before reconnecting
Thread.sleep(1000);
}
}Change streams can be created at different levels for various monitoring scopes.
// Cluster-level change streams (monitor all databases)
ChangeStreamIterable<Document> clusterChanges = mongoClient.watch();
// Database-level change streams (monitor all collections in database)
ChangeStreamIterable<Document> databaseChanges = database.watch();
// Collection-level change streams (monitor specific collection)
ChangeStreamIterable<Document> collectionChanges = collection.watch();Usage Examples:
// Monitor all database operations
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = mongoClient.watch().cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
MongoNamespace namespace = change.getNamespace();
System.out.println("Database: " + namespace.getDatabaseName());
System.out.println("Collection: " + namespace.getCollectionName());
System.out.println("Operation: " + change.getOperationType());
}
}
// Monitor specific database with filtering
List<Bson> dbPipeline = Arrays.asList(
Aggregates.match(Filters.and(
Filters.in("operationType", Arrays.asList("insert", "update", "delete")),
Filters.not(Filters.regex("ns.coll", "^system\\.")) // Exclude system collections
))
);
ChangeStreamIterable<Document> filteredDbStream = database.watch(dbPipeline);
// Monitor with session for causally consistent reads
try (ClientSession session = mongoClient.startSession()) {
ChangeStreamIterable<Document> sessionStream = collection.watch(session);
// Process changes within session context
}Comprehensive handling of different types of change events and their data.
private void processChangeEvent(ChangeStreamDocument<Document> change) {
OperationType operationType = change.getOperationType();
switch (operationType) {
case INSERT:
handleInsert(change);
break;
case UPDATE:
handleUpdate(change);
break;
case REPLACE:
handleReplace(change);
break;
case DELETE:
handleDelete(change);
break;
case INVALIDATE:
handleInvalidate(change);
break;
case DROP:
handleDrop(change);
break;
case DROP_DATABASE:
handleDropDatabase(change);
break;
case RENAME:
handleRename(change);
break;
default:
System.out.println("Unknown operation type: " + operationType);
}
}
private void handleInsert(ChangeStreamDocument<Document> change) {
Document newDocument = change.getFullDocument();
BsonDocument documentKey = change.getDocumentKey();
System.out.println("New document inserted:");
System.out.println("ID: " + documentKey.get("_id"));
System.out.println("Document: " + newDocument.toJson());
// Trigger post-insert processing
onDocumentInserted(newDocument);
}
private void handleUpdate(ChangeStreamDocument<Document> change) {
BsonDocument documentKey = change.getDocumentKey();
UpdateDescription updateDescription = change.getUpdateDescription();
Document fullDocumentAfter = change.getFullDocument(); // If UPDATE_LOOKUP enabled
Document fullDocumentBefore = change.getFullDocumentBeforeChange(); // If enabled
System.out.println("Document updated:");
System.out.println("ID: " + documentKey.get("_id"));
if (updateDescription != null) {
System.out.println("Updated fields: " + updateDescription.getUpdatedFields());
System.out.println("Removed fields: " + updateDescription.getRemovedFields());
System.out.println("Truncated arrays: " + updateDescription.getTruncatedArrays());
}
// Compare before and after if available
if (fullDocumentBefore != null && fullDocumentAfter != null) {
analyzeChanges(fullDocumentBefore, fullDocumentAfter);
}
// Trigger post-update processing
onDocumentUpdated(documentKey, updateDescription);
}
private void handleDelete(ChangeStreamDocument<Document> change) {
BsonDocument documentKey = change.getDocumentKey();
Document fullDocumentBefore = change.getFullDocumentBeforeChange();
System.out.println("Document deleted:");
System.out.println("ID: " + documentKey.get("_id"));
if (fullDocumentBefore != null) {
System.out.println("Deleted document: " + fullDocumentBefore.toJson());
}
// Trigger post-delete processing
onDocumentDeleted(documentKey);
}Complex change stream patterns for real-world applications.
// Real-time cache invalidation
private void setupCacheInvalidationStream() {
List<Bson> pipeline = Arrays.asList(
Aggregates.match(Filters.or(
Filters.eq("operationType", "update"),
Filters.eq("operationType", "delete"),
Filters.eq("operationType", "replace")
))
);
ChangeStreamIterable<Document> cacheStream = collection.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// Run in background thread
CompletableFuture.runAsync(() -> {
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = cacheStream.cursor()) {
while (!Thread.currentThread().isInterrupted()) {
ChangeStreamDocument<Document> change = cursor.tryNext();
if (change != null) {
String cacheKey = extractCacheKey(change.getDocumentKey());
cacheManager.invalidate(cacheKey);
System.out.println("Cache invalidated for key: " + cacheKey);
}
Thread.sleep(100); // Prevent tight loop
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Data synchronization between systems
private void setupDataSyncStream() {
// Start from current time to avoid processing historical data
BsonTimestamp now = new BsonTimestamp((int) (System.currentTimeMillis() / 1000), 0);
ChangeStreamIterable<Document> syncStream = collection.watch()
.startAtOperationTime(now)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = syncStream.cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
// Sync to external system
syncToExternalSystem(change);
// Update high-water mark
updateSyncPosition(cursor.getResumeToken());
}
}
}
// Audit trail generation
private void setupAuditTrailStream() {
List<Bson> auditPipeline = Arrays.asList(
Aggregates.match(Filters.in("operationType",
Arrays.asList("insert", "update", "delete", "replace"))),
Aggregates.project(Projections.fields(
Projections.include("_id", "operationType", "ns", "documentKey"),
Projections.computed("timestamp", new Document("$toDate", "$clusterTime")),
Projections.computed("user", "$$USER"), // If authentication enabled
Projections.excludeId()
))
);
ChangeStreamIterable<Document> auditStream = collection.watch(auditPipeline);
MongoCollection<Document> auditCollection = database.getCollection("audit_log");
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = auditStream.cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
Document auditEntry = new Document()
.append("changeId", change.getResumeToken())
.append("operation", change.getOperationType().getValue())
.append("collection", change.getNamespace().getCollectionName())
.append("documentId", change.getDocumentKey())
.append("timestamp", new Date())
.append("clusterTime", change.getClusterTime());
auditCollection.insertOne(auditEntry);
}
}
}
// Horizontal scaling with change stream routing
private void setupShardedChangeStreamProcessing() {
// Distribute processing across multiple consumers based on document ID
int consumerCount = 4;
int consumerId = 0; // This consumer's ID (0-3)
List<Bson> shardingPipeline = Arrays.asList(
Aggregates.match(Filters.expr(
new Document("$eq", Arrays.asList(
new Document("$mod", Arrays.asList("$documentKey._id", consumerCount)),
consumerId
))
))
);
ChangeStreamIterable<Document> shardedStream = collection.watch(shardingPipeline);
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = shardedStream.cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
processChangeForShard(change, consumerId);
}
}
}Best practices for handling change stream errors and maintaining resilience.
private void robustChangeStreamProcessing() {
BsonDocument resumeToken = loadLastResumeToken();
int reconnectAttempts = 0;
final int maxReconnectAttempts = 10;
while (reconnectAttempts < maxReconnectAttempts) {
try {
ChangeStreamIterable<Document> stream = collection.watch();
if (resumeToken != null) {
stream = stream.resumeAfter(resumeToken);
}
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {
reconnectAttempts = 0; // Reset on successful connection
while (cursor.hasNext()) {
try {
ChangeStreamDocument<Document> change = cursor.next();
// Process change with retry logic
processChangeWithRetry(change);
// Update resume token
resumeToken = cursor.getResumeToken();
saveResumeToken(resumeToken);
} catch (Exception e) {
System.err.println("Error processing change event: " + e.getMessage());
// Continue processing other events
}
}
}
} catch (MongoChangeStreamException e) {
if (e.getErrorCode() == 40585) { // Resume token not found
System.warn("Resume token expired, starting from current time");
resumeToken = null; // Start fresh
} else {
System.err.println("Change stream error: " + e.getMessage());
}
reconnectAttempts++;
if (reconnectAttempts < maxReconnectAttempts) {
try {
Thread.sleep(Math.min(1000 * reconnectAttempts, 30000)); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
} catch (MongoException e) {
System.err.println("MongoDB error: " + e.getMessage());
reconnectAttempts++;
if (reconnectAttempts < maxReconnectAttempts) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
if (reconnectAttempts >= maxReconnectAttempts) {
System.err.println("Max reconnect attempts exceeded, stopping change stream processing");
}
}
private void processChangeWithRetry(ChangeStreamDocument<Document> change) {
int retryCount = 0;
final int maxRetries = 3;
while (retryCount < maxRetries) {
try {
processChangeEvent(change);
return; // Success
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
// Send to dead letter queue or log as failed
logFailedChangeEvent(change, e);
throw new RuntimeException("Failed to process change event after " + maxRetries + " attempts", e);
}
try {
Thread.sleep(1000 * retryCount); // Linear backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-mongodb--mongodb-driver-sync