or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio-operations.mdchange-streams.mdclient-encryption.mdcursor-operations.mdgridfs-operations.mdindex.mdtornado-operations.mdweb-integration.md

change-streams.mddocs/

0

# Change Streams

1

2

Real-time change monitoring for watching database, collection, or document changes. Motor's change streams enable reactive applications that respond immediately to data modifications with full support for both asyncio and Tornado frameworks.

3

4

## Capabilities

5

6

### Change Stream Creation

7

8

Change streams can be created at client, database, or collection level to monitor different scopes of changes.

9

10

```python { .api }

11

# Client-level change streams (all databases)

12

def watch(

13

pipeline: Optional[List[Dict[str, Any]]] = None,

14

full_document: Optional[str] = None,

15

resume_after: Optional[Dict[str, Any]] = None,

16

max_await_time_ms: Optional[int] = None,

17

batch_size: Optional[int] = None,

18

collation: Optional[Dict[str, Any]] = None,

19

start_at_operation_time: Optional[Any] = None,

20

session: Optional[Any] = None,

21

start_after: Optional[Dict[str, Any]] = None,

22

**kwargs

23

) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:

24

"""

25

Watch for changes across all collections in all databases.

26

27

Parameters:

28

- pipeline: Aggregation pipeline to filter changes

29

- full_document: 'default', 'updateLookup', or 'whenAvailable'

30

- resume_after: Resume token to continue from a specific point

31

- max_await_time_ms: Maximum time to wait for changes

32

- batch_size: Number of changes to return in each batch

33

- collation: Collation options for string comparisons

34

- start_at_operation_time: Start watching from specific time

35

- session: Client session for transaction context

36

- start_after: Resume token for change stream continuation

37

"""

38

39

# Database-level change streams (all collections in database)

40

def watch(

41

pipeline: Optional[List[Dict[str, Any]]] = None,

42

**kwargs

43

) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:

44

"""Watch for changes on all collections in the database."""

45

46

# Collection-level change streams (specific collection)

47

def watch(

48

pipeline: Optional[List[Dict[str, Any]]] = None,

49

**kwargs

50

) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:

51

"""Watch for changes on the collection."""

52

```

53

54

### AsyncIO Change Stream

55

56

Change stream implementation optimized for asyncio with native async/await support.

57

58

```python { .api }

59

class AsyncIOMotorChangeStream:

60

# Async Iterator Protocol

61

def __aiter__(self) -> AsyncIOMotorChangeStream:

62

"""Return self for async iteration."""

63

64

async def __anext__(self) -> Dict[str, Any]:

65

"""Get the next change event."""

66

67

# Manual Iteration

68

async def next(self) -> Dict[str, Any]:

69

"""

70

Get the next change event.

71

72

Returns:

73

Dictionary containing change event with fields like:

74

- _id: Resume token for this change

75

- operationType: Type of operation (insert, update, delete, etc.)

76

- ns: Namespace (database and collection)

77

- documentKey: Key of changed document

78

- fullDocument: Full document (if full_document option used)

79

- updateDescription: Description of update (for update operations)

80

- clusterTime: Timestamp of the change

81

"""

82

83

async def try_next(self) -> Optional[Dict[str, Any]]:

84

"""

85

Try to get the next change event without blocking.

86

87

Returns:

88

Change event dict or None if no changes available

89

"""

90

91

# Change Stream Properties

92

@property

93

def resume_token(self) -> Optional[Dict[str, Any]]:

94

"""Current resume token for continuing the change stream."""

95

96

@property

97

def alive(self) -> bool:

98

"""Whether the change stream is still alive."""

99

100

# Change Stream Management

101

async def close(self) -> None:

102

"""Close the change stream."""

103

104

async def __aenter__(self) -> AsyncIOMotorChangeStream:

105

"""Async context manager entry."""

106

107

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

108

"""Async context manager exit."""

109

110

# Tornado Change Stream

111

class MotorChangeStream:

112

# Manual Iteration (returns Tornado Futures)

113

def next(self) -> tornado.concurrent.Future:

114

"""Get the next change event."""

115

116

def try_next(self) -> tornado.concurrent.Future:

117

"""Try to get the next change event without blocking."""

118

119

# Properties (identical to AsyncIO version)

120

@property

121

def resume_token(self) -> Optional[Dict[str, Any]]: ...

122

@property

123

def alive(self) -> bool: ...

124

125

# Management

126

def close(self) -> tornado.concurrent.Future: ...

127

```

128

129

### Change Event Structure

130

131

Change events follow a standardized structure containing information about the modification.

132

133

```python { .api }

134

class ChangeEvent:

135

"""

136

Structure of change stream events.

137

Note: This is a conceptual type - actual events are dictionaries.

138

"""

139

_id: Dict[str, Any] # Resume token

140

operationType: str # 'insert', 'update', 'replace', 'delete', 'invalidate', etc.

141

clusterTime: Any # Timestamp when the change occurred

142

ns: Dict[str, str] # Namespace: {'db': 'database_name', 'coll': 'collection_name'}

143

documentKey: Dict[str, Any] # Key identifying the changed document

144

145

# Optional fields (depending on operation type and options)

146

fullDocument: Optional[Dict[str, Any]] # Full document (for inserts/updates with fullDocument option)

147

fullDocumentBeforeChange: Optional[Dict[str, Any]] # Document before change (MongoDB 6.0+)

148

updateDescription: Optional[Dict[str, Any]] # Update details for update operations

149

txnNumber: Optional[int] # Transaction number (for transactional changes)

150

lsid: Optional[Dict[str, Any]] # Logical session identifier

151

```

152

153

## Usage Examples

154

155

### Basic Change Stream Monitoring

156

157

```python

158

import asyncio

159

import motor.motor_asyncio

160

161

async def basic_change_stream_example():

162

client = motor.motor_asyncio.AsyncIOMotorClient()

163

db = client.test_database

164

collection = db.test_collection

165

166

# Start watching for changes

167

print("Starting change stream...")

168

change_stream = collection.watch()

169

170

# Start a background task to make changes

171

async def make_changes():

172

await asyncio.sleep(1) # Wait a bit

173

174

print("Inserting document...")

175

await collection.insert_one({"name": "Alice", "age": 30})

176

177

await asyncio.sleep(1)

178

print("Updating document...")

179

await collection.update_one({"name": "Alice"}, {"$set": {"age": 31}})

180

181

await asyncio.sleep(1)

182

print("Deleting document...")

183

await collection.delete_one({"name": "Alice"})

184

185

# Start change-making task

186

change_task = asyncio.create_task(make_changes())

187

188

# Watch for changes

189

try:

190

change_count = 0

191

async for change in change_stream:

192

print(f"Change {change_count + 1}:")

193

print(f" Operation: {change['operationType']}")

194

print(f" Document Key: {change['documentKey']}")

195

196

if 'fullDocument' in change:

197

print(f" Full Document: {change['fullDocument']}")

198

199

if 'updateDescription' in change:

200

print(f" Update: {change['updateDescription']}")

201

202

change_count += 1

203

204

# Stop after seeing 3 changes

205

if change_count >= 3:

206

break

207

208

finally:

209

await change_stream.close()

210

await change_task

211

client.close()

212

213

asyncio.run(basic_change_stream_example())

214

```

215

216

### Advanced Change Stream Options

217

218

```python

219

import asyncio

220

import motor.motor_asyncio

221

from datetime import datetime

222

223

async def advanced_change_stream_example():

224

client = motor.motor_asyncio.AsyncIOMotorClient()

225

db = client.test_database

226

collection = db.users

227

228

# Change stream with full document lookup

229

change_stream = collection.watch(

230

full_document='updateLookup', # Include full document for updates

231

max_await_time_ms=1000, # Wait max 1 second for changes

232

batch_size=10 # Process changes in batches of 10

233

)

234

235

# Background task to generate changes

236

async def generate_changes():

237

users = [

238

{"name": "Alice", "age": 30, "status": "active"},

239

{"name": "Bob", "age": 25, "status": "active"},

240

{"name": "Charlie", "age": 35, "status": "inactive"}

241

]

242

243

# Insert users

244

await collection.insert_many(users)

245

await asyncio.sleep(2)

246

247

# Update users

248

await collection.update_many(

249

{"status": "active"},

250

{"$inc": {"age": 1}}

251

)

252

await asyncio.sleep(2)

253

254

# Delete inactive users

255

await collection.delete_many({"status": "inactive"})

256

257

change_task = asyncio.create_task(generate_changes())

258

259

print("Watching for changes with full document lookup...")

260

261

try:

262

timeout_count = 0

263

async for change in change_stream:

264

print(f"\nChange detected:")

265

print(f" Type: {change['operationType']}")

266

print(f" Time: {change['clusterTime']}")

267

print(f" Namespace: {change['ns']}")

268

269

if change['operationType'] == 'insert':

270

print(f" Inserted: {change['fullDocument']}")

271

272

elif change['operationType'] == 'update':

273

print(f" Updated document: {change['fullDocument']}")

274

print(f" Updated fields: {change['updateDescription']['updatedFields']}")

275

276

elif change['operationType'] == 'delete':

277

print(f" Deleted document key: {change['documentKey']}")

278

279

# Store resume token for potential resumption

280

resume_token = change['_id']

281

print(f" Resume token: {resume_token}")

282

283

except asyncio.TimeoutError:

284

print("No more changes detected")

285

286

finally:

287

await change_stream.close()

288

await change_task

289

client.close()

290

291

asyncio.run(advanced_change_stream_example())

292

```

293

294

### Change Stream with Pipeline Filtering

295

296

```python

297

import asyncio

298

import motor.motor_asyncio

299

300

async def filtered_change_stream_example():

301

client = motor.motor_asyncio.AsyncIOMotorClient()

302

db = client.test_database

303

collection = db.products

304

305

# Pipeline to filter only certain types of changes

306

pipeline = [

307

# Only watch for insert and update operations

308

{"$match": {

309

"operationType": {"$in": ["insert", "update"]}

310

}},

311

# Only watch for products in Electronics category

312

{"$match": {

313

"$or": [

314

{"fullDocument.category": "Electronics"},

315

{"updateDescription.updatedFields.category": "Electronics"}

316

]

317

}},

318

# Add custom fields to the change event

319

{"$addFields": {

320

"changeTimestamp": "$$clusterTime",

321

"productName": "$fullDocument.name"

322

}}

323

]

324

325

change_stream = collection.watch(

326

pipeline=pipeline,

327

full_document='updateLookup'

328

)

329

330

async def make_product_changes():

331

products = [

332

{"name": "Laptop", "category": "Electronics", "price": 999},

333

{"name": "Book", "category": "Literature", "price": 20},

334

{"name": "Phone", "category": "Electronics", "price": 699},

335

{"name": "Desk", "category": "Furniture", "price": 299}

336

]

337

338

# Insert products

339

await collection.insert_many(products)

340

await asyncio.sleep(1)

341

342

# Update electronics prices (should be detected)

343

await collection.update_many(

344

{"category": "Electronics"},

345

{"$mul": {"price": 0.9}} # 10% discount

346

)

347

await asyncio.sleep(1)

348

349

# Update furniture prices (should NOT be detected due to filter)

350

await collection.update_many(

351

{"category": "Furniture"},

352

{"$mul": {"price": 0.8}} # 20% discount

353

)

354

await asyncio.sleep(1)

355

356

# Change category (should be detected when changing TO Electronics)

357

await collection.update_one(

358

{"name": "Book"},

359

{"$set": {"category": "Electronics"}} # Now it's electronics

360

)

361

362

change_task = asyncio.create_task(make_product_changes())

363

364

print("Watching for Electronics product changes only...")

365

366

try:

367

change_count = 0

368

async for change in change_stream:

369

change_count += 1

370

print(f"\nFiltered Change {change_count}:")

371

print(f" Operation: {change['operationType']}")

372

print(f" Product: {change.get('productName', 'Unknown')}")

373

print(f" Category: {change['fullDocument']['category']}")

374

print(f" Price: ${change['fullDocument']['price']}")

375

376

# Stop after reasonable number of changes

377

if change_count >= 5:

378

break

379

380

finally:

381

await change_stream.close()

382

await change_task

383

client.close()

384

385

asyncio.run(filtered_change_stream_example())

386

```

387

388

### Resume Token and Error Recovery

389

390

```python

391

import asyncio

392

import motor.motor_asyncio

393

import pymongo.errors

394

395

async def resume_token_example():

396

client = motor.motor_asyncio.AsyncIOMotorClient()

397

collection = client.test_database.events

398

399

resume_token = None

400

401

async def watch_with_resume():

402

nonlocal resume_token

403

404

# Create change stream, resuming from token if available

405

if resume_token:

406

print(f"Resuming from token: {resume_token}")

407

change_stream = collection.watch(resume_after=resume_token)

408

else:

409

print("Starting new change stream")

410

change_stream = collection.watch()

411

412

try:

413

change_count = 0

414

async for change in change_stream:

415

change_count += 1

416

print(f"Change {change_count}: {change['operationType']}")

417

418

# Store resume token after each change

419

resume_token = change['_id']

420

421

# Simulate error after 3 changes

422

if change_count == 3:

423

raise Exception("Simulated connection error")

424

425

except Exception as e:

426

print(f"Error occurred: {e}")

427

print(f"Last resume token: {resume_token}")

428

429

finally:

430

await change_stream.close()

431

432

# Background task to generate changes

433

async def generate_events():

434

for i in range(10):

435

await asyncio.sleep(1)

436

await collection.insert_one({"event": f"Event {i}", "timestamp": i})

437

438

event_task = asyncio.create_task(generate_events())

439

440

# First watch session (will be interrupted)

441

try:

442

await watch_with_resume()

443

except Exception:

444

pass

445

446

print("\nRestarting change stream from resume token...")

447

await asyncio.sleep(2)

448

449

# Second watch session (resumes from where we left off)

450

try:

451

await watch_with_resume()

452

except Exception:

453

pass

454

455

await event_task

456

client.close()

457

458

asyncio.run(resume_token_example())

459

```

460

461

### Multi-Collection Change Monitoring

462

463

```python

464

import asyncio

465

import motor.motor_asyncio

466

467

async def multi_collection_example():

468

client = motor.motor_asyncio.AsyncIOMotorClient()

469

db = client.test_database

470

471

# Watch at database level to see changes across all collections

472

change_stream = db.watch(

473

pipeline=[

474

# Filter to only certain collections

475

{"$match": {

476

"ns.coll": {"$in": ["users", "orders", "products"]}

477

}}

478

]

479

)

480

481

async def make_changes():

482

users = db.users

483

orders = db.orders

484

products = db.products

485

486

# Changes across multiple collections

487

await users.insert_one({"name": "Alice", "email": "alice@example.com"})

488

await asyncio.sleep(0.5)

489

490

await products.insert_one({"name": "Laptop", "price": 999})

491

await asyncio.sleep(0.5)

492

493

await orders.insert_one({

494

"user": "alice@example.com",

495

"product": "Laptop",

496

"quantity": 1,

497

"total": 999

498

})

499

await asyncio.sleep(0.5)

500

501

# Update across collections

502

await users.update_one(

503

{"email": "alice@example.com"},

504

{"$set": {"last_order": "Laptop"}}

505

)

506

507

change_task = asyncio.create_task(make_changes())

508

509

print("Watching for changes across multiple collections...")

510

511

try:

512

change_count = 0

513

async for change in change_stream:

514

change_count += 1

515

collection_name = change['ns']['coll']

516

operation = change['operationType']

517

518

print(f"Change {change_count}:")

519

print(f" Collection: {collection_name}")

520

print(f" Operation: {operation}")

521

522

if operation == 'insert':

523

print(f" Document: {change['fullDocument']}")

524

elif operation == 'update':

525

print(f" Updated fields: {change['updateDescription']['updatedFields']}")

526

527

if change_count >= 4:

528

break

529

530

finally:

531

await change_stream.close()

532

await change_task

533

client.close()

534

535

asyncio.run(multi_collection_example())

536

```

537

538

### Change Stream Context Manager

539

540

```python

541

import asyncio

542

import motor.motor_asyncio

543

544

async def context_manager_example():

545

client = motor.motor_asyncio.AsyncIOMotorClient()

546

collection = client.test_database.notifications

547

548

# Using change stream as async context manager

549

async with collection.watch() as change_stream:

550

print("Change stream started with context manager")

551

552

# Background task to generate changes

553

async def send_notifications():

554

notifications = [

555

{"type": "email", "recipient": "user1@example.com", "message": "Welcome!"},

556

{"type": "sms", "recipient": "+1234567890", "message": "Code: 123"},

557

{"type": "push", "recipient": "device123", "message": "New message"}

558

]

559

560

for notification in notifications:

561

await asyncio.sleep(1)

562

await collection.insert_one(notification)

563

564

notify_task = asyncio.create_task(send_notifications())

565

566

# Process changes

567

change_count = 0

568

async for change in change_stream:

569

change_count += 1

570

doc = change['fullDocument']

571

print(f"Notification {change_count}: {doc['type']} to {doc['recipient']}")

572

573

if change_count >= 3:

574

break

575

576

await notify_task

577

578

# Change stream automatically closed when exiting context

579

print("Change stream closed by context manager")

580

client.close()

581

582

asyncio.run(context_manager_example())

583

```

584

585

## Types

586

587

```python { .api }

588

from typing import Any, Optional, Union, Dict, List, AsyncIterator

589

import tornado.concurrent

590

591

# Change event structure

592

ChangeEvent = Dict[str, Any]

593

ResumeToken = Dict[str, Any]

594

OperationType = str # 'insert', 'update', 'replace', 'delete', 'invalidate', etc.

595

Namespace = Dict[str, str] # {'db': str, 'coll': str}

596

597

# Change stream options

598

FullDocumentOption = str # 'default', 'updateLookup', 'whenAvailable'

599

Pipeline = List[Dict[str, Any]]

600

```