or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queries.mdbson-handling.mdbulk-transactions.mdclient-connection.mddatabase-collection.mdgridfs-storage.mdindex.mdmonitoring-events.md

monitoring-events.mddocs/

0

# Monitoring and Change Streams

1

2

Change streams, monitoring capabilities, and event handling for real-time data updates and application performance monitoring.

3

4

## Capabilities

5

6

### Change Streams

7

8

Monitor real-time changes to collections, databases, or entire deployments.

9

10

```python { .api }

11

class Collection:

12

def watch(self, pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None, show_expanded_events=None):

13

"""

14

Open change stream to monitor collection changes.

15

16

Parameters:

17

- pipeline: aggregation pipeline to filter/transform change events

18

- full_document: when to return full document ('default', 'updateLookup', 'whenAvailable', 'required')

19

- resume_after: resume token to continue from specific point

20

- max_await_time_ms: maximum time to wait for changes

21

- batch_size: change event batch size

22

- collation: collation options

23

- start_at_operation_time: start watching from specific time

24

- session: optional ClientSession

25

- start_after: start after specific change event

26

- show_expanded_events: include additional change event types

27

28

Returns:

29

ChangeStream: Change stream cursor

30

"""

31

32

class Database:

33

def watch(self, pipeline=None, **kwargs):

34

"""

35

Open change stream to monitor database changes.

36

37

Parameters:

38

- pipeline: aggregation pipeline for filtering

39

- kwargs: same options as Collection.watch()

40

41

Returns:

42

ChangeStream: Change stream cursor

43

"""

44

45

class MongoClient:

46

def watch(self, pipeline=None, **kwargs):

47

"""

48

Open change stream to monitor all database changes.

49

50

Parameters:

51

- pipeline: aggregation pipeline for filtering

52

- kwargs: same options as Collection.watch()

53

54

Returns:

55

ChangeStream: Change stream cursor

56

"""

57

```

58

59

### Change Stream Operations

60

61

Handle and process change stream events.

62

63

```python { .api }

64

class ChangeStream:

65

def __iter__(self):

66

"""Iterate over change events."""

67

68

def __next__(self):

69

"""Get next change event."""

70

71

def next(self):

72

"""Get next change event (Python 2 compatibility)."""

73

74

def try_next(self):

75

"""

76

Try to get next change event without blocking.

77

78

Returns:

79

dict: Change event or None if no events available

80

"""

81

82

def close(self):

83

"""Close the change stream."""

84

85

@property

86

def alive(self):

87

"""

88

Check if change stream is alive.

89

90

Returns:

91

bool: True if stream is active

92

"""

93

94

@property

95

def resume_token(self):

96

"""

97

Get current resume token.

98

99

Returns:

100

dict: Resume token for stream continuation

101

"""

102

103

def __enter__(self):

104

"""Context manager entry."""

105

106

def __exit__(self, exc_type, exc_val, exc_tb):

107

"""Context manager exit."""

108

```

109

110

### Change Event Types

111

112

Structure of change stream events.

113

114

```python { .api }

115

# Change event document structure:

116

{

117

"_id": { # Resume token

118

"_data": "resume_token_string"

119

},

120

"operationType": "insert|update|replace|delete|drop|rename|dropDatabase|invalidate",

121

"clusterTime": "Timestamp(...)",

122

"ns": { # Namespace

123

"db": "database_name",

124

"coll": "collection_name"

125

},

126

"documentKey": { # Document identifier

127

"_id": "ObjectId(...)"

128

},

129

"fullDocument": {...}, # Full document (based on full_document option)

130

"fullDocumentBeforeChange": {...}, # Previous document state

131

"updateDescription": { # For update operations

132

"updatedFields": {...},

133

"removedFields": [...],

134

"truncatedArrays": [...]

135

}

136

}

137

```

138

139

### Command Monitoring

140

141

Monitor database commands for performance and debugging.

142

143

```python { .api }

144

from pymongo import monitoring

145

146

class CommandListener(monitoring.CommandListener):

147

def started(self, event):

148

"""

149

Handle command started event.

150

151

Parameters:

152

- event: CommandStartedEvent

153

"""

154

155

def succeeded(self, event):

156

"""

157

Handle command succeeded event.

158

159

Parameters:

160

- event: CommandSucceededEvent

161

"""

162

163

def failed(self, event):

164

"""

165

Handle command failed event.

166

167

Parameters:

168

- event: CommandFailedEvent

169

"""

170

171

class CommandStartedEvent:

172

@property

173

def command_name(self):

174

"""Command name."""

175

176

@property

177

def request_id(self):

178

"""Request identifier."""

179

180

@property

181

def connection_id(self):

182

"""Connection identifier."""

183

184

@property

185

def command(self):

186

"""Command document."""

187

188

class CommandSucceededEvent:

189

@property

190

def duration_micros(self):

191

"""Command duration in microseconds."""

192

193

@property

194

def reply(self):

195

"""Command reply document."""

196

197

@property

198

def command_name(self):

199

"""Command name."""

200

201

@property

202

def request_id(self):

203

"""Request identifier."""

204

205

class CommandFailedEvent:

206

@property

207

def duration_micros(self):

208

"""Command duration in microseconds."""

209

210

@property

211

def failure(self):

212

"""Failure details."""

213

214

@property

215

def command_name(self):

216

"""Command name."""

217

218

@property

219

def request_id(self):

220

"""Request identifier."""

221

222

# Register command listener

223

monitoring.register(CommandListener())

224

```

225

226

### Connection Pool Monitoring

227

228

Monitor connection pool events for performance tuning.

229

230

```python { .api }

231

from pymongo import monitoring

232

233

class PoolListener(monitoring.PoolListener):

234

def pool_created(self, event):

235

"""

236

Handle pool created event.

237

238

Parameters:

239

- event: PoolCreatedEvent

240

"""

241

242

def pool_ready(self, event):

243

"""

244

Handle pool ready event.

245

246

Parameters:

247

- event: PoolReadyEvent

248

"""

249

250

def pool_cleared(self, event):

251

"""

252

Handle pool cleared event.

253

254

Parameters:

255

- event: PoolClearedEvent

256

"""

257

258

def pool_closed(self, event):

259

"""

260

Handle pool closed event.

261

262

Parameters:

263

- event: PoolClosedEvent

264

"""

265

266

def connection_created(self, event):

267

"""

268

Handle connection created event.

269

270

Parameters:

271

- event: ConnectionCreatedEvent

272

"""

273

274

def connection_ready(self, event):

275

"""

276

Handle connection ready event.

277

278

Parameters:

279

- event: ConnectionReadyEvent

280

"""

281

282

def connection_closed(self, event):

283

"""

284

Handle connection closed event.

285

286

Parameters:

287

- event: ConnectionClosedEvent

288

"""

289

290

def connection_check_out_started(self, event):

291

"""

292

Handle connection checkout started event.

293

294

Parameters:

295

- event: ConnectionCheckOutStartedEvent

296

"""

297

298

def connection_check_out_failed(self, event):

299

"""

300

Handle connection checkout failed event.

301

302

Parameters:

303

- event: ConnectionCheckOutFailedEvent

304

"""

305

306

def connection_checked_out(self, event):

307

"""

308

Handle connection checked out event.

309

310

Parameters:

311

- event: ConnectionCheckedOutEvent

312

"""

313

314

def connection_checked_in(self, event):

315

"""

316

Handle connection checked in event.

317

318

Parameters:

319

- event: ConnectionCheckedInEvent

320

"""

321

322

# Register pool listener

323

monitoring.register(PoolListener())

324

```

325

326

### Server Monitoring

327

328

Monitor server discovery and topology changes.

329

330

```python { .api }

331

from pymongo import monitoring

332

333

class ServerListener(monitoring.ServerListener):

334

def opened(self, event):

335

"""

336

Handle server opened event.

337

338

Parameters:

339

- event: ServerOpeningEvent

340

"""

341

342

def description_changed(self, event):

343

"""

344

Handle server description changed event.

345

346

Parameters:

347

- event: ServerDescriptionChangedEvent

348

"""

349

350

def closed(self, event):

351

"""

352

Handle server closed event.

353

354

Parameters:

355

- event: ServerClosedEvent

356

"""

357

358

class TopologyListener(monitoring.TopologyListener):

359

def opened(self, event):

360

"""

361

Handle topology opened event.

362

363

Parameters:

364

- event: TopologyOpenedEvent

365

"""

366

367

def description_changed(self, event):

368

"""

369

Handle topology description changed event.

370

371

Parameters:

372

- event: TopologyDescriptionChangedEvent

373

"""

374

375

def closed(self, event):

376

"""

377

Handle topology closed event.

378

379

Parameters:

380

- event: TopologyClosedEvent

381

"""

382

383

# Register server and topology listeners

384

monitoring.register(ServerListener())

385

monitoring.register(TopologyListener())

386

```

387

388

### Heartbeat Monitoring

389

390

Monitor server heartbeat events for connection health.

391

392

```python { .api }

393

from pymongo import monitoring

394

395

class HeartbeatListener(monitoring.ServerHeartbeatListener):

396

def started(self, event):

397

"""

398

Handle heartbeat started event.

399

400

Parameters:

401

- event: ServerHeartbeatStartedEvent

402

"""

403

404

def succeeded(self, event):

405

"""

406

Handle heartbeat succeeded event.

407

408

Parameters:

409

- event: ServerHeartbeatSucceededEvent

410

"""

411

412

def failed(self, event):

413

"""

414

Handle heartbeat failed event.

415

416

Parameters:

417

- event: ServerHeartbeatFailedEvent

418

"""

419

420

# Register heartbeat listener

421

monitoring.register(HeartbeatListener())

422

```

423

424

## Usage Examples

425

426

### Basic Change Streams

427

428

```python

429

from pymongo import MongoClient

430

import pymongo

431

432

client = MongoClient()

433

db = client.mydb

434

collection = db.orders

435

436

# Watch for all changes to collection

437

with collection.watch() as stream:

438

for change in stream:

439

print(f"Change detected: {change['operationType']}")

440

print(f"Document: {change.get('fullDocument', 'N/A')}")

441

print(f"Resume token: {change['_id']}")

442

443

# Watch with pipeline filter

444

pipeline = [

445

{"$match": {"operationType": {"$in": ["insert", "update"]}}},

446

{"$match": {"fullDocument.status": "urgent"}}

447

]

448

449

with collection.watch(pipeline) as stream:

450

for change in stream:

451

print(f"Urgent order change: {change['fullDocument']['_id']}")

452

453

# Watch with full document lookup

454

with collection.watch(full_document="updateLookup") as stream:

455

for change in stream:

456

if change["operationType"] == "update":

457

print(f"Updated document: {change['fullDocument']}")

458

print(f"Changed fields: {change['updateDescription']['updatedFields']}")

459

```

460

461

### Resumable Change Streams

462

463

```python

464

from pymongo import MongoClient

465

from pymongo.errors import PyMongoError

466

import time

467

468

client = MongoClient()

469

collection = client.mydb.inventory

470

471

resume_token = None

472

473

def process_changes():

474

global resume_token

475

476

try:

477

# Resume from last token if available

478

with collection.watch(resume_after=resume_token) as stream:

479

for change in stream:

480

# Process change event

481

process_inventory_change(change)

482

483

# Save resume token for recovery

484

resume_token = stream.resume_token

485

486

except PyMongoError as e:

487

print(f"Change stream error: {e}")

488

time.sleep(5) # Wait before retry

489

process_changes() # Retry with last resume token

490

491

def process_inventory_change(change):

492

"""Process inventory change event."""

493

op_type = change["operationType"]

494

495

if op_type == "insert":

496

print(f"New product added: {change['fullDocument']['name']}")

497

elif op_type == "update":

498

updates = change["updateDescription"]["updatedFields"]

499

if "quantity" in updates:

500

print(f"Quantity updated for {change['documentKey']['_id']}")

501

elif op_type == "delete":

502

print(f"Product deleted: {change['documentKey']['_id']}")

503

504

# Start monitoring

505

process_changes()

506

```

507

508

### Database and Client-level Change Streams

509

510

```python

511

from pymongo import MongoClient

512

513

client = MongoClient()

514

db = client.ecommerce

515

516

# Watch entire database

517

pipeline = [

518

{"$match": {"ns.coll": {"$in": ["orders", "inventory", "customers"]}}},

519

{"$project": {

520

"operationType": 1,

521

"ns": 1,

522

"documentKey": 1,

523

"fullDocument.status": 1

524

}}

525

]

526

527

with db.watch(pipeline) as stream:

528

for change in stream:

529

collection_name = change["ns"]["coll"]

530

print(f"Change in {collection_name}: {change['operationType']}")

531

532

# Watch all databases (requires appropriate permissions)

533

with client.watch() as stream:

534

for change in stream:

535

db_name = change["ns"]["db"]

536

coll_name = change["ns"]["coll"]

537

print(f"Change in {db_name}.{coll_name}")

538

```

539

540

### Command Monitoring

541

542

```python

543

import pymongo

544

from pymongo import monitoring

545

import logging

546

547

# Set up logging

548

logging.basicConfig(level=logging.INFO)

549

logger = logging.getLogger(__name__)

550

551

class CommandLogger(monitoring.CommandListener):

552

def started(self, event):

553

logger.info(f"Command {event.command_name} started on {event.connection_id}")

554

if event.command_name in ["find", "insert", "update", "delete"]:

555

logger.info(f"Command details: {event.command}")

556

557

def succeeded(self, event):

558

logger.info(f"Command {event.command_name} succeeded in {event.duration_micros}μs")

559

560

def failed(self, event):

561

logger.error(f"Command {event.command_name} failed after {event.duration_micros}μs: {event.failure}")

562

563

# Register the listener

564

monitoring.register(CommandLogger())

565

566

# Now all MongoDB commands will be logged

567

client = pymongo.MongoClient()

568

collection = client.mydb.mycollection

569

570

# These operations will generate log entries

571

collection.insert_one({"name": "test"})

572

collection.find_one({"name": "test"})

573

collection.update_one({"name": "test"}, {"$set": {"updated": True}})

574

```

575

576

### Connection Pool Monitoring

577

578

```python

579

import pymongo

580

from pymongo import monitoring

581

from datetime import datetime

582

583

class PoolMonitor(monitoring.PoolListener):

584

def __init__(self):

585

self.pool_stats = {}

586

587

def pool_created(self, event):

588

print(f"Pool created for {event.address}")

589

self.pool_stats[event.address] = {

590

"created": datetime.now(),

591

"connections": 0,

592

"checkouts": 0

593

}

594

595

def connection_created(self, event):

596

stats = self.pool_stats.get(event.address, {})

597

stats["connections"] = stats.get("connections", 0) + 1

598

print(f"Connection created for {event.address} (total: {stats['connections']})")

599

600

def connection_checked_out(self, event):

601

stats = self.pool_stats.get(event.address, {})

602

stats["checkouts"] = stats.get("checkouts", 0) + 1

603

print(f"Connection checked out from {event.address}")

604

605

def connection_check_out_failed(self, event):

606

print(f"Connection checkout failed for {event.address}: {event.reason}")

607

608

def pool_cleared(self, event):

609

print(f"Pool cleared for {event.address}")

610

611

# Register pool monitor

612

monitoring.register(PoolMonitor())

613

614

# Create client (will trigger pool creation)

615

client = pymongo.MongoClient(maxPoolSize=10, minPoolSize=2)

616

```

617

618

### Performance Monitoring

619

620

```python

621

import pymongo

622

from pymongo import monitoring

623

import time

624

from collections import defaultdict

625

626

class PerformanceMonitor(monitoring.CommandListener):

627

def __init__(self):

628

self.command_times = defaultdict(list)

629

self.slow_queries = []

630

631

def started(self, event):

632

event.start_time = time.time()

633

634

def succeeded(self, event):

635

duration_ms = event.duration_micros / 1000

636

command_name = event.command_name

637

638

self.command_times[command_name].append(duration_ms)

639

640

# Log slow queries (>100ms)

641

if duration_ms > 100:

642

self.slow_queries.append({

643

"command": command_name,

644

"duration_ms": duration_ms,

645

"details": event.command

646

})

647

648

def get_stats(self):

649

"""Get performance statistics."""

650

stats = {}

651

for cmd, times in self.command_times.items():

652

stats[cmd] = {

653

"count": len(times),

654

"avg_ms": sum(times) / len(times),

655

"max_ms": max(times),

656

"min_ms": min(times)

657

}

658

return stats

659

660

def get_slow_queries(self, limit=10):

661

"""Get slowest queries."""

662

return sorted(

663

self.slow_queries,

664

key=lambda x: x["duration_ms"],

665

reverse=True

666

)[:limit]

667

668

# Set up monitoring

669

perf_monitor = PerformanceMonitor()

670

monitoring.register(perf_monitor)

671

672

# Run some operations

673

client = pymongo.MongoClient()

674

collection = client.testdb.testcoll

675

676

# Generate some operations

677

for i in range(100):

678

collection.insert_one({"index": i, "data": f"test_data_{i}"})

679

680

# Create index (slow operation)

681

collection.create_index("index")

682

683

# Run some queries

684

collection.find({"index": {"$gt": 50}}).limit(10).to_list()

685

686

# Get performance stats

687

stats = perf_monitor.get_stats()

688

print("Command Performance Stats:")

689

for cmd, stat in stats.items():

690

print(f"{cmd}: {stat['count']} ops, avg: {stat['avg_ms']:.2f}ms")

691

692

slow_queries = perf_monitor.get_slow_queries()

693

print(f"\nTop {len(slow_queries)} Slow Queries:")

694

for query in slow_queries:

695

print(f"{query['command']}: {query['duration_ms']:.2f}ms")

696

```