or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mddata-types.mddbapi-interface.mddriver-connection.mderror-handling.mdindex.mdquery-service.mdschema-operations.mdsqlalchemy-integration.mdtable-operations.mdtopic-operations.md

async-operations.mddocs/

0

# Async Operations

1

2

Full async/await interface providing asynchronous versions of all core functionality through the `ydb.aio` module.

3

4

## Capabilities

5

6

### Async Driver

7

8

Asynchronous database driver with context manager support and automatic connection management.

9

10

```python { .api }

11

import ydb.aio as ydb_aio

12

13

class Driver:

14

def __init__(

15

self,

16

endpoint: str,

17

database: str,

18

credentials: Credentials = None,

19

**kwargs

20

):

21

"""

22

Create asynchronous YDB driver.

23

24

Args:

25

endpoint (str): YDB cluster endpoint

26

database (str): Database path

27

credentials (Credentials, optional): Authentication credentials

28

**kwargs: Additional driver configuration

29

"""

30

31

async def __aenter__(self) -> 'Driver':

32

"""

33

Enter async context manager.

34

35

Returns:

36

Driver: Initialized driver instance

37

"""

38

39

async def __aexit__(self, exc_type, exc_val, exc_tb):

40

"""

41

Exit async context manager and cleanup resources.

42

"""

43

44

async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool:

45

"""

46

Wait for driver to be ready asynchronously.

47

48

Args:

49

fail_fast (bool): Fail immediately on first error

50

timeout (float, optional): Maximum wait time in seconds

51

52

Returns:

53

bool: True if driver is ready, False on timeout

54

"""

55

56

async def stop(self, timeout: float = None):

57

"""

58

Stop the driver and cleanup resources.

59

60

Args:

61

timeout (float, optional): Shutdown timeout in seconds

62

"""

63

64

@property

65

def discovery_debug_details(self) -> str:

66

"""Get discovery debug information."""

67

68

def scheme_client(self) -> 'SchemeClient':

69

"""

70

Create async scheme client for schema operations.

71

72

Returns:

73

SchemeClient: Async scheme client instance

74

"""

75

76

def table_client(self) -> 'TableClient':

77

"""

78

Create async table client for table operations.

79

80

Returns:

81

TableClient: Async table client instance

82

"""

83

84

def query_session_pool(self, **kwargs) -> 'QuerySessionPool':

85

"""

86

Create async query session pool.

87

88

Returns:

89

QuerySessionPool: Async query session pool

90

"""

91

```

92

93

### Async Session Pool

94

95

Asynchronous session pool with automatic session lifecycle management and retry capabilities.

96

97

```python { .api }

98

class SessionPool:

99

def __init__(

100

self,

101

driver: Driver,

102

size: int = None,

103

creation_timeout: float = None,

104

**kwargs

105

):

106

"""

107

Create asynchronous session pool.

108

109

Args:

110

driver (Driver): Async YDB driver instance

111

size (int, optional): Maximum pool size

112

creation_timeout (float, optional): Session creation timeout

113

"""

114

115

async def __aenter__(self) -> 'SessionPool':

116

"""

117

Enter async context manager.

118

119

Returns:

120

SessionPool: Initialized session pool

121

"""

122

123

async def __aexit__(self, exc_type, exc_val, exc_tb):

124

"""

125

Exit async context manager and stop pool.

126

"""

127

128

async def acquire(self, timeout: float = None) -> 'Session':

129

"""

130

Acquire session from pool asynchronously.

131

132

Args:

133

timeout (float, optional): Acquisition timeout

134

135

Returns:

136

Session: Available async session

137

"""

138

139

async def release(self, session: 'Session'):

140

"""

141

Release session back to pool.

142

143

Args:

144

session (Session): Session to release

145

"""

146

147

async def retry_operation(

148

self,

149

callee: Callable[['Session'], Awaitable[Any]],

150

retry_settings: RetrySettings = None,

151

*args,

152

**kwargs

153

) -> Any:

154

"""

155

Execute async operation with automatic retry and session management.

156

157

Args:

158

callee (Callable): Async function to execute with session

159

retry_settings (RetrySettings, optional): Custom retry configuration

160

*args: Additional arguments for callee

161

**kwargs: Additional keyword arguments for callee

162

163

Returns:

164

Any: Result of callee execution

165

"""

166

167

async def stop(self, timeout: float = None):

168

"""

169

Stop the session pool and close all sessions.

170

171

Args:

172

timeout (float, optional): Shutdown timeout

173

"""

174

175

def checkout(self) -> 'AsyncSessionCheckout':

176

"""

177

Create async session checkout context manager.

178

179

Returns:

180

AsyncSessionCheckout: Async session context manager

181

"""

182

183

class AsyncSessionCheckout:

184

def __init__(self, pool: SessionPool):

185

"""

186

Async context manager for session checkout.

187

188

Args:

189

pool (SessionPool): Parent session pool

190

"""

191

192

async def __aenter__(self) -> 'Session':

193

"""

194

Acquire session from pool.

195

196

Returns:

197

Session: Available session

198

"""

199

200

async def __aexit__(self, exc_type, exc_val, exc_tb):

201

"""

202

Release session back to pool.

203

"""

204

```

205

206

### Async Sessions

207

208

Asynchronous database session for query execution and transaction management.

209

210

```python { .api }

211

class Session:

212

def __init__(self, driver: Driver):

213

"""

214

Create asynchronous database session.

215

216

Args:

217

driver (Driver): Async YDB driver instance

218

"""

219

220

async def __aenter__(self) -> 'Session':

221

"""

222

Enter async context manager.

223

224

Returns:

225

Session: Initialized session

226

"""

227

228

async def __aexit__(self, exc_type, exc_val, exc_tb):

229

"""

230

Exit async context manager and close session.

231

"""

232

233

async def create_table(

234

self,

235

path: str,

236

table_description: TableDescription,

237

settings: CreateTableSettings = None

238

):

239

"""

240

Create table asynchronously.

241

242

Args:

243

path (str): Table path

244

table_description (TableDescription): Table structure definition

245

settings (CreateTableSettings, optional): Creation settings

246

"""

247

248

async def drop_table(self, path: str, settings: DropTableSettings = None):

249

"""

250

Drop table asynchronously.

251

252

Args:

253

path (str): Table path

254

settings (DropTableSettings, optional): Drop settings

255

"""

256

257

async def alter_table(

258

self,

259

path: str,

260

alter_table_settings: AlterTableSettings

261

):

262

"""

263

Alter table structure asynchronously.

264

265

Args:

266

path (str): Table path

267

alter_table_settings (AlterTableSettings): Alteration settings

268

"""

269

270

async def copy_table(

271

self,

272

source_path: str,

273

destination_path: str,

274

settings: CopyTableSettings = None

275

):

276

"""

277

Copy table asynchronously.

278

279

Args:

280

source_path (str): Source table path

281

destination_path (str): Destination table path

282

settings (CopyTableSettings, optional): Copy settings

283

"""

284

285

async def describe_table(

286

self,

287

path: str,

288

settings: DescribeTableSettings = None

289

) -> TableDescription:

290

"""

291

Describe table structure asynchronously.

292

293

Args:

294

path (str): Table path

295

settings (DescribeTableSettings, optional): Describe settings

296

297

Returns:

298

TableDescription: Table structure information

299

"""

300

301

async def execute_query(

302

self,

303

query: str,

304

parameters: Dict[str, Any] = None,

305

settings: ExecuteQuerySettings = None

306

) -> List[ResultSet]:

307

"""

308

Execute YQL query asynchronously.

309

310

Args:

311

query (str): YQL query text

312

parameters (Dict[str, Any], optional): Query parameters

313

settings (ExecuteQuerySettings, optional): Execution settings

314

315

Returns:

316

List[ResultSet]: Query results

317

"""

318

319

async def execute_scheme_query(self, query: str):

320

"""

321

Execute scheme query asynchronously.

322

323

Args:

324

query (str): Scheme query text (DDL)

325

"""

326

327

async def prepare_query(

328

self,

329

query: str,

330

settings: PrepareQuerySettings = None

331

) -> DataQuery:

332

"""

333

Prepare query for execution asynchronously.

334

335

Args:

336

query (str): YQL query text

337

settings (PrepareQuerySettings, optional): Preparation settings

338

339

Returns:

340

DataQuery: Prepared query object

341

"""

342

343

async def transaction(self, tx_mode: TxMode = None) -> 'AsyncTxContext':

344

"""

345

Begin transaction asynchronously.

346

347

Args:

348

tx_mode (TxMode, optional): Transaction mode

349

350

Returns:

351

AsyncTxContext: Async transaction context

352

"""

353

354

async def read_table(

355

self,

356

path: str,

357

key_range: KeyRange = None,

358

columns: List[str] = None,

359

settings: ReadTableSettings = None

360

) -> AsyncIterator[ResultSet]:

361

"""

362

Read table data asynchronously with streaming.

363

364

Args:

365

path (str): Table path

366

key_range (KeyRange, optional): Key range to read

367

columns (List[str], optional): Columns to read

368

settings (ReadTableSettings, optional): Read settings

369

370

Returns:

371

AsyncIterator[ResultSet]: Streaming result sets

372

"""

373

374

async def bulk_upsert(

375

self,

376

path: str,

377

rows: Union[List[Dict], pd.DataFrame],

378

column_types: Dict[str, Type] = None,

379

settings: BulkUpsertSettings = None

380

):

381

"""

382

Bulk upsert data asynchronously.

383

384

Args:

385

path (str): Table path

386

rows (Union[List[Dict], pd.DataFrame]): Data to upsert

387

column_types (Dict[str, Type], optional): Column type overrides

388

settings (BulkUpsertSettings, optional): Upsert settings

389

"""

390

391

async def close(self):

392

"""

393

Close session and release resources asynchronously.

394

"""

395

396

@property

397

def session_id(self) -> str:

398

"""Get session identifier."""

399

```

400

401

### Async Transaction Context

402

403

Asynchronous transaction management with automatic commit/rollback handling.

404

405

```python { .api }

406

class AsyncTxContext:

407

def __init__(self, session: Session, tx_mode: TxMode = None):

408

"""

409

Asynchronous transaction context.

410

411

Args:

412

session (Session): Parent async session

413

tx_mode (TxMode, optional): Transaction isolation mode

414

"""

415

416

async def __aenter__(self) -> 'AsyncTxContext':

417

"""

418

Enter async transaction context.

419

420

Returns:

421

AsyncTxContext: Transaction context

422

"""

423

424

async def __aexit__(self, exc_type, exc_val, exc_tb):

425

"""

426

Exit async transaction context with automatic commit/rollback.

427

"""

428

429

async def execute(

430

self,

431

query: str,

432

parameters: Dict[str, Any] = None,

433

commit_tx: bool = False,

434

settings: ExecuteQuerySettings = None

435

) -> List[ResultSet]:

436

"""

437

Execute query within transaction asynchronously.

438

439

Args:

440

query (str): YQL query text

441

parameters (Dict[str, Any], optional): Query parameters

442

commit_tx (bool): Commit transaction after execution

443

settings (ExecuteQuerySettings, optional): Execution settings

444

445

Returns:

446

List[ResultSet]: Query results

447

"""

448

449

async def commit(self, settings: CommitTxSettings = None):

450

"""

451

Commit transaction asynchronously.

452

453

Args:

454

settings (CommitTxSettings, optional): Commit settings

455

"""

456

457

async def rollback(self, settings: RollbackTxSettings = None):

458

"""

459

Rollback transaction asynchronously.

460

461

Args:

462

settings (RollbackTxSettings, optional): Rollback settings

463

"""

464

465

@property

466

def tx_id(self) -> str:

467

"""Get transaction identifier."""

468

```

469

470

### Async Query Service

471

472

Asynchronous query service with modern YQL interface and session pooling.

473

474

```python { .api }

475

class QuerySessionPool:

476

def __init__(

477

self,

478

driver: Driver,

479

size: int = None,

480

query_client_settings: QueryClientSettings = None

481

):

482

"""

483

Asynchronous query session pool.

484

485

Args:

486

driver (Driver): Async YDB driver instance

487

size (int, optional): Maximum pool size

488

query_client_settings (QueryClientSettings, optional): Default settings

489

"""

490

491

async def __aenter__(self) -> 'QuerySessionPool':

492

"""Enter async context manager."""

493

494

async def __aexit__(self, exc_type, exc_val, exc_tb):

495

"""Exit async context manager."""

496

497

async def acquire(self, timeout: float = None) -> 'QuerySession':

498

"""

499

Acquire query session from pool asynchronously.

500

501

Args:

502

timeout (float, optional): Acquisition timeout

503

504

Returns:

505

QuerySession: Available async query session

506

"""

507

508

async def release(self, session: 'QuerySession'):

509

"""

510

Release query session back to pool.

511

512

Args:

513

session (QuerySession): Session to release

514

"""

515

516

async def retry_operation(

517

self,

518

callee: Callable[['QuerySession'], Awaitable[Any]],

519

retry_settings: RetrySettings = None,

520

*args,

521

**kwargs

522

) -> Any:

523

"""

524

Execute async operation with automatic retry.

525

526

Args:

527

callee (Callable): Async function to execute

528

retry_settings (RetrySettings, optional): Retry configuration

529

530

Returns:

531

Any: Result of callee execution

532

"""

533

534

async def stop(self, timeout: float = None):

535

"""Stop the query session pool."""

536

537

class QuerySession:

538

def __init__(self, driver: Driver, settings: QueryClientSettings = None):

539

"""

540

Asynchronous query session.

541

542

Args:

543

driver (Driver): Async YDB driver instance

544

settings (QueryClientSettings, optional): Session configuration

545

"""

546

547

async def execute_query(

548

self,

549

query: str,

550

parameters: Dict[str, Any] = None,

551

tx_control: QueryTxControl = None,

552

settings: ExecuteQuerySettings = None

553

) -> AsyncIterator[ResultSet]:

554

"""

555

Execute query asynchronously with streaming results.

556

557

Args:

558

query (str): YQL query text

559

parameters (Dict[str, Any], optional): Query parameters

560

tx_control (QueryTxControl, optional): Transaction control

561

settings (ExecuteQuerySettings, optional): Execution settings

562

563

Returns:

564

AsyncIterator[ResultSet]: Streaming query results

565

"""

566

567

async def transaction(

568

self,

569

tx_settings: QueryTxSettings = None

570

) -> 'AsyncQueryTxContext':

571

"""

572

Begin async query transaction.

573

574

Args:

575

tx_settings (QueryTxSettings, optional): Transaction settings

576

577

Returns:

578

AsyncQueryTxContext: Async transaction context

579

"""

580

581

async def close(self):

582

"""Close async query session."""

583

584

class AsyncQueryTxContext:

585

async def __aenter__(self) -> 'AsyncQueryTxContext':

586

"""Enter async transaction context."""

587

588

async def __aexit__(self, exc_type, exc_val, exc_tb):

589

"""Exit async transaction context."""

590

591

async def execute(

592

self,

593

query: str,

594

parameters: Dict[str, Any] = None,

595

settings: ExecuteQuerySettings = None

596

) -> AsyncIterator[ResultSet]:

597

"""Execute query within async transaction."""

598

599

async def commit(self):

600

"""Commit async transaction."""

601

602

async def rollback(self):

603

"""Rollback async transaction."""

604

```

605

606

### Async Scheme Operations

607

608

Asynchronous schema and directory operations client.

609

610

```python { .api }

611

class SchemeClient:

612

def __init__(self, driver: Driver):

613

"""

614

Asynchronous scheme client for schema operations.

615

616

Args:

617

driver (Driver): Async YDB driver instance

618

"""

619

620

async def make_directory(

621

self,

622

path: str,

623

settings: MakeDirectorySettings = None

624

):

625

"""

626

Create directory asynchronously.

627

628

Args:

629

path (str): Directory path

630

settings (MakeDirectorySettings, optional): Creation settings

631

"""

632

633

async def remove_directory(

634

self,

635

path: str,

636

settings: RemoveDirectorySettings = None

637

):

638

"""

639

Remove directory asynchronously.

640

641

Args:

642

path (str): Directory path

643

settings (RemoveDirectorySettings, optional): Removal settings

644

"""

645

646

async def list_directory(

647

self,

648

path: str,

649

settings: ListDirectorySettings = None

650

) -> Directory:

651

"""

652

List directory contents asynchronously.

653

654

Args:

655

path (str): Directory path

656

settings (ListDirectorySettings, optional): Listing settings

657

658

Returns:

659

Directory: Directory information with entries

660

"""

661

662

async def describe_path(

663

self,

664

path: str,

665

settings: DescribePathSettings = None

666

) -> SchemeEntry:

667

"""

668

Describe path entry asynchronously.

669

670

Args:

671

path (str): Entry path

672

settings (DescribePathSettings, optional): Describe settings

673

674

Returns:

675

SchemeEntry: Path entry information

676

"""

677

678

async def modify_permissions(

679

self,

680

path: str,

681

permissions: Permissions,

682

settings: ModifyPermissionsSettings = None

683

):

684

"""

685

Modify path permissions asynchronously.

686

687

Args:

688

path (str): Entry path

689

permissions (Permissions): Permission changes

690

settings (ModifyPermissionsSettings, optional): Modify settings

691

"""

692

```

693

694

### Async Retry Operations

695

696

Asynchronous retry functionality with backoff and error handling.

697

698

```python { .api }

699

async def retry_operation(

700

callee: Callable[..., Awaitable[Any]],

701

retry_settings: RetrySettings = None,

702

session_pool: SessionPool = None,

703

*args,

704

**kwargs

705

) -> Any:

706

"""

707

Execute async operation with retry logic.

708

709

Args:

710

callee (Callable): Async function to execute

711

retry_settings (RetrySettings, optional): Retry configuration

712

session_pool (SessionPool, optional): Session pool for session-based operations

713

*args: Additional arguments for callee

714

**kwargs: Additional keyword arguments for callee

715

716

Returns:

717

Any: Result of successful callee execution

718

"""

719

720

class AsyncRetrySettings:

721

def __init__(

722

self,

723

max_retries: int = 10,

724

max_session_acquire_timeout: float = None,

725

fast_backoff_settings: BackoffSettings = None,

726

slow_backoff_settings: BackoffSettings = None,

727

**kwargs

728

):

729

"""

730

Retry settings for async operations.

731

732

Args:

733

max_retries (int): Maximum number of retry attempts

734

max_session_acquire_timeout (float, optional): Session acquisition timeout

735

fast_backoff_settings (BackoffSettings, optional): Fast backoff configuration

736

slow_backoff_settings (BackoffSettings, optional): Slow backoff configuration

737

"""

738

```

739

740

## Usage Examples

741

742

### Basic Async Driver Usage

743

744

```python

745

import asyncio

746

import ydb.aio as ydb_aio

747

748

async def main():

749

# Create async driver with context manager

750

async with ydb_aio.Driver(

751

endpoint="grpc://localhost:2136",

752

database="/local",

753

credentials=ydb.AnonymousCredentials()

754

) as driver:

755

# Wait for driver to be ready

756

await driver.wait(fail_fast=True, timeout=5)

757

758

# Create session pool

759

async with ydb_aio.SessionPool(driver) as pool:

760

# Execute operation

761

async def query_operation(session):

762

result_sets = await session.execute_query("SELECT 1 AS value")

763

return [row.value for row in result_sets[0].rows]

764

765

results = await pool.retry_operation(query_operation)

766

print(f"Results: {results}")

767

768

# Run async main function

769

asyncio.run(main())

770

```

771

772

### Async Transaction Management

773

774

```python

775

async def transfer_funds(session, from_account, to_account, amount):

776

# Execute multiple queries in async transaction

777

async with await session.transaction(ydb.SerializableReadWrite()) as tx:

778

# Debit from source account

779

await tx.execute(

780

"""

781

UPDATE accounts

782

SET balance = balance - $amount

783

WHERE account_id = $from_account

784

""",

785

parameters={

786

"$from_account": from_account,

787

"$amount": amount

788

}

789

)

790

791

# Credit to destination account

792

await tx.execute(

793

"""

794

UPDATE accounts

795

SET balance = balance + $amount

796

WHERE account_id = $to_account

797

""",

798

parameters={

799

"$to_account": to_account,

800

"$amount": amount

801

}

802

)

803

# Transaction automatically commits on context exit

804

805

# Usage with session pool

806

async with pool.checkout() as session:

807

await transfer_funds(session, "acc1", "acc2", 100.0)

808

```

809

810

### Async Query Service

811

812

```python

813

async def execute_analytics_query():

814

async with ydb_aio.Driver(...) as driver:

815

query_pool = driver.query_session_pool(size=5)

816

817

async with query_pool as pool:

818

async def analytics_operation(session):

819

query = """

820

SELECT

821

DATE_TRUNC('month', created_at) as month,

822

COUNT(*) as orders_count,

823

SUM(total_amount) as total_revenue

824

FROM orders

825

WHERE created_at >= $start_date

826

GROUP BY month

827

ORDER BY month

828

"""

829

830

parameters = {"$start_date": datetime(2024, 1, 1)}

831

832

async for result_set in session.execute_query(query, parameters):

833

async for row in result_set.rows:

834

print(f"Month: {row.month}, Orders: {row.orders_count}, Revenue: {row.total_revenue}")

835

836

await pool.retry_operation(analytics_operation)

837

```

838

839

### Async Bulk Operations

840

841

```python

842

async def bulk_insert_users(session, user_data):

843

# Prepare bulk data

844

rows = [

845

{"user_id": user["id"], "name": user["name"], "email": user["email"]}

846

for user in user_data

847

]

848

849

# Define column types

850

column_types = {

851

"user_id": ydb.PrimitiveType.Uint64,

852

"name": ydb.PrimitiveType.Utf8,

853

"email": ydb.PrimitiveType.Utf8

854

}

855

856

# Perform bulk upsert asynchronously

857

await session.bulk_upsert(

858

"/local/users",

859

rows,

860

column_types=column_types

861

)

862

863

# Usage

864

async with pool.checkout() as session:

865

await bulk_insert_users(session, user_data_list)

866

```

867

868

### Async Schema Operations

869

870

```python

871

async def setup_database_schema():

872

async with ydb_aio.Driver(...) as driver:

873

scheme_client = driver.scheme_client()

874

875

# Create directories

876

await scheme_client.make_directory("/local/app")

877

await scheme_client.make_directory("/local/app/tables")

878

879

# List directory contents

880

directory = await scheme_client.list_directory("/local/app")

881

for entry in directory.children:

882

print(f"Entry: {entry.name}, Type: {entry.type}")

883

884

# Create table through session

885

async with ydb_aio.SessionPool(driver) as pool:

886

async def create_table_operation(session):

887

table_description = (

888

ydb.TableDescription()

889

.with_column(ydb.TableColumn("id", ydb.OptionalType(ydb.PrimitiveType.Uint64)))

890

.with_column(ydb.TableColumn("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)))

891

.with_primary_key("id")

892

)

893

894

await session.create_table("/local/app/tables/users", table_description)

895

896

await pool.retry_operation(create_table_operation)

897

```

898

899

## Type Definitions

900

901

```python { .api }

902

# Type aliases for async operations

903

AsyncQueryCallback = Callable[['Session'], Awaitable[Any]]

904

AsyncQuerySessionCallback = Callable[['QuerySession'], Awaitable[Any]]

905

AsyncResultIterator = AsyncIterator[ResultSet]

906

907

# Common async context managers

908

AsyncDriverContext = AsyncContextManager[Driver]

909

AsyncSessionPoolContext = AsyncContextManager[SessionPool]

910

AsyncSessionContext = AsyncContextManager[Session]

911

AsyncTxContextManager = AsyncContextManager[AsyncTxContext]

912

```