or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

change-streams.mdcollection-crud.mdconnection-management.mddatabase-operations.mdencryption.mdgridfs.mdindex-management.mdindex.mdquery-aggregation.mdsessions-transactions.md

change-streams.mddocs/

0

# Change Streams

1

2

Real-time change monitoring at cluster, database, and collection levels with resume token support, filtering capabilities, and comprehensive change event information.

3

4

## Capabilities

5

6

### ChangeStreamIterable Interface

7

8

Primary interface for configuring and consuming change streams with filtering and resume capabilities.

9

10

```java { .api }

11

/**

12

* Interface for configuring and consuming change streams

13

*/

14

public interface ChangeStreamIterable<TResult> extends MongoIterable<ChangeStreamDocument<TResult>> {

15

/**

16

* Returns a change stream cursor for iterating over change events

17

* @return MongoChangeStreamCursor for consuming change events

18

*/

19

MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor();

20

21

/**

22

* Sets the full document option for change events

23

* @param fullDocument option for including full documents in change events

24

* @return ChangeStreamIterable with full document option

25

*/

26

ChangeStreamIterable<TResult> fullDocument(FullDocument fullDocument);

27

28

/**

29

* Sets the full document before change option

30

* @param fullDocumentBeforeChange option for including pre-change documents

31

* @return ChangeStreamIterable with before change document option

32

*/

33

ChangeStreamIterable<TResult> fullDocumentBeforeChange(FullDocumentBeforeChange fullDocumentBeforeChange);

34

35

/**

36

* Sets the resume token to resume change stream from a specific point

37

* @param resumeToken the resume token as BsonDocument

38

* @return ChangeStreamIterable that resumes from the specified token

39

*/

40

ChangeStreamIterable<TResult> resumeAfter(BsonDocument resumeToken);

41

42

/**

43

* Sets the start after token to begin change stream after a specific event

44

* @param startAfter the start after token as BsonDocument

45

* @return ChangeStreamIterable that starts after the specified token

46

*/

47

ChangeStreamIterable<TResult> startAfter(BsonDocument startAfter);

48

49

/**

50

* Sets the cluster time to start the change stream from

51

* @param startAtOperationTime the cluster time to start from

52

* @return ChangeStreamIterable that starts at the specified time

53

*/

54

ChangeStreamIterable<TResult> startAtOperationTime(BsonTimestamp startAtOperationTime);

55

56

/**

57

* Sets the maximum time to wait for changes when using await

58

* @param maxAwaitTime the maximum await time

59

* @param timeUnit the time unit

60

* @return ChangeStreamIterable with await time limit

61

*/

62

ChangeStreamIterable<TResult> maxAwaitTime(long maxAwaitTime, TimeUnit timeUnit);

63

64

/**

65

* Sets collation for string operations in the change stream pipeline

66

* @param collation the collation specification

67

* @return ChangeStreamIterable with applied collation

68

*/

69

ChangeStreamIterable<TResult> collation(Collation collation);

70

71

/**

72

* Sets the batch size for change stream cursor

73

* @param batchSize the batch size

74

* @return ChangeStreamIterable with specified batch size

75

*/

76

ChangeStreamIterable<TResult> batchSize(int batchSize);

77

78

/**

79

* Adds a comment to the change stream operation

80

* @param comment the comment string

81

* @return ChangeStreamIterable with comment

82

*/

83

ChangeStreamIterable<TResult> comment(String comment);

84

85

/**

86

* Enables showing expanded events in change streams

87

* @param showExpandedEvents true to show expanded events

88

* @return ChangeStreamIterable with expanded events option

89

*/

90

ChangeStreamIterable<TResult> showExpandedEvents(Boolean showExpandedEvents);

91

}

92

```

93

94

**Usage Examples:**

95

96

```java

97

import com.mongodb.client.ChangeStreamIterable;

98

import com.mongodb.client.MongoChangeStreamCursor;

99

import com.mongodb.client.model.changestream.ChangeStreamDocument;

100

import com.mongodb.client.model.changestream.FullDocument;

101

102

// Basic change stream monitoring

103

ChangeStreamIterable<Document> changeStream = collection.watch();

104

105

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor()) {

106

while (cursor.hasNext()) {

107

ChangeStreamDocument<Document> change = cursor.next();

108

System.out.println("Operation: " + change.getOperationType());

109

System.out.println("Document: " + change.getFullDocument());

110

System.out.println("Resume token: " + change.getResumeToken());

111

}

112

}

113

114

// Change stream with full document lookup

115

ChangeStreamIterable<Document> fullDocStream = collection.watch()

116

.fullDocument(FullDocument.UPDATE_LOOKUP)

117

.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);

118

119

// Filtered change stream

120

List<Bson> pipeline = Arrays.asList(

121

Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update", "delete")))

122

);

123

124

ChangeStreamIterable<Document> filteredStream = collection.watch(pipeline)

125

.maxAwaitTime(5, TimeUnit.SECONDS)

126

.batchSize(10);

127

```

128

129

### MongoChangeStreamCursor Interface

130

131

Enhanced cursor interface specifically for change streams with resume token access.

132

133

```java { .api }

134

/**

135

* Cursor interface for change streams with resume token support

136

*/

137

public interface MongoChangeStreamCursor<TResult> extends MongoCursor<TResult> {

138

/**

139

* Gets the current resume token for the change stream

140

* @return BsonDocument containing the resume token

141

*/

142

BsonDocument getResumeToken();

143

144

/**

145

* Returns the next change event without blocking

146

* @return next change event or null if none available

147

*/

148

TResult tryNext();

149

}

150

```

151

152

**Usage Examples:**

153

154

```java

155

// Robust change stream with resume capability

156

BsonDocument resumeToken = null;

157

158

while (true) {

159

try {

160

ChangeStreamIterable<Document> stream = collection.watch();

161

162

// Resume from last known position if available

163

if (resumeToken != null) {

164

stream = stream.resumeAfter(resumeToken);

165

}

166

167

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {

168

while (cursor.hasNext()) {

169

ChangeStreamDocument<Document> change = cursor.next();

170

171

// Process change event

172

processChangeEvent(change);

173

174

// Store resume token for fault tolerance

175

resumeToken = cursor.getResumeToken();

176

persistResumeToken(resumeToken);

177

}

178

}

179

180

} catch (MongoException e) {

181

System.err.println("Change stream error: " + e.getMessage());

182

// Wait before reconnecting

183

Thread.sleep(1000);

184

}

185

}

186

```

187

188

### Change Stream Levels

189

190

Change streams can be created at different levels for various monitoring scopes.

191

192

```java { .api }

193

// Cluster-level change streams (monitor all databases)

194

ChangeStreamIterable<Document> clusterChanges = mongoClient.watch();

195

196

// Database-level change streams (monitor all collections in database)

197

ChangeStreamIterable<Document> databaseChanges = database.watch();

198

199

// Collection-level change streams (monitor specific collection)

200

ChangeStreamIterable<Document> collectionChanges = collection.watch();

201

```

202

203

**Usage Examples:**

204

205

```java

206

// Monitor all database operations

207

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = mongoClient.watch().cursor()) {

208

while (cursor.hasNext()) {

209

ChangeStreamDocument<Document> change = cursor.next();

210

MongoNamespace namespace = change.getNamespace();

211

212

System.out.println("Database: " + namespace.getDatabaseName());

213

System.out.println("Collection: " + namespace.getCollectionName());

214

System.out.println("Operation: " + change.getOperationType());

215

}

216

}

217

218

// Monitor specific database with filtering

219

List<Bson> dbPipeline = Arrays.asList(

220

Aggregates.match(Filters.and(

221

Filters.in("operationType", Arrays.asList("insert", "update", "delete")),

222

Filters.not(Filters.regex("ns.coll", "^system\\.")) // Exclude system collections

223

))

224

);

225

226

ChangeStreamIterable<Document> filteredDbStream = database.watch(dbPipeline);

227

228

// Monitor with session for causally consistent reads

229

try (ClientSession session = mongoClient.startSession()) {

230

ChangeStreamIterable<Document> sessionStream = collection.watch(session);

231

// Process changes within session context

232

}

233

```

234

235

### Change Event Processing

236

237

Comprehensive handling of different types of change events and their data.

238

239

```java { .api }

240

private void processChangeEvent(ChangeStreamDocument<Document> change) {

241

OperationType operationType = change.getOperationType();

242

243

switch (operationType) {

244

case INSERT:

245

handleInsert(change);

246

break;

247

case UPDATE:

248

handleUpdate(change);

249

break;

250

case REPLACE:

251

handleReplace(change);

252

break;

253

case DELETE:

254

handleDelete(change);

255

break;

256

case INVALIDATE:

257

handleInvalidate(change);

258

break;

259

case DROP:

260

handleDrop(change);

261

break;

262

case DROP_DATABASE:

263

handleDropDatabase(change);

264

break;

265

case RENAME:

266

handleRename(change);

267

break;

268

default:

269

System.out.println("Unknown operation type: " + operationType);

270

}

271

}

272

273

private void handleInsert(ChangeStreamDocument<Document> change) {

274

Document newDocument = change.getFullDocument();

275

BsonDocument documentKey = change.getDocumentKey();

276

277

System.out.println("New document inserted:");

278

System.out.println("ID: " + documentKey.get("_id"));

279

System.out.println("Document: " + newDocument.toJson());

280

281

// Trigger post-insert processing

282

onDocumentInserted(newDocument);

283

}

284

285

private void handleUpdate(ChangeStreamDocument<Document> change) {

286

BsonDocument documentKey = change.getDocumentKey();

287

UpdateDescription updateDescription = change.getUpdateDescription();

288

Document fullDocumentAfter = change.getFullDocument(); // If UPDATE_LOOKUP enabled

289

Document fullDocumentBefore = change.getFullDocumentBeforeChange(); // If enabled

290

291

System.out.println("Document updated:");

292

System.out.println("ID: " + documentKey.get("_id"));

293

294

if (updateDescription != null) {

295

System.out.println("Updated fields: " + updateDescription.getUpdatedFields());

296

System.out.println("Removed fields: " + updateDescription.getRemovedFields());

297

System.out.println("Truncated arrays: " + updateDescription.getTruncatedArrays());

298

}

299

300

// Compare before and after if available

301

if (fullDocumentBefore != null && fullDocumentAfter != null) {

302

analyzeChanges(fullDocumentBefore, fullDocumentAfter);

303

}

304

305

// Trigger post-update processing

306

onDocumentUpdated(documentKey, updateDescription);

307

}

308

309

private void handleDelete(ChangeStreamDocument<Document> change) {

310

BsonDocument documentKey = change.getDocumentKey();

311

Document fullDocumentBefore = change.getFullDocumentBeforeChange();

312

313

System.out.println("Document deleted:");

314

System.out.println("ID: " + documentKey.get("_id"));

315

316

if (fullDocumentBefore != null) {

317

System.out.println("Deleted document: " + fullDocumentBefore.toJson());

318

}

319

320

// Trigger post-delete processing

321

onDocumentDeleted(documentKey);

322

}

323

```

324

325

### Advanced Change Stream Patterns

326

327

Complex change stream patterns for real-world applications.

328

329

```java { .api }

330

// Real-time cache invalidation

331

private void setupCacheInvalidationStream() {

332

List<Bson> pipeline = Arrays.asList(

333

Aggregates.match(Filters.or(

334

Filters.eq("operationType", "update"),

335

Filters.eq("operationType", "delete"),

336

Filters.eq("operationType", "replace")

337

))

338

);

339

340

ChangeStreamIterable<Document> cacheStream = collection.watch(pipeline)

341

.fullDocument(FullDocument.UPDATE_LOOKUP);

342

343

// Run in background thread

344

CompletableFuture.runAsync(() -> {

345

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = cacheStream.cursor()) {

346

while (!Thread.currentThread().isInterrupted()) {

347

ChangeStreamDocument<Document> change = cursor.tryNext();

348

if (change != null) {

349

String cacheKey = extractCacheKey(change.getDocumentKey());

350

cacheManager.invalidate(cacheKey);

351

System.out.println("Cache invalidated for key: " + cacheKey);

352

}

353

Thread.sleep(100); // Prevent tight loop

354

}

355

} catch (InterruptedException e) {

356

Thread.currentThread().interrupt();

357

}

358

});

359

}

360

361

// Data synchronization between systems

362

private void setupDataSyncStream() {

363

// Start from current time to avoid processing historical data

364

BsonTimestamp now = new BsonTimestamp((int) (System.currentTimeMillis() / 1000), 0);

365

366

ChangeStreamIterable<Document> syncStream = collection.watch()

367

.startAtOperationTime(now)

368

.fullDocument(FullDocument.UPDATE_LOOKUP)

369

.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);

370

371

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = syncStream.cursor()) {

372

while (cursor.hasNext()) {

373

ChangeStreamDocument<Document> change = cursor.next();

374

375

// Sync to external system

376

syncToExternalSystem(change);

377

378

// Update high-water mark

379

updateSyncPosition(cursor.getResumeToken());

380

}

381

}

382

}

383

384

// Audit trail generation

385

private void setupAuditTrailStream() {

386

List<Bson> auditPipeline = Arrays.asList(

387

Aggregates.match(Filters.in("operationType",

388

Arrays.asList("insert", "update", "delete", "replace"))),

389

Aggregates.project(Projections.fields(

390

Projections.include("_id", "operationType", "ns", "documentKey"),

391

Projections.computed("timestamp", new Document("$toDate", "$clusterTime")),

392

Projections.computed("user", "$$USER"), // If authentication enabled

393

Projections.excludeId()

394

))

395

);

396

397

ChangeStreamIterable<Document> auditStream = collection.watch(auditPipeline);

398

399

MongoCollection<Document> auditCollection = database.getCollection("audit_log");

400

401

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = auditStream.cursor()) {

402

while (cursor.hasNext()) {

403

ChangeStreamDocument<Document> change = cursor.next();

404

405

Document auditEntry = new Document()

406

.append("changeId", change.getResumeToken())

407

.append("operation", change.getOperationType().getValue())

408

.append("collection", change.getNamespace().getCollectionName())

409

.append("documentId", change.getDocumentKey())

410

.append("timestamp", new Date())

411

.append("clusterTime", change.getClusterTime());

412

413

auditCollection.insertOne(auditEntry);

414

}

415

}

416

}

417

418

// Horizontal scaling with change stream routing

419

private void setupShardedChangeStreamProcessing() {

420

// Distribute processing across multiple consumers based on document ID

421

int consumerCount = 4;

422

int consumerId = 0; // This consumer's ID (0-3)

423

424

List<Bson> shardingPipeline = Arrays.asList(

425

Aggregates.match(Filters.expr(

426

new Document("$eq", Arrays.asList(

427

new Document("$mod", Arrays.asList("$documentKey._id", consumerCount)),

428

consumerId

429

))

430

))

431

);

432

433

ChangeStreamIterable<Document> shardedStream = collection.watch(shardingPipeline);

434

435

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = shardedStream.cursor()) {

436

while (cursor.hasNext()) {

437

ChangeStreamDocument<Document> change = cursor.next();

438

processChangeForShard(change, consumerId);

439

}

440

}

441

}

442

```

443

444

### Error Handling and Resilience

445

446

Best practices for handling change stream errors and maintaining resilience.

447

448

```java { .api }

449

private void robustChangeStreamProcessing() {

450

BsonDocument resumeToken = loadLastResumeToken();

451

int reconnectAttempts = 0;

452

final int maxReconnectAttempts = 10;

453

454

while (reconnectAttempts < maxReconnectAttempts) {

455

try {

456

ChangeStreamIterable<Document> stream = collection.watch();

457

458

if (resumeToken != null) {

459

stream = stream.resumeAfter(resumeToken);

460

}

461

462

try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {

463

reconnectAttempts = 0; // Reset on successful connection

464

465

while (cursor.hasNext()) {

466

try {

467

ChangeStreamDocument<Document> change = cursor.next();

468

469

// Process change with retry logic

470

processChangeWithRetry(change);

471

472

// Update resume token

473

resumeToken = cursor.getResumeToken();

474

saveResumeToken(resumeToken);

475

476

} catch (Exception e) {

477

System.err.println("Error processing change event: " + e.getMessage());

478

// Continue processing other events

479

}

480

}

481

}

482

483

} catch (MongoChangeStreamException e) {

484

if (e.getErrorCode() == 40585) { // Resume token not found

485

System.warn("Resume token expired, starting from current time");

486

resumeToken = null; // Start fresh

487

} else {

488

System.err.println("Change stream error: " + e.getMessage());

489

}

490

491

reconnectAttempts++;

492

if (reconnectAttempts < maxReconnectAttempts) {

493

try {

494

Thread.sleep(Math.min(1000 * reconnectAttempts, 30000)); // Exponential backoff

495

} catch (InterruptedException ie) {

496

Thread.currentThread().interrupt();

497

break;

498

}

499

}

500

501

} catch (MongoException e) {

502

System.err.println("MongoDB error: " + e.getMessage());

503

reconnectAttempts++;

504

505

if (reconnectAttempts < maxReconnectAttempts) {

506

try {

507

Thread.sleep(5000);

508

} catch (InterruptedException ie) {

509

Thread.currentThread().interrupt();

510

break;

511

}

512

}

513

}

514

}

515

516

if (reconnectAttempts >= maxReconnectAttempts) {

517

System.err.println("Max reconnect attempts exceeded, stopping change stream processing");

518

}

519

}

520

521

private void processChangeWithRetry(ChangeStreamDocument<Document> change) {

522

int retryCount = 0;

523

final int maxRetries = 3;

524

525

while (retryCount < maxRetries) {

526

try {

527

processChangeEvent(change);

528

return; // Success

529

530

} catch (Exception e) {

531

retryCount++;

532

if (retryCount >= maxRetries) {

533

// Send to dead letter queue or log as failed

534

logFailedChangeEvent(change, e);

535

throw new RuntimeException("Failed to process change event after " + maxRetries + " attempts", e);

536

}

537

538

try {

539

Thread.sleep(1000 * retryCount); // Linear backoff

540

} catch (InterruptedException ie) {

541

Thread.currentThread().interrupt();

542

throw new RuntimeException("Interrupted during retry", ie);

543

}

544

}

545

}

546

}

547

```