0
# DB-API 2.0 Interface
1
2
Standard Python DB-API 2.0 compliant interface providing familiar database connectivity patterns.
3
4
## Capabilities
5
6
### DB-API Module Properties
7
8
The YDB DB-API module provides standard database interface compliance.
9
10
```python { .api }
11
import ydb.dbapi
12
13
# DB-API 2.0 module information
14
version: str = "0.0.31"
15
version_info: Tuple[int, int, int] = (1, 0, 0)
16
apilevel: str = "1.0"
17
threadsafety: int = 0
18
paramstyle: str = "qmark"
19
20
# Exception hierarchy
21
errors: Tuple[Type[Exception], ...] = (
22
Warning,
23
Error,
24
InterfaceError,
25
DatabaseError,
26
DataError,
27
OperationalError,
28
IntegrityError,
29
InternalError,
30
ProgrammingError,
31
NotSupportedError,
32
)
33
34
def connect(*args, **kwargs) -> Connection:
35
"""
36
Create new database connection.
37
38
Args:
39
*args: Connection arguments (endpoint, database, etc.)
40
**kwargs: Connection keyword arguments
41
42
Returns:
43
Connection: DB-API connection object
44
"""
45
```
46
47
### Connection Interface
48
49
Database connection implementing DB-API 2.0 connection interface.
50
51
```python { .api }
52
class Connection:
53
def __init__(
54
self,
55
endpoint: str = None,
56
database: str = None,
57
credentials: ydb.Credentials = None,
58
driver_config: ydb.DriverConfig = None,
59
**kwargs
60
):
61
"""
62
Create YDB database connection.
63
64
Args:
65
endpoint (str, optional): YDB endpoint URL
66
database (str, optional): Database path
67
credentials (ydb.Credentials, optional): Authentication credentials
68
driver_config (ydb.DriverConfig, optional): Driver configuration
69
**kwargs: Additional connection parameters
70
"""
71
72
def cursor(self) -> Cursor:
73
"""
74
Create new cursor for executing statements.
75
76
Returns:
77
Cursor: DB-API cursor object
78
"""
79
80
def commit(self):
81
"""
82
Commit current transaction.
83
84
Note: YDB handles transactions at the session level.
85
This method ensures consistency with DB-API.
86
"""
87
88
def rollback(self):
89
"""
90
Rollback current transaction.
91
92
Note: YDB handles transactions at the session level.
93
This method ensures consistency with DB-API.
94
"""
95
96
def close(self):
97
"""
98
Close database connection and release resources.
99
"""
100
101
def __enter__(self) -> 'Connection':
102
"""
103
Enter connection context manager.
104
105
Returns:
106
Connection: Connection instance
107
"""
108
109
def __exit__(self, exc_type, exc_val, exc_tb):
110
"""
111
Exit connection context manager and close connection.
112
"""
113
114
@property
115
def closed(self) -> bool:
116
"""
117
Check if connection is closed.
118
119
Returns:
120
bool: True if connection is closed
121
"""
122
123
def describe(self, table_path: str) -> List[TableColumn]:
124
"""
125
Get table description (YDB-specific extension).
126
127
Args:
128
table_path (str): Path to table
129
130
Returns:
131
List[TableColumn]: Table column descriptions
132
"""
133
134
def execute_scheme_query(self, query: str):
135
"""
136
Execute schema query (YDB-specific extension).
137
138
Args:
139
query (str): DDL query to execute
140
"""
141
```
142
143
### Cursor Interface
144
145
Cursor for executing SQL statements and fetching results.
146
147
```python { .api }
148
class Cursor:
149
def __init__(self, connection: Connection):
150
"""
151
Create cursor for connection.
152
153
Args:
154
connection (Connection): Parent connection
155
"""
156
157
def execute(
158
self,
159
query: str,
160
parameters: Optional[Union[Tuple, Dict]] = None
161
):
162
"""
163
Execute SQL query with optional parameters.
164
165
Args:
166
query (str): SQL query to execute
167
parameters (Optional[Union[Tuple, Dict]]): Query parameters
168
"""
169
170
def executemany(
171
self,
172
query: str,
173
seq_of_parameters: Sequence[Union[Tuple, Dict]]
174
):
175
"""
176
Execute query multiple times with different parameter sets.
177
178
Args:
179
query (str): SQL query to execute
180
seq_of_parameters (Sequence): Sequence of parameter sets
181
"""
182
183
def fetchone(self) -> Optional[Tuple]:
184
"""
185
Fetch next row from query result.
186
187
Returns:
188
Optional[Tuple]: Next row as tuple or None if no more rows
189
"""
190
191
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
192
"""
193
Fetch multiple rows from query result.
194
195
Args:
196
size (Optional[int]): Number of rows to fetch (default: arraysize)
197
198
Returns:
199
List[Tuple]: List of rows as tuples
200
"""
201
202
def fetchall(self) -> List[Tuple]:
203
"""
204
Fetch all remaining rows from query result.
205
206
Returns:
207
List[Tuple]: All remaining rows as tuples
208
"""
209
210
def close(self):
211
"""
212
Close cursor and release resources.
213
"""
214
215
def __enter__(self) -> 'Cursor':
216
"""Enter cursor context manager."""
217
218
def __exit__(self, exc_type, exc_val, exc_tb):
219
"""Exit cursor context manager and close cursor."""
220
221
def __iter__(self) -> Iterator[Tuple]:
222
"""
223
Iterate over query results.
224
225
Returns:
226
Iterator[Tuple]: Row iterator
227
"""
228
229
def __next__(self) -> Tuple:
230
"""
231
Get next row from iteration.
232
233
Returns:
234
Tuple: Next row
235
236
Raises:
237
StopIteration: When no more rows available
238
"""
239
240
@property
241
def description(self) -> Optional[List[Tuple]]:
242
"""
243
Get result set description.
244
245
Returns:
246
Optional[List[Tuple]]: Column descriptions as tuples
247
(name, type_code, display_size, internal_size,
248
precision, scale, null_ok)
249
"""
250
251
@property
252
def rowcount(self) -> int:
253
"""
254
Number of rows affected by last operation.
255
256
Returns:
257
int: Row count (-1 if not available)
258
"""
259
260
@property
261
def arraysize(self) -> int:
262
"""
263
Default number of rows to fetch with fetchmany().
264
265
Returns:
266
int: Array size for fetchmany()
267
"""
268
269
@arraysize.setter
270
def arraysize(self, size: int):
271
"""
272
Set array size for fetchmany().
273
274
Args:
275
size (int): New array size
276
"""
277
278
def setinputsizes(self, sizes: Sequence[Optional[int]]):
279
"""
280
Set input parameter sizes (no-op for YDB).
281
282
Args:
283
sizes (Sequence[Optional[int]]): Parameter sizes
284
"""
285
286
def setoutputsize(self, size: int, column: Optional[int] = None):
287
"""
288
Set output column size (no-op for YDB).
289
290
Args:
291
size (int): Output size
292
column (Optional[int]): Column index
293
"""
294
295
def callproc(self, procname: str, parameters: Tuple = None) -> Tuple:
296
"""
297
Call stored procedure (not supported in YDB).
298
299
Args:
300
procname (str): Procedure name
301
parameters (Tuple, optional): Procedure parameters
302
303
Returns:
304
Tuple: Procedure results
305
306
Raises:
307
NotSupportedError: Always raised as YDB doesn't support stored procedures
308
"""
309
```
310
311
### DB-API Exception Hierarchy
312
313
Standard DB-API exception hierarchy for error handling.
314
315
```python { .api }
316
class Warning(Exception):
317
"""
318
Exception for important warnings.
319
"""
320
321
class Error(Exception):
322
"""
323
Base class for all database errors.
324
"""
325
326
class InterfaceError(Error):
327
"""
328
Exception for interface-related errors.
329
"""
330
331
class DatabaseError(Error):
332
"""
333
Exception for database-related errors.
334
"""
335
336
class DataError(DatabaseError):
337
"""
338
Exception for data processing errors.
339
"""
340
341
class OperationalError(DatabaseError):
342
"""
343
Exception for operational errors not under user control.
344
"""
345
346
class IntegrityError(DatabaseError):
347
"""
348
Exception for database integrity constraint violations.
349
"""
350
351
class InternalError(DatabaseError):
352
"""
353
Exception for database internal errors.
354
"""
355
356
class ProgrammingError(DatabaseError):
357
"""
358
Exception for programming errors in SQL or API usage.
359
"""
360
361
class NotSupportedError(DatabaseError):
362
"""
363
Exception for unsupported database features.
364
"""
365
366
def _map_ydb_error(ydb_error: ydb.Error) -> DatabaseError:
367
"""
368
Map YDB error to appropriate DB-API exception.
369
370
Args:
371
ydb_error (ydb.Error): YDB-specific error
372
373
Returns:
374
DatabaseError: Appropriate DB-API exception
375
"""
376
```
377
378
### Type Mapping
379
380
Utilities for converting between YDB types and Python DB-API types.
381
382
```python { .api }
383
class YdbTypeConverter:
384
"""
385
Converter between YDB types and Python DB-API types.
386
"""
387
388
@staticmethod
389
def ydb_to_python(ydb_value, ydb_type: ydb.Type):
390
"""
391
Convert YDB value to Python value.
392
393
Args:
394
ydb_value: YDB-typed value
395
ydb_type (ydb.Type): YDB type information
396
397
Returns:
398
Any: Python value
399
"""
400
401
@staticmethod
402
def python_to_ydb(python_value: Any) -> Tuple[Any, ydb.Type]:
403
"""
404
Convert Python value to YDB value and type.
405
406
Args:
407
python_value (Any): Python value
408
409
Returns:
410
Tuple[Any, ydb.Type]: YDB value and type
411
"""
412
413
@staticmethod
414
def get_type_code(ydb_type: ydb.Type) -> str:
415
"""
416
Get DB-API type code for YDB type.
417
418
Args:
419
ydb_type (ydb.Type): YDB type
420
421
Returns:
422
str: DB-API type code
423
"""
424
425
# Standard DB-API type objects
426
class DBAPIType:
427
"""DB-API type objects for parameter binding."""
428
429
STRING: str = "STRING"
430
BINARY: str = "BINARY"
431
NUMBER: str = "NUMBER"
432
DATETIME: str = "DATETIME"
433
ROWID: str = "ROWID"
434
435
def Date(year: int, month: int, day: int) -> datetime.date:
436
"""
437
Construct date value.
438
439
Args:
440
year (int): Year
441
month (int): Month
442
day (int): Day
443
444
Returns:
445
datetime.date: Date object
446
"""
447
448
def Time(hour: int, minute: int, second: int) -> datetime.time:
449
"""
450
Construct time value.
451
452
Args:
453
hour (int): Hour
454
minute (int): Minute
455
second (int): Second
456
457
Returns:
458
datetime.time: Time object
459
"""
460
461
def Timestamp(
462
year: int,
463
month: int,
464
day: int,
465
hour: int,
466
minute: int,
467
second: int
468
) -> datetime.datetime:
469
"""
470
Construct timestamp value.
471
472
Args:
473
year (int): Year
474
month (int): Month
475
day (int): Day
476
hour (int): Hour
477
minute (int): Minute
478
second (int): Second
479
480
Returns:
481
datetime.datetime: Timestamp object
482
"""
483
484
def DateFromTicks(ticks: float) -> datetime.date:
485
"""
486
Construct date from time ticks.
487
488
Args:
489
ticks (float): Time ticks since epoch
490
491
Returns:
492
datetime.date: Date object
493
"""
494
495
def TimeFromTicks(ticks: float) -> datetime.time:
496
"""
497
Construct time from time ticks.
498
499
Args:
500
ticks (float): Time ticks since epoch
501
502
Returns:
503
datetime.time: Time object
504
"""
505
506
def TimestampFromTicks(ticks: float) -> datetime.datetime:
507
"""
508
Construct timestamp from time ticks.
509
510
Args:
511
ticks (float): Time ticks since epoch
512
513
Returns:
514
datetime.datetime: Timestamp object
515
"""
516
517
def Binary(data: bytes) -> bytes:
518
"""
519
Construct binary value.
520
521
Args:
522
data (bytes): Binary data
523
524
Returns:
525
bytes: Binary object
526
"""
527
```
528
529
### Connection Factory
530
531
Factory functions for creating standardized connections.
532
533
```python { .api }
534
def connect(
535
endpoint: str = None,
536
database: str = None,
537
user: str = None,
538
password: str = None,
539
host: str = None,
540
port: int = None,
541
dsn: str = None,
542
credentials: ydb.Credentials = None,
543
**kwargs
544
) -> Connection:
545
"""
546
Create database connection with flexible parameter handling.
547
548
Args:
549
endpoint (str, optional): YDB endpoint URL
550
database (str, optional): Database path
551
user (str, optional): Username (for credential creation)
552
password (str, optional): Password (for credential creation)
553
host (str, optional): Hostname (alternative to endpoint)
554
port (int, optional): Port number (alternative to endpoint)
555
dsn (str, optional): Data source name
556
credentials (ydb.Credentials, optional): Pre-configured credentials
557
**kwargs: Additional connection parameters
558
559
Returns:
560
Connection: DB-API connection
561
"""
562
563
def create_connection_from_string(connection_string: str, **kwargs) -> Connection:
564
"""
565
Create connection from connection string.
566
567
Args:
568
connection_string (str): YDB connection string
569
**kwargs: Additional parameters
570
571
Returns:
572
Connection: DB-API connection
573
"""
574
575
class ConnectionPool:
576
"""
577
Basic connection pool for DB-API connections.
578
"""
579
580
def __init__(
581
self,
582
connection_factory: Callable[[], Connection],
583
max_connections: int = 10,
584
min_connections: int = 1
585
):
586
"""
587
Create connection pool.
588
589
Args:
590
connection_factory (Callable): Function to create new connections
591
max_connections (int): Maximum pool size
592
min_connections (int): Minimum pool size
593
"""
594
595
def get_connection(self, timeout: float = None) -> Connection:
596
"""
597
Get connection from pool.
598
599
Args:
600
timeout (float, optional): Acquisition timeout
601
602
Returns:
603
Connection: Available connection
604
"""
605
606
def return_connection(self, connection: Connection):
607
"""
608
Return connection to pool.
609
610
Args:
611
connection (Connection): Connection to return
612
"""
613
614
def close_all(self):
615
"""Close all connections in pool."""
616
```
617
618
## Usage Examples
619
620
### Basic DB-API Usage
621
622
```python
623
import ydb.dbapi
624
625
# Connect to YDB
626
connection = ydb.dbapi.connect(
627
endpoint="grpc://localhost:2136",
628
database="/local"
629
)
630
631
try:
632
# Create cursor
633
cursor = connection.cursor()
634
635
# Execute simple query
636
cursor.execute("SELECT COUNT(*) FROM users")
637
result = cursor.fetchone()
638
print(f"User count: {result[0]}")
639
640
# Execute query with parameters
641
cursor.execute(
642
"SELECT name, age FROM users WHERE age > ? AND active = ?",
643
(25, True)
644
)
645
646
# Fetch results
647
rows = cursor.fetchall()
648
for row in rows:
649
print(f"Name: {row[0]}, Age: {row[1]}")
650
651
finally:
652
cursor.close()
653
connection.close()
654
```
655
656
### Context Manager Usage
657
658
```python
659
# Using context managers for automatic cleanup
660
with ydb.dbapi.connect(
661
endpoint="grpc://localhost:2136",
662
database="/local"
663
) as connection:
664
665
with connection.cursor() as cursor:
666
# Execute multiple queries
667
queries = [
668
"SELECT COUNT(*) FROM users WHERE active = true",
669
"SELECT COUNT(*) FROM orders WHERE status = 'completed'",
670
"SELECT COUNT(*) FROM products WHERE in_stock = true"
671
]
672
673
results = {}
674
for query in queries:
675
cursor.execute(query)
676
count = cursor.fetchone()[0]
677
678
# Extract table name from query for results
679
table_name = query.split("FROM ")[1].split()[0]
680
results[table_name] = count
681
682
print("Table counts:", results)
683
```
684
685
### Parameterized Queries
686
687
```python
688
def demonstrate_parameters():
689
"""Demonstrate different parameter styles and safety."""
690
691
with ydb.dbapi.connect(
692
endpoint="grpc://localhost:2136",
693
database="/local"
694
) as connection:
695
696
cursor = connection.cursor()
697
698
# Positional parameters (qmark style)
699
cursor.execute(
700
"SELECT * FROM users WHERE age BETWEEN ? AND ? AND city = ?",
701
(25, 35, "New York")
702
)
703
704
young_professionals = cursor.fetchall()
705
print(f"Found {len(young_professionals)} young professionals")
706
707
# Execute many with different parameter sets
708
insert_query = "INSERT INTO user_activity (user_id, activity, timestamp) VALUES (?, ?, ?)"
709
710
activity_data = [
711
(1, "login", "2024-01-01T10:00:00Z"),
712
(2, "purchase", "2024-01-01T11:30:00Z"),
713
(3, "logout", "2024-01-01T12:00:00Z"),
714
(1, "view_product", "2024-01-01T14:15:00Z"),
715
]
716
717
cursor.executemany(insert_query, activity_data)
718
connection.commit()
719
720
print(f"Inserted {len(activity_data)} activity records")
721
722
demonstrate_parameters()
723
```
724
725
### Result Processing
726
727
```python
728
def process_large_result_set():
729
"""Demonstrate efficient result set processing."""
730
731
with ydb.dbapi.connect(
732
endpoint="grpc://localhost:2136",
733
database="/local"
734
) as connection:
735
736
cursor = connection.cursor()
737
738
# Execute query that might return many rows
739
cursor.execute("""
740
SELECT user_id, name, email, registration_date, last_login
741
FROM users
742
WHERE registration_date >= '2023-01-01'
743
ORDER BY registration_date DESC
744
""")
745
746
# Check result metadata
747
if cursor.description:
748
print("Column information:")
749
for col_info in cursor.description:
750
name, type_code, display_size, internal_size, precision, scale, null_ok = col_info
751
print(f" {name}: {type_code} (nullable: {null_ok})")
752
753
# Process results in batches
754
cursor.arraysize = 100 # Fetch 100 rows at a time
755
756
batch_number = 1
757
while True:
758
rows = cursor.fetchmany()
759
if not rows:
760
break
761
762
print(f"Processing batch {batch_number} ({len(rows)} rows)")
763
764
for row in rows:
765
user_id, name, email, reg_date, last_login = row
766
767
# Process individual row
768
if last_login is None:
769
print(f"User {name} ({email}) never logged in")
770
else:
771
print(f"User {name} last seen: {last_login}")
772
773
batch_number += 1
774
775
print(f"Processed {batch_number - 1} batches total")
776
777
process_large_result_set()
778
```
779
780
### Error Handling
781
782
```python
783
def handle_dbapi_errors():
784
"""Demonstrate DB-API error handling patterns."""
785
786
try:
787
connection = ydb.dbapi.connect(
788
endpoint="grpc://invalid:2136",
789
database="/local"
790
)
791
792
except ydb.dbapi.OperationalError as e:
793
print(f"Connection failed: {e}")
794
return
795
796
except ydb.dbapi.InterfaceError as e:
797
print(f"Interface error: {e}")
798
return
799
800
try:
801
with connection:
802
cursor = connection.cursor()
803
804
# This might cause a programming error
805
try:
806
cursor.execute("SELECT * FROM nonexistent_table")
807
808
except ydb.dbapi.ProgrammingError as e:
809
print(f"Query error (table doesn't exist): {e}")
810
811
# This might cause a data error
812
try:
813
cursor.execute("INSERT INTO users (id) VALUES (?)", ("not_a_number",))
814
815
except ydb.dbapi.DataError as e:
816
print(f"Data type error: {e}")
817
818
# This might cause an integrity error
819
try:
820
cursor.execute("INSERT INTO users (id, email) VALUES (?, ?)", (1, "duplicate@email.com"))
821
cursor.execute("INSERT INTO users (id, email) VALUES (?, ?)", (2, "duplicate@email.com"))
822
connection.commit()
823
824
except ydb.dbapi.IntegrityError as e:
825
print(f"Integrity constraint violation: {e}")
826
connection.rollback()
827
828
# Handle general database errors
829
try:
830
cursor.execute("SOME INVALID SQL SYNTAX")
831
832
except ydb.dbapi.DatabaseError as e:
833
print(f"General database error: {e}")
834
835
except ydb.dbapi.Error as e:
836
print(f"General YDB error: {e}")
837
838
except Exception as e:
839
print(f"Unexpected error: {e}")
840
841
finally:
842
if 'connection' in locals() and connection:
843
connection.close()
844
845
handle_dbapi_errors()
846
```
847
848
### Transaction Management
849
850
```python
851
def demonstrate_transactions():
852
"""Demonstrate transaction handling with DB-API."""
853
854
with ydb.dbapi.connect(
855
endpoint="grpc://localhost:2136",
856
database="/local"
857
) as connection:
858
859
cursor = connection.cursor()
860
861
try:
862
# Start transaction (implicit)
863
864
# Transfer money between accounts
865
cursor.execute(
866
"UPDATE accounts SET balance = balance - ? WHERE account_id = ?",
867
(100.0, "account_1")
868
)
869
870
cursor.execute(
871
"UPDATE accounts SET balance = balance + ? WHERE account_id = ?",
872
(100.0, "account_2")
873
)
874
875
# Log transaction
876
cursor.execute(
877
"INSERT INTO transaction_log (from_account, to_account, amount, timestamp) VALUES (?, ?, ?, ?)",
878
("account_1", "account_2", 100.0, datetime.now())
879
)
880
881
# Commit transaction
882
connection.commit()
883
print("Transaction committed successfully")
884
885
except Exception as e:
886
# Rollback on any error
887
connection.rollback()
888
print(f"Transaction rolled back due to error: {e}")
889
890
def demonstrate_autocommit_mode():
891
"""Demonstrate autocommit behavior."""
892
893
with ydb.dbapi.connect(
894
endpoint="grpc://localhost:2136",
895
database="/local"
896
) as connection:
897
898
cursor = connection.cursor()
899
900
# In YDB, each statement is typically auto-committed
901
# unless explicitly wrapped in a transaction
902
903
cursor.execute(
904
"INSERT INTO audit_log (action, timestamp) VALUES (?, ?)",
905
("user_login", datetime.now())
906
)
907
908
# This is automatically committed
909
print("Audit log entry added (auto-committed)")
910
911
# For multi-statement transactions, use explicit commit/rollback
912
cursor.execute("BEGIN TRANSACTION")
913
914
cursor.execute(
915
"UPDATE user_stats SET login_count = login_count + 1 WHERE user_id = ?",
916
(123,)
917
)
918
919
cursor.execute(
920
"INSERT INTO session_log (user_id, session_start) VALUES (?, ?)",
921
(123, datetime.now())
922
)
923
924
# Explicitly commit the transaction
925
cursor.execute("COMMIT TRANSACTION")
926
connection.commit()
927
928
print("Multi-statement transaction committed")
929
930
demonstrate_transactions()
931
demonstrate_autocommit_mode()
932
```
933
934
### Advanced Usage Patterns
935
936
```python
937
def demonstrate_advanced_patterns():
938
"""Demonstrate advanced DB-API usage patterns."""
939
940
# Connection pooling
941
def create_connection():
942
return ydb.dbapi.connect(
943
endpoint="grpc://localhost:2136",
944
database="/local"
945
)
946
947
pool = ydb.dbapi.ConnectionPool(
948
connection_factory=create_connection,
949
max_connections=10,
950
min_connections=2
951
)
952
953
try:
954
# Use pooled connection
955
connection = pool.get_connection(timeout=5.0)
956
957
with connection:
958
cursor = connection.cursor()
959
960
# Prepared statement pattern (simulate)
961
def execute_user_query(cursor, user_id, status):
962
cursor.execute(
963
"SELECT * FROM users WHERE id = ? AND status = ?",
964
(user_id, status)
965
)
966
return cursor.fetchall()
967
968
# Execute multiple times with different parameters
969
active_users = []
970
for user_id in [1, 2, 3, 4, 5]:
971
users = execute_user_query(cursor, user_id, "active")
972
active_users.extend(users)
973
974
print(f"Found {len(active_users)} active users")
975
976
# Cursor as iterator
977
cursor.execute("SELECT id, name FROM users LIMIT 10")
978
979
print("Users (using iterator):")
980
for row in cursor:
981
user_id, name = row
982
print(f" ID: {user_id}, Name: {name}")
983
984
# Custom result processing
985
cursor.execute("SELECT * FROM user_preferences")
986
987
# Convert rows to dictionaries
988
if cursor.description:
989
columns = [desc[0] for desc in cursor.description]
990
991
preferences = []
992
for row in cursor.fetchall():
993
row_dict = dict(zip(columns, row))
994
preferences.append(row_dict)
995
996
print(f"Loaded {len(preferences)} user preferences as dicts")
997
998
# Process as needed
999
for pref in preferences:
1000
if pref.get('theme') == 'dark':
1001
print(f"User {pref['user_id']} prefers dark theme")
1002
1003
finally:
1004
pool.return_connection(connection)
1005
pool.close_all()
1006
1007
def demonstrate_ydb_extensions():
1008
"""Demonstrate YDB-specific DB-API extensions."""
1009
1010
with ydb.dbapi.connect(
1011
endpoint="grpc://localhost:2136",
1012
database="/local"
1013
) as connection:
1014
1015
# Use YDB-specific table description
1016
try:
1017
table_info = connection.describe("/local/users")
1018
1019
print("Table schema:")
1020
for column in table_info:
1021
print(f" {column.name}: {column.type}")
1022
1023
except AttributeError:
1024
print("Table description not available")
1025
1026
# Execute schema queries (YDB extension)
1027
try:
1028
connection.execute_scheme_query("""
1029
CREATE TABLE test_table (
1030
id UInt64,
1031
name Utf8,
1032
PRIMARY KEY (id)
1033
)
1034
""")
1035
print("Schema query executed")
1036
1037
except AttributeError:
1038
print("Schema query execution not available")
1039
1040
except Exception as e:
1041
print(f"Schema query failed: {e}")
1042
1043
demonstrate_advanced_patterns()
1044
demonstrate_ydb_extensions()
1045
```
1046
1047
### Integration with Other Libraries
1048
1049
```python
1050
def integrate_with_pandas():
1051
"""Demonstrate integration with pandas for data analysis."""
1052
1053
try:
1054
import pandas as pd
1055
1056
with ydb.dbapi.connect(
1057
endpoint="grpc://localhost:2136",
1058
database="/local"
1059
) as connection:
1060
1061
# Read data into pandas DataFrame
1062
query = """
1063
SELECT
1064
user_id,
1065
age,
1066
registration_date,
1067
last_login_date,
1068
total_orders,
1069
total_spent
1070
FROM user_analytics
1071
WHERE registration_date >= '2023-01-01'
1072
"""
1073
1074
# Execute query and get results
1075
cursor = connection.cursor()
1076
cursor.execute(query)
1077
1078
# Get column names
1079
columns = [desc[0] for desc in cursor.description]
1080
1081
# Fetch all data
1082
rows = cursor.fetchall()
1083
1084
# Create DataFrame
1085
df = pd.DataFrame(rows, columns=columns)
1086
1087
print("Data loaded into pandas:")
1088
print(df.info())
1089
print("\nSample data:")
1090
print(df.head())
1091
1092
# Perform analysis
1093
print(f"\nAnalysis:")
1094
print(f"Average age: {df['age'].mean():.1f}")
1095
print(f"Average total spent: ${df['total_spent'].mean():.2f}")
1096
print(f"Users with no logins: {df['last_login_date'].isna().sum()}")
1097
1098
# Group analysis
1099
monthly_registrations = df.groupby(
1100
df['registration_date'].dt.to_period('M')
1101
).size()
1102
1103
print(f"\nMonthly registrations:")
1104
print(monthly_registrations)
1105
1106
except ImportError:
1107
print("pandas not available for integration")
1108
1109
except Exception as e:
1110
print(f"Error in pandas integration: {e}")
1111
1112
def demonstrate_connection_url_formats():
1113
"""Show different ways to specify connection parameters."""
1114
1115
# Method 1: Individual parameters
1116
conn1 = ydb.dbapi.connect(
1117
endpoint="grpc://localhost:2136",
1118
database="/local"
1119
)
1120
1121
# Method 2: Using host and port
1122
conn2 = ydb.dbapi.connect(
1123
host="localhost",
1124
port=2136,
1125
database="/local"
1126
)
1127
1128
# Method 3: Using DSN-style string
1129
conn3 = ydb.dbapi.create_connection_from_string(
1130
"ydb://localhost:2136/local"
1131
)
1132
1133
# Method 4: With credentials
1134
credentials = ydb.StaticCredentials("your-token")
1135
conn4 = ydb.dbapi.connect(
1136
endpoint="grpcs://ydb.serverless.yandexcloud.net:2135",
1137
database="/ru-central1/b1g8skpblkos03malf3s/etn01lrprvnlnhv8v5kf",
1138
credentials=credentials
1139
)
1140
1141
# Test connections
1142
connections = [conn1, conn2, conn3, conn4]
1143
1144
for i, conn in enumerate(connections, 1):
1145
try:
1146
if conn:
1147
cursor = conn.cursor()
1148
cursor.execute("SELECT 1")
1149
result = cursor.fetchone()
1150
print(f"Connection {i}: Success - {result}")
1151
cursor.close()
1152
conn.close()
1153
1154
except Exception as e:
1155
print(f"Connection {i}: Failed - {e}")
1156
if conn:
1157
conn.close()
1158
1159
integrate_with_pandas()
1160
demonstrate_connection_url_formats()
1161
```
1162
1163
## Type Definitions
1164
1165
```python { .api }
1166
# Type aliases for DB-API interface
1167
DBConnection = Connection
1168
DBCursor = Cursor
1169
DBError = Error
1170
1171
# Parameter types
1172
Parameters = Union[Tuple, Dict[str, Any], None]
1173
ParameterSequence = Sequence[Parameters]
1174
1175
# Result types
1176
Row = Tuple[Any, ...]
1177
ResultSet = List[Row]
1178
ColumnDescription = Tuple[str, str, Optional[int], Optional[int], Optional[int], Optional[int], bool]
1179
1180
# Connection parameters
1181
ConnectionParams = Dict[str, Any]
1182
ConnectionString = str
1183
ConnectionFactory = Callable[[], Connection]
1184
1185
# Type objects for parameter binding
1186
TypeObject = Union[str, type]
1187
TypeMapping = Dict[str, TypeObject]
1188
1189
# Common patterns
1190
QueryExecutor = Callable[[Cursor], Any]
1191
ResultProcessor = Callable[[ResultSet], Any]
1192
ErrorHandler = Callable[[Exception], bool]
1193
```