0
# Asynchronous Operations
1
2
Asyncio-based asynchronous operations for high-performance, non-blocking applications. Provides the same API surface as synchronous operations with async/await support for improved concurrency and scalability.
3
4
## Capabilities
5
6
### Asynchronous Cluster Operations
7
8
Asyncio-compatible cluster connection and management.
9
10
```python { .api }
11
class ACluster:
12
def __init__(self, connection_string: str, options: ClusterOptions = None):
13
"""
14
Create asynchronous cluster instance.
15
16
Args:
17
connection_string (str): Connection string
18
options (ClusterOptions, optional): Cluster options
19
"""
20
21
async def bucket(self, bucket_name: str) -> ABucket:
22
"""
23
Get asynchronous bucket reference.
24
25
Args:
26
bucket_name (str): Bucket name
27
28
Returns:
29
ABucket: Asynchronous bucket instance
30
"""
31
32
async def query(self, statement: str, options: QueryOptions = None) -> QueryResult:
33
"""
34
Execute N1QL query asynchronously.
35
36
Args:
37
statement (str): N1QL query statement
38
options (QueryOptions, optional): Query options
39
40
Returns:
41
QueryResult: Async query results iterator
42
"""
43
44
async def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:
45
"""
46
Execute Analytics query asynchronously.
47
48
Args:
49
statement (str): Analytics query statement
50
options (AnalyticsOptions, optional): Analytics options
51
52
Returns:
53
AnalyticsResult: Async analytics results iterator
54
"""
55
56
async def search_query(self, index: str, query: SearchQuery, options: SearchOptions = None) -> SearchResult:
57
"""
58
Execute search query asynchronously.
59
60
Args:
61
index (str): Search index name
62
query (SearchQuery): Search query
63
options (SearchOptions, optional): Search options
64
65
Returns:
66
SearchResult: Async search results iterator
67
"""
68
69
async def ping(self, options: PingOptions = None) -> PingResult:
70
"""
71
Ping cluster services asynchronously.
72
73
Args:
74
options (PingOptions, optional): Ping options
75
76
Returns:
77
PingResult: Connectivity status
78
"""
79
80
async def diagnostics(self, options: DiagnosticsOptions = None) -> DiagnosticsResult:
81
"""
82
Get cluster diagnostics asynchronously.
83
84
Args:
85
options (DiagnosticsOptions, optional): Diagnostic options
86
87
Returns:
88
DiagnosticsResult: Cluster health information
89
"""
90
91
async def close(self) -> None:
92
"""Close cluster connection and cleanup resources."""
93
```
94
95
### Asynchronous Document Operations
96
97
Async key-value operations for document management.
98
99
```python { .api }
100
class AsyncCBCollection:
101
async def get(self, key: str, options: GetOptions = None) -> AsyncGetResult:
102
"""
103
Retrieve document asynchronously.
104
105
Args:
106
key (str): Document key
107
options (GetOptions, optional): Retrieval options
108
109
Returns:
110
AsyncGetResult: Document content and metadata
111
"""
112
113
async def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> AsyncMutationResult:
114
"""
115
Upsert document asynchronously.
116
117
Args:
118
key (str): Document key
119
value (Any): Document content
120
options (UpsertOptions, optional): Upsert options
121
122
Returns:
123
AsyncMutationResult: Operation result
124
"""
125
126
async def insert(self, key: str, value: Any, options: InsertOptions = None) -> AsyncMutationResult:
127
"""
128
Insert document asynchronously.
129
130
Args:
131
key (str): Document key
132
value (Any): Document content
133
options (InsertOptions, optional): Insert options
134
135
Returns:
136
AsyncMutationResult: Operation result
137
"""
138
139
async def replace(self, key: str, value: Any, options: ReplaceOptions = None) -> AsyncMutationResult:
140
"""
141
Replace document asynchronously.
142
143
Args:
144
key (str): Document key
145
value (Any): New document content
146
options (ReplaceOptions, optional): Replace options
147
148
Returns:
149
AsyncMutationResult: Operation result
150
"""
151
152
async def remove(self, key: str, options: RemoveOptions = None) -> AsyncMutationResult:
153
"""
154
Remove document asynchronously.
155
156
Args:
157
key (str): Document key
158
options (RemoveOptions, optional): Remove options
159
160
Returns:
161
AsyncMutationResult: Operation result
162
"""
163
164
async def exists(self, key: str, options: ExistsOptions = None) -> ExistsResult:
165
"""
166
Check document existence asynchronously.
167
168
Args:
169
key (str): Document key
170
options (ExistsOptions, optional): Existence check options
171
172
Returns:
173
ExistsResult: Existence status
174
"""
175
176
async def touch(self, key: str, expiry: timedelta, options: TouchOptions = None) -> AsyncMutationResult:
177
"""
178
Update document expiration asynchronously.
179
180
Args:
181
key (str): Document key
182
expiry (timedelta): New expiration time
183
options (TouchOptions, optional): Touch options
184
185
Returns:
186
AsyncMutationResult: Operation result
187
"""
188
189
async def get_and_touch(self, key: str, expiry: timedelta, options: GetAndTouchOptions = None) -> AsyncGetResult:
190
"""
191
Get and touch document asynchronously.
192
193
Args:
194
key (str): Document key
195
expiry (timedelta): New expiration time
196
options (GetAndTouchOptions, optional): Operation options
197
198
Returns:
199
AsyncGetResult: Document content with updated expiry
200
"""
201
202
async def get_and_lock(self, key: str, lock_time: timedelta, options: GetAndLockOptions = None) -> AsyncGetResult:
203
"""
204
Get and lock document asynchronously.
205
206
Args:
207
key (str): Document key
208
lock_time (timedelta): Lock duration
209
options (GetAndLockOptions, optional): Lock options
210
211
Returns:
212
AsyncGetResult: Document content with lock
213
"""
214
215
async def unlock(self, key: str, cas: int, options: UnlockOptions = None) -> None:
216
"""
217
Unlock document asynchronously.
218
219
Args:
220
key (str): Document key
221
cas (int): CAS value from get_and_lock
222
options (UnlockOptions, optional): Unlock options
223
"""
224
```
225
226
### Asynchronous Subdocument Operations
227
228
Async subdocument operations for efficient partial document updates.
229
230
```python { .api }
231
class AsyncCBCollection:
232
async def lookup_in(self, key: str, spec: List[Spec], options: LookupInOptions = None) -> LookupInResult:
233
"""
234
Perform subdocument lookups asynchronously.
235
236
Args:
237
key (str): Document key
238
spec (List[Spec]): Lookup specifications
239
options (LookupInOptions, optional): Lookup options
240
241
Returns:
242
LookupInResult: Lookup results
243
"""
244
245
async def mutate_in(self, key: str, spec: List[Spec], options: MutateInOptions = None) -> AsyncMutateInResult:
246
"""
247
Perform subdocument mutations asynchronously.
248
249
Args:
250
key (str): Document key
251
spec (List[Spec]): Mutation specifications
252
options (MutateInOptions, optional): Mutation options
253
254
Returns:
255
AsyncMutateInResult: Mutation results
256
"""
257
```
258
259
### Asynchronous Binary Operations
260
261
Async binary data and counter operations.
262
263
```python { .api }
264
class AsyncBinaryCollection:
265
async def append(self, key: str, value: bytes, options: AppendOptions = None) -> AsyncMutationResult:
266
"""
267
Append binary data asynchronously.
268
269
Args:
270
key (str): Document key
271
value (bytes): Data to append
272
options (AppendOptions, optional): Append options
273
274
Returns:
275
AsyncMutationResult: Operation result
276
"""
277
278
async def prepend(self, key: str, value: bytes, options: PrependOptions = None) -> AsyncMutationResult:
279
"""
280
Prepend binary data asynchronously.
281
282
Args:
283
key (str): Document key
284
value (bytes): Data to prepend
285
options (PrependOptions, optional): Prepend options
286
287
Returns:
288
AsyncMutationResult: Operation result
289
"""
290
291
async def increment(self, key: str, options: IncrementOptions = None) -> CounterResult:
292
"""
293
Increment counter asynchronously.
294
295
Args:
296
key (str): Counter key
297
options (IncrementOptions, optional): Increment options
298
299
Returns:
300
CounterResult: New counter value
301
"""
302
303
async def decrement(self, key: str, options: DecrementOptions = None) -> CounterResult:
304
"""
305
Decrement counter asynchronously.
306
307
Args:
308
key (str): Counter key
309
options (DecrementOptions, optional): Decrement options
310
311
Returns:
312
CounterResult: New counter value
313
"""
314
```
315
316
### Asynchronous Management Operations
317
318
Async administrative operations for cluster management.
319
320
```python { .api }
321
class AUserManager:
322
async def upsert_user(self, user: User, options: UpsertUserOptions = None) -> None:
323
"""Create or update user asynchronously."""
324
325
async def drop_user(self, username: str, options: DropUserOptions = None) -> None:
326
"""Delete user asynchronously."""
327
328
async def get_user(self, username: str, options: GetUserOptions = None) -> UserAndMetadata:
329
"""Get user information asynchronously."""
330
331
async def get_all_users(self, options: GetAllUsersOptions = None) -> List[UserAndMetadata]:
332
"""Get all users asynchronously."""
333
334
class ABucketManager:
335
async def create_bucket(self, settings: CreateBucketSettings, options: CreateBucketOptions = None) -> None:
336
"""Create bucket asynchronously."""
337
338
async def update_bucket(self, settings: BucketSettings, options: UpdateBucketOptions = None) -> None:
339
"""Update bucket asynchronously."""
340
341
async def drop_bucket(self, bucket_name: str, options: DropBucketOptions = None) -> None:
342
"""Delete bucket asynchronously."""
343
344
async def get_bucket(self, bucket_name: str, options: GetBucketOptions = None) -> BucketSettings:
345
"""Get bucket settings asynchronously."""
346
347
async def get_all_buckets(self, options: GetAllBucketsOptions = None) -> Dict[str, BucketSettings]:
348
"""Get all bucket settings asynchronously."""
349
350
class ACollectionManager:
351
async def create_scope(self, scope_name: str, options: CreateScopeOptions = None) -> None:
352
"""Create scope asynchronously."""
353
354
async def drop_scope(self, scope_name: str, options: DropScopeOptions = None) -> None:
355
"""Delete scope asynchronously."""
356
357
async def create_collection(self, collection_spec: CollectionSpec, options: CreateCollectionOptions = None) -> None:
358
"""Create collection asynchronously."""
359
360
async def drop_collection(self, collection_spec: CollectionSpec, options: DropCollectionOptions = None) -> None:
361
"""Delete collection asynchronously."""
362
363
async def get_all_scopes(self, options: GetAllScopesOptions = None) -> List[ScopeSpec]:
364
"""Get all scopes asynchronously."""
365
366
class AQueryIndexManager:
367
async def create_index(self, bucket_name: str, index_name: str, keys: List[str], options: CreateQueryIndexOptions = None) -> None:
368
"""Create N1QL index asynchronously."""
369
370
async def drop_index(self, bucket_name: str, index_name: str, options: DropQueryIndexOptions = None) -> None:
371
"""Drop N1QL index asynchronously."""
372
373
async def get_all_indexes(self, bucket_name: str, options: GetAllQueryIndexesOptions = None) -> List[QueryIndex]:
374
"""Get all indexes asynchronously."""
375
376
async def build_deferred_indexes(self, bucket_name: str, options: BuildDeferredQueryIndexOptions = None) -> None:
377
"""Build deferred indexes asynchronously."""
378
```
379
380
## Async Result Types
381
382
```python { .api }
383
class AsyncGetResult:
384
@property
385
def content_as(self) -> ContentProxy:
386
"""Access document content with type conversion."""
387
388
@property
389
def cas(self) -> int:
390
"""Document CAS value."""
391
392
@property
393
def expiry_time(self) -> datetime:
394
"""Document expiration time (if requested)."""
395
396
class AsyncMutationResult:
397
@property
398
def cas(self) -> int:
399
"""New CAS value after mutation."""
400
401
@property
402
def mutation_token(self) -> MutationToken:
403
"""Mutation token for consistency."""
404
405
class AsyncMutateInResult:
406
def content_as(self, index: int, target_type: type):
407
"""Get content of mutation operation at index."""
408
409
@property
410
def cas(self) -> int:
411
"""New document CAS value."""
412
413
@property
414
def mutation_token(self) -> MutationToken:
415
"""Mutation token for consistency."""
416
```
417
418
## Usage Examples
419
420
### Basic Async Connection
421
422
```python
423
import asyncio
424
from acouchbase.cluster import ACluster
425
from couchbase.auth import PasswordAuthenticator
426
from couchbase.options import ClusterOptions
427
428
async def main():
429
# Connect to cluster
430
cluster = ACluster("couchbase://localhost",
431
ClusterOptions(PasswordAuthenticator("user", "pass")))
432
433
# Get bucket and collection
434
bucket = await cluster.bucket("travel-sample")
435
collection = bucket.default_collection()
436
437
# Document operations
438
doc = {"name": "Alice", "age": 30}
439
result = await collection.upsert("user::async", doc)
440
print(f"CAS: {result.cas}")
441
442
get_result = await collection.get("user::async")
443
print(f"Document: {get_result.content_as[dict]}")
444
445
# Close connection
446
await cluster.close()
447
448
# Run async function
449
asyncio.run(main())
450
```
451
452
### Async Bulk Operations
453
454
```python
455
async def bulk_operations(collection):
456
# Prepare documents
457
docs = {
458
f"user::{i}": {"id": i, "name": f"User {i}", "active": True}
459
for i in range(100)
460
}
461
462
# Bulk upsert using asyncio.gather for concurrency
463
tasks = [
464
collection.upsert(key, doc)
465
for key, doc in docs.items()
466
]
467
468
results = await asyncio.gather(*tasks)
469
print(f"Upserted {len(results)} documents")
470
471
# Bulk get
472
keys = list(docs.keys())
473
get_tasks = [collection.get(key) for key in keys]
474
get_results = await asyncio.gather(*get_tasks, return_exceptions=True)
475
476
successful = [r for r in get_results if not isinstance(r, Exception)]
477
print(f"Retrieved {len(successful)} documents")
478
```
479
480
### Async Query Operations
481
482
```python
483
async def query_operations(cluster):
484
# Simple query
485
query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 10"
486
result = await cluster.query(query)
487
488
async for row in result:
489
print(f"User: {row['name']}, Age: {row['age']}")
490
491
# Parameterized query
492
from couchbase.options import QueryOptions
493
494
query = "SELECT * FROM `travel-sample` WHERE type = $type AND age > $min_age"
495
options = QueryOptions(type="user", min_age=25)
496
result = await cluster.query(query, options)
497
498
# Collect all results
499
users = []
500
async for row in result:
501
users.append(row)
502
503
print(f"Found {len(users)} users")
504
505
# Get metadata
506
metadata = result.metadata()
507
print(f"Query took: {metadata.metrics.elapsed_time}")
508
```
509
510
### Async Subdocument Operations
511
512
```python
513
import couchbase.subdocument as SD
514
515
async def subdoc_operations(collection):
516
# Setup document
517
doc = {
518
"name": "John",
519
"stats": {"views": 0, "likes": 0},
520
"tags": ["user", "active"]
521
}
522
await collection.upsert("user::subdoc", doc)
523
524
# Async subdocument mutations
525
await collection.mutate_in("user::subdoc", [
526
SD.replace("name", "Johnny"),
527
SD.increment("stats.views", 1),
528
SD.array_append("tags", "premium")
529
])
530
531
# Async subdocument lookups
532
result = await collection.lookup_in("user::subdoc", [
533
SD.get("name"),
534
SD.get("stats"),
535
SD.count("tags")
536
])
537
538
name = result.content_as(0, str)
539
stats = result.content_as(1, dict)
540
tag_count = result.content_as(2, int)
541
542
print(f"Name: {name}")
543
print(f"Stats: {stats}")
544
print(f"Tag count: {tag_count}")
545
```
546
547
### Async Management Operations
548
549
```python
550
async def management_operations(cluster):
551
# User management
552
user_mgr = cluster.users()
553
554
from couchbase.management.users import User, Role
555
556
user = User(
557
username="async_user",
558
display_name="Async User",
559
password="secure_pass",
560
roles=[Role("bucket_admin", bucket="travel-sample")]
561
)
562
563
await user_mgr.upsert_user(user)
564
565
# Get user info
566
user_info = await user_mgr.get_user("async_user")
567
print(f"Created user: {user_info.user.display_name}")
568
569
# Bucket management
570
bucket_mgr = cluster.buckets()
571
572
from couchbase.management.buckets import BucketSettings, BucketType
573
574
settings = BucketSettings(
575
name="async-bucket",
576
bucket_type=BucketType.COUCHBASE,
577
ram_quota_mb=128
578
)
579
580
await bucket_mgr.create_bucket(settings)
581
print("Created async bucket")
582
583
# List all buckets
584
all_buckets = await bucket_mgr.get_all_buckets()
585
for name, settings in all_buckets.items():
586
print(f"Bucket: {name}")
587
```
588
589
### Error Handling in Async Operations
590
591
```python
592
from couchbase.exceptions import DocumentNotFoundException, CouchbaseException
593
594
async def error_handling_example(collection):
595
try:
596
# This will fail
597
result = await collection.get("nonexistent-key")
598
except DocumentNotFoundException:
599
print("Document not found")
600
except CouchbaseException as e:
601
print(f"Couchbase error: {e}")
602
except Exception as e:
603
print(f"Unexpected error: {e}")
604
605
# Using asyncio.gather with error handling
606
keys = ["key1", "key2", "nonexistent-key", "key4"]
607
tasks = [collection.get(key) for key in keys]
608
609
results = await asyncio.gather(*tasks, return_exceptions=True)
610
611
for i, result in enumerate(results):
612
if isinstance(result, Exception):
613
print(f"Error getting {keys[i]}: {result}")
614
else:
615
print(f"Got {keys[i]}: {result.content_as[dict]}")
616
```
617
618
### Context Manager Support
619
620
```python
621
async def context_manager_example():
622
async with ACluster("couchbase://localhost",
623
ClusterOptions(PasswordAuthenticator("user", "pass"))) as cluster:
624
bucket = await cluster.bucket("travel-sample")
625
collection = bucket.default_collection()
626
627
result = await collection.get("some-key")
628
print(f"Document: {result.content_as[dict]}")
629
630
# Cluster automatically closed when exiting context
631
```
632
633
## Twisted Framework Support
634
635
The Couchbase SDK supports Twisted framework for asynchronous operations using deferreds.
636
637
### Basic Twisted Operations
638
639
```python { .api }
640
class TxCluster:
641
def __init__(self, connection_string: str, options: ClusterOptions = None):
642
"""
643
Initialize Twisted cluster connection.
644
645
Args:
646
connection_string (str): Connection string
647
options (ClusterOptions, optional): Cluster options
648
"""
649
650
def bucket(self, bucket_name: str) -> Deferred[TxBucket]:
651
"""Get bucket reference (returns Deferred)."""
652
653
def query(self, statement: str, options: QueryOptions = None) -> Deferred[QueryResult]:
654
"""Execute N1QL query (returns Deferred)."""
655
656
class TxCollection:
657
def get(self, key: str, options: GetOptions = None) -> Deferred[GetResult]:
658
"""Get document (returns Deferred)."""
659
660
def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> Deferred[MutationResult]:
661
"""Upsert document (returns Deferred)."""
662
```
663
664
### Twisted Usage Examples
665
666
```python
667
from twisted.internet import reactor, defer
668
from txcouchbase.cluster import TxCluster
669
from couchbase.auth import PasswordAuthenticator
670
from couchbase.options import ClusterOptions
671
672
@defer.inlineCallbacks
673
def twisted_example():
674
try:
675
# Connect to cluster
676
auth = PasswordAuthenticator("username", "password")
677
cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))
678
679
# Get bucket and collection
680
bucket = yield cluster.bucket("travel-sample")
681
collection = bucket.default_collection()
682
683
# Document operations with deferreds
684
doc = {"name": "Alice", "age": 25}
685
result = yield collection.upsert("user::alice", doc)
686
print(f"Upsert CAS: {result.cas}")
687
688
# Retrieve document
689
get_result = yield collection.get("user::alice")
690
print(f"Document: {get_result.content_as[dict]}")
691
692
# Query with deferreds
693
query_result = yield cluster.query(
694
"SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 5"
695
)
696
697
for row in query_result:
698
print(f"User: {row}")
699
700
except Exception as e:
701
print(f"Error: {e}")
702
finally:
703
reactor.stop()
704
705
if __name__ == "__main__":
706
reactor.callWhenRunning(twisted_example)
707
reactor.run()
708
```
709
710
### Deferred Chaining
711
712
```python
713
from twisted.internet import defer
714
715
def deferred_chaining_example():
716
auth = PasswordAuthenticator("username", "password")
717
cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))
718
719
def on_bucket_ready(bucket):
720
collection = bucket.default_collection()
721
return collection.get("some-key")
722
723
def on_document_retrieved(result):
724
print(f"Got document: {result.content_as[dict]}")
725
return result
726
727
def on_error(failure):
728
print(f"Operation failed: {failure}")
729
730
# Chain deferred operations
731
d = cluster.bucket("travel-sample")
732
d.addCallback(on_bucket_ready)
733
d.addCallback(on_document_retrieved)
734
d.addErrback(on_error)
735
736
return d
737
```