0
# SQLAlchemy Integration
1
2
SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.
3
4
## Capabilities
5
6
### YDB SQLAlchemy Dialect
7
8
The YDB SQLAlchemy dialect enables using YDB with SQLAlchemy ORM and core functionality.
9
10
```python { .api }
11
import ydb.sqlalchemy
12
13
class YqlDialect(DefaultDialect):
14
"""
15
SQLAlchemy dialect for YDB (YQL).
16
17
Provides SQLAlchemy integration for YDB database operations
18
including ORM support, query compilation, and type mapping.
19
"""
20
21
name = "yql"
22
supports_alter = False
23
max_identifier_length = 63
24
supports_sane_rowcount = False
25
supports_statement_cache = False
26
27
supports_native_enum = False
28
supports_native_boolean = True
29
supports_smallserial = False
30
31
supports_sequences = False
32
sequences_optional = True
33
preexecute_autoincrement_sequences = True
34
postfetch_lastrowid = False
35
36
supports_default_values = False
37
supports_empty_insert = False
38
supports_multivalues_insert = True
39
default_paramstyle = "qmark"
40
41
isolation_level = None
42
43
@staticmethod
44
def dbapi():
45
"""
46
Get YDB DB-API module.
47
48
Returns:
49
module: YDB DB-API 2.0 module
50
"""
51
import ydb.dbapi
52
return ydb.dbapi
53
54
def get_columns(
55
self,
56
connection,
57
table_name: str,
58
schema: str = None,
59
**kwargs
60
) -> List[Dict]:
61
"""
62
Get table column information for SQLAlchemy reflection.
63
64
Args:
65
connection: SQLAlchemy connection
66
table_name (str): Table name
67
schema (str, optional): Schema name (not supported)
68
**kwargs: Additional arguments
69
70
Returns:
71
List[Dict]: Column information dictionaries
72
"""
73
74
def has_table(
75
self,
76
connection,
77
table_name: str,
78
schema: str = None
79
) -> bool:
80
"""
81
Check if table exists.
82
83
Args:
84
connection: SQLAlchemy connection
85
table_name (str): Table name to check
86
schema (str, optional): Schema name (not supported)
87
88
Returns:
89
bool: True if table exists
90
"""
91
92
def register_dialect(
93
name: str = "yql",
94
module: str = None,
95
cls: str = "YqlDialect"
96
):
97
"""
98
Register YDB SQLAlchemy dialect.
99
100
Args:
101
name (str): Dialect name
102
module (str, optional): Module path
103
cls (str): Dialect class name
104
"""
105
```
106
107
### YDB-Specific SQLAlchemy Types
108
109
Custom SQLAlchemy types for YDB-specific data types.
110
111
```python { .api }
112
from sqlalchemy.types import Integer, TypeDecorator
113
114
class UInt8(Integer):
115
"""
116
SQLAlchemy type for YDB UInt8.
117
118
Maps to YDB's 8-bit unsigned integer type.
119
"""
120
121
def __init__(self):
122
"""Create UInt8 type."""
123
super().__init__()
124
125
class UInt32(Integer):
126
"""
127
SQLAlchemy type for YDB UInt32.
128
129
Maps to YDB's 32-bit unsigned integer type.
130
"""
131
132
def __init__(self):
133
"""Create UInt32 type."""
134
super().__init__()
135
136
class UInt64(Integer):
137
"""
138
SQLAlchemy type for YDB UInt64.
139
140
Maps to YDB's 64-bit unsigned integer type.
141
"""
142
143
def __init__(self):
144
"""Create UInt64 type."""
145
super().__init__()
146
147
class YdbDateTime(TypeDecorator):
148
"""
149
SQLAlchemy type decorator for YDB datetime handling.
150
151
Handles conversion between Python datetime and YDB datetime types.
152
"""
153
154
impl = sa.DATETIME
155
cache_ok = True
156
157
class YdbDecimal(TypeDecorator):
158
"""
159
SQLAlchemy type decorator for YDB decimal type.
160
161
Handles YDB decimal precision and scale requirements.
162
"""
163
164
impl = sa.DECIMAL
165
cache_ok = True
166
167
def __init__(self, precision: int = 22, scale: int = 9):
168
"""
169
Create YDB decimal type.
170
171
Args:
172
precision (int): Decimal precision (max 35)
173
scale (int): Decimal scale (max precision)
174
"""
175
super().__init__(precision=precision, scale=scale)
176
177
class YdbJson(TypeDecorator):
178
"""
179
SQLAlchemy type decorator for YDB JSON types.
180
181
Handles both Json and JsonDocument YDB types.
182
"""
183
184
impl = sa.JSON
185
cache_ok = True
186
```
187
188
### Type Compiler
189
190
Custom type compiler for translating SQLAlchemy types to YQL types.
191
192
```python { .api }
193
class YqlTypeCompiler(GenericTypeCompiler):
194
"""
195
Type compiler for converting SQLAlchemy types to YQL.
196
"""
197
198
def visit_VARCHAR(self, type_, **kwargs) -> str:
199
"""
200
Convert VARCHAR to YQL STRING type.
201
202
Args:
203
type_: SQLAlchemy VARCHAR type
204
**kwargs: Additional arguments
205
206
Returns:
207
str: YQL type string
208
"""
209
return "STRING"
210
211
def visit_unicode(self, type_, **kwargs) -> str:
212
"""
213
Convert Unicode to YQL UTF8 type.
214
215
Args:
216
type_: SQLAlchemy Unicode type
217
**kwargs: Additional arguments
218
219
Returns:
220
str: YQL type string
221
"""
222
return "UTF8"
223
224
def visit_TEXT(self, type_, **kwargs) -> str:
225
"""
226
Convert TEXT to YQL UTF8 type.
227
228
Args:
229
type_: SQLAlchemy TEXT type
230
**kwargs: Additional arguments
231
232
Returns:
233
str: YQL type string
234
"""
235
return "UTF8"
236
237
def visit_BOOLEAN(self, type_, **kwargs) -> str:
238
"""
239
Convert BOOLEAN to YQL BOOL type.
240
241
Args:
242
type_: SQLAlchemy BOOLEAN type
243
**kwargs: Additional arguments
244
245
Returns:
246
str: YQL type string
247
"""
248
return "BOOL"
249
250
def visit_FLOAT(self, type_, **kwargs) -> str:
251
"""
252
Convert FLOAT to YQL DOUBLE type.
253
254
Args:
255
type_: SQLAlchemy FLOAT type
256
**kwargs: Additional arguments
257
258
Returns:
259
str: YQL type string
260
"""
261
return "DOUBLE"
262
263
def visit_uint32(self, type_, **kwargs) -> str:
264
"""
265
Convert UInt32 to YQL UInt32 type.
266
267
Args:
268
type_: YDB UInt32 type
269
**kwargs: Additional arguments
270
271
Returns:
272
str: YQL type string
273
"""
274
return "UInt32"
275
276
def visit_uint64(self, type_, **kwargs) -> str:
277
"""
278
Convert UInt64 to YQL UInt64 type.
279
280
Args:
281
type_: YDB UInt64 type
282
**kwargs: Additional arguments
283
284
Returns:
285
str: YQL type string
286
"""
287
return "UInt64"
288
289
def visit_uint8(self, type_, **kwargs) -> str:
290
"""
291
Convert UInt8 to YQL UInt8 type.
292
293
Args:
294
type_: YDB UInt8 type
295
**kwargs: Additional arguments
296
297
Returns:
298
str: YQL type string
299
"""
300
return "UInt8"
301
```
302
303
### Query Compiler
304
305
Custom SQL compiler for generating YQL from SQLAlchemy expressions.
306
307
```python { .api }
308
class YqlCompiler(SQLCompiler):
309
"""
310
SQL compiler for generating YQL queries from SQLAlchemy expressions.
311
"""
312
313
def group_by_clause(self, select, **kwargs):
314
"""
315
Generate GROUP BY clause for YQL.
316
317
Args:
318
select: SQLAlchemy select object
319
**kwargs: Additional arguments
320
321
Returns:
322
str: YQL GROUP BY clause
323
"""
324
kwargs.update(within_columns_clause=True)
325
return super().group_by_clause(select, **kwargs)
326
327
def visit_function(self, func, add_to_result_map=None, **kwargs):
328
"""
329
Visit function expressions and convert to YQL syntax.
330
331
Args:
332
func: SQLAlchemy function expression
333
add_to_result_map: Result map callback
334
**kwargs: Additional arguments
335
336
Returns:
337
str: YQL function call
338
"""
339
# Handle YQL namespace syntax (::) instead of SQL (.)
340
disp = getattr(self, f"visit_{func.name.lower()}_func", None)
341
if disp:
342
return disp(func, **kwargs)
343
344
# Convert function names to YQL format
345
name = func.name
346
if hasattr(func, 'packagenames') and func.packagenames:
347
name = "::".join(func.packagenames + [name])
348
349
return f"{name}{self.function_argspec(func, **kwargs)}"
350
351
def visit_lambda(self, lambda_, **kwargs):
352
"""
353
Visit lambda expressions for YQL.
354
355
Args:
356
lambda_: Lambda expression
357
**kwargs: Additional arguments
358
359
Returns:
360
str: YQL lambda expression
361
"""
362
func = lambda_.func
363
spec = inspect_getfullargspec(func)
364
365
# Build YQL lambda syntax: ($arg1, $arg2) -> { RETURN expression; }
366
args_str = "(" + ", ".join(f"${arg}" for arg in spec.args) + ")"
367
368
# Create literal columns for lambda parameters
369
args = [literal_column(f"${arg}") for arg in spec.args]
370
body_expr = func(*args)
371
body_str = self.process(body_expr, **kwargs)
372
373
return f"{args_str} -> {{ RETURN {body_str}; }}"
374
375
class ParametrizedFunction(functions.Function):
376
"""
377
SQLAlchemy function with YQL-style parameters.
378
379
Supports YQL functions that take type parameters.
380
"""
381
382
__visit_name__ = "parametrized_function"
383
384
def __init__(self, name: str, params: List, *args, **kwargs):
385
"""
386
Create parametrized function.
387
388
Args:
389
name (str): Function name
390
params (List): Type parameters
391
*args: Function arguments
392
**kwargs: Additional arguments
393
"""
394
super().__init__(name, *args, **kwargs)
395
self._func_name = name
396
self._func_params = params
397
self.params_expr = ClauseList(
398
operator=functions.operators.comma_op,
399
group_contents=True,
400
*params
401
).self_group()
402
```
403
404
### Identifier Preparer
405
406
Custom identifier handling for YQL naming conventions.
407
408
```python { .api }
409
class YqlIdentifierPreparer(IdentifierPreparer):
410
"""
411
Identifier preparer for YQL naming conventions.
412
413
Handles quoting and escaping of identifiers in YQL.
414
"""
415
416
def __init__(self, dialect):
417
"""
418
Create YQL identifier preparer.
419
420
Args:
421
dialect: YQL dialect instance
422
"""
423
super().__init__(
424
dialect,
425
initial_quote="`",
426
final_quote="`",
427
)
428
429
def _requires_quotes(self, value: str) -> bool:
430
"""
431
Determine if identifier requires quoting.
432
433
Args:
434
value (str): Identifier value
435
436
Returns:
437
bool: True if quoting is required
438
"""
439
# Force quoting unless already quoted
440
return not (
441
value.startswith(self.initial_quote) and
442
value.endswith(self.final_quote)
443
)
444
445
def quote_identifier(self, value: str) -> str:
446
"""
447
Quote identifier for YQL.
448
449
Args:
450
value (str): Identifier to quote
451
452
Returns:
453
str: Quoted identifier
454
"""
455
if self._requires_quotes(value):
456
return f"{self.initial_quote}{value}{self.final_quote}"
457
return value
458
```
459
460
### Connection and Engine Creation
461
462
Utilities for creating SQLAlchemy engines and connections to YDB.
463
464
```python { .api }
465
def create_ydb_engine(
466
endpoint: str,
467
database: str,
468
credentials: ydb.Credentials = None,
469
**engine_kwargs
470
) -> sa.Engine:
471
"""
472
Create SQLAlchemy engine for YDB.
473
474
Args:
475
endpoint (str): YDB endpoint URL
476
database (str): Database path
477
credentials (ydb.Credentials, optional): Authentication credentials
478
**engine_kwargs: Additional engine arguments
479
480
Returns:
481
sa.Engine: SQLAlchemy engine configured for YDB
482
"""
483
# Register YDB dialect
484
register_dialect()
485
486
# Build connection string
487
connection_string = f"yql://{endpoint.replace('grpc://', '')}{database}"
488
489
# Create engine with YDB-specific settings
490
engine = sa.create_engine(
491
connection_string,
492
credentials=credentials,
493
**engine_kwargs
494
)
495
496
return engine
497
498
def get_ydb_metadata(engine: sa.Engine) -> sa.MetaData:
499
"""
500
Get SQLAlchemy metadata with YDB table reflection.
501
502
Args:
503
engine (sa.Engine): YDB SQLAlchemy engine
504
505
Returns:
506
sa.MetaData: Metadata with reflected tables
507
"""
508
metadata = sa.MetaData()
509
metadata.reflect(bind=engine)
510
return metadata
511
512
class YdbEngineBuilder:
513
"""
514
Builder for YDB SQLAlchemy engines with configuration.
515
"""
516
517
def __init__(self):
518
"""Create engine builder."""
519
self.endpoint = None
520
self.database = None
521
self.credentials = None
522
self.engine_options = {}
523
524
def with_endpoint(self, endpoint: str) -> 'YdbEngineBuilder':
525
"""
526
Set YDB endpoint.
527
528
Args:
529
endpoint (str): YDB endpoint URL
530
531
Returns:
532
YdbEngineBuilder: Self for chaining
533
"""
534
self.endpoint = endpoint
535
return self
536
537
def with_database(self, database: str) -> 'YdbEngineBuilder':
538
"""
539
Set database path.
540
541
Args:
542
database (str): Database path
543
544
Returns:
545
YdbEngineBuilder: Self for chaining
546
"""
547
self.database = database
548
return self
549
550
def with_credentials(self, credentials: ydb.Credentials) -> 'YdbEngineBuilder':
551
"""
552
Set authentication credentials.
553
554
Args:
555
credentials (ydb.Credentials): YDB credentials
556
557
Returns:
558
YdbEngineBuilder: Self for chaining
559
"""
560
self.credentials = credentials
561
return self
562
563
def with_pool_size(self, size: int) -> 'YdbEngineBuilder':
564
"""
565
Set connection pool size.
566
567
Args:
568
size (int): Pool size
569
570
Returns:
571
YdbEngineBuilder: Self for chaining
572
"""
573
self.engine_options['pool_size'] = size
574
return self
575
576
def build(self) -> sa.Engine:
577
"""
578
Build SQLAlchemy engine.
579
580
Returns:
581
sa.Engine: Configured YDB engine
582
"""
583
if not self.endpoint or not self.database:
584
raise ValueError("Endpoint and database are required")
585
586
return create_ydb_engine(
587
self.endpoint,
588
self.database,
589
self.credentials,
590
**self.engine_options
591
)
592
```
593
594
## Usage Examples
595
596
### Basic SQLAlchemy Setup
597
598
```python
599
import sqlalchemy as sa
600
import ydb.sqlalchemy
601
from sqlalchemy.ext.declarative import declarative_base
602
from sqlalchemy.orm import sessionmaker
603
604
# Register YDB dialect
605
ydb.sqlalchemy.register_dialect()
606
607
# Create engine
608
engine = sa.create_engine("yql://localhost:2136/local")
609
610
# Create metadata and base class
611
Base = declarative_base()
612
metadata = sa.MetaData()
613
614
# Define ORM model
615
class User(Base):
616
__tablename__ = 'users'
617
618
id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)
619
name = sa.Column(sa.Unicode(255), nullable=False)
620
email = sa.Column(sa.Unicode(255), unique=True)
621
age = sa.Column(ydb.sqlalchemy.UInt32)
622
created_at = sa.Column(sa.DateTime, default=sa.func.CurrentUtcDatetime())
623
is_active = sa.Column(sa.Boolean, default=True)
624
metadata_json = sa.Column(sa.JSON)
625
626
def __repr__(self):
627
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
628
629
# Create session factory
630
Session = sessionmaker(bind=engine)
631
```
632
633
### ORM Operations
634
635
```python
636
def demonstrate_orm_operations():
637
"""Demonstrate basic ORM operations with YDB."""
638
639
session = Session()
640
641
try:
642
# Create new user
643
new_user = User(
644
id=1001,
645
name="Alice Johnson",
646
email="alice@example.com",
647
age=25,
648
metadata_json={"department": "engineering", "level": "senior"}
649
)
650
651
session.add(new_user)
652
session.commit()
653
654
# Query users
655
users = session.query(User).filter(User.age > 21).all()
656
print(f"Found {len(users)} users over 21")
657
658
# Update user
659
user = session.query(User).filter(User.email == "alice@example.com").first()
660
if user:
661
user.age = 26
662
user.metadata_json = {"department": "engineering", "level": "lead"}
663
session.commit()
664
print(f"Updated user: {user}")
665
666
# Aggregate query
667
avg_age = session.query(sa.func.avg(User.age)).scalar()
668
print(f"Average age: {avg_age}")
669
670
# Complex query with joins and filters
671
active_engineers = session.query(User).filter(
672
User.is_active == True,
673
User.metadata_json['department'].astext == 'engineering'
674
).order_by(User.created_at.desc()).all()
675
676
print(f"Active engineers: {len(active_engineers)}")
677
678
except Exception as e:
679
session.rollback()
680
print(f"Error in ORM operations: {e}")
681
682
finally:
683
session.close()
684
685
demonstrate_orm_operations()
686
```
687
688
### Advanced YQL Features
689
690
```python
691
def demonstrate_yql_features():
692
"""Demonstrate YQL-specific features through SQLAlchemy."""
693
694
session = Session()
695
696
try:
697
# Use YQL functions through SQLAlchemy
698
query = session.query(
699
User.name,
700
sa.func.ListLength(
701
sa.func.String_SplitToList(User.name, sa.literal(" "))
702
).label("name_parts_count")
703
).filter(User.is_active == True)
704
705
for user_name, parts_count in query:
706
print(f"User: {user_name}, Name parts: {parts_count}")
707
708
# YQL lambda expressions (requires custom compilation)
709
# This would need additional dialect customization
710
711
# Window functions
712
ranked_users = session.query(
713
User.name,
714
User.age,
715
sa.func.row_number().over(
716
order_by=User.age.desc()
717
).label("age_rank")
718
).all()
719
720
for name, age, rank in ranked_users:
721
print(f"Rank {rank}: {name} (age {age})")
722
723
# JSON operations
724
engineering_users = session.query(User).filter(
725
sa.func.JsonValue(
726
User.metadata_json,
727
'strict $.department'
728
) == 'engineering'
729
).all()
730
731
print(f"Engineering users: {len(engineering_users)}")
732
733
except Exception as e:
734
session.rollback()
735
print(f"Error in YQL operations: {e}")
736
737
finally:
738
session.close()
739
740
demonstrate_yql_features()
741
```
742
743
### Table Reflection and Inspection
744
745
```python
746
def inspect_ydb_schema():
747
"""Inspect YDB schema through SQLAlchemy reflection."""
748
749
# Reflect existing tables
750
metadata = sa.MetaData()
751
metadata.reflect(bind=engine)
752
753
print("Reflected tables:")
754
for table_name, table in metadata.tables.items():
755
print(f"\nTable: {table_name}")
756
print("Columns:")
757
758
for column in table.columns:
759
nullable = "NULL" if column.nullable else "NOT NULL"
760
primary = "PRIMARY KEY" if column.primary_key else ""
761
print(f" {column.name}: {column.type} {nullable} {primary}")
762
763
# Print indexes if any
764
if table.indexes:
765
print("Indexes:")
766
for index in table.indexes:
767
unique = "UNIQUE" if index.unique else ""
768
columns = ", ".join([col.name for col in index.columns])
769
print(f" {index.name}: {unique} ({columns})")
770
771
# Get table info using engine
772
inspector = sa.inspect(engine)
773
774
print("\nInspected table names:")
775
table_names = inspector.get_table_names()
776
for name in table_names:
777
print(f" {name}")
778
779
# Get column info
780
columns = inspector.get_columns(name)
781
for col in columns:
782
print(f" {col['name']}: {col['type']} (nullable: {col['nullable']})")
783
784
inspect_ydb_schema()
785
```
786
787
### Custom YDB Types Usage
788
789
```python
790
from decimal import Decimal
791
from datetime import datetime
792
793
def demonstrate_custom_types():
794
"""Demonstrate YDB-specific SQLAlchemy types."""
795
796
# Define table with YDB-specific types
797
class YdbTypeDemo(Base):
798
__tablename__ = 'ydb_type_demo'
799
800
id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)
801
small_int = sa.Column(ydb.sqlalchemy.UInt8)
802
medium_int = sa.Column(ydb.sqlalchemy.UInt32)
803
large_int = sa.Column(ydb.sqlalchemy.UInt64)
804
precise_decimal = sa.Column(ydb.sqlalchemy.YdbDecimal(precision=22, scale=9))
805
timestamp_field = sa.Column(ydb.sqlalchemy.YdbDateTime)
806
json_data = sa.Column(ydb.sqlalchemy.YdbJson)
807
808
# Create table (would need to be done outside SQLAlchemy for YDB)
809
# Base.metadata.create_all(engine) # Not supported in YDB
810
811
session = Session()
812
813
try:
814
# Insert with YDB types
815
demo_record = YdbTypeDemo(
816
id=1,
817
small_int=255, # Max UInt8
818
medium_int=4294967295, # Max UInt32
819
large_int=18446744073709551615, # Max UInt64
820
precise_decimal=Decimal('123456789.123456789'),
821
timestamp_field=datetime.now(),
822
json_data={
823
"nested": {"key": "value"},
824
"array": [1, 2, 3],
825
"boolean": True
826
}
827
)
828
829
session.add(demo_record)
830
session.commit()
831
832
# Query with type-specific operations
833
results = session.query(YdbTypeDemo).filter(
834
YdbTypeDemo.small_int > 100,
835
YdbTypeDemo.precise_decimal > Decimal('100')
836
).all()
837
838
for record in results:
839
print(f"ID: {record.id}")
840
print(f"Small int: {record.small_int}")
841
print(f"Precise decimal: {record.precise_decimal}")
842
print(f"JSON data: {record.json_data}")
843
844
except Exception as e:
845
session.rollback()
846
print(f"Error with custom types: {e}")
847
848
finally:
849
session.close()
850
851
demonstrate_custom_types()
852
```
853
854
### Connection Management
855
856
```python
857
def demonstrate_connection_management():
858
"""Demonstrate advanced connection management with SQLAlchemy."""
859
860
# Create engine with custom configuration
861
engine = ydb.sqlalchemy.YdbEngineBuilder()\
862
.with_endpoint("grpc://localhost:2136")\
863
.with_database("/local")\
864
.with_credentials(ydb.AnonymousCredentials())\
865
.with_pool_size(10)\
866
.build()
867
868
# Connection pooling with custom settings
869
engine_with_pool = sa.create_engine(
870
"yql://localhost:2136/local",
871
pool_size=20,
872
max_overflow=30,
873
pool_timeout=30,
874
pool_recycle=3600, # Recycle connections every hour
875
echo=True # Log SQL queries
876
)
877
878
# Use connection context managers
879
with engine.connect() as connection:
880
# Execute raw YQL
881
result = connection.execute(sa.text("""
882
SELECT COUNT(*) as user_count
883
FROM users
884
WHERE is_active = true
885
"""))
886
887
count = result.scalar()
888
print(f"Active users: {count}")
889
890
# Execute with parameters
891
result = connection.execute(
892
sa.text("SELECT * FROM users WHERE age > :min_age"),
893
{"min_age": 25}
894
)
895
896
for row in result:
897
print(f"User: {row.name}, Age: {row.age}")
898
899
# Transaction management
900
with engine.begin() as connection:
901
# All operations in this block are in a transaction
902
connection.execute(
903
sa.text("UPDATE users SET is_active = false WHERE age < :min_age"),
904
{"min_age": 18}
905
)
906
907
connection.execute(
908
sa.text("INSERT INTO audit_log (action, timestamp) VALUES (:action, :ts)"),
909
{"action": "deactivated_minors", "ts": datetime.now()}
910
)
911
# Automatically commits on successful exit
912
913
# Session with custom configuration
914
SessionFactory = sessionmaker(
915
bind=engine,
916
expire_on_commit=False,
917
autoflush=True,
918
autocommit=False
919
)
920
921
session = SessionFactory()
922
try:
923
# Perform operations
924
users = session.query(User).limit(10).all()
925
print(f"Retrieved {len(users)} users")
926
927
finally:
928
session.close()
929
930
demonstrate_connection_management()
931
```
932
933
### Error Handling with SQLAlchemy
934
935
```python
936
def handle_sqlalchemy_errors():
937
"""Demonstrate error handling with SQLAlchemy and YDB."""
938
939
session = Session()
940
941
try:
942
# Operation that might fail
943
user = User(
944
id=1, # Might conflict with existing ID
945
name="Test User",
946
email="test@example.com"
947
)
948
949
session.add(user)
950
session.commit()
951
952
except sa.exc.IntegrityError as e:
953
session.rollback()
954
print(f"Integrity error (likely duplicate key): {e}")
955
956
except sa.exc.OperationalError as e:
957
session.rollback()
958
print(f"Operational error (YDB-specific): {e}")
959
960
# Check if it's a retryable YDB error
961
original_error = e.orig
962
if isinstance(original_error, ydb.RetryableError):
963
print("This error can be retried")
964
elif isinstance(original_error, ydb.BadRequestError):
965
print("Bad request - fix the query")
966
967
except sa.exc.DatabaseError as e:
968
session.rollback()
969
print(f"Database error: {e}")
970
971
except Exception as e:
972
session.rollback()
973
print(f"Unexpected error: {e}")
974
975
finally:
976
session.close()
977
978
# Retry logic with SQLAlchemy
979
def retry_sqlalchemy_operation(operation_func, max_retries=3):
980
"""Retry SQLAlchemy operations with YDB error handling."""
981
982
for attempt in range(max_retries):
983
session = Session()
984
985
try:
986
result = operation_func(session)
987
session.commit()
988
return result
989
990
except sa.exc.OperationalError as e:
991
session.rollback()
992
993
# Check if the underlying YDB error is retryable
994
if hasattr(e, 'orig') and isinstance(e.orig, ydb.RetryableError):
995
if attempt < max_retries - 1:
996
backoff_time = 2 ** attempt
997
print(f"Retrying in {backoff_time}s (attempt {attempt + 1})")
998
time.sleep(backoff_time)
999
continue
1000
1001
raise
1002
1003
except Exception as e:
1004
session.rollback()
1005
raise
1006
1007
finally:
1008
session.close()
1009
1010
raise RuntimeError("Max retries exceeded")
1011
1012
# Use retry logic
1013
def create_user_operation(session):
1014
user = User(
1015
id=9999,
1016
name="Retry Test User",
1017
email="retry@example.com"
1018
)
1019
session.add(user)
1020
return user
1021
1022
try:
1023
user = retry_sqlalchemy_operation(create_user_operation)
1024
print(f"Created user with retry: {user}")
1025
1026
except Exception as e:
1027
print(f"Failed even with retries: {e}")
1028
1029
handle_sqlalchemy_errors()
1030
```
1031
1032
### Performance Optimization
1033
1034
```python
1035
def optimize_sqlalchemy_performance():
1036
"""Demonstrate performance optimization techniques."""
1037
1038
# Bulk operations (more efficient than individual inserts)
1039
def bulk_insert_users(user_data_list):
1040
"""Efficiently insert multiple users."""
1041
1042
session = Session()
1043
1044
try:
1045
# Method 1: bulk_insert_mappings (fastest)
1046
session.bulk_insert_mappings(User, user_data_list)
1047
session.commit()
1048
1049
except Exception as e:
1050
session.rollback()
1051
print(f"Bulk insert failed: {e}")
1052
1053
finally:
1054
session.close()
1055
1056
# Efficient querying with eager loading
1057
def efficient_user_queries():
1058
"""Demonstrate efficient querying patterns."""
1059
1060
session = Session()
1061
1062
try:
1063
# Use specific columns instead of SELECT *
1064
user_summaries = session.query(
1065
User.id,
1066
User.name,
1067
User.email
1068
).filter(User.is_active == True).all()
1069
1070
# Use LIMIT to avoid large result sets
1071
recent_users = session.query(User)\
1072
.order_by(User.created_at.desc())\
1073
.limit(100)\
1074
.all()
1075
1076
# Use EXISTS for existence checks
1077
has_active_users = session.query(
1078
session.query(User)
1079
.filter(User.is_active == True)
1080
.exists()
1081
).scalar()
1082
1083
print(f"Has active users: {has_active_users}")
1084
1085
# Batch processing for large datasets
1086
batch_size = 1000
1087
offset = 0
1088
1089
while True:
1090
batch = session.query(User)\
1091
.offset(offset)\
1092
.limit(batch_size)\
1093
.all()
1094
1095
if not batch:
1096
break
1097
1098
# Process batch
1099
for user in batch:
1100
# Process individual user
1101
pass
1102
1103
offset += batch_size
1104
1105
finally:
1106
session.close()
1107
1108
# Connection optimization
1109
def optimize_connections():
1110
"""Optimize connection usage."""
1111
1112
# Use connection pooling
1113
optimized_engine = sa.create_engine(
1114
"yql://localhost:2136/local",
1115
pool_size=20, # Number of connections to maintain
1116
max_overflow=50, # Additional connections when needed
1117
pool_timeout=30, # Timeout when getting connection
1118
pool_recycle=3600, # Recycle connections after 1 hour
1119
pool_pre_ping=True, # Verify connections before use
1120
echo_pool=True # Log pool events
1121
)
1122
1123
# Use scoped sessions for thread safety
1124
from sqlalchemy.orm import scoped_session
1125
1126
ScopedSession = scoped_session(sessionmaker(bind=optimized_engine))
1127
1128
# Session will be automatically managed per thread
1129
session = ScopedSession()
1130
1131
try:
1132
users = session.query(User).limit(10).all()
1133
return users
1134
1135
finally:
1136
ScopedSession.remove() # Clean up thread-local session
1137
1138
# Generate test data
1139
test_users = [
1140
{
1141
"id": i,
1142
"name": f"User {i}",
1143
"email": f"user{i}@example.com",
1144
"age": 20 + (i % 50),
1145
"is_active": i % 10 != 0
1146
}
1147
for i in range(1000, 2000)
1148
]
1149
1150
# Perform bulk insert
1151
print("Starting bulk insert...")
1152
import time
1153
start_time = time.time()
1154
1155
bulk_insert_users(test_users)
1156
1157
end_time = time.time()
1158
print(f"Bulk insert completed in {end_time - start_time:.2f} seconds")
1159
1160
# Run efficient queries
1161
efficient_user_queries()
1162
1163
optimize_sqlalchemy_performance()
1164
```
1165
1166
## Type Definitions
1167
1168
```python { .api }
1169
# Type aliases for SQLAlchemy integration
1170
YdbEngine = sa.Engine
1171
YdbConnection = sa.Connection
1172
YdbSession = sa.orm.Session
1173
YdbMetadata = sa.MetaData
1174
1175
# ORM types
1176
YdbModel = declarative_base()
1177
YdbColumn = sa.Column
1178
YdbTable = sa.Table
1179
1180
# Query types
1181
YdbQuery = sa.orm.Query
1182
YdbResult = sa.engine.Result
1183
YdbResultProxy = sa.engine.ResultProxy
1184
1185
# Type mapping
1186
SqlaType = sa.types.TypeEngine
1187
YdbType = Union[UInt8, UInt32, UInt64, YdbDecimal, YdbDateTime, YdbJson]
1188
TypeMapping = Dict[str, SqlaType]
1189
1190
# Connection string format
1191
# yql://[host]:[port]/[database]?[parameters]
1192
ConnectionString = str
1193
```