or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queries.mdbson-handling.mdbulk-transactions.mdclient-connection.mddatabase-collection.mdgridfs-storage.mdindex.mdmonitoring-events.md

bulk-transactions.mddocs/

0

# Bulk Operations and Transactions

1

2

Bulk write operations, transaction support, and session management for high-performance and ACID-compliant operations.

3

4

## Capabilities

5

6

### Bulk Write Operations

7

8

Perform multiple write operations efficiently in a single request.

9

10

```python { .api }

11

class Collection:

12

def bulk_write(self, requests, ordered=True, bypass_document_validation=False, session=None):

13

"""

14

Execute bulk write operations.

15

16

Parameters:

17

- requests: list of write operation instances

18

- ordered: stop on first error if True

19

- bypass_document_validation: skip document validation

20

- session: optional ClientSession

21

22

Returns:

23

BulkWriteResult: Result summary

24

"""

25

26

def initialize_ordered_bulk_op(self):

27

"""

28

Initialize ordered bulk operation (deprecated).

29

30

Returns:

31

BulkOperationBuilder: Bulk operation builder

32

"""

33

34

def initialize_unordered_bulk_op(self):

35

"""

36

Initialize unordered bulk operation (deprecated).

37

38

Returns:

39

BulkOperationBuilder: Bulk operation builder

40

"""

41

```

42

43

### Bulk Operation Types

44

45

Individual operation types for bulk writes.

46

47

```python { .api }

48

class InsertOne:

49

def __init__(self, document):

50

"""

51

Single document insert operation.

52

53

Parameters:

54

- document: document to insert

55

"""

56

57

class UpdateOne:

58

def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):

59

"""

60

Single document update operation.

61

62

Parameters:

63

- filter: query criteria

64

- update: update specification

65

- upsert: insert if no match found

66

- collation: collation options

67

- array_filters: array update filters

68

- hint: index hint

69

"""

70

71

class UpdateMany:

72

def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):

73

"""

74

Multiple document update operation.

75

76

Parameters:

77

- filter: query criteria

78

- update: update specification

79

- upsert: insert if no match found

80

- collation: collation options

81

- array_filters: array update filters

82

- hint: index hint

83

"""

84

85

class ReplaceOne:

86

def __init__(self, filter, replacement, upsert=False, collation=None, hint=None):

87

"""

88

Single document replacement operation.

89

90

Parameters:

91

- filter: query criteria

92

- replacement: replacement document

93

- upsert: insert if no match found

94

- collation: collation options

95

- hint: index hint

96

"""

97

98

class DeleteOne:

99

def __init__(self, filter, collation=None, hint=None):

100

"""

101

Single document delete operation.

102

103

Parameters:

104

- filter: query criteria

105

- collation: collation options

106

- hint: index hint

107

"""

108

109

class DeleteMany:

110

def __init__(self, filter, collation=None, hint=None):

111

"""

112

Multiple document delete operation.

113

114

Parameters:

115

- filter: query criteria

116

- collation: collation options

117

- hint: index hint

118

"""

119

120

class IndexModel:

121

def __init__(self, keys, **kwargs):

122

"""

123

Index creation specification.

124

125

Parameters:

126

- keys: index key specification

127

- name: index name

128

- unique: unique constraint

129

- background: background build (deprecated)

130

- sparse: sparse index

131

- expireAfterSeconds: TTL seconds

132

- partialFilterExpression: partial index filter

133

- collation: collation options

134

"""

135

```

136

137

### Bulk Write Results

138

139

Result objects for bulk operations.

140

141

```python { .api }

142

class BulkWriteResult:

143

@property

144

def acknowledged(self):

145

"""

146

Write acknowledgment status.

147

148

Returns:

149

bool: True if acknowledged

150

"""

151

152

@property

153

def inserted_count(self):

154

"""

155

Number of inserted documents.

156

157

Returns:

158

int: Insert count

159

"""

160

161

@property

162

def matched_count(self):

163

"""

164

Number of matched documents for updates.

165

166

Returns:

167

int: Match count

168

"""

169

170

@property

171

def modified_count(self):

172

"""

173

Number of modified documents.

174

175

Returns:

176

int: Modification count

177

"""

178

179

@property

180

def deleted_count(self):

181

"""

182

Number of deleted documents.

183

184

Returns:

185

int: Delete count

186

"""

187

188

@property

189

def upserted_count(self):

190

"""

191

Number of upserted documents.

192

193

Returns:

194

int: Upsert count

195

"""

196

197

@property

198

def upserted_ids(self):

199

"""

200

Mapping of operation index to upserted _id.

201

202

Returns:

203

dict: {index: _id} mapping

204

"""

205

206

class BulkWriteError(OperationFailure):

207

@property

208

def details(self):

209

"""

210

Detailed error information.

211

212

Returns:

213

dict: Error details with writeErrors and writeConcernErrors

214

"""

215

```

216

217

### Transaction Support

218

219

Multi-document ACID transactions with session management.

220

221

```python { .api }

222

class MongoClient:

223

def start_session(

224

self,

225

causal_consistency=None,

226

default_transaction_options=None,

227

snapshot=False

228

):

229

"""

230

Start a client session.

231

232

Parameters:

233

- causal_consistency: enable causal consistency

234

- default_transaction_options: default TransactionOptions

235

- snapshot: enable snapshot reads

236

237

Returns:

238

ClientSession: Session for operations

239

"""

240

241

class ClientSession:

242

def start_transaction(self, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):

243

"""

244

Start a multi-document transaction.

245

246

Parameters:

247

- read_concern: transaction read concern

248

- write_concern: transaction write concern

249

- read_preference: transaction read preference

250

- max_commit_time_ms: maximum commit time

251

252

Raises:

253

InvalidOperation: if transaction already started

254

"""

255

256

def commit_transaction(self):

257

"""

258

Commit the current transaction.

259

260

Raises:

261

InvalidOperation: if no active transaction

262

"""

263

264

def abort_transaction(self):

265

"""

266

Abort the current transaction.

267

268

Raises:

269

InvalidOperation: if no active transaction

270

"""

271

272

@property

273

def in_transaction(self):

274

"""

275

Check if session has active transaction.

276

277

Returns:

278

bool: True if transaction is active

279

"""

280

281

@property

282

def has_ended(self):

283

"""

284

Check if session has ended.

285

286

Returns:

287

bool: True if session ended

288

"""

289

290

def end_session(self):

291

"""End the session."""

292

293

def with_transaction(self, callback, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):

294

"""

295

Execute callback in a transaction with automatic retry.

296

297

Parameters:

298

- callback: function to execute in transaction

299

- read_concern: transaction read concern

300

- write_concern: transaction write concern

301

- read_preference: transaction read preference

302

- max_commit_time_ms: maximum commit time

303

304

Returns:

305

Any: Result from callback function

306

"""

307

```

308

309

### Transaction Options

310

311

Configuration for transaction behavior.

312

313

```python { .api }

314

class TransactionOptions:

315

def __init__(

316

self,

317

read_concern=None,

318

write_concern=None,

319

read_preference=None,

320

max_commit_time_ms=None

321

):

322

"""

323

Transaction configuration options.

324

325

Parameters:

326

- read_concern: read concern for transaction

327

- write_concern: write concern for transaction

328

- read_preference: read preference for transaction

329

- max_commit_time_ms: maximum time for commit

330

"""

331

332

@property

333

def read_concern(self):

334

"""Transaction read concern."""

335

336

@property

337

def write_concern(self):

338

"""Transaction write concern."""

339

340

@property

341

def read_preference(self):

342

"""Transaction read preference."""

343

344

@property

345

def max_commit_time_ms(self):

346

"""Maximum commit time in milliseconds."""

347

```

348

349

## Usage Examples

350

351

### Bulk Write Operations

352

353

```python

354

from pymongo import MongoClient

355

from pymongo.operations import InsertOne, UpdateOne, DeleteOne, ReplaceOne

356

357

client = MongoClient()

358

collection = client.mydb.mycollection

359

360

# Prepare bulk operations

361

requests = [

362

InsertOne({"name": "Alice", "age": 25}),

363

InsertOne({"name": "Bob", "age": 30}),

364

UpdateOne({"name": "Charlie"}, {"$set": {"age": 35}}),

365

ReplaceOne(

366

{"name": "David"},

367

{"name": "David", "age": 40, "status": "active"},

368

upsert=True

369

),

370

DeleteOne({"name": "Eve"})

371

]

372

373

# Execute bulk write

374

try:

375

result = collection.bulk_write(requests)

376

print(f"Inserted: {result.inserted_count}")

377

print(f"Modified: {result.modified_count}")

378

print(f"Deleted: {result.deleted_count}")

379

print(f"Upserted: {result.upserted_count}")

380

if result.upserted_ids:

381

print(f"Upserted IDs: {result.upserted_ids}")

382

except BulkWriteError as e:

383

print(f"Bulk write error: {e.details}")

384

385

# Unordered bulk write (continues on errors)

386

result = collection.bulk_write(requests, ordered=False)

387

```

388

389

### Transaction Usage

390

391

```python

392

from pymongo import MongoClient

393

from pymongo.errors import PyMongoError

394

395

client = MongoClient()

396

db = client.mydb

397

398

# Basic transaction

399

with client.start_session() as session:

400

with session.start_transaction():

401

db.accounts.update_one(

402

{"name": "Alice"},

403

{"$inc": {"balance": -100}},

404

session=session

405

)

406

db.accounts.update_one(

407

{"name": "Bob"},

408

{"$inc": {"balance": 100}},

409

session=session

410

)

411

# Transaction commits automatically on context exit

412

413

# Manual transaction control

414

session = client.start_session()

415

try:

416

session.start_transaction()

417

418

# Perform operations

419

db.inventory.update_one(

420

{"item": "widget", "qty": {"$gte": 5}},

421

{"$inc": {"qty": -5}},

422

session=session

423

)

424

db.orders.insert_one(

425

{"item": "widget", "qty": 5, "status": "pending"},

426

session=session

427

)

428

429

# Commit transaction

430

session.commit_transaction()

431

print("Transaction committed")

432

433

except PyMongoError as e:

434

session.abort_transaction()

435

print(f"Transaction aborted: {e}")

436

finally:

437

session.end_session()

438

```

439

440

### Transaction with Callback

441

442

```python

443

def transfer_funds(session):

444

"""Transfer funds between accounts."""

445

# This function will be called within a transaction

446

accounts = session.client.mydb.accounts

447

448

# Check source account balance

449

source = accounts.find_one({"name": "Alice"}, session=session)

450

if source["balance"] < 100:

451

raise ValueError("Insufficient funds")

452

453

# Perform transfer

454

accounts.update_one(

455

{"name": "Alice"},

456

{"$inc": {"balance": -100}},

457

session=session

458

)

459

accounts.update_one(

460

{"name": "Bob"},

461

{"$inc": {"balance": 100}},

462

session=session

463

)

464

465

return "Transfer completed"

466

467

# Execute with automatic retry on transient errors

468

with client.start_session() as session:

469

try:

470

result = session.with_transaction(transfer_funds)

471

print(result)

472

except ValueError as e:

473

print(f"Business logic error: {e}")

474

```

475

476

### Advanced Bulk Operations

477

478

```python

479

from pymongo.operations import UpdateMany, InsertOne

480

from pymongo.write_concern import WriteConcern

481

482

# Bulk operations with write concern

483

requests = [

484

InsertOne({"category": "electronics", "price": 299.99}),

485

UpdateMany(

486

{"category": "electronics"},

487

{"$mul": {"price": 0.9}}, # 10% discount

488

hint="category_1" # Use specific index

489

),

490

UpdateMany(

491

{"status": "pending", "created": {"$lt": "2023-01-01"}},

492

{"$set": {"status": "expired"}},

493

collation={"locale": "en", "strength": 2}

494

)

495

]

496

497

# Execute with custom write concern

498

result = collection.bulk_write(

499

requests,

500

ordered=True,

501

bypass_document_validation=False

502

)

503

504

print(f"Bulk operation result: {result.bulk_api_result}")

505

```

506

507

### Session with Read Preferences

508

509

```python

510

from pymongo import ReadPreference

511

from pymongo.read_concern import ReadConcern

512

513

# Start session with specific options

514

with client.start_session(

515

causal_consistency=True,

516

default_transaction_options=TransactionOptions(

517

read_concern=ReadConcern("snapshot"),

518

write_concern=WriteConcern(w="majority"),

519

read_preference=ReadPreference.PRIMARY

520

)

521

) as session:

522

523

# Read operations will use session's read preference

524

docs = list(collection.find({}, session=session))

525

526

# Transaction inherits default options

527

with session.start_transaction():

528

collection.insert_one({"timestamp": datetime.now()}, session=session)

529

collection.update_many(

530

{"processed": False},

531

{"$set": {"processed": True}},

532

session=session

533

)

534

```