or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdchange-feeds.mddatabase-management.mddocument-operations.mderror-handling.mdhttp-adapters.mdindex.mdquery-indexing.mdreplication.mdscheduler-monitoring.mdsecurity-document.mdviews-design-documents.md

change-feeds.mddocs/

0

# Change Feeds and Monitoring

1

2

Monitor database changes in real-time with comprehensive support for continuous feeds, filtering, infinite monitoring, and change event processing.

3

4

## Capabilities

5

6

### Feed Classes

7

8

Iterator classes for consuming change feeds and database updates.

9

10

```python { .api }

11

class Feed:

12

"""

13

Iterator for consuming continuous and non-continuous change feeds.

14

"""

15

16

def __init__(self, source, raw_data=False, **options):

17

"""

18

Initialize feed iterator.

19

20

Parameters:

21

- source (callable): Function that returns feed data

22

- raw_data (bool): Return raw JSON response data

23

- heartbeat (int): Heartbeat interval in milliseconds

24

- timeout (int): Feed timeout in milliseconds

25

- since (str | int): Start from specific sequence

26

- limit (int): Maximum number of changes

27

- descending (bool): Process changes in reverse order

28

- include_docs (bool): Include document content in changes

29

- attachments (bool): Include attachment info

30

- att_encoding_info (bool): Include attachment encoding info

31

- conflicts (bool): Include conflict information

32

- filter (str): Filter function name

33

- **options: Additional feed parameters

34

"""

35

36

def __iter__(self):

37

"""

38

Iterator protocol implementation.

39

40

Yields:

41

dict: Change events from the feed

42

"""

43

44

def stop(self):

45

"""

46

Stop feed iteration and close connection.

47

48

Returns:

49

None

50

"""

51

52

@property

53

def last_seq(self):

54

"""

55

Last processed sequence identifier.

56

57

Returns:

58

str | int: Sequence identifier for resuming feed

59

"""

60

61

class InfiniteFeed(Feed):

62

"""

63

Perpetually refreshed feed iterator that automatically reconnects.

64

"""

65

```

66

67

### Database Change Feed Methods

68

69

Database methods for accessing change feeds.

70

71

```python { .api }

72

class CouchDatabase(dict):

73

"""Database change feed methods."""

74

75

def changes(self, raw_data=False, **kwargs):

76

"""

77

Get changes feed for database.

78

79

Parameters:

80

- raw_data (bool): Return raw response data

81

- feed (str): Feed type ('normal', 'continuous', 'longpoll')

82

- style (str): Change format ('main_only', 'all_docs')

83

- heartbeat (int): Heartbeat interval (continuous feeds)

84

- timeout (int): Request timeout in milliseconds

85

- since (str | int): Start sequence (0, 'now', or sequence ID)

86

- limit (int): Maximum number of changes

87

- descending (bool): Reverse chronological order

88

- include_docs (bool): Include full document content

89

- attachments (bool): Include attachment stubs

90

- att_encoding_info (bool): Include attachment encoding info

91

- conflicts (bool): Include conflict revisions

92

- deleted_conflicts (bool): Include deleted conflict revisions

93

- filter (str): Filter function name from design document

94

- doc_ids (list[str]): Only changes for specific document IDs

95

- selector (dict): Filter changes by document selector (Cloudant only)

96

- **kwargs: Additional filter parameters

97

98

Returns:

99

Feed: Change feed iterator

100

101

Raises:

102

CloudantFeedException: Feed creation failed

103

"""

104

105

def infinite_changes(self, **kwargs):

106

"""

107

Get infinite changes feed that automatically reconnects.

108

109

Parameters:

110

- **kwargs: Same options as changes() method

111

112

Returns:

113

InfiniteFeed: Perpetual change feed iterator

114

"""

115

```

116

117

### Client Database Update Feeds

118

119

Monitor database creation and deletion events.

120

121

```python { .api }

122

class CouchDB(dict):

123

"""Database update monitoring."""

124

125

def db_updates(self, raw_data=False, **kwargs):

126

"""

127

Monitor database creation and deletion events.

128

129

Parameters:

130

- raw_data (bool): Return raw response data

131

- feed (str): Feed type ('normal', 'continuous', 'longpoll')

132

- timeout (int): Request timeout

133

- heartbeat (int): Heartbeat interval (continuous feeds)

134

- since (str): Start from specific sequence

135

136

Returns:

137

Feed: Database updates feed iterator

138

"""

139

140

class Cloudant(CouchDB):

141

"""Cloudant database update monitoring."""

142

143

def infinite_db_updates(self, **kwargs):

144

"""

145

Get infinite database updates feed.

146

147

Parameters:

148

- **kwargs: Same options as db_updates()

149

150

Returns:

151

InfiniteFeed: Perpetual database updates iterator

152

"""

153

```

154

155

## Usage Examples

156

157

### Basic Change Monitoring

158

159

```python

160

from cloudant import cloudant

161

162

with cloudant('user', 'pass', account='myaccount') as client:

163

db = client['my_database']

164

165

# Get recent changes (non-continuous)

166

changes = db.changes(limit=10, include_docs=True)

167

168

for change in changes:

169

doc_id = change['id']

170

seq = change['seq']

171

172

if 'doc' in change:

173

doc = change['doc']

174

print(f"Changed document {doc_id}: {doc.get('name', 'N/A')}")

175

else:

176

print(f"Document {doc_id} changed (deleted: {change.get('deleted', False)})")

177

178

print(f"Last sequence: {changes.last_seq}")

179

```

180

181

### Continuous Change Monitoring

182

183

```python

184

from cloudant import cloudant

185

import time

186

import threading

187

188

with cloudant('user', 'pass', account='myaccount') as client:

189

db = client['my_database']

190

191

# Start continuous feed in background thread

192

def monitor_changes():

193

try:

194

changes = db.changes(

195

feed='continuous',

196

include_docs=True,

197

heartbeat=30000, # 30 second heartbeat

198

timeout=60000 # 60 second timeout

199

)

200

201

for change in changes:

202

if change: # Skip heartbeat messages

203

doc_id = change['id']

204

if change.get('deleted'):

205

print(f"Document deleted: {doc_id}")

206

else:

207

doc = change.get('doc', {})

208

print(f"Document updated: {doc_id} - {doc.get('name', 'N/A')}")

209

210

except Exception as e:

211

print(f"Change feed error: {e}")

212

213

# Start monitoring in background

214

monitor_thread = threading.Thread(target=monitor_changes)

215

monitor_thread.daemon = True

216

monitor_thread.start()

217

218

# Do other work while monitoring changes

219

print("Monitoring changes in background...")

220

time.sleep(60) # Monitor for 1 minute

221

222

print("Stopping change monitoring")

223

```

224

225

### Infinite Change Feed

226

227

```python

228

from cloudant import cloudant

229

230

with cloudant('user', 'pass', account='myaccount') as client:

231

db = client['my_database']

232

233

# Infinite feed automatically reconnects on connection loss

234

infinite_changes = db.infinite_changes(

235

include_docs=True,

236

heartbeat=15000, # 15 second heartbeat

237

since='now' # Start from current time

238

)

239

240

try:

241

for change in infinite_changes:

242

if change: # Skip heartbeat messages

243

doc_id = change['id']

244

seq = change['seq']

245

246

if change.get('deleted'):

247

print(f"[{seq}] Document deleted: {doc_id}")

248

else:

249

doc = change.get('doc', {})

250

doc_type = doc.get('type', 'unknown')

251

print(f"[{seq}] {doc_type} document updated: {doc_id}")

252

253

except KeyboardInterrupt:

254

print("Stopping infinite change feed...")

255

infinite_changes.stop()

256

```

257

258

### Filtered Change Feeds

259

260

```python

261

from cloudant import cloudant

262

263

with cloudant('user', 'pass', account='myaccount') as client:

264

db = client['my_database']

265

266

# Filter by document IDs

267

doc_ids = ['user123', 'user456', 'user789']

268

changes = db.changes(

269

doc_ids=doc_ids,

270

include_docs=True,

271

since='0' # From beginning

272

)

273

274

for change in changes:

275

print(f"User document changed: {change['id']}")

276

277

# Filter using selector (Cloudant only)

278

changes = db.changes(

279

selector={'type': 'order', 'status': 'pending'},

280

include_docs=True,

281

feed='continuous'

282

)

283

284

for change in changes:

285

if change:

286

doc = change.get('doc', {})

287

print(f"Pending order updated: {doc.get('order_id', 'N/A')}")

288

289

# Filter using design document filter function

290

changes = db.changes(

291

filter='filters/by_user_type',

292

user_type='admin', # Parameter for filter function

293

include_docs=True

294

)

295

296

for change in changes:

297

print(f"Admin user changed: {change['id']}")

298

```

299

300

### Resumable Change Processing

301

302

```python

303

from cloudant import cloudant

304

import json

305

import os

306

307

CHECKPOINT_FILE = 'change_checkpoint.json'

308

309

def load_checkpoint():

310

"""Load last processed sequence from file."""

311

if os.path.exists(CHECKPOINT_FILE):

312

with open(CHECKPOINT_FILE, 'r') as f:

313

data = json.load(f)

314

return data.get('last_seq', '0')

315

return '0'

316

317

def save_checkpoint(seq):

318

"""Save current sequence to file."""

319

with open(CHECKPOINT_FILE, 'w') as f:

320

json.dump({'last_seq': seq}, f)

321

322

with cloudant('user', 'pass', account='myaccount') as client:

323

db = client['my_database']

324

325

# Start from last checkpoint

326

last_seq = load_checkpoint()

327

print(f"Resuming from sequence: {last_seq}")

328

329

changes = db.changes(

330

since=last_seq,

331

include_docs=True,

332

limit=100 # Process in batches

333

)

334

335

processed_count = 0

336

337

for change in changes:

338

doc_id = change['id']

339

current_seq = change['seq']

340

341

# Process the change

342

if change.get('deleted'):

343

print(f"Processing deletion: {doc_id}")

344

else:

345

doc = change.get('doc', {})

346

print(f"Processing update: {doc_id} - {doc.get('type', 'unknown')}")

347

348

processed_count += 1

349

350

# Save checkpoint periodically

351

if processed_count % 10 == 0:

352

save_checkpoint(current_seq)

353

print(f"Checkpoint saved at sequence: {current_seq}")

354

355

# Save final checkpoint

356

if changes.last_seq:

357

save_checkpoint(changes.last_seq)

358

print(f"Final checkpoint: {changes.last_seq}")

359

360

print(f"Processed {processed_count} changes")

361

```

362

363

### Database Updates Monitoring

364

365

```python

366

from cloudant import cloudant

367

368

with cloudant('user', 'pass', account='myaccount') as client:

369

# Monitor database creation/deletion events

370

db_updates = client.db_updates(feed='continuous')

371

372

try:

373

for update in db_updates:

374

if update: # Skip heartbeat messages

375

db_name = update['db_name']

376

update_type = update['type']

377

378

if update_type == 'created':

379

print(f"Database created: {db_name}")

380

elif update_type == 'deleted':

381

print(f"Database deleted: {db_name}")

382

elif update_type == 'updated':

383

print(f"Database updated: {db_name}")

384

385

except KeyboardInterrupt:

386

print("Stopping database updates monitoring")

387

db_updates.stop()

388

```

389

390

### Change Feed with Error Handling

391

392

```python

393

from cloudant import cloudant

394

from cloudant.error import CloudantFeedException

395

import time

396

397

def robust_change_monitor(client, db_name, max_retries=5):

398

"""Monitor changes with automatic retry on errors."""

399

400

retry_count = 0

401

last_seq = '0'

402

403

while retry_count < max_retries:

404

try:

405

db = client[db_name]

406

407

print(f"Starting change feed from sequence: {last_seq}")

408

changes = db.changes(

409

since=last_seq,

410

feed='continuous',

411

include_docs=True,

412

heartbeat=30000,

413

timeout=60000

414

)

415

416

for change in changes:

417

if change:

418

doc_id = change['id']

419

last_seq = change['seq']

420

421

# Process change

422

if change.get('deleted'):

423

print(f"Document deleted: {doc_id}")

424

else:

425

doc = change.get('doc', {})

426

print(f"Document updated: {doc_id}")

427

428

# If we reach here, feed ended normally

429

print("Change feed ended normally")

430

break

431

432

except CloudantFeedException as e:

433

retry_count += 1

434

print(f"Feed error (attempt {retry_count}/{max_retries}): {e}")

435

436

if retry_count < max_retries:

437

# Exponential backoff

438

wait_time = 2 ** retry_count

439

print(f"Retrying in {wait_time} seconds...")

440

time.sleep(wait_time)

441

else:

442

print("Max retries reached, giving up")

443

raise

444

445

except KeyboardInterrupt:

446

print("Change monitoring interrupted by user")

447

break

448

except Exception as e:

449

print(f"Unexpected error: {e}")

450

break

451

452

# Usage

453

with cloudant('user', 'pass', account='myaccount') as client:

454

robust_change_monitor(client, 'my_database')

455

```

456

457

### Multi-Database Change Monitoring

458

459

```python

460

from cloudant import cloudant

461

import threading

462

import time

463

464

def monitor_database_changes(client, db_name, callback):

465

"""Monitor changes for a specific database."""

466

try:

467

db = client[db_name]

468

changes = db.changes(

469

feed='continuous',

470

include_docs=True,

471

heartbeat=30000

472

)

473

474

for change in changes:

475

if change:

476

callback(db_name, change)

477

478

except Exception as e:

479

print(f"Error monitoring {db_name}: {e}")

480

481

def change_handler(db_name, change):

482

"""Handle change events from any database."""

483

doc_id = change['id']

484

seq = change['seq']

485

486

if change.get('deleted'):

487

print(f"[{db_name}] Document deleted: {doc_id}")

488

else:

489

doc = change.get('doc', {})

490

doc_type = doc.get('type', 'unknown')

491

print(f"[{db_name}] {doc_type} updated: {doc_id}")

492

493

with cloudant('user', 'pass', account='myaccount') as client:

494

# Monitor multiple databases

495

databases = ['users', 'orders', 'inventory', 'logs']

496

threads = []

497

498

for db_name in databases:

499

thread = threading.Thread(

500

target=monitor_database_changes,

501

args=(client, db_name, change_handler)

502

)

503

thread.daemon = True

504

thread.start()

505

threads.append(thread)

506

print(f"Started monitoring {db_name}")

507

508

try:

509

# Keep main thread alive

510

while True:

511

time.sleep(1)

512

except KeyboardInterrupt:

513

print("Stopping all change monitors...")

514

```

515

516

### Change Feed Performance Optimization

517

518

```python

519

from cloudant import cloudant

520

import time

521

522

with cloudant('user', 'pass', account='myaccount') as client:

523

db = client['my_database']

524

525

# High-performance change processing

526

changes = db.changes(

527

feed='continuous',

528

include_docs=False, # Don't fetch full documents for performance

529

heartbeat=5000, # More frequent heartbeats

530

timeout=30000, # Shorter timeout for faster reconnection

531

limit=1000, # Process in larger batches

532

since='now' # Start from current time

533

)

534

535

batch = []

536

batch_size = 50

537

last_process_time = time.time()

538

539

for change in changes:

540

if change:

541

batch.append(change)

542

543

# Process batch when full or after timeout

544

if (len(batch) >= batch_size or

545

time.time() - last_process_time > 5):

546

547

# Process batch efficiently

548

print(f"Processing batch of {len(batch)} changes")

549

550

for change in batch:

551

doc_id = change['id']

552

553

if change.get('deleted'):

554

# Handle deletion

555

print(f"Deleted: {doc_id}")

556

else:

557

# Fetch document only if needed

558

doc = db.get(doc_id, remote=True)

559

if doc and doc.exists():

560

# Process document

561

print(f"Updated: {doc_id}")

562

563

# Clear batch

564

batch = []

565

last_process_time = time.time()

566

```

567

568

## Error Handling

569

570

Change feed operations can raise `CloudantFeedException`:

571

572

```python

573

from cloudant import cloudant

574

from cloudant.error import CloudantFeedException

575

576

with cloudant('user', 'pass', account='myaccount') as client:

577

db = client['my_database']

578

579

try:

580

# Invalid feed parameters

581

changes = db.changes(feed='invalid_feed_type')

582

for change in changes:

583

print(change)

584

except CloudantFeedException as e:

585

print(f"Feed configuration error: {e}")

586

587

try:

588

# Network timeout during feed consumption

589

changes = db.changes(

590

feed='continuous',

591

timeout=1000 # Very short timeout

592

)

593

594

for change in changes:

595

print(change)

596

597

except CloudantFeedException as e:

598

print(f"Feed network error: {e}")

599

600

try:

601

# Invalid filter function

602

changes = db.changes(filter='non_existent/filter')

603

for change in changes:

604

print(change)

605

except CloudantFeedException as e:

606

print(f"Filter error: {e}")

607

```