or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analytics-operations.mdasync-operations.mdcluster-operations.mddocument-operations.mdindex.mdmanagement-operations.mdn1ql-queries.mdsearch-operations.mdsubdocument-operations.mdview-operations.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Asyncio-based asynchronous operations for high-performance, non-blocking applications. Provides the same API surface as synchronous operations with async/await support for improved concurrency and scalability.

3

4

## Capabilities

5

6

### Asynchronous Cluster Operations

7

8

Asyncio-compatible cluster connection and management.

9

10

```python { .api }

11

class ACluster:

12

def __init__(self, connection_string: str, options: ClusterOptions = None):

13

"""

14

Create asynchronous cluster instance.

15

16

Args:

17

connection_string (str): Connection string

18

options (ClusterOptions, optional): Cluster options

19

"""

20

21

async def bucket(self, bucket_name: str) -> ABucket:

22

"""

23

Get asynchronous bucket reference.

24

25

Args:

26

bucket_name (str): Bucket name

27

28

Returns:

29

ABucket: Asynchronous bucket instance

30

"""

31

32

async def query(self, statement: str, options: QueryOptions = None) -> QueryResult:

33

"""

34

Execute N1QL query asynchronously.

35

36

Args:

37

statement (str): N1QL query statement

38

options (QueryOptions, optional): Query options

39

40

Returns:

41

QueryResult: Async query results iterator

42

"""

43

44

async def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:

45

"""

46

Execute Analytics query asynchronously.

47

48

Args:

49

statement (str): Analytics query statement

50

options (AnalyticsOptions, optional): Analytics options

51

52

Returns:

53

AnalyticsResult: Async analytics results iterator

54

"""

55

56

async def search_query(self, index: str, query: SearchQuery, options: SearchOptions = None) -> SearchResult:

57

"""

58

Execute search query asynchronously.

59

60

Args:

61

index (str): Search index name

62

query (SearchQuery): Search query

63

options (SearchOptions, optional): Search options

64

65

Returns:

66

SearchResult: Async search results iterator

67

"""

68

69

async def ping(self, options: PingOptions = None) -> PingResult:

70

"""

71

Ping cluster services asynchronously.

72

73

Args:

74

options (PingOptions, optional): Ping options

75

76

Returns:

77

PingResult: Connectivity status

78

"""

79

80

async def diagnostics(self, options: DiagnosticsOptions = None) -> DiagnosticsResult:

81

"""

82

Get cluster diagnostics asynchronously.

83

84

Args:

85

options (DiagnosticsOptions, optional): Diagnostic options

86

87

Returns:

88

DiagnosticsResult: Cluster health information

89

"""

90

91

async def close(self) -> None:

92

"""Close cluster connection and cleanup resources."""

93

```

94

95

### Asynchronous Document Operations

96

97

Async key-value operations for document management.

98

99

```python { .api }

100

class AsyncCBCollection:

101

async def get(self, key: str, options: GetOptions = None) -> AsyncGetResult:

102

"""

103

Retrieve document asynchronously.

104

105

Args:

106

key (str): Document key

107

options (GetOptions, optional): Retrieval options

108

109

Returns:

110

AsyncGetResult: Document content and metadata

111

"""

112

113

async def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> AsyncMutationResult:

114

"""

115

Upsert document asynchronously.

116

117

Args:

118

key (str): Document key

119

value (Any): Document content

120

options (UpsertOptions, optional): Upsert options

121

122

Returns:

123

AsyncMutationResult: Operation result

124

"""

125

126

async def insert(self, key: str, value: Any, options: InsertOptions = None) -> AsyncMutationResult:

127

"""

128

Insert document asynchronously.

129

130

Args:

131

key (str): Document key

132

value (Any): Document content

133

options (InsertOptions, optional): Insert options

134

135

Returns:

136

AsyncMutationResult: Operation result

137

"""

138

139

async def replace(self, key: str, value: Any, options: ReplaceOptions = None) -> AsyncMutationResult:

140

"""

141

Replace document asynchronously.

142

143

Args:

144

key (str): Document key

145

value (Any): New document content

146

options (ReplaceOptions, optional): Replace options

147

148

Returns:

149

AsyncMutationResult: Operation result

150

"""

151

152

async def remove(self, key: str, options: RemoveOptions = None) -> AsyncMutationResult:

153

"""

154

Remove document asynchronously.

155

156

Args:

157

key (str): Document key

158

options (RemoveOptions, optional): Remove options

159

160

Returns:

161

AsyncMutationResult: Operation result

162

"""

163

164

async def exists(self, key: str, options: ExistsOptions = None) -> ExistsResult:

165

"""

166

Check document existence asynchronously.

167

168

Args:

169

key (str): Document key

170

options (ExistsOptions, optional): Existence check options

171

172

Returns:

173

ExistsResult: Existence status

174

"""

175

176

async def touch(self, key: str, expiry: timedelta, options: TouchOptions = None) -> AsyncMutationResult:

177

"""

178

Update document expiration asynchronously.

179

180

Args:

181

key (str): Document key

182

expiry (timedelta): New expiration time

183

options (TouchOptions, optional): Touch options

184

185

Returns:

186

AsyncMutationResult: Operation result

187

"""

188

189

async def get_and_touch(self, key: str, expiry: timedelta, options: GetAndTouchOptions = None) -> AsyncGetResult:

190

"""

191

Get and touch document asynchronously.

192

193

Args:

194

key (str): Document key

195

expiry (timedelta): New expiration time

196

options (GetAndTouchOptions, optional): Operation options

197

198

Returns:

199

AsyncGetResult: Document content with updated expiry

200

"""

201

202

async def get_and_lock(self, key: str, lock_time: timedelta, options: GetAndLockOptions = None) -> AsyncGetResult:

203

"""

204

Get and lock document asynchronously.

205

206

Args:

207

key (str): Document key

208

lock_time (timedelta): Lock duration

209

options (GetAndLockOptions, optional): Lock options

210

211

Returns:

212

AsyncGetResult: Document content with lock

213

"""

214

215

async def unlock(self, key: str, cas: int, options: UnlockOptions = None) -> None:

216

"""

217

Unlock document asynchronously.

218

219

Args:

220

key (str): Document key

221

cas (int): CAS value from get_and_lock

222

options (UnlockOptions, optional): Unlock options

223

"""

224

```

225

226

### Asynchronous Subdocument Operations

227

228

Async subdocument operations for efficient partial document updates.

229

230

```python { .api }

231

class AsyncCBCollection:

232

async def lookup_in(self, key: str, spec: List[Spec], options: LookupInOptions = None) -> LookupInResult:

233

"""

234

Perform subdocument lookups asynchronously.

235

236

Args:

237

key (str): Document key

238

spec (List[Spec]): Lookup specifications

239

options (LookupInOptions, optional): Lookup options

240

241

Returns:

242

LookupInResult: Lookup results

243

"""

244

245

async def mutate_in(self, key: str, spec: List[Spec], options: MutateInOptions = None) -> AsyncMutateInResult:

246

"""

247

Perform subdocument mutations asynchronously.

248

249

Args:

250

key (str): Document key

251

spec (List[Spec]): Mutation specifications

252

options (MutateInOptions, optional): Mutation options

253

254

Returns:

255

AsyncMutateInResult: Mutation results

256

"""

257

```

258

259

### Asynchronous Binary Operations

260

261

Async binary data and counter operations.

262

263

```python { .api }

264

class AsyncBinaryCollection:

265

async def append(self, key: str, value: bytes, options: AppendOptions = None) -> AsyncMutationResult:

266

"""

267

Append binary data asynchronously.

268

269

Args:

270

key (str): Document key

271

value (bytes): Data to append

272

options (AppendOptions, optional): Append options

273

274

Returns:

275

AsyncMutationResult: Operation result

276

"""

277

278

async def prepend(self, key: str, value: bytes, options: PrependOptions = None) -> AsyncMutationResult:

279

"""

280

Prepend binary data asynchronously.

281

282

Args:

283

key (str): Document key

284

value (bytes): Data to prepend

285

options (PrependOptions, optional): Prepend options

286

287

Returns:

288

AsyncMutationResult: Operation result

289

"""

290

291

async def increment(self, key: str, options: IncrementOptions = None) -> CounterResult:

292

"""

293

Increment counter asynchronously.

294

295

Args:

296

key (str): Counter key

297

options (IncrementOptions, optional): Increment options

298

299

Returns:

300

CounterResult: New counter value

301

"""

302

303

async def decrement(self, key: str, options: DecrementOptions = None) -> CounterResult:

304

"""

305

Decrement counter asynchronously.

306

307

Args:

308

key (str): Counter key

309

options (DecrementOptions, optional): Decrement options

310

311

Returns:

312

CounterResult: New counter value

313

"""

314

```

315

316

### Asynchronous Management Operations

317

318

Async administrative operations for cluster management.

319

320

```python { .api }

321

class AUserManager:

322

async def upsert_user(self, user: User, options: UpsertUserOptions = None) -> None:

323

"""Create or update user asynchronously."""

324

325

async def drop_user(self, username: str, options: DropUserOptions = None) -> None:

326

"""Delete user asynchronously."""

327

328

async def get_user(self, username: str, options: GetUserOptions = None) -> UserAndMetadata:

329

"""Get user information asynchronously."""

330

331

async def get_all_users(self, options: GetAllUsersOptions = None) -> List[UserAndMetadata]:

332

"""Get all users asynchronously."""

333

334

class ABucketManager:

335

async def create_bucket(self, settings: CreateBucketSettings, options: CreateBucketOptions = None) -> None:

336

"""Create bucket asynchronously."""

337

338

async def update_bucket(self, settings: BucketSettings, options: UpdateBucketOptions = None) -> None:

339

"""Update bucket asynchronously."""

340

341

async def drop_bucket(self, bucket_name: str, options: DropBucketOptions = None) -> None:

342

"""Delete bucket asynchronously."""

343

344

async def get_bucket(self, bucket_name: str, options: GetBucketOptions = None) -> BucketSettings:

345

"""Get bucket settings asynchronously."""

346

347

async def get_all_buckets(self, options: GetAllBucketsOptions = None) -> Dict[str, BucketSettings]:

348

"""Get all bucket settings asynchronously."""

349

350

class ACollectionManager:

351

async def create_scope(self, scope_name: str, options: CreateScopeOptions = None) -> None:

352

"""Create scope asynchronously."""

353

354

async def drop_scope(self, scope_name: str, options: DropScopeOptions = None) -> None:

355

"""Delete scope asynchronously."""

356

357

async def create_collection(self, collection_spec: CollectionSpec, options: CreateCollectionOptions = None) -> None:

358

"""Create collection asynchronously."""

359

360

async def drop_collection(self, collection_spec: CollectionSpec, options: DropCollectionOptions = None) -> None:

361

"""Delete collection asynchronously."""

362

363

async def get_all_scopes(self, options: GetAllScopesOptions = None) -> List[ScopeSpec]:

364

"""Get all scopes asynchronously."""

365

366

class AQueryIndexManager:

367

async def create_index(self, bucket_name: str, index_name: str, keys: List[str], options: CreateQueryIndexOptions = None) -> None:

368

"""Create N1QL index asynchronously."""

369

370

async def drop_index(self, bucket_name: str, index_name: str, options: DropQueryIndexOptions = None) -> None:

371

"""Drop N1QL index asynchronously."""

372

373

async def get_all_indexes(self, bucket_name: str, options: GetAllQueryIndexesOptions = None) -> List[QueryIndex]:

374

"""Get all indexes asynchronously."""

375

376

async def build_deferred_indexes(self, bucket_name: str, options: BuildDeferredQueryIndexOptions = None) -> None:

377

"""Build deferred indexes asynchronously."""

378

```

379

380

## Async Result Types

381

382

```python { .api }

383

class AsyncGetResult:

384

@property

385

def content_as(self) -> ContentProxy:

386

"""Access document content with type conversion."""

387

388

@property

389

def cas(self) -> int:

390

"""Document CAS value."""

391

392

@property

393

def expiry_time(self) -> datetime:

394

"""Document expiration time (if requested)."""

395

396

class AsyncMutationResult:

397

@property

398

def cas(self) -> int:

399

"""New CAS value after mutation."""

400

401

@property

402

def mutation_token(self) -> MutationToken:

403

"""Mutation token for consistency."""

404

405

class AsyncMutateInResult:

406

def content_as(self, index: int, target_type: type):

407

"""Get content of mutation operation at index."""

408

409

@property

410

def cas(self) -> int:

411

"""New document CAS value."""

412

413

@property

414

def mutation_token(self) -> MutationToken:

415

"""Mutation token for consistency."""

416

```

417

418

## Usage Examples

419

420

### Basic Async Connection

421

422

```python

423

import asyncio

424

from acouchbase.cluster import ACluster

425

from couchbase.auth import PasswordAuthenticator

426

from couchbase.options import ClusterOptions

427

428

async def main():

429

# Connect to cluster

430

cluster = ACluster("couchbase://localhost",

431

ClusterOptions(PasswordAuthenticator("user", "pass")))

432

433

# Get bucket and collection

434

bucket = await cluster.bucket("travel-sample")

435

collection = bucket.default_collection()

436

437

# Document operations

438

doc = {"name": "Alice", "age": 30}

439

result = await collection.upsert("user::async", doc)

440

print(f"CAS: {result.cas}")

441

442

get_result = await collection.get("user::async")

443

print(f"Document: {get_result.content_as[dict]}")

444

445

# Close connection

446

await cluster.close()

447

448

# Run async function

449

asyncio.run(main())

450

```

451

452

### Async Bulk Operations

453

454

```python

455

async def bulk_operations(collection):

456

# Prepare documents

457

docs = {

458

f"user::{i}": {"id": i, "name": f"User {i}", "active": True}

459

for i in range(100)

460

}

461

462

# Bulk upsert using asyncio.gather for concurrency

463

tasks = [

464

collection.upsert(key, doc)

465

for key, doc in docs.items()

466

]

467

468

results = await asyncio.gather(*tasks)

469

print(f"Upserted {len(results)} documents")

470

471

# Bulk get

472

keys = list(docs.keys())

473

get_tasks = [collection.get(key) for key in keys]

474

get_results = await asyncio.gather(*get_tasks, return_exceptions=True)

475

476

successful = [r for r in get_results if not isinstance(r, Exception)]

477

print(f"Retrieved {len(successful)} documents")

478

```

479

480

### Async Query Operations

481

482

```python

483

async def query_operations(cluster):

484

# Simple query

485

query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 10"

486

result = await cluster.query(query)

487

488

async for row in result:

489

print(f"User: {row['name']}, Age: {row['age']}")

490

491

# Parameterized query

492

from couchbase.options import QueryOptions

493

494

query = "SELECT * FROM `travel-sample` WHERE type = $type AND age > $min_age"

495

options = QueryOptions(type="user", min_age=25)

496

result = await cluster.query(query, options)

497

498

# Collect all results

499

users = []

500

async for row in result:

501

users.append(row)

502

503

print(f"Found {len(users)} users")

504

505

# Get metadata

506

metadata = result.metadata()

507

print(f"Query took: {metadata.metrics.elapsed_time}")

508

```

509

510

### Async Subdocument Operations

511

512

```python

513

import couchbase.subdocument as SD

514

515

async def subdoc_operations(collection):

516

# Setup document

517

doc = {

518

"name": "John",

519

"stats": {"views": 0, "likes": 0},

520

"tags": ["user", "active"]

521

}

522

await collection.upsert("user::subdoc", doc)

523

524

# Async subdocument mutations

525

await collection.mutate_in("user::subdoc", [

526

SD.replace("name", "Johnny"),

527

SD.increment("stats.views", 1),

528

SD.array_append("tags", "premium")

529

])

530

531

# Async subdocument lookups

532

result = await collection.lookup_in("user::subdoc", [

533

SD.get("name"),

534

SD.get("stats"),

535

SD.count("tags")

536

])

537

538

name = result.content_as(0, str)

539

stats = result.content_as(1, dict)

540

tag_count = result.content_as(2, int)

541

542

print(f"Name: {name}")

543

print(f"Stats: {stats}")

544

print(f"Tag count: {tag_count}")

545

```

546

547

### Async Management Operations

548

549

```python

550

async def management_operations(cluster):

551

# User management

552

user_mgr = cluster.users()

553

554

from couchbase.management.users import User, Role

555

556

user = User(

557

username="async_user",

558

display_name="Async User",

559

password="secure_pass",

560

roles=[Role("bucket_admin", bucket="travel-sample")]

561

)

562

563

await user_mgr.upsert_user(user)

564

565

# Get user info

566

user_info = await user_mgr.get_user("async_user")

567

print(f"Created user: {user_info.user.display_name}")

568

569

# Bucket management

570

bucket_mgr = cluster.buckets()

571

572

from couchbase.management.buckets import BucketSettings, BucketType

573

574

settings = BucketSettings(

575

name="async-bucket",

576

bucket_type=BucketType.COUCHBASE,

577

ram_quota_mb=128

578

)

579

580

await bucket_mgr.create_bucket(settings)

581

print("Created async bucket")

582

583

# List all buckets

584

all_buckets = await bucket_mgr.get_all_buckets()

585

for name, settings in all_buckets.items():

586

print(f"Bucket: {name}")

587

```

588

589

### Error Handling in Async Operations

590

591

```python

592

from couchbase.exceptions import DocumentNotFoundException, CouchbaseException

593

594

async def error_handling_example(collection):

595

try:

596

# This will fail

597

result = await collection.get("nonexistent-key")

598

except DocumentNotFoundException:

599

print("Document not found")

600

except CouchbaseException as e:

601

print(f"Couchbase error: {e}")

602

except Exception as e:

603

print(f"Unexpected error: {e}")

604

605

# Using asyncio.gather with error handling

606

keys = ["key1", "key2", "nonexistent-key", "key4"]

607

tasks = [collection.get(key) for key in keys]

608

609

results = await asyncio.gather(*tasks, return_exceptions=True)

610

611

for i, result in enumerate(results):

612

if isinstance(result, Exception):

613

print(f"Error getting {keys[i]}: {result}")

614

else:

615

print(f"Got {keys[i]}: {result.content_as[dict]}")

616

```

617

618

### Context Manager Support

619

620

```python

621

async def context_manager_example():

622

async with ACluster("couchbase://localhost",

623

ClusterOptions(PasswordAuthenticator("user", "pass"))) as cluster:

624

bucket = await cluster.bucket("travel-sample")

625

collection = bucket.default_collection()

626

627

result = await collection.get("some-key")

628

print(f"Document: {result.content_as[dict]}")

629

630

# Cluster automatically closed when exiting context

631

```

632

633

## Twisted Framework Support

634

635

The Couchbase SDK supports Twisted framework for asynchronous operations using deferreds.

636

637

### Basic Twisted Operations

638

639

```python { .api }

640

class TxCluster:

641

def __init__(self, connection_string: str, options: ClusterOptions = None):

642

"""

643

Initialize Twisted cluster connection.

644

645

Args:

646

connection_string (str): Connection string

647

options (ClusterOptions, optional): Cluster options

648

"""

649

650

def bucket(self, bucket_name: str) -> Deferred[TxBucket]:

651

"""Get bucket reference (returns Deferred)."""

652

653

def query(self, statement: str, options: QueryOptions = None) -> Deferred[QueryResult]:

654

"""Execute N1QL query (returns Deferred)."""

655

656

class TxCollection:

657

def get(self, key: str, options: GetOptions = None) -> Deferred[GetResult]:

658

"""Get document (returns Deferred)."""

659

660

def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> Deferred[MutationResult]:

661

"""Upsert document (returns Deferred)."""

662

```

663

664

### Twisted Usage Examples

665

666

```python

667

from twisted.internet import reactor, defer

668

from txcouchbase.cluster import TxCluster

669

from couchbase.auth import PasswordAuthenticator

670

from couchbase.options import ClusterOptions

671

672

@defer.inlineCallbacks

673

def twisted_example():

674

try:

675

# Connect to cluster

676

auth = PasswordAuthenticator("username", "password")

677

cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))

678

679

# Get bucket and collection

680

bucket = yield cluster.bucket("travel-sample")

681

collection = bucket.default_collection()

682

683

# Document operations with deferreds

684

doc = {"name": "Alice", "age": 25}

685

result = yield collection.upsert("user::alice", doc)

686

print(f"Upsert CAS: {result.cas}")

687

688

# Retrieve document

689

get_result = yield collection.get("user::alice")

690

print(f"Document: {get_result.content_as[dict]}")

691

692

# Query with deferreds

693

query_result = yield cluster.query(

694

"SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 5"

695

)

696

697

for row in query_result:

698

print(f"User: {row}")

699

700

except Exception as e:

701

print(f"Error: {e}")

702

finally:

703

reactor.stop()

704

705

if __name__ == "__main__":

706

reactor.callWhenRunning(twisted_example)

707

reactor.run()

708

```

709

710

### Deferred Chaining

711

712

```python

713

from twisted.internet import defer

714

715

def deferred_chaining_example():

716

auth = PasswordAuthenticator("username", "password")

717

cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))

718

719

def on_bucket_ready(bucket):

720

collection = bucket.default_collection()

721

return collection.get("some-key")

722

723

def on_document_retrieved(result):

724

print(f"Got document: {result.content_as[dict]}")

725

return result

726

727

def on_error(failure):

728

print(f"Operation failed: {failure}")

729

730

# Chain deferred operations

731

d = cluster.bucket("travel-sample")

732

d.addCallback(on_bucket_ready)

733

d.addCallback(on_document_retrieved)

734

d.addErrback(on_error)

735

736

return d

737

```