0
# Async Operations
1
2
Full async/await interface providing asynchronous versions of all core functionality through the `ydb.aio` module.
3
4
## Capabilities
5
6
### Async Driver
7
8
Asynchronous database driver with context manager support and automatic connection management.
9
10
```python { .api }
11
import ydb.aio as ydb_aio
12
13
class Driver:
14
def __init__(
15
self,
16
endpoint: str,
17
database: str,
18
credentials: Credentials = None,
19
**kwargs
20
):
21
"""
22
Create asynchronous YDB driver.
23
24
Args:
25
endpoint (str): YDB cluster endpoint
26
database (str): Database path
27
credentials (Credentials, optional): Authentication credentials
28
**kwargs: Additional driver configuration
29
"""
30
31
async def __aenter__(self) -> 'Driver':
32
"""
33
Enter async context manager.
34
35
Returns:
36
Driver: Initialized driver instance
37
"""
38
39
async def __aexit__(self, exc_type, exc_val, exc_tb):
40
"""
41
Exit async context manager and cleanup resources.
42
"""
43
44
async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool:
45
"""
46
Wait for driver to be ready asynchronously.
47
48
Args:
49
fail_fast (bool): Fail immediately on first error
50
timeout (float, optional): Maximum wait time in seconds
51
52
Returns:
53
bool: True if driver is ready, False on timeout
54
"""
55
56
async def stop(self, timeout: float = None):
57
"""
58
Stop the driver and cleanup resources.
59
60
Args:
61
timeout (float, optional): Shutdown timeout in seconds
62
"""
63
64
@property
65
def discovery_debug_details(self) -> str:
66
"""Get discovery debug information."""
67
68
def scheme_client(self) -> 'SchemeClient':
69
"""
70
Create async scheme client for schema operations.
71
72
Returns:
73
SchemeClient: Async scheme client instance
74
"""
75
76
def table_client(self) -> 'TableClient':
77
"""
78
Create async table client for table operations.
79
80
Returns:
81
TableClient: Async table client instance
82
"""
83
84
def query_session_pool(self, **kwargs) -> 'QuerySessionPool':
85
"""
86
Create async query session pool.
87
88
Returns:
89
QuerySessionPool: Async query session pool
90
"""
91
```
92
93
### Async Session Pool
94
95
Asynchronous session pool with automatic session lifecycle management and retry capabilities.
96
97
```python { .api }
98
class SessionPool:
99
def __init__(
100
self,
101
driver: Driver,
102
size: int = None,
103
creation_timeout: float = None,
104
**kwargs
105
):
106
"""
107
Create asynchronous session pool.
108
109
Args:
110
driver (Driver): Async YDB driver instance
111
size (int, optional): Maximum pool size
112
creation_timeout (float, optional): Session creation timeout
113
"""
114
115
async def __aenter__(self) -> 'SessionPool':
116
"""
117
Enter async context manager.
118
119
Returns:
120
SessionPool: Initialized session pool
121
"""
122
123
async def __aexit__(self, exc_type, exc_val, exc_tb):
124
"""
125
Exit async context manager and stop pool.
126
"""
127
128
async def acquire(self, timeout: float = None) -> 'Session':
129
"""
130
Acquire session from pool asynchronously.
131
132
Args:
133
timeout (float, optional): Acquisition timeout
134
135
Returns:
136
Session: Available async session
137
"""
138
139
async def release(self, session: 'Session'):
140
"""
141
Release session back to pool.
142
143
Args:
144
session (Session): Session to release
145
"""
146
147
async def retry_operation(
148
self,
149
callee: Callable[['Session'], Awaitable[Any]],
150
retry_settings: RetrySettings = None,
151
*args,
152
**kwargs
153
) -> Any:
154
"""
155
Execute async operation with automatic retry and session management.
156
157
Args:
158
callee (Callable): Async function to execute with session
159
retry_settings (RetrySettings, optional): Custom retry configuration
160
*args: Additional arguments for callee
161
**kwargs: Additional keyword arguments for callee
162
163
Returns:
164
Any: Result of callee execution
165
"""
166
167
async def stop(self, timeout: float = None):
168
"""
169
Stop the session pool and close all sessions.
170
171
Args:
172
timeout (float, optional): Shutdown timeout
173
"""
174
175
def checkout(self) -> 'AsyncSessionCheckout':
176
"""
177
Create async session checkout context manager.
178
179
Returns:
180
AsyncSessionCheckout: Async session context manager
181
"""
182
183
class AsyncSessionCheckout:
184
def __init__(self, pool: SessionPool):
185
"""
186
Async context manager for session checkout.
187
188
Args:
189
pool (SessionPool): Parent session pool
190
"""
191
192
async def __aenter__(self) -> 'Session':
193
"""
194
Acquire session from pool.
195
196
Returns:
197
Session: Available session
198
"""
199
200
async def __aexit__(self, exc_type, exc_val, exc_tb):
201
"""
202
Release session back to pool.
203
"""
204
```
205
206
### Async Sessions
207
208
Asynchronous database session for query execution and transaction management.
209
210
```python { .api }
211
class Session:
212
def __init__(self, driver: Driver):
213
"""
214
Create asynchronous database session.
215
216
Args:
217
driver (Driver): Async YDB driver instance
218
"""
219
220
async def __aenter__(self) -> 'Session':
221
"""
222
Enter async context manager.
223
224
Returns:
225
Session: Initialized session
226
"""
227
228
async def __aexit__(self, exc_type, exc_val, exc_tb):
229
"""
230
Exit async context manager and close session.
231
"""
232
233
async def create_table(
234
self,
235
path: str,
236
table_description: TableDescription,
237
settings: CreateTableSettings = None
238
):
239
"""
240
Create table asynchronously.
241
242
Args:
243
path (str): Table path
244
table_description (TableDescription): Table structure definition
245
settings (CreateTableSettings, optional): Creation settings
246
"""
247
248
async def drop_table(self, path: str, settings: DropTableSettings = None):
249
"""
250
Drop table asynchronously.
251
252
Args:
253
path (str): Table path
254
settings (DropTableSettings, optional): Drop settings
255
"""
256
257
async def alter_table(
258
self,
259
path: str,
260
alter_table_settings: AlterTableSettings
261
):
262
"""
263
Alter table structure asynchronously.
264
265
Args:
266
path (str): Table path
267
alter_table_settings (AlterTableSettings): Alteration settings
268
"""
269
270
async def copy_table(
271
self,
272
source_path: str,
273
destination_path: str,
274
settings: CopyTableSettings = None
275
):
276
"""
277
Copy table asynchronously.
278
279
Args:
280
source_path (str): Source table path
281
destination_path (str): Destination table path
282
settings (CopyTableSettings, optional): Copy settings
283
"""
284
285
async def describe_table(
286
self,
287
path: str,
288
settings: DescribeTableSettings = None
289
) -> TableDescription:
290
"""
291
Describe table structure asynchronously.
292
293
Args:
294
path (str): Table path
295
settings (DescribeTableSettings, optional): Describe settings
296
297
Returns:
298
TableDescription: Table structure information
299
"""
300
301
async def execute_query(
302
self,
303
query: str,
304
parameters: Dict[str, Any] = None,
305
settings: ExecuteQuerySettings = None
306
) -> List[ResultSet]:
307
"""
308
Execute YQL query asynchronously.
309
310
Args:
311
query (str): YQL query text
312
parameters (Dict[str, Any], optional): Query parameters
313
settings (ExecuteQuerySettings, optional): Execution settings
314
315
Returns:
316
List[ResultSet]: Query results
317
"""
318
319
async def execute_scheme_query(self, query: str):
320
"""
321
Execute scheme query asynchronously.
322
323
Args:
324
query (str): Scheme query text (DDL)
325
"""
326
327
async def prepare_query(
328
self,
329
query: str,
330
settings: PrepareQuerySettings = None
331
) -> DataQuery:
332
"""
333
Prepare query for execution asynchronously.
334
335
Args:
336
query (str): YQL query text
337
settings (PrepareQuerySettings, optional): Preparation settings
338
339
Returns:
340
DataQuery: Prepared query object
341
"""
342
343
async def transaction(self, tx_mode: TxMode = None) -> 'AsyncTxContext':
344
"""
345
Begin transaction asynchronously.
346
347
Args:
348
tx_mode (TxMode, optional): Transaction mode
349
350
Returns:
351
AsyncTxContext: Async transaction context
352
"""
353
354
async def read_table(
355
self,
356
path: str,
357
key_range: KeyRange = None,
358
columns: List[str] = None,
359
settings: ReadTableSettings = None
360
) -> AsyncIterator[ResultSet]:
361
"""
362
Read table data asynchronously with streaming.
363
364
Args:
365
path (str): Table path
366
key_range (KeyRange, optional): Key range to read
367
columns (List[str], optional): Columns to read
368
settings (ReadTableSettings, optional): Read settings
369
370
Returns:
371
AsyncIterator[ResultSet]: Streaming result sets
372
"""
373
374
async def bulk_upsert(
375
self,
376
path: str,
377
rows: Union[List[Dict], pd.DataFrame],
378
column_types: Dict[str, Type] = None,
379
settings: BulkUpsertSettings = None
380
):
381
"""
382
Bulk upsert data asynchronously.
383
384
Args:
385
path (str): Table path
386
rows (Union[List[Dict], pd.DataFrame]): Data to upsert
387
column_types (Dict[str, Type], optional): Column type overrides
388
settings (BulkUpsertSettings, optional): Upsert settings
389
"""
390
391
async def close(self):
392
"""
393
Close session and release resources asynchronously.
394
"""
395
396
@property
397
def session_id(self) -> str:
398
"""Get session identifier."""
399
```
400
401
### Async Transaction Context
402
403
Asynchronous transaction management with automatic commit/rollback handling.
404
405
```python { .api }
406
class AsyncTxContext:
407
def __init__(self, session: Session, tx_mode: TxMode = None):
408
"""
409
Asynchronous transaction context.
410
411
Args:
412
session (Session): Parent async session
413
tx_mode (TxMode, optional): Transaction isolation mode
414
"""
415
416
async def __aenter__(self) -> 'AsyncTxContext':
417
"""
418
Enter async transaction context.
419
420
Returns:
421
AsyncTxContext: Transaction context
422
"""
423
424
async def __aexit__(self, exc_type, exc_val, exc_tb):
425
"""
426
Exit async transaction context with automatic commit/rollback.
427
"""
428
429
async def execute(
430
self,
431
query: str,
432
parameters: Dict[str, Any] = None,
433
commit_tx: bool = False,
434
settings: ExecuteQuerySettings = None
435
) -> List[ResultSet]:
436
"""
437
Execute query within transaction asynchronously.
438
439
Args:
440
query (str): YQL query text
441
parameters (Dict[str, Any], optional): Query parameters
442
commit_tx (bool): Commit transaction after execution
443
settings (ExecuteQuerySettings, optional): Execution settings
444
445
Returns:
446
List[ResultSet]: Query results
447
"""
448
449
async def commit(self, settings: CommitTxSettings = None):
450
"""
451
Commit transaction asynchronously.
452
453
Args:
454
settings (CommitTxSettings, optional): Commit settings
455
"""
456
457
async def rollback(self, settings: RollbackTxSettings = None):
458
"""
459
Rollback transaction asynchronously.
460
461
Args:
462
settings (RollbackTxSettings, optional): Rollback settings
463
"""
464
465
@property
466
def tx_id(self) -> str:
467
"""Get transaction identifier."""
468
```
469
470
### Async Query Service
471
472
Asynchronous query service with modern YQL interface and session pooling.
473
474
```python { .api }
475
class QuerySessionPool:
476
def __init__(
477
self,
478
driver: Driver,
479
size: int = None,
480
query_client_settings: QueryClientSettings = None
481
):
482
"""
483
Asynchronous query session pool.
484
485
Args:
486
driver (Driver): Async YDB driver instance
487
size (int, optional): Maximum pool size
488
query_client_settings (QueryClientSettings, optional): Default settings
489
"""
490
491
async def __aenter__(self) -> 'QuerySessionPool':
492
"""Enter async context manager."""
493
494
async def __aexit__(self, exc_type, exc_val, exc_tb):
495
"""Exit async context manager."""
496
497
async def acquire(self, timeout: float = None) -> 'QuerySession':
498
"""
499
Acquire query session from pool asynchronously.
500
501
Args:
502
timeout (float, optional): Acquisition timeout
503
504
Returns:
505
QuerySession: Available async query session
506
"""
507
508
async def release(self, session: 'QuerySession'):
509
"""
510
Release query session back to pool.
511
512
Args:
513
session (QuerySession): Session to release
514
"""
515
516
async def retry_operation(
517
self,
518
callee: Callable[['QuerySession'], Awaitable[Any]],
519
retry_settings: RetrySettings = None,
520
*args,
521
**kwargs
522
) -> Any:
523
"""
524
Execute async operation with automatic retry.
525
526
Args:
527
callee (Callable): Async function to execute
528
retry_settings (RetrySettings, optional): Retry configuration
529
530
Returns:
531
Any: Result of callee execution
532
"""
533
534
async def stop(self, timeout: float = None):
535
"""Stop the query session pool."""
536
537
class QuerySession:
538
def __init__(self, driver: Driver, settings: QueryClientSettings = None):
539
"""
540
Asynchronous query session.
541
542
Args:
543
driver (Driver): Async YDB driver instance
544
settings (QueryClientSettings, optional): Session configuration
545
"""
546
547
async def execute_query(
548
self,
549
query: str,
550
parameters: Dict[str, Any] = None,
551
tx_control: QueryTxControl = None,
552
settings: ExecuteQuerySettings = None
553
) -> AsyncIterator[ResultSet]:
554
"""
555
Execute query asynchronously with streaming results.
556
557
Args:
558
query (str): YQL query text
559
parameters (Dict[str, Any], optional): Query parameters
560
tx_control (QueryTxControl, optional): Transaction control
561
settings (ExecuteQuerySettings, optional): Execution settings
562
563
Returns:
564
AsyncIterator[ResultSet]: Streaming query results
565
"""
566
567
async def transaction(
568
self,
569
tx_settings: QueryTxSettings = None
570
) -> 'AsyncQueryTxContext':
571
"""
572
Begin async query transaction.
573
574
Args:
575
tx_settings (QueryTxSettings, optional): Transaction settings
576
577
Returns:
578
AsyncQueryTxContext: Async transaction context
579
"""
580
581
async def close(self):
582
"""Close async query session."""
583
584
class AsyncQueryTxContext:
585
async def __aenter__(self) -> 'AsyncQueryTxContext':
586
"""Enter async transaction context."""
587
588
async def __aexit__(self, exc_type, exc_val, exc_tb):
589
"""Exit async transaction context."""
590
591
async def execute(
592
self,
593
query: str,
594
parameters: Dict[str, Any] = None,
595
settings: ExecuteQuerySettings = None
596
) -> AsyncIterator[ResultSet]:
597
"""Execute query within async transaction."""
598
599
async def commit(self):
600
"""Commit async transaction."""
601
602
async def rollback(self):
603
"""Rollback async transaction."""
604
```
605
606
### Async Scheme Operations
607
608
Asynchronous schema and directory operations client.
609
610
```python { .api }
611
class SchemeClient:
612
def __init__(self, driver: Driver):
613
"""
614
Asynchronous scheme client for schema operations.
615
616
Args:
617
driver (Driver): Async YDB driver instance
618
"""
619
620
async def make_directory(
621
self,
622
path: str,
623
settings: MakeDirectorySettings = None
624
):
625
"""
626
Create directory asynchronously.
627
628
Args:
629
path (str): Directory path
630
settings (MakeDirectorySettings, optional): Creation settings
631
"""
632
633
async def remove_directory(
634
self,
635
path: str,
636
settings: RemoveDirectorySettings = None
637
):
638
"""
639
Remove directory asynchronously.
640
641
Args:
642
path (str): Directory path
643
settings (RemoveDirectorySettings, optional): Removal settings
644
"""
645
646
async def list_directory(
647
self,
648
path: str,
649
settings: ListDirectorySettings = None
650
) -> Directory:
651
"""
652
List directory contents asynchronously.
653
654
Args:
655
path (str): Directory path
656
settings (ListDirectorySettings, optional): Listing settings
657
658
Returns:
659
Directory: Directory information with entries
660
"""
661
662
async def describe_path(
663
self,
664
path: str,
665
settings: DescribePathSettings = None
666
) -> SchemeEntry:
667
"""
668
Describe path entry asynchronously.
669
670
Args:
671
path (str): Entry path
672
settings (DescribePathSettings, optional): Describe settings
673
674
Returns:
675
SchemeEntry: Path entry information
676
"""
677
678
async def modify_permissions(
679
self,
680
path: str,
681
permissions: Permissions,
682
settings: ModifyPermissionsSettings = None
683
):
684
"""
685
Modify path permissions asynchronously.
686
687
Args:
688
path (str): Entry path
689
permissions (Permissions): Permission changes
690
settings (ModifyPermissionsSettings, optional): Modify settings
691
"""
692
```
693
694
### Async Retry Operations
695
696
Asynchronous retry functionality with backoff and error handling.
697
698
```python { .api }
699
async def retry_operation(
700
callee: Callable[..., Awaitable[Any]],
701
retry_settings: RetrySettings = None,
702
session_pool: SessionPool = None,
703
*args,
704
**kwargs
705
) -> Any:
706
"""
707
Execute async operation with retry logic.
708
709
Args:
710
callee (Callable): Async function to execute
711
retry_settings (RetrySettings, optional): Retry configuration
712
session_pool (SessionPool, optional): Session pool for session-based operations
713
*args: Additional arguments for callee
714
**kwargs: Additional keyword arguments for callee
715
716
Returns:
717
Any: Result of successful callee execution
718
"""
719
720
class AsyncRetrySettings:
721
def __init__(
722
self,
723
max_retries: int = 10,
724
max_session_acquire_timeout: float = None,
725
fast_backoff_settings: BackoffSettings = None,
726
slow_backoff_settings: BackoffSettings = None,
727
**kwargs
728
):
729
"""
730
Retry settings for async operations.
731
732
Args:
733
max_retries (int): Maximum number of retry attempts
734
max_session_acquire_timeout (float, optional): Session acquisition timeout
735
fast_backoff_settings (BackoffSettings, optional): Fast backoff configuration
736
slow_backoff_settings (BackoffSettings, optional): Slow backoff configuration
737
"""
738
```
739
740
## Usage Examples
741
742
### Basic Async Driver Usage
743
744
```python
745
import asyncio
746
import ydb.aio as ydb_aio
747
748
async def main():
749
# Create async driver with context manager
750
async with ydb_aio.Driver(
751
endpoint="grpc://localhost:2136",
752
database="/local",
753
credentials=ydb.AnonymousCredentials()
754
) as driver:
755
# Wait for driver to be ready
756
await driver.wait(fail_fast=True, timeout=5)
757
758
# Create session pool
759
async with ydb_aio.SessionPool(driver) as pool:
760
# Execute operation
761
async def query_operation(session):
762
result_sets = await session.execute_query("SELECT 1 AS value")
763
return [row.value for row in result_sets[0].rows]
764
765
results = await pool.retry_operation(query_operation)
766
print(f"Results: {results}")
767
768
# Run async main function
769
asyncio.run(main())
770
```
771
772
### Async Transaction Management
773
774
```python
775
async def transfer_funds(session, from_account, to_account, amount):
776
# Execute multiple queries in async transaction
777
async with await session.transaction(ydb.SerializableReadWrite()) as tx:
778
# Debit from source account
779
await tx.execute(
780
"""
781
UPDATE accounts
782
SET balance = balance - $amount
783
WHERE account_id = $from_account
784
""",
785
parameters={
786
"$from_account": from_account,
787
"$amount": amount
788
}
789
)
790
791
# Credit to destination account
792
await tx.execute(
793
"""
794
UPDATE accounts
795
SET balance = balance + $amount
796
WHERE account_id = $to_account
797
""",
798
parameters={
799
"$to_account": to_account,
800
"$amount": amount
801
}
802
)
803
# Transaction automatically commits on context exit
804
805
# Usage with session pool
806
async with pool.checkout() as session:
807
await transfer_funds(session, "acc1", "acc2", 100.0)
808
```
809
810
### Async Query Service
811
812
```python
813
async def execute_analytics_query():
814
async with ydb_aio.Driver(...) as driver:
815
query_pool = driver.query_session_pool(size=5)
816
817
async with query_pool as pool:
818
async def analytics_operation(session):
819
query = """
820
SELECT
821
DATE_TRUNC('month', created_at) as month,
822
COUNT(*) as orders_count,
823
SUM(total_amount) as total_revenue
824
FROM orders
825
WHERE created_at >= $start_date
826
GROUP BY month
827
ORDER BY month
828
"""
829
830
parameters = {"$start_date": datetime(2024, 1, 1)}
831
832
async for result_set in session.execute_query(query, parameters):
833
async for row in result_set.rows:
834
print(f"Month: {row.month}, Orders: {row.orders_count}, Revenue: {row.total_revenue}")
835
836
await pool.retry_operation(analytics_operation)
837
```
838
839
### Async Bulk Operations
840
841
```python
842
async def bulk_insert_users(session, user_data):
843
# Prepare bulk data
844
rows = [
845
{"user_id": user["id"], "name": user["name"], "email": user["email"]}
846
for user in user_data
847
]
848
849
# Define column types
850
column_types = {
851
"user_id": ydb.PrimitiveType.Uint64,
852
"name": ydb.PrimitiveType.Utf8,
853
"email": ydb.PrimitiveType.Utf8
854
}
855
856
# Perform bulk upsert asynchronously
857
await session.bulk_upsert(
858
"/local/users",
859
rows,
860
column_types=column_types
861
)
862
863
# Usage
864
async with pool.checkout() as session:
865
await bulk_insert_users(session, user_data_list)
866
```
867
868
### Async Schema Operations
869
870
```python
871
async def setup_database_schema():
872
async with ydb_aio.Driver(...) as driver:
873
scheme_client = driver.scheme_client()
874
875
# Create directories
876
await scheme_client.make_directory("/local/app")
877
await scheme_client.make_directory("/local/app/tables")
878
879
# List directory contents
880
directory = await scheme_client.list_directory("/local/app")
881
for entry in directory.children:
882
print(f"Entry: {entry.name}, Type: {entry.type}")
883
884
# Create table through session
885
async with ydb_aio.SessionPool(driver) as pool:
886
async def create_table_operation(session):
887
table_description = (
888
ydb.TableDescription()
889
.with_column(ydb.TableColumn("id", ydb.OptionalType(ydb.PrimitiveType.Uint64)))
890
.with_column(ydb.TableColumn("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)))
891
.with_primary_key("id")
892
)
893
894
await session.create_table("/local/app/tables/users", table_description)
895
896
await pool.retry_operation(create_table_operation)
897
```
898
899
## Type Definitions
900
901
```python { .api }
902
# Type aliases for async operations
903
AsyncQueryCallback = Callable[['Session'], Awaitable[Any]]
904
AsyncQuerySessionCallback = Callable[['QuerySession'], Awaitable[Any]]
905
AsyncResultIterator = AsyncIterator[ResultSet]
906
907
# Common async context managers
908
AsyncDriverContext = AsyncContextManager[Driver]
909
AsyncSessionPoolContext = AsyncContextManager[SessionPool]
910
AsyncSessionContext = AsyncContextManager[Session]
911
AsyncTxContextManager = AsyncContextManager[AsyncTxContext]
912
```