or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mddata-types.mddbapi-interface.mddriver-connection.mderror-handling.mdindex.mdquery-service.mdschema-operations.mdsqlalchemy-integration.mdtable-operations.mdtopic-operations.md

sqlalchemy-integration.mddocs/

0

# SQLAlchemy Integration

1

2

SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.

3

4

## Capabilities

5

6

### YDB SQLAlchemy Dialect

7

8

The YDB SQLAlchemy dialect enables using YDB with SQLAlchemy ORM and core functionality.

9

10

```python { .api }

11

import ydb.sqlalchemy

12

13

class YqlDialect(DefaultDialect):

14

"""

15

SQLAlchemy dialect for YDB (YQL).

16

17

Provides SQLAlchemy integration for YDB database operations

18

including ORM support, query compilation, and type mapping.

19

"""

20

21

name = "yql"

22

supports_alter = False

23

max_identifier_length = 63

24

supports_sane_rowcount = False

25

supports_statement_cache = False

26

27

supports_native_enum = False

28

supports_native_boolean = True

29

supports_smallserial = False

30

31

supports_sequences = False

32

sequences_optional = True

33

preexecute_autoincrement_sequences = True

34

postfetch_lastrowid = False

35

36

supports_default_values = False

37

supports_empty_insert = False

38

supports_multivalues_insert = True

39

default_paramstyle = "qmark"

40

41

isolation_level = None

42

43

@staticmethod

44

def dbapi():

45

"""

46

Get YDB DB-API module.

47

48

Returns:

49

module: YDB DB-API 2.0 module

50

"""

51

import ydb.dbapi

52

return ydb.dbapi

53

54

def get_columns(

55

self,

56

connection,

57

table_name: str,

58

schema: str = None,

59

**kwargs

60

) -> List[Dict]:

61

"""

62

Get table column information for SQLAlchemy reflection.

63

64

Args:

65

connection: SQLAlchemy connection

66

table_name (str): Table name

67

schema (str, optional): Schema name (not supported)

68

**kwargs: Additional arguments

69

70

Returns:

71

List[Dict]: Column information dictionaries

72

"""

73

74

def has_table(

75

self,

76

connection,

77

table_name: str,

78

schema: str = None

79

) -> bool:

80

"""

81

Check if table exists.

82

83

Args:

84

connection: SQLAlchemy connection

85

table_name (str): Table name to check

86

schema (str, optional): Schema name (not supported)

87

88

Returns:

89

bool: True if table exists

90

"""

91

92

def register_dialect(

93

name: str = "yql",

94

module: str = None,

95

cls: str = "YqlDialect"

96

):

97

"""

98

Register YDB SQLAlchemy dialect.

99

100

Args:

101

name (str): Dialect name

102

module (str, optional): Module path

103

cls (str): Dialect class name

104

"""

105

```

106

107

### YDB-Specific SQLAlchemy Types

108

109

Custom SQLAlchemy types for YDB-specific data types.

110

111

```python { .api }

112

from sqlalchemy.types import Integer, TypeDecorator

113

114

class UInt8(Integer):

115

"""

116

SQLAlchemy type for YDB UInt8.

117

118

Maps to YDB's 8-bit unsigned integer type.

119

"""

120

121

def __init__(self):

122

"""Create UInt8 type."""

123

super().__init__()

124

125

class UInt32(Integer):

126

"""

127

SQLAlchemy type for YDB UInt32.

128

129

Maps to YDB's 32-bit unsigned integer type.

130

"""

131

132

def __init__(self):

133

"""Create UInt32 type."""

134

super().__init__()

135

136

class UInt64(Integer):

137

"""

138

SQLAlchemy type for YDB UInt64.

139

140

Maps to YDB's 64-bit unsigned integer type.

141

"""

142

143

def __init__(self):

144

"""Create UInt64 type."""

145

super().__init__()

146

147

class YdbDateTime(TypeDecorator):

148

"""

149

SQLAlchemy type decorator for YDB datetime handling.

150

151

Handles conversion between Python datetime and YDB datetime types.

152

"""

153

154

impl = sa.DATETIME

155

cache_ok = True

156

157

class YdbDecimal(TypeDecorator):

158

"""

159

SQLAlchemy type decorator for YDB decimal type.

160

161

Handles YDB decimal precision and scale requirements.

162

"""

163

164

impl = sa.DECIMAL

165

cache_ok = True

166

167

def __init__(self, precision: int = 22, scale: int = 9):

168

"""

169

Create YDB decimal type.

170

171

Args:

172

precision (int): Decimal precision (max 35)

173

scale (int): Decimal scale (max precision)

174

"""

175

super().__init__(precision=precision, scale=scale)

176

177

class YdbJson(TypeDecorator):

178

"""

179

SQLAlchemy type decorator for YDB JSON types.

180

181

Handles both Json and JsonDocument YDB types.

182

"""

183

184

impl = sa.JSON

185

cache_ok = True

186

```

187

188

### Type Compiler

189

190

Custom type compiler for translating SQLAlchemy types to YQL types.

191

192

```python { .api }

193

class YqlTypeCompiler(GenericTypeCompiler):

194

"""

195

Type compiler for converting SQLAlchemy types to YQL.

196

"""

197

198

def visit_VARCHAR(self, type_, **kwargs) -> str:

199

"""

200

Convert VARCHAR to YQL STRING type.

201

202

Args:

203

type_: SQLAlchemy VARCHAR type

204

**kwargs: Additional arguments

205

206

Returns:

207

str: YQL type string

208

"""

209

return "STRING"

210

211

def visit_unicode(self, type_, **kwargs) -> str:

212

"""

213

Convert Unicode to YQL UTF8 type.

214

215

Args:

216

type_: SQLAlchemy Unicode type

217

**kwargs: Additional arguments

218

219

Returns:

220

str: YQL type string

221

"""

222

return "UTF8"

223

224

def visit_TEXT(self, type_, **kwargs) -> str:

225

"""

226

Convert TEXT to YQL UTF8 type.

227

228

Args:

229

type_: SQLAlchemy TEXT type

230

**kwargs: Additional arguments

231

232

Returns:

233

str: YQL type string

234

"""

235

return "UTF8"

236

237

def visit_BOOLEAN(self, type_, **kwargs) -> str:

238

"""

239

Convert BOOLEAN to YQL BOOL type.

240

241

Args:

242

type_: SQLAlchemy BOOLEAN type

243

**kwargs: Additional arguments

244

245

Returns:

246

str: YQL type string

247

"""

248

return "BOOL"

249

250

def visit_FLOAT(self, type_, **kwargs) -> str:

251

"""

252

Convert FLOAT to YQL DOUBLE type.

253

254

Args:

255

type_: SQLAlchemy FLOAT type

256

**kwargs: Additional arguments

257

258

Returns:

259

str: YQL type string

260

"""

261

return "DOUBLE"

262

263

def visit_uint32(self, type_, **kwargs) -> str:

264

"""

265

Convert UInt32 to YQL UInt32 type.

266

267

Args:

268

type_: YDB UInt32 type

269

**kwargs: Additional arguments

270

271

Returns:

272

str: YQL type string

273

"""

274

return "UInt32"

275

276

def visit_uint64(self, type_, **kwargs) -> str:

277

"""

278

Convert UInt64 to YQL UInt64 type.

279

280

Args:

281

type_: YDB UInt64 type

282

**kwargs: Additional arguments

283

284

Returns:

285

str: YQL type string

286

"""

287

return "UInt64"

288

289

def visit_uint8(self, type_, **kwargs) -> str:

290

"""

291

Convert UInt8 to YQL UInt8 type.

292

293

Args:

294

type_: YDB UInt8 type

295

**kwargs: Additional arguments

296

297

Returns:

298

str: YQL type string

299

"""

300

return "UInt8"

301

```

302

303

### Query Compiler

304

305

Custom SQL compiler for generating YQL from SQLAlchemy expressions.

306

307

```python { .api }

308

class YqlCompiler(SQLCompiler):

309

"""

310

SQL compiler for generating YQL queries from SQLAlchemy expressions.

311

"""

312

313

def group_by_clause(self, select, **kwargs):

314

"""

315

Generate GROUP BY clause for YQL.

316

317

Args:

318

select: SQLAlchemy select object

319

**kwargs: Additional arguments

320

321

Returns:

322

str: YQL GROUP BY clause

323

"""

324

kwargs.update(within_columns_clause=True)

325

return super().group_by_clause(select, **kwargs)

326

327

def visit_function(self, func, add_to_result_map=None, **kwargs):

328

"""

329

Visit function expressions and convert to YQL syntax.

330

331

Args:

332

func: SQLAlchemy function expression

333

add_to_result_map: Result map callback

334

**kwargs: Additional arguments

335

336

Returns:

337

str: YQL function call

338

"""

339

# Handle YQL namespace syntax (::) instead of SQL (.)

340

disp = getattr(self, f"visit_{func.name.lower()}_func", None)

341

if disp:

342

return disp(func, **kwargs)

343

344

# Convert function names to YQL format

345

name = func.name

346

if hasattr(func, 'packagenames') and func.packagenames:

347

name = "::".join(func.packagenames + [name])

348

349

return f"{name}{self.function_argspec(func, **kwargs)}"

350

351

def visit_lambda(self, lambda_, **kwargs):

352

"""

353

Visit lambda expressions for YQL.

354

355

Args:

356

lambda_: Lambda expression

357

**kwargs: Additional arguments

358

359

Returns:

360

str: YQL lambda expression

361

"""

362

func = lambda_.func

363

spec = inspect_getfullargspec(func)

364

365

# Build YQL lambda syntax: ($arg1, $arg2) -> { RETURN expression; }

366

args_str = "(" + ", ".join(f"${arg}" for arg in spec.args) + ")"

367

368

# Create literal columns for lambda parameters

369

args = [literal_column(f"${arg}") for arg in spec.args]

370

body_expr = func(*args)

371

body_str = self.process(body_expr, **kwargs)

372

373

return f"{args_str} -> {{ RETURN {body_str}; }}"

374

375

class ParametrizedFunction(functions.Function):

376

"""

377

SQLAlchemy function with YQL-style parameters.

378

379

Supports YQL functions that take type parameters.

380

"""

381

382

__visit_name__ = "parametrized_function"

383

384

def __init__(self, name: str, params: List, *args, **kwargs):

385

"""

386

Create parametrized function.

387

388

Args:

389

name (str): Function name

390

params (List): Type parameters

391

*args: Function arguments

392

**kwargs: Additional arguments

393

"""

394

super().__init__(name, *args, **kwargs)

395

self._func_name = name

396

self._func_params = params

397

self.params_expr = ClauseList(

398

operator=functions.operators.comma_op,

399

group_contents=True,

400

*params

401

).self_group()

402

```

403

404

### Identifier Preparer

405

406

Custom identifier handling for YQL naming conventions.

407

408

```python { .api }

409

class YqlIdentifierPreparer(IdentifierPreparer):

410

"""

411

Identifier preparer for YQL naming conventions.

412

413

Handles quoting and escaping of identifiers in YQL.

414

"""

415

416

def __init__(self, dialect):

417

"""

418

Create YQL identifier preparer.

419

420

Args:

421

dialect: YQL dialect instance

422

"""

423

super().__init__(

424

dialect,

425

initial_quote="`",

426

final_quote="`",

427

)

428

429

def _requires_quotes(self, value: str) -> bool:

430

"""

431

Determine if identifier requires quoting.

432

433

Args:

434

value (str): Identifier value

435

436

Returns:

437

bool: True if quoting is required

438

"""

439

# Force quoting unless already quoted

440

return not (

441

value.startswith(self.initial_quote) and

442

value.endswith(self.final_quote)

443

)

444

445

def quote_identifier(self, value: str) -> str:

446

"""

447

Quote identifier for YQL.

448

449

Args:

450

value (str): Identifier to quote

451

452

Returns:

453

str: Quoted identifier

454

"""

455

if self._requires_quotes(value):

456

return f"{self.initial_quote}{value}{self.final_quote}"

457

return value

458

```

459

460

### Connection and Engine Creation

461

462

Utilities for creating SQLAlchemy engines and connections to YDB.

463

464

```python { .api }

465

def create_ydb_engine(

466

endpoint: str,

467

database: str,

468

credentials: ydb.Credentials = None,

469

**engine_kwargs

470

) -> sa.Engine:

471

"""

472

Create SQLAlchemy engine for YDB.

473

474

Args:

475

endpoint (str): YDB endpoint URL

476

database (str): Database path

477

credentials (ydb.Credentials, optional): Authentication credentials

478

**engine_kwargs: Additional engine arguments

479

480

Returns:

481

sa.Engine: SQLAlchemy engine configured for YDB

482

"""

483

# Register YDB dialect

484

register_dialect()

485

486

# Build connection string

487

connection_string = f"yql://{endpoint.replace('grpc://', '')}{database}"

488

489

# Create engine with YDB-specific settings

490

engine = sa.create_engine(

491

connection_string,

492

credentials=credentials,

493

**engine_kwargs

494

)

495

496

return engine

497

498

def get_ydb_metadata(engine: sa.Engine) -> sa.MetaData:

499

"""

500

Get SQLAlchemy metadata with YDB table reflection.

501

502

Args:

503

engine (sa.Engine): YDB SQLAlchemy engine

504

505

Returns:

506

sa.MetaData: Metadata with reflected tables

507

"""

508

metadata = sa.MetaData()

509

metadata.reflect(bind=engine)

510

return metadata

511

512

class YdbEngineBuilder:

513

"""

514

Builder for YDB SQLAlchemy engines with configuration.

515

"""

516

517

def __init__(self):

518

"""Create engine builder."""

519

self.endpoint = None

520

self.database = None

521

self.credentials = None

522

self.engine_options = {}

523

524

def with_endpoint(self, endpoint: str) -> 'YdbEngineBuilder':

525

"""

526

Set YDB endpoint.

527

528

Args:

529

endpoint (str): YDB endpoint URL

530

531

Returns:

532

YdbEngineBuilder: Self for chaining

533

"""

534

self.endpoint = endpoint

535

return self

536

537

def with_database(self, database: str) -> 'YdbEngineBuilder':

538

"""

539

Set database path.

540

541

Args:

542

database (str): Database path

543

544

Returns:

545

YdbEngineBuilder: Self for chaining

546

"""

547

self.database = database

548

return self

549

550

def with_credentials(self, credentials: ydb.Credentials) -> 'YdbEngineBuilder':

551

"""

552

Set authentication credentials.

553

554

Args:

555

credentials (ydb.Credentials): YDB credentials

556

557

Returns:

558

YdbEngineBuilder: Self for chaining

559

"""

560

self.credentials = credentials

561

return self

562

563

def with_pool_size(self, size: int) -> 'YdbEngineBuilder':

564

"""

565

Set connection pool size.

566

567

Args:

568

size (int): Pool size

569

570

Returns:

571

YdbEngineBuilder: Self for chaining

572

"""

573

self.engine_options['pool_size'] = size

574

return self

575

576

def build(self) -> sa.Engine:

577

"""

578

Build SQLAlchemy engine.

579

580

Returns:

581

sa.Engine: Configured YDB engine

582

"""

583

if not self.endpoint or not self.database:

584

raise ValueError("Endpoint and database are required")

585

586

return create_ydb_engine(

587

self.endpoint,

588

self.database,

589

self.credentials,

590

**self.engine_options

591

)

592

```

593

594

## Usage Examples

595

596

### Basic SQLAlchemy Setup

597

598

```python

599

import sqlalchemy as sa

600

import ydb.sqlalchemy

601

from sqlalchemy.ext.declarative import declarative_base

602

from sqlalchemy.orm import sessionmaker

603

604

# Register YDB dialect

605

ydb.sqlalchemy.register_dialect()

606

607

# Create engine

608

engine = sa.create_engine("yql://localhost:2136/local")

609

610

# Create metadata and base class

611

Base = declarative_base()

612

metadata = sa.MetaData()

613

614

# Define ORM model

615

class User(Base):

616

__tablename__ = 'users'

617

618

id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)

619

name = sa.Column(sa.Unicode(255), nullable=False)

620

email = sa.Column(sa.Unicode(255), unique=True)

621

age = sa.Column(ydb.sqlalchemy.UInt32)

622

created_at = sa.Column(sa.DateTime, default=sa.func.CurrentUtcDatetime())

623

is_active = sa.Column(sa.Boolean, default=True)

624

metadata_json = sa.Column(sa.JSON)

625

626

def __repr__(self):

627

return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

628

629

# Create session factory

630

Session = sessionmaker(bind=engine)

631

```

632

633

### ORM Operations

634

635

```python

636

def demonstrate_orm_operations():

637

"""Demonstrate basic ORM operations with YDB."""

638

639

session = Session()

640

641

try:

642

# Create new user

643

new_user = User(

644

id=1001,

645

name="Alice Johnson",

646

email="alice@example.com",

647

age=25,

648

metadata_json={"department": "engineering", "level": "senior"}

649

)

650

651

session.add(new_user)

652

session.commit()

653

654

# Query users

655

users = session.query(User).filter(User.age > 21).all()

656

print(f"Found {len(users)} users over 21")

657

658

# Update user

659

user = session.query(User).filter(User.email == "alice@example.com").first()

660

if user:

661

user.age = 26

662

user.metadata_json = {"department": "engineering", "level": "lead"}

663

session.commit()

664

print(f"Updated user: {user}")

665

666

# Aggregate query

667

avg_age = session.query(sa.func.avg(User.age)).scalar()

668

print(f"Average age: {avg_age}")

669

670

# Complex query with joins and filters

671

active_engineers = session.query(User).filter(

672

User.is_active == True,

673

User.metadata_json['department'].astext == 'engineering'

674

).order_by(User.created_at.desc()).all()

675

676

print(f"Active engineers: {len(active_engineers)}")

677

678

except Exception as e:

679

session.rollback()

680

print(f"Error in ORM operations: {e}")

681

682

finally:

683

session.close()

684

685

demonstrate_orm_operations()

686

```

687

688

### Advanced YQL Features

689

690

```python

691

def demonstrate_yql_features():

692

"""Demonstrate YQL-specific features through SQLAlchemy."""

693

694

session = Session()

695

696

try:

697

# Use YQL functions through SQLAlchemy

698

query = session.query(

699

User.name,

700

sa.func.ListLength(

701

sa.func.String_SplitToList(User.name, sa.literal(" "))

702

).label("name_parts_count")

703

).filter(User.is_active == True)

704

705

for user_name, parts_count in query:

706

print(f"User: {user_name}, Name parts: {parts_count}")

707

708

# YQL lambda expressions (requires custom compilation)

709

# This would need additional dialect customization

710

711

# Window functions

712

ranked_users = session.query(

713

User.name,

714

User.age,

715

sa.func.row_number().over(

716

order_by=User.age.desc()

717

).label("age_rank")

718

).all()

719

720

for name, age, rank in ranked_users:

721

print(f"Rank {rank}: {name} (age {age})")

722

723

# JSON operations

724

engineering_users = session.query(User).filter(

725

sa.func.JsonValue(

726

User.metadata_json,

727

'strict $.department'

728

) == 'engineering'

729

).all()

730

731

print(f"Engineering users: {len(engineering_users)}")

732

733

except Exception as e:

734

session.rollback()

735

print(f"Error in YQL operations: {e}")

736

737

finally:

738

session.close()

739

740

demonstrate_yql_features()

741

```

742

743

### Table Reflection and Inspection

744

745

```python

746

def inspect_ydb_schema():

747

"""Inspect YDB schema through SQLAlchemy reflection."""

748

749

# Reflect existing tables

750

metadata = sa.MetaData()

751

metadata.reflect(bind=engine)

752

753

print("Reflected tables:")

754

for table_name, table in metadata.tables.items():

755

print(f"\nTable: {table_name}")

756

print("Columns:")

757

758

for column in table.columns:

759

nullable = "NULL" if column.nullable else "NOT NULL"

760

primary = "PRIMARY KEY" if column.primary_key else ""

761

print(f" {column.name}: {column.type} {nullable} {primary}")

762

763

# Print indexes if any

764

if table.indexes:

765

print("Indexes:")

766

for index in table.indexes:

767

unique = "UNIQUE" if index.unique else ""

768

columns = ", ".join([col.name for col in index.columns])

769

print(f" {index.name}: {unique} ({columns})")

770

771

# Get table info using engine

772

inspector = sa.inspect(engine)

773

774

print("\nInspected table names:")

775

table_names = inspector.get_table_names()

776

for name in table_names:

777

print(f" {name}")

778

779

# Get column info

780

columns = inspector.get_columns(name)

781

for col in columns:

782

print(f" {col['name']}: {col['type']} (nullable: {col['nullable']})")

783

784

inspect_ydb_schema()

785

```

786

787

### Custom YDB Types Usage

788

789

```python

790

from decimal import Decimal

791

from datetime import datetime

792

793

def demonstrate_custom_types():

794

"""Demonstrate YDB-specific SQLAlchemy types."""

795

796

# Define table with YDB-specific types

797

class YdbTypeDemo(Base):

798

__tablename__ = 'ydb_type_demo'

799

800

id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)

801

small_int = sa.Column(ydb.sqlalchemy.UInt8)

802

medium_int = sa.Column(ydb.sqlalchemy.UInt32)

803

large_int = sa.Column(ydb.sqlalchemy.UInt64)

804

precise_decimal = sa.Column(ydb.sqlalchemy.YdbDecimal(precision=22, scale=9))

805

timestamp_field = sa.Column(ydb.sqlalchemy.YdbDateTime)

806

json_data = sa.Column(ydb.sqlalchemy.YdbJson)

807

808

# Create table (would need to be done outside SQLAlchemy for YDB)

809

# Base.metadata.create_all(engine) # Not supported in YDB

810

811

session = Session()

812

813

try:

814

# Insert with YDB types

815

demo_record = YdbTypeDemo(

816

id=1,

817

small_int=255, # Max UInt8

818

medium_int=4294967295, # Max UInt32

819

large_int=18446744073709551615, # Max UInt64

820

precise_decimal=Decimal('123456789.123456789'),

821

timestamp_field=datetime.now(),

822

json_data={

823

"nested": {"key": "value"},

824

"array": [1, 2, 3],

825

"boolean": True

826

}

827

)

828

829

session.add(demo_record)

830

session.commit()

831

832

# Query with type-specific operations

833

results = session.query(YdbTypeDemo).filter(

834

YdbTypeDemo.small_int > 100,

835

YdbTypeDemo.precise_decimal > Decimal('100')

836

).all()

837

838

for record in results:

839

print(f"ID: {record.id}")

840

print(f"Small int: {record.small_int}")

841

print(f"Precise decimal: {record.precise_decimal}")

842

print(f"JSON data: {record.json_data}")

843

844

except Exception as e:

845

session.rollback()

846

print(f"Error with custom types: {e}")

847

848

finally:

849

session.close()

850

851

demonstrate_custom_types()

852

```

853

854

### Connection Management

855

856

```python

857

def demonstrate_connection_management():

858

"""Demonstrate advanced connection management with SQLAlchemy."""

859

860

# Create engine with custom configuration

861

engine = ydb.sqlalchemy.YdbEngineBuilder()\

862

.with_endpoint("grpc://localhost:2136")\

863

.with_database("/local")\

864

.with_credentials(ydb.AnonymousCredentials())\

865

.with_pool_size(10)\

866

.build()

867

868

# Connection pooling with custom settings

869

engine_with_pool = sa.create_engine(

870

"yql://localhost:2136/local",

871

pool_size=20,

872

max_overflow=30,

873

pool_timeout=30,

874

pool_recycle=3600, # Recycle connections every hour

875

echo=True # Log SQL queries

876

)

877

878

# Use connection context managers

879

with engine.connect() as connection:

880

# Execute raw YQL

881

result = connection.execute(sa.text("""

882

SELECT COUNT(*) as user_count

883

FROM users

884

WHERE is_active = true

885

"""))

886

887

count = result.scalar()

888

print(f"Active users: {count}")

889

890

# Execute with parameters

891

result = connection.execute(

892

sa.text("SELECT * FROM users WHERE age > :min_age"),

893

{"min_age": 25}

894

)

895

896

for row in result:

897

print(f"User: {row.name}, Age: {row.age}")

898

899

# Transaction management

900

with engine.begin() as connection:

901

# All operations in this block are in a transaction

902

connection.execute(

903

sa.text("UPDATE users SET is_active = false WHERE age < :min_age"),

904

{"min_age": 18}

905

)

906

907

connection.execute(

908

sa.text("INSERT INTO audit_log (action, timestamp) VALUES (:action, :ts)"),

909

{"action": "deactivated_minors", "ts": datetime.now()}

910

)

911

# Automatically commits on successful exit

912

913

# Session with custom configuration

914

SessionFactory = sessionmaker(

915

bind=engine,

916

expire_on_commit=False,

917

autoflush=True,

918

autocommit=False

919

)

920

921

session = SessionFactory()

922

try:

923

# Perform operations

924

users = session.query(User).limit(10).all()

925

print(f"Retrieved {len(users)} users")

926

927

finally:

928

session.close()

929

930

demonstrate_connection_management()

931

```

932

933

### Error Handling with SQLAlchemy

934

935

```python

936

def handle_sqlalchemy_errors():

937

"""Demonstrate error handling with SQLAlchemy and YDB."""

938

939

session = Session()

940

941

try:

942

# Operation that might fail

943

user = User(

944

id=1, # Might conflict with existing ID

945

name="Test User",

946

email="test@example.com"

947

)

948

949

session.add(user)

950

session.commit()

951

952

except sa.exc.IntegrityError as e:

953

session.rollback()

954

print(f"Integrity error (likely duplicate key): {e}")

955

956

except sa.exc.OperationalError as e:

957

session.rollback()

958

print(f"Operational error (YDB-specific): {e}")

959

960

# Check if it's a retryable YDB error

961

original_error = e.orig

962

if isinstance(original_error, ydb.RetryableError):

963

print("This error can be retried")

964

elif isinstance(original_error, ydb.BadRequestError):

965

print("Bad request - fix the query")

966

967

except sa.exc.DatabaseError as e:

968

session.rollback()

969

print(f"Database error: {e}")

970

971

except Exception as e:

972

session.rollback()

973

print(f"Unexpected error: {e}")

974

975

finally:

976

session.close()

977

978

# Retry logic with SQLAlchemy

979

def retry_sqlalchemy_operation(operation_func, max_retries=3):

980

"""Retry SQLAlchemy operations with YDB error handling."""

981

982

for attempt in range(max_retries):

983

session = Session()

984

985

try:

986

result = operation_func(session)

987

session.commit()

988

return result

989

990

except sa.exc.OperationalError as e:

991

session.rollback()

992

993

# Check if the underlying YDB error is retryable

994

if hasattr(e, 'orig') and isinstance(e.orig, ydb.RetryableError):

995

if attempt < max_retries - 1:

996

backoff_time = 2 ** attempt

997

print(f"Retrying in {backoff_time}s (attempt {attempt + 1})")

998

time.sleep(backoff_time)

999

continue

1000

1001

raise

1002

1003

except Exception as e:

1004

session.rollback()

1005

raise

1006

1007

finally:

1008

session.close()

1009

1010

raise RuntimeError("Max retries exceeded")

1011

1012

# Use retry logic

1013

def create_user_operation(session):

1014

user = User(

1015

id=9999,

1016

name="Retry Test User",

1017

email="retry@example.com"

1018

)

1019

session.add(user)

1020

return user

1021

1022

try:

1023

user = retry_sqlalchemy_operation(create_user_operation)

1024

print(f"Created user with retry: {user}")

1025

1026

except Exception as e:

1027

print(f"Failed even with retries: {e}")

1028

1029

handle_sqlalchemy_errors()

1030

```

1031

1032

### Performance Optimization

1033

1034

```python

1035

def optimize_sqlalchemy_performance():

1036

"""Demonstrate performance optimization techniques."""

1037

1038

# Bulk operations (more efficient than individual inserts)

1039

def bulk_insert_users(user_data_list):

1040

"""Efficiently insert multiple users."""

1041

1042

session = Session()

1043

1044

try:

1045

# Method 1: bulk_insert_mappings (fastest)

1046

session.bulk_insert_mappings(User, user_data_list)

1047

session.commit()

1048

1049

except Exception as e:

1050

session.rollback()

1051

print(f"Bulk insert failed: {e}")

1052

1053

finally:

1054

session.close()

1055

1056

# Efficient querying with eager loading

1057

def efficient_user_queries():

1058

"""Demonstrate efficient querying patterns."""

1059

1060

session = Session()

1061

1062

try:

1063

# Use specific columns instead of SELECT *

1064

user_summaries = session.query(

1065

User.id,

1066

User.name,

1067

User.email

1068

).filter(User.is_active == True).all()

1069

1070

# Use LIMIT to avoid large result sets

1071

recent_users = session.query(User)\

1072

.order_by(User.created_at.desc())\

1073

.limit(100)\

1074

.all()

1075

1076

# Use EXISTS for existence checks

1077

has_active_users = session.query(

1078

session.query(User)

1079

.filter(User.is_active == True)

1080

.exists()

1081

).scalar()

1082

1083

print(f"Has active users: {has_active_users}")

1084

1085

# Batch processing for large datasets

1086

batch_size = 1000

1087

offset = 0

1088

1089

while True:

1090

batch = session.query(User)\

1091

.offset(offset)\

1092

.limit(batch_size)\

1093

.all()

1094

1095

if not batch:

1096

break

1097

1098

# Process batch

1099

for user in batch:

1100

# Process individual user

1101

pass

1102

1103

offset += batch_size

1104

1105

finally:

1106

session.close()

1107

1108

# Connection optimization

1109

def optimize_connections():

1110

"""Optimize connection usage."""

1111

1112

# Use connection pooling

1113

optimized_engine = sa.create_engine(

1114

"yql://localhost:2136/local",

1115

pool_size=20, # Number of connections to maintain

1116

max_overflow=50, # Additional connections when needed

1117

pool_timeout=30, # Timeout when getting connection

1118

pool_recycle=3600, # Recycle connections after 1 hour

1119

pool_pre_ping=True, # Verify connections before use

1120

echo_pool=True # Log pool events

1121

)

1122

1123

# Use scoped sessions for thread safety

1124

from sqlalchemy.orm import scoped_session

1125

1126

ScopedSession = scoped_session(sessionmaker(bind=optimized_engine))

1127

1128

# Session will be automatically managed per thread

1129

session = ScopedSession()

1130

1131

try:

1132

users = session.query(User).limit(10).all()

1133

return users

1134

1135

finally:

1136

ScopedSession.remove() # Clean up thread-local session

1137

1138

# Generate test data

1139

test_users = [

1140

{

1141

"id": i,

1142

"name": f"User {i}",

1143

"email": f"user{i}@example.com",

1144

"age": 20 + (i % 50),

1145

"is_active": i % 10 != 0

1146

}

1147

for i in range(1000, 2000)

1148

]

1149

1150

# Perform bulk insert

1151

print("Starting bulk insert...")

1152

import time

1153

start_time = time.time()

1154

1155

bulk_insert_users(test_users)

1156

1157

end_time = time.time()

1158

print(f"Bulk insert completed in {end_time - start_time:.2f} seconds")

1159

1160

# Run efficient queries

1161

efficient_user_queries()

1162

1163

optimize_sqlalchemy_performance()

1164

```

1165

1166

## Type Definitions

1167

1168

```python { .api }

1169

# Type aliases for SQLAlchemy integration

1170

YdbEngine = sa.Engine

1171

YdbConnection = sa.Connection

1172

YdbSession = sa.orm.Session

1173

YdbMetadata = sa.MetaData

1174

1175

# ORM types

1176

YdbModel = declarative_base()

1177

YdbColumn = sa.Column

1178

YdbTable = sa.Table

1179

1180

# Query types

1181

YdbQuery = sa.orm.Query

1182

YdbResult = sa.engine.Result

1183

YdbResultProxy = sa.engine.ResultProxy

1184

1185

# Type mapping

1186

SqlaType = sa.types.TypeEngine

1187

YdbType = Union[UInt8, UInt32, UInt64, YdbDecimal, YdbDateTime, YdbJson]

1188

TypeMapping = Dict[str, SqlaType]

1189

1190

# Connection string format

1191

# yql://[host]:[port]/[database]?[parameters]

1192

ConnectionString = str

1193

```