or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio-operations.mdchange-streams.mdclient-encryption.mdcursor-operations.mdgridfs-operations.mdindex.mdtornado-operations.mdweb-integration.md

cursor-operations.mddocs/

0

# Cursor Operations

1

2

Cursor functionality for iterating over query results, command results, and change streams. Motor provides comprehensive cursor support with async iteration, traditional cursor methods, and framework-specific optimizations.

3

4

## Capabilities

5

6

### Query Cursor

7

8

Cursor for iterating over document query results with support for sorting, limiting, skipping, and async iteration.

9

10

```python { .api }

11

# AsyncIO Query Cursor

12

class AsyncIOMotorCursor:

13

# Cursor Configuration

14

def limit(self, limit: int) -> AsyncIOMotorCursor:

15

"""Limit the number of results returned."""

16

17

def skip(self, skip: int) -> AsyncIOMotorCursor:

18

"""Skip a number of documents."""

19

20

def sort(

21

self,

22

key_or_list: Union[str, List[Tuple[str, int]]],

23

direction: Optional[int] = None

24

) -> AsyncIOMotorCursor:

25

"""

26

Sort the results.

27

28

Parameters:

29

- key_or_list: Field name or list of (field, direction) tuples

30

- direction: 1 for ascending, -1 for descending (when key_or_list is string)

31

"""

32

33

def batch_size(self, batch_size: int) -> AsyncIOMotorCursor:

34

"""Set the batch size for cursor operations."""

35

36

def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCursor:

37

"""Set maximum time in milliseconds for cursor operations."""

38

39

def hint(self, index: Union[str, List[Tuple[str, int]]]) -> AsyncIOMotorCursor:

40

"""Hint which index to use."""

41

42

def comment(self, comment: str) -> AsyncIOMotorCursor:

43

"""Add a comment to the cursor."""

44

45

def collation(self, collation: Dict[str, Any]) -> AsyncIOMotorCursor:

46

"""Set collation options."""

47

48

def allow_partial_results(self, allow_partial_results: bool) -> AsyncIOMotorCursor:

49

"""Allow partial results from mongos if some shards are down."""

50

51

# Cursor Execution

52

async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:

53

"""

54

Convert cursor to a list.

55

56

Parameters:

57

- length: Maximum number of documents to return (None for all)

58

59

Returns:

60

List of documents

61

"""

62

63

async def count(self, with_limit_and_skip: bool = False) -> int:

64

"""

65

Count documents (deprecated, use count_documents instead).

66

"""

67

68

def distinct(self, key: str) -> AsyncIOMotorCursor:

69

"""Get distinct values for a key."""

70

71

# Async Iterator Protocol

72

def __aiter__(self) -> AsyncIOMotorCursor:

73

"""Return self for async iteration."""

74

75

async def __anext__(self) -> Dict[str, Any]:

76

"""Get the next document."""

77

78

# Cursor Properties

79

@property

80

def address(self) -> Optional[Tuple[str, int]]:

81

"""Server address for this cursor."""

82

83

@property

84

def cursor_id(self) -> Optional[int]:

85

"""Cursor ID on the server."""

86

87

@property

88

def alive(self) -> bool:

89

"""Whether the cursor is still alive on the server."""

90

91

# Cursor Management

92

async def close(self) -> None:

93

"""Close the cursor."""

94

95

def clone(self) -> AsyncIOMotorCursor:

96

"""Create a copy of this cursor."""

97

98

# Tornado Query Cursor

99

class MotorCursor:

100

# Cursor Configuration (identical API, returns MotorCursor)

101

def limit(self, limit: int) -> MotorCursor: ...

102

def skip(self, skip: int) -> MotorCursor: ...

103

def sort(

104

self,

105

key_or_list: Union[str, List[Tuple[str, int]]],

106

direction: Optional[int] = None

107

) -> MotorCursor: ...

108

def batch_size(self, batch_size: int) -> MotorCursor: ...

109

def max_time_ms(self, max_time_ms: int) -> MotorCursor: ...

110

def hint(self, index: Union[str, List[Tuple[str, int]]]) -> MotorCursor: ...

111

def comment(self, comment: str) -> MotorCursor: ...

112

def collation(self, collation: Dict[str, Any]) -> MotorCursor: ...

113

def allow_partial_results(self, allow_partial_results: bool) -> MotorCursor: ...

114

115

# Cursor Execution (returns Tornado Futures)

116

def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...

117

def count(self, with_limit_and_skip: bool = False) -> tornado.concurrent.Future: ...

118

def distinct(self, key: str) -> tornado.concurrent.Future: ...

119

120

# Properties

121

@property

122

def address(self) -> Optional[Tuple[str, int]]: ...

123

@property

124

def cursor_id(self) -> Optional[int]: ...

125

@property

126

def alive(self) -> bool: ...

127

128

# Management

129

def close(self) -> tornado.concurrent.Future: ...

130

def clone(self) -> MotorCursor: ...

131

132

# Legacy iteration methods

133

def each(self, callback) -> None:

134

"""Deprecated: Iterate with callback."""

135

136

def next_object(self) -> tornado.concurrent.Future:

137

"""Deprecated: Get next object."""

138

```

139

140

### Command Cursor

141

142

Cursor for iterating over database command results like aggregation pipelines and administrative commands.

143

144

```python { .api }

145

# AsyncIO Command Cursor

146

class AsyncIOMotorCommandCursor:

147

# Cursor Configuration

148

def batch_size(self, batch_size: int) -> AsyncIOMotorCommandCursor:

149

"""Set the batch size for cursor operations."""

150

151

def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCommandCursor:

152

"""Set maximum time in milliseconds for cursor operations."""

153

154

# Cursor Execution

155

async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:

156

"""Convert cursor to a list."""

157

158

# Async Iterator Protocol

159

def __aiter__(self) -> AsyncIOMotorCommandCursor:

160

"""Return self for async iteration."""

161

162

async def __anext__(self) -> Dict[str, Any]:

163

"""Get the next document."""

164

165

# Cursor Properties

166

@property

167

def address(self) -> Optional[Tuple[str, int]]:

168

"""Server address for this cursor."""

169

170

@property

171

def cursor_id(self) -> Optional[int]:

172

"""Cursor ID on the server."""

173

174

@property

175

def alive(self) -> bool:

176

"""Whether the cursor is still alive on the server."""

177

178

# Cursor Management

179

async def close(self) -> None:

180

"""Close the cursor."""

181

182

# Tornado Command Cursor

183

class MotorCommandCursor:

184

def batch_size(self, batch_size: int) -> MotorCommandCursor: ...

185

def max_time_ms(self, max_time_ms: int) -> MotorCommandCursor: ...

186

187

def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...

188

189

@property

190

def address(self) -> Optional[Tuple[str, int]]: ...

191

@property

192

def cursor_id(self) -> Optional[int]: ...

193

@property

194

def alive(self) -> bool: ...

195

196

def close(self) -> tornado.concurrent.Future: ...

197

198

# Legacy methods

199

def each(self, callback) -> None: ...

200

def next_object(self) -> tornado.concurrent.Future: ...

201

```

202

203

### Latent Command Cursor

204

205

Special cursor type for deferred command execution, used primarily for aggregation operations that return cursors.

206

207

```python { .api }

208

# AsyncIO Latent Command Cursor

209

class AsyncIOMotorLatentCommandCursor(AsyncIOMotorCommandCursor):

210

"""

211

A command cursor that defers execution until first iteration.

212

213

Created by operations like aggregate() that return cursors.

214

The actual command isn't sent to MongoDB until iteration begins.

215

"""

216

217

def batch_size(self, batch_size: int) -> AsyncIOMotorLatentCommandCursor:

218

"""Set the batch size and return self for chaining."""

219

220

async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:

221

"""Convert cursor to a list, executing the deferred command."""

222

223

def __aiter__(self) -> AsyncIOMotorLatentCommandCursor:

224

"""Return self for async iteration."""

225

226

async def __anext__(self) -> Dict[str, Any]:

227

"""Get the next document, executing command on first call."""

228

229

# Tornado Latent Command Cursor

230

class MotorLatentCommandCursor(MotorCommandCursor):

231

"""

232

A command cursor that defers execution until first iteration.

233

234

Created by operations like aggregate() that return cursors.

235

The actual command isn't sent to MongoDB until iteration begins.

236

"""

237

238

def batch_size(self, batch_size: int) -> MotorLatentCommandCursor:

239

"""Set the batch size and return self for chaining."""

240

241

def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future:

242

"""Convert cursor to a list, executing the deferred command."""

243

244

def each(self, callback) -> None:

245

"""Iterate with callback, executing command on first call."""

246

247

def next_object(self) -> tornado.concurrent.Future:

248

"""Get next object, executing command on first call."""

249

```

250

251

### Raw Batch Cursors

252

253

Specialized cursors for handling raw BSON data with minimal processing overhead.

254

255

```python { .api }

256

# AsyncIO Raw Batch Cursor

257

class AsyncIOMotorRawBatchCursor(AsyncIOMotorCursor):

258

"""Cursor that returns raw BSON bytes instead of decoded documents."""

259

260

async def __anext__(self) -> bytes:

261

"""Get the next raw BSON document."""

262

263

# AsyncIO Raw Batch Command Cursor

264

class AsyncIOMotorRawBatchCommandCursor(AsyncIOMotorCommandCursor):

265

"""Command cursor that returns raw BSON bytes."""

266

267

async def __anext__(self) -> bytes:

268

"""Get the next raw BSON document."""

269

270

# Tornado equivalents

271

class MotorRawBatchCursor(MotorCursor):

272

"""Tornado cursor for raw BSON data."""

273

274

class MotorRawBatchCommandCursor(MotorCommandCursor):

275

"""Tornado command cursor for raw BSON data."""

276

```

277

278

## Usage Examples

279

280

### Basic Cursor Operations

281

282

```python

283

import asyncio

284

import motor.motor_asyncio

285

286

async def cursor_example():

287

client = motor.motor_asyncio.AsyncIOMotorClient()

288

collection = client.test_database.test_collection

289

290

# Insert sample data

291

await collection.insert_many([

292

{"name": "Alice", "age": 30, "city": "New York"},

293

{"name": "Bob", "age": 25, "city": "San Francisco"},

294

{"name": "Charlie", "age": 35, "city": "Chicago"},

295

{"name": "Diana", "age": 28, "city": "New York"},

296

{"name": "Eve", "age": 32, "city": "San Francisco"}

297

])

298

299

# Basic cursor usage

300

cursor = collection.find({"age": {"$gte": 25}})

301

302

# Async iteration

303

print("All users 25 or older:")

304

async for document in cursor:

305

print(f" {document['name']} ({document['age']}) - {document['city']}")

306

307

# Convert to list

308

cursor = collection.find({"city": "New York"})

309

users = await cursor.to_list(length=None)

310

print(f"\nFound {len(users)} users in New York")

311

312

# Cursor chaining

313

cursor = collection.find()\

314

.sort("age", -1)\

315

.limit(3)\

316

.skip(1)

317

318

print("\nTop 3 oldest users (skipping 1st):")

319

async for document in cursor:

320

print(f" {document['name']} ({document['age']})")

321

322

client.close()

323

324

asyncio.run(cursor_example())

325

```

326

327

### Advanced Cursor Configuration

328

329

```python

330

import asyncio

331

import motor.motor_asyncio

332

import pymongo

333

334

async def advanced_cursor_example():

335

client = motor.motor_asyncio.AsyncIOMotorClient()

336

collection = client.test_database.products

337

338

# Create index for examples

339

await collection.create_index([("price", 1), ("category", 1)])

340

341

# Insert sample products

342

await collection.insert_many([

343

{"name": "Laptop", "price": 999, "category": "Electronics", "brand": "Dell"},

344

{"name": "Phone", "price": 699, "category": "Electronics", "brand": "Apple"},

345

{"name": "Tablet", "price": 399, "category": "Electronics", "brand": "Samsung"},

346

{"name": "Book", "price": 19, "category": "Books", "brand": "Penguin"},

347

{"name": "Headphones", "price": 199, "category": "Electronics", "brand": "Sony"}

348

])

349

350

# Complex cursor with multiple options

351

cursor = collection.find(

352

{"category": "Electronics"},

353

{"name": 1, "price": 1, "brand": 1} # Projection

354

).sort([

355

("price", pymongo.DESCENDING),

356

("name", pymongo.ASCENDING)

357

]).limit(10).batch_size(2).hint([("price", 1), ("category", 1)])

358

359

print("Electronics sorted by price (desc), then name (asc):")

360

async for product in cursor:

361

print(f" {product['name']}: ${product['price']} ({product['brand']})")

362

363

# Cursor with collation for case-insensitive sorting

364

cursor = collection.find().sort("name", 1).collation({

365

"locale": "en",

366

"strength": 2 # Case insensitive

367

})

368

369

print("\nProducts sorted case-insensitively:")

370

async for product in cursor:

371

print(f" {product['name']}")

372

373

# Cursor with comment and max time

374

cursor = collection.find({"price": {"$lt": 500}})\

375

.comment("Finding affordable products")\

376

.max_time_ms(5000) # 5 second timeout

377

378

print("\nAffordable products (under $500):")

379

try:

380

async for product in cursor:

381

print(f" {product['name']}: ${product['price']}")

382

except pymongo.errors.ExecutionTimeout:

383

print("Query timed out!")

384

385

client.close()

386

387

asyncio.run(advanced_cursor_example())

388

```

389

390

### Command Cursor Usage

391

392

```python

393

import asyncio

394

import motor.motor_asyncio

395

396

async def command_cursor_example():

397

client = motor.motor_asyncio.AsyncIOMotorClient()

398

db = client.test_database

399

collection = db.sales

400

401

# Insert sample sales data

402

await collection.insert_many([

403

{"product": "Laptop", "amount": 999, "date": "2023-01-15", "region": "North"},

404

{"product": "Phone", "amount": 699, "date": "2023-01-16", "region": "South"},

405

{"product": "Laptop", "amount": 999, "date": "2023-01-17", "region": "North"},

406

{"product": "Tablet", "amount": 399, "date": "2023-01-18", "region": "East"},

407

{"product": "Phone", "amount": 699, "date": "2023-01-19", "region": "West"}

408

])

409

410

# Aggregation pipeline

411

pipeline = [

412

{"$group": {

413

"_id": "$product",

414

"total_sales": {"$sum": "$amount"},

415

"count": {"$sum": 1}

416

}},

417

{"$sort": {"total_sales": -1}}

418

]

419

420

# Get command cursor from aggregation

421

cursor = collection.aggregate(pipeline)

422

423

print("Sales by Product:")

424

async for result in cursor:

425

print(f" {result['_id']}: ${result['total_sales']} ({result['count']} sales)")

426

427

# List collections command cursor

428

cursor = db.list_collections()

429

430

print("\nCollections in database:")

431

async for collection_info in cursor:

432

print(f" {collection_info['name']}: {collection_info['type']}")

433

434

# List databases command cursor

435

cursor = client.list_databases()

436

437

print("\nDatabases:")

438

async for db_info in cursor:

439

print(f" {db_info['name']}: {db_info['sizeOnDisk']} bytes")

440

441

client.close()

442

443

asyncio.run(command_cursor_example())

444

```

445

446

### Cursor Performance Optimization

447

448

```python

449

import asyncio

450

import motor.motor_asyncio

451

import time

452

453

async def cursor_performance_example():

454

client = motor.motor_asyncio.AsyncIOMotorClient()

455

collection = client.test_database.large_collection

456

457

# Insert large amount of test data

458

print("Inserting test data...")

459

batch_size = 1000

460

for i in range(10): # 10,000 documents

461

batch = [

462

{"index": i * batch_size + j, "value": f"value_{i * batch_size + j}"}

463

for j in range(batch_size)

464

]

465

await collection.insert_many(batch)

466

467

print("Testing cursor performance...")

468

469

# Test 1: Default batch size

470

start_time = time.time()

471

cursor = collection.find()

472

count = 0

473

async for doc in cursor:

474

count += 1

475

476

default_time = time.time() - start_time

477

print(f"Default batch size: {count} docs in {default_time:.2f}s")

478

479

# Test 2: Large batch size

480

start_time = time.time()

481

cursor = collection.find().batch_size(1000)

482

count = 0

483

async for doc in cursor:

484

count += 1

485

486

large_batch_time = time.time() - start_time

487

print(f"Large batch size (1000): {count} docs in {large_batch_time:.2f}s")

488

489

# Test 3: Convert to list (single network round trip)

490

start_time = time.time()

491

cursor = collection.find()

492

docs = await cursor.to_list(length=None)

493

list_time = time.time() - start_time

494

print(f"to_list(): {len(docs)} docs in {list_time:.2f}s")

495

496

# Test 4: Limited results

497

start_time = time.time()

498

cursor = collection.find().limit(1000)

499

docs = await cursor.to_list(1000)

500

limited_time = time.time() - start_time

501

print(f"Limited (1000): {len(docs)} docs in {limited_time:.2f}s")

502

503

# Cleanup

504

await collection.drop()

505

client.close()

506

507

asyncio.run(cursor_performance_example())

508

```

509

510

### Cursor Error Handling

511

512

```python

513

import asyncio

514

import motor.motor_asyncio

515

import pymongo.errors

516

517

async def cursor_error_handling_example():

518

client = motor.motor_asyncio.AsyncIOMotorClient()

519

collection = client.test_database.test_collection

520

521

try:

522

# Cursor with timeout

523

cursor = collection.find().max_time_ms(1) # Very short timeout

524

525

async for document in cursor:

526

print(document)

527

528

except pymongo.errors.ExecutionTimeout:

529

print("Cursor operation timed out")

530

531

try:

532

# Invalid sort specification

533

cursor = collection.find().sort("invalid_field", 999) # Invalid direction

534

await cursor.to_list(None)

535

536

except pymongo.errors.OperationFailure as e:

537

print(f"Sort operation failed: {e}")

538

539

try:

540

# Cursor on dropped collection

541

await collection.drop()

542

cursor = collection.find()

543

544

# This might work (empty result) or fail depending on timing

545

async for document in cursor:

546

print(document)

547

548

except pymongo.errors.OperationFailure as e:

549

print(f"Cursor on dropped collection: {e}")

550

551

client.close()

552

553

asyncio.run(cursor_error_handling_example())

554

```

555

556

## Types

557

558

```python { .api }

559

from typing import Any, Optional, Union, Dict, List, Tuple, Iterator, AsyncIterator

560

import tornado.concurrent

561

562

# Sort specifications

563

SortKey = Union[str, List[Tuple[str, int]]]

564

SortDirection = int # 1 for ascending, -1 for descending

565

566

# Cursor result types

567

Document = Dict[str, Any]

568

RawBSONBytes = bytes

569

570

# Cursor state

571

CursorId = Optional[int]

572

ServerAddress = Optional[Tuple[str, int]]

573

```