or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio-operations.mdchange-streams.mdclient-encryption.mdcursor-operations.mdgridfs-operations.mdindex.mdtornado-operations.mdweb-integration.md

tornado-operations.mddocs/

0

# Tornado Operations

1

2

Core MongoDB client, database, and collection operations optimized for Tornado's IOLoop and Future objects. These classes integrate seamlessly with Tornado applications and provide callback-based asynchronous operations.

3

4

## Capabilities

5

6

### Tornado Client

7

8

The main entry point for Tornado-based MongoDB operations, providing connection management and database access with Tornado Future objects.

9

10

```python { .api }

11

class MotorClient:

12

def __init__(

13

self,

14

host: Union[str, List[str]] = 'localhost',

15

port: int = 27017,

16

document_class: type = dict,

17

tz_aware: bool = False,

18

connect: bool = True,

19

io_loop=None,

20

**kwargs

21

):

22

"""

23

Create a new MotorClient connection to MongoDB.

24

25

Parameters:

26

- host: MongoDB host(s) to connect to

27

- port: Port number for MongoDB connection

28

- document_class: Default class for documents returned from queries

29

- tz_aware: Whether datetime objects should be timezone-aware

30

- connect: Whether to connect immediately or lazily

31

- io_loop: Tornado IOLoop instance (optional, uses current if None)

32

- **kwargs: Additional connection options (maxPoolSize, ssl, etc.)

33

"""

34

35

def get_database(self, name: Optional[str] = None, **kwargs) -> MotorDatabase:

36

"""Get a database instance."""

37

38

def get_default_database(self, **kwargs) -> MotorDatabase:

39

"""Get the default database specified in connection URI."""

40

41

def list_databases(self, session=None, **kwargs) -> tornado.concurrent.Future:

42

"""List all databases on the MongoDB server."""

43

44

def list_database_names(self, session=None, **kwargs) -> tornado.concurrent.Future:

45

"""Get names of all databases on the server."""

46

47

def server_info(self) -> tornado.concurrent.Future:

48

"""Get information about the MongoDB server."""

49

50

def start_session(self, **kwargs) -> tornado.concurrent.Future:

51

"""Start a logical session for use with transactions."""

52

53

def drop_database(self, name_or_database: Union[str, MotorDatabase], session=None) -> tornado.concurrent.Future:

54

"""Drop a database."""

55

56

def close(self) -> None:

57

"""Close all connections to MongoDB."""

58

59

def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> MotorChangeStream:

60

"""Watch for changes across all collections in all databases."""

61

62

# Read-only properties

63

@property

64

def address(self) -> Optional[Tuple[str, int]]:

65

"""Current connection address."""

66

67

@property

68

def primary(self) -> Optional[Tuple[str, int]]:

69

"""Primary server address in replica set."""

70

71

@property

72

def secondaries(self) -> Set[Tuple[str, int]]:

73

"""Secondary server addresses in replica set."""

74

75

@property

76

def is_primary(self) -> bool:

77

"""Whether connected to a primary server."""

78

79

@property

80

def is_mongos(self) -> bool:

81

"""Whether connected to a mongos router."""

82

```

83

84

### Tornado Database

85

86

Represents a MongoDB database with Tornado Future-based operations.

87

88

```python { .api }

89

class MotorDatabase:

90

@property

91

def name(self) -> str:

92

"""Database name."""

93

94

@property

95

def client(self) -> MotorClient:

96

"""The client that owns this database."""

97

98

def get_collection(self, name: str, **kwargs) -> MotorCollection:

99

"""Get a collection in this database."""

100

101

def __getitem__(self, name: str) -> MotorCollection:

102

"""Get a collection using dictionary-style access."""

103

104

def __getattr__(self, name: str) -> MotorCollection:

105

"""Get a collection using attribute-style access."""

106

107

def create_collection(

108

self,

109

name: str,

110

codec_options=None,

111

read_preference=None,

112

write_concern=None,

113

read_concern=None,

114

session=None,

115

**kwargs

116

) -> tornado.concurrent.Future:

117

"""Create a new collection in this database."""

118

119

def drop_collection(

120

self,

121

name_or_collection: Union[str, MotorCollection],

122

session=None

123

) -> tornado.concurrent.Future:

124

"""Drop a collection."""

125

126

def list_collection_names(

127

self,

128

session=None,

129

filter: Optional[Dict[str, Any]] = None,

130

**kwargs

131

) -> tornado.concurrent.Future:

132

"""Get names of all collections in this database."""

133

134

def list_collections(

135

self,

136

session=None,

137

filter: Optional[Dict[str, Any]] = None,

138

**kwargs

139

) -> tornado.concurrent.Future:

140

"""List collections with metadata."""

141

142

def command(

143

self,

144

command: Union[str, Dict[str, Any]],

145

value: Any = 1,

146

check: bool = True,

147

allowable_errors: Optional[List[str]] = None,

148

session=None,

149

**kwargs

150

) -> tornado.concurrent.Future:

151

"""Execute a database command."""

152

153

def aggregate(

154

self,

155

pipeline: List[Dict[str, Any]],

156

session=None,

157

**kwargs

158

) -> MotorCommandCursor:

159

"""Execute an aggregation pipeline on the database."""

160

161

def watch(

162

self,

163

pipeline: Optional[List[Dict[str, Any]]] = None,

164

session=None,

165

**kwargs

166

) -> MotorChangeStream:

167

"""Watch for changes on all collections in this database."""

168

```

169

170

### Tornado Collection

171

172

Represents a MongoDB collection with Tornado Future-based document operations.

173

174

```python { .api }

175

class MotorCollection:

176

@property

177

def name(self) -> str:

178

"""Collection name."""

179

180

@property

181

def full_name(self) -> str:

182

"""Full collection name (database.collection)."""

183

184

@property

185

def database(self) -> MotorDatabase:

186

"""The database that owns this collection."""

187

188

# Insert Operations

189

def insert_one(

190

self,

191

document: Dict[str, Any],

192

bypass_document_validation: bool = False,

193

session=None

194

) -> tornado.concurrent.Future:

195

"""Insert a single document."""

196

197

def insert_many(

198

self,

199

documents: List[Dict[str, Any]],

200

ordered: bool = True,

201

bypass_document_validation: bool = False,

202

session=None

203

) -> tornado.concurrent.Future:

204

"""Insert multiple documents."""

205

206

# Find Operations

207

def find_one(

208

self,

209

filter: Optional[Dict[str, Any]] = None,

210

*args,

211

projection: Optional[Dict[str, Any]] = None,

212

session=None,

213

**kwargs

214

) -> tornado.concurrent.Future:

215

"""Find a single document."""

216

217

def find(

218

self,

219

filter: Optional[Dict[str, Any]] = None,

220

projection: Optional[Dict[str, Any]] = None,

221

skip: int = 0,

222

limit: int = 0,

223

no_cursor_timeout: bool = False,

224

cursor_type=None,

225

sort: Optional[List[Tuple[str, int]]] = None,

226

allow_partial_results: bool = False,

227

batch_size: int = 0,

228

collation=None,

229

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

230

max_time_ms: Optional[int] = None,

231

session=None,

232

**kwargs

233

) -> MotorCursor:

234

"""Find multiple documents, returns a cursor."""

235

236

def find_one_and_delete(

237

self,

238

filter: Dict[str, Any],

239

projection: Optional[Dict[str, Any]] = None,

240

sort: Optional[List[Tuple[str, int]]] = None,

241

session=None,

242

**kwargs

243

) -> tornado.concurrent.Future:

244

"""Find and delete a single document."""

245

246

def find_one_and_replace(

247

self,

248

filter: Dict[str, Any],

249

replacement: Dict[str, Any],

250

projection: Optional[Dict[str, Any]] = None,

251

sort: Optional[List[Tuple[str, int]]] = None,

252

upsert: bool = False,

253

return_document: bool = False,

254

session=None,

255

**kwargs

256

) -> tornado.concurrent.Future:

257

"""Find and replace a single document."""

258

259

def find_one_and_update(

260

self,

261

filter: Dict[str, Any],

262

update: Dict[str, Any],

263

projection: Optional[Dict[str, Any]] = None,

264

sort: Optional[List[Tuple[str, int]]] = None,

265

upsert: bool = False,

266

return_document: bool = False,

267

session=None,

268

**kwargs

269

) -> tornado.concurrent.Future:

270

"""Find and update a single document."""

271

272

# Update Operations

273

def update_one(

274

self,

275

filter: Dict[str, Any],

276

update: Dict[str, Any],

277

upsert: bool = False,

278

bypass_document_validation: bool = False,

279

collation=None,

280

array_filters: Optional[List[Dict[str, Any]]] = None,

281

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

282

session=None

283

) -> tornado.concurrent.Future:

284

"""Update a single document."""

285

286

def update_many(

287

self,

288

filter: Dict[str, Any],

289

update: Dict[str, Any],

290

upsert: bool = False,

291

array_filters: Optional[List[Dict[str, Any]]] = None,

292

bypass_document_validation: bool = False,

293

collation=None,

294

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

295

session=None

296

) -> tornado.concurrent.Future:

297

"""Update multiple documents."""

298

299

def replace_one(

300

self,

301

filter: Dict[str, Any],

302

replacement: Dict[str, Any],

303

upsert: bool = False,

304

bypass_document_validation: bool = False,

305

collation=None,

306

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

307

session=None

308

) -> tornado.concurrent.Future:

309

"""Replace a single document."""

310

311

# Delete Operations

312

def delete_one(

313

self,

314

filter: Dict[str, Any],

315

collation=None,

316

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

317

session=None

318

) -> tornado.concurrent.Future:

319

"""Delete a single document."""

320

321

def delete_many(

322

self,

323

filter: Dict[str, Any],

324

collation=None,

325

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

326

session=None

327

) -> tornado.concurrent.Future:

328

"""Delete multiple documents."""

329

330

# Count Operations

331

def count_documents(

332

self,

333

filter: Dict[str, Any],

334

session=None,

335

**kwargs

336

) -> tornado.concurrent.Future:

337

"""Count documents matching filter."""

338

339

def estimated_document_count(self, **kwargs) -> tornado.concurrent.Future:

340

"""Estimate total document count."""

341

342

# Index Operations

343

def create_index(

344

self,

345

keys: Union[str, List[Tuple[str, int]]],

346

session=None,

347

**kwargs

348

) -> tornado.concurrent.Future:

349

"""Create a single index."""

350

351

def create_indexes(

352

self,

353

indexes: List[Dict[str, Any]],

354

session=None,

355

**kwargs

356

) -> tornado.concurrent.Future:

357

"""Create multiple indexes."""

358

359

def drop_index(

360

self,

361

index: Union[str, List[Tuple[str, int]]],

362

session=None,

363

**kwargs

364

) -> tornado.concurrent.Future:

365

"""Drop a single index."""

366

367

def list_indexes(self, session=None) -> MotorCommandCursor:

368

"""List all indexes on the collection."""

369

370

# Aggregation Operations

371

def aggregate(

372

self,

373

pipeline: List[Dict[str, Any]],

374

session=None,

375

**kwargs

376

) -> MotorCommandCursor:

377

"""Execute an aggregation pipeline."""

378

379

def distinct(

380

self,

381

key: str,

382

filter: Optional[Dict[str, Any]] = None,

383

session=None,

384

**kwargs

385

) -> tornado.concurrent.Future:

386

"""Get distinct values for a field."""

387

388

# Bulk Operations

389

def bulk_write(

390

self,

391

requests: List[Any],

392

ordered: bool = True,

393

bypass_document_validation: bool = False,

394

session=None

395

) -> tornado.concurrent.Future:

396

"""Execute bulk write operations."""

397

398

# Change Streams

399

def watch(

400

self,

401

pipeline: Optional[List[Dict[str, Any]]] = None,

402

full_document: Optional[str] = None,

403

resume_after: Optional[Dict[str, Any]] = None,

404

max_await_time_ms: Optional[int] = None,

405

batch_size: Optional[int] = None,

406

collation=None,

407

start_at_operation_time=None,

408

session=None,

409

start_after: Optional[Dict[str, Any]] = None,

410

**kwargs

411

) -> MotorChangeStream:

412

"""Watch for changes on the collection."""

413

414

# Collection Management

415

def drop(self, session=None) -> tornado.concurrent.Future:

416

"""Drop the collection."""

417

418

def rename(self, new_name: str, session=None, **kwargs) -> tornado.concurrent.Future:

419

"""Rename the collection."""

420

```

421

422

### Tornado Client Session

423

424

Client session for transaction support and causally consistent reads in Tornado applications.

425

426

```python { .api }

427

class MotorClientSession:

428

"""

429

A session for ordering sequential operations and transactions.

430

431

Created via MotorClient.start_session(), not directly instantiated.

432

"""

433

434

# Properties

435

@property

436

def client(self) -> MotorClient:

437

"""The client this session was created from."""

438

439

@property

440

def cluster_time(self) -> Optional[Dict[str, Any]]:

441

"""The cluster time returned by the last operation."""

442

443

@property

444

def has_ended(self) -> bool:

445

"""Whether this session has ended."""

446

447

@property

448

def in_transaction(self) -> bool:

449

"""Whether this session is in an active transaction."""

450

451

@property

452

def operation_time(self) -> Optional[Any]:

453

"""The operation time returned by the last operation."""

454

455

@property

456

def options(self) -> Dict[str, Any]:

457

"""The options used to create this session."""

458

459

@property

460

def session_id(self) -> Dict[str, Any]:

461

"""A BSON document identifying this session."""

462

463

# Transaction Methods

464

def start_transaction(

465

self,

466

read_concern: Optional[Any] = None,

467

write_concern: Optional[Any] = None,

468

read_preference: Optional[Any] = None,

469

max_commit_time_ms: Optional[int] = None

470

) -> Any:

471

"""

472

Start a multi-statement transaction.

473

474

Returns a context manager for the transaction.

475

Use with context manager syntax.

476

477

Parameters:

478

- read_concern: Read concern for the transaction

479

- write_concern: Write concern for the transaction

480

- read_preference: Read preference for the transaction

481

- max_commit_time_ms: Maximum time for commit operation

482

483

Returns:

484

Transaction context manager

485

"""

486

487

def commit_transaction(self) -> tornado.concurrent.Future:

488

"""Commit the current transaction."""

489

490

def abort_transaction(self) -> tornado.concurrent.Future:

491

"""Abort the current transaction."""

492

493

def with_transaction(

494

self,

495

coro: Callable,

496

read_concern: Optional[Any] = None,

497

write_concern: Optional[Any] = None,

498

read_preference: Optional[Any] = None,

499

max_commit_time_ms: Optional[int] = None

500

) -> tornado.concurrent.Future:

501

"""

502

Execute a coroutine within a transaction.

503

504

Automatically handles transaction retry logic for transient errors.

505

Will retry the entire transaction for up to 120 seconds.

506

507

Parameters:

508

- coro: Function that takes this session as first argument

509

- read_concern: Read concern for the transaction

510

- write_concern: Write concern for the transaction

511

- read_preference: Read preference for the transaction

512

- max_commit_time_ms: Maximum time for commit operation

513

514

Returns:

515

Tornado Future with return value from the coroutine

516

"""

517

518

# Session Management

519

def end_session(self) -> tornado.concurrent.Future:

520

"""End this session."""

521

522

def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:

523

"""Advance the cluster time for this session."""

524

525

def advance_operation_time(self, operation_time: Any) -> None:

526

"""Advance the operation time for this session."""

527

```

528

529

## Usage Examples

530

531

### Basic Tornado Operations

532

533

```python

534

import tornado.ioloop

535

import motor.motor_tornado

536

537

async def example():

538

client = motor.motor_tornado.MotorClient()

539

db = client.test_database

540

collection = db.test_collection

541

542

# Insert operations

543

result = await collection.insert_one({"name": "Alice", "age": 30})

544

print(f"Inserted ID: {result.inserted_id}")

545

546

results = await collection.insert_many([

547

{"name": "Bob", "age": 25},

548

{"name": "Charlie", "age": 35}

549

])

550

print(f"Inserted IDs: {results.inserted_ids}")

551

552

# Find operations

553

document = await collection.find_one({"name": "Alice"})

554

print(f"Found: {document}")

555

556

# Update operations

557

result = await collection.update_one(

558

{"name": "Alice"},

559

{"$set": {"age": 31}}

560

)

561

print(f"Modified {result.modified_count} document(s)")

562

563

# Delete operations

564

result = await collection.delete_one({"name": "Alice"})

565

print(f"Deleted {result.deleted_count} document(s)")

566

567

client.close()

568

569

# Run with Tornado IOLoop

570

tornado.ioloop.IOLoop.current().run_sync(example)

571

```

572

573

### Callback-Style Operations (Legacy)

574

575

```python

576

import tornado.ioloop

577

import motor.motor_tornado

578

579

def handle_result(result, error):

580

if error:

581

print(f"Error: {error}")

582

else:

583

print(f"Result: {result}")

584

tornado.ioloop.IOLoop.current().stop()

585

586

def callback_example():

587

client = motor.motor_tornado.MotorClient()

588

collection = client.test_database.test_collection

589

590

# Using callbacks (deprecated style)

591

future = collection.find_one({"name": "Alice"})

592

tornado.ioloop.IOLoop.current().add_future(

593

future,

594

lambda f: handle_result(f.result(), f.exception())

595

)

596

597

tornado.ioloop.IOLoop.current().run_sync(callback_example)

598

```

599

600

### Generator-Style Operations (Legacy)

601

602

```python

603

import tornado.gen

604

import tornado.ioloop

605

import motor.motor_tornado

606

607

@tornado.gen.coroutine

608

def gen_example():

609

client = motor.motor_tornado.MotorClient()

610

collection = client.test_database.test_collection

611

612

# Generator-based coroutines (legacy)

613

try:

614

result = yield collection.insert_one({"name": "test"})

615

print(f"Inserted: {result.inserted_id}")

616

617

document = yield collection.find_one({"name": "test"})

618

print(f"Found: {document}")

619

finally:

620

client.close()

621

622

tornado.ioloop.IOLoop.current().run_sync(gen_example)

623

```

624

625

### Web Application Integration

626

627

```python

628

import tornado.web

629

import tornado.ioloop

630

import motor.motor_tornado

631

632

class MainHandler(tornado.web.RequestHandler):

633

async def get(self):

634

db = self.application.settings['db']

635

collection = db.users

636

637

# Find users

638

users = []

639

async for user in collection.find().limit(10):

640

users.append(user)

641

642

self.write({"users": users})

643

644

def make_app():

645

client = motor.motor_tornado.MotorClient()

646

647

return tornado.web.Application([

648

(r"/", MainHandler),

649

], db=client.test_database)

650

651

if __name__ == "__main__":

652

app = make_app()

653

app.listen(8888)

654

tornado.ioloop.IOLoop.current().start()

655

```

656

657

## Types

658

659

```python { .api }

660

from typing import Any, Optional, Union, Dict, List, Tuple, Set

661

import tornado.concurrent

662

663

MotorClientSession = Any # Actual session type from motor

664

MotorCommandCursor = Any # Command cursor type

665

MotorCursor = Any # Query cursor type

666

MotorChangeStream = Any # Change stream type

667

```