Officially supported Python client for YDB distributed SQL database
Standard Python DB-API 2.0 compliant interface providing familiar database connectivity patterns.
The YDB DB-API module provides standard database interface compliance.
import ydb.dbapi
# DB-API 2.0 module information
version: str = "0.0.31"
version_info: Tuple[int, int, int] = (1, 0, 0)
apilevel: str = "1.0"
threadsafety: int = 0
paramstyle: str = "qmark"
# Exception hierarchy
errors: Tuple[Type[Exception], ...] = (
Warning,
Error,
InterfaceError,
DatabaseError,
DataError,
OperationalError,
IntegrityError,
InternalError,
ProgrammingError,
NotSupportedError,
)
def connect(*args, **kwargs) -> Connection:
"""
Create new database connection.
Args:
*args: Connection arguments (endpoint, database, etc.)
**kwargs: Connection keyword arguments
Returns:
Connection: DB-API connection object
"""Database connection implementing DB-API 2.0 connection interface.
class Connection:
def __init__(
self,
endpoint: str = None,
database: str = None,
credentials: ydb.Credentials = None,
driver_config: ydb.DriverConfig = None,
**kwargs
):
"""
Create YDB database connection.
Args:
endpoint (str, optional): YDB endpoint URL
database (str, optional): Database path
credentials (ydb.Credentials, optional): Authentication credentials
driver_config (ydb.DriverConfig, optional): Driver configuration
**kwargs: Additional connection parameters
"""
def cursor(self) -> Cursor:
"""
Create new cursor for executing statements.
Returns:
Cursor: DB-API cursor object
"""
def commit(self):
"""
Commit current transaction.
Note: YDB handles transactions at the session level.
This method ensures consistency with DB-API.
"""
def rollback(self):
"""
Rollback current transaction.
Note: YDB handles transactions at the session level.
This method ensures consistency with DB-API.
"""
def close(self):
"""
Close database connection and release resources.
"""
def __enter__(self) -> 'Connection':
"""
Enter connection context manager.
Returns:
Connection: Connection instance
"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit connection context manager and close connection.
"""
@property
def closed(self) -> bool:
"""
Check if connection is closed.
Returns:
bool: True if connection is closed
"""
def describe(self, table_path: str) -> List[TableColumn]:
"""
Get table description (YDB-specific extension).
Args:
table_path (str): Path to table
Returns:
List[TableColumn]: Table column descriptions
"""
def execute_scheme_query(self, query: str):
"""
Execute schema query (YDB-specific extension).
Args:
query (str): DDL query to execute
"""Cursor for executing SQL statements and fetching results.
class Cursor:
def __init__(self, connection: Connection):
"""
Create cursor for connection.
Args:
connection (Connection): Parent connection
"""
def execute(
self,
query: str,
parameters: Optional[Union[Tuple, Dict]] = None
):
"""
Execute SQL query with optional parameters.
Args:
query (str): SQL query to execute
parameters (Optional[Union[Tuple, Dict]]): Query parameters
"""
def executemany(
self,
query: str,
seq_of_parameters: Sequence[Union[Tuple, Dict]]
):
"""
Execute query multiple times with different parameter sets.
Args:
query (str): SQL query to execute
seq_of_parameters (Sequence): Sequence of parameter sets
"""
def fetchone(self) -> Optional[Tuple]:
"""
Fetch next row from query result.
Returns:
Optional[Tuple]: Next row as tuple or None if no more rows
"""
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
"""
Fetch multiple rows from query result.
Args:
size (Optional[int]): Number of rows to fetch (default: arraysize)
Returns:
List[Tuple]: List of rows as tuples
"""
def fetchall(self) -> List[Tuple]:
"""
Fetch all remaining rows from query result.
Returns:
List[Tuple]: All remaining rows as tuples
"""
def close(self):
"""
Close cursor and release resources.
"""
def __enter__(self) -> 'Cursor':
"""Enter cursor context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit cursor context manager and close cursor."""
def __iter__(self) -> Iterator[Tuple]:
"""
Iterate over query results.
Returns:
Iterator[Tuple]: Row iterator
"""
def __next__(self) -> Tuple:
"""
Get next row from iteration.
Returns:
Tuple: Next row
Raises:
StopIteration: When no more rows available
"""
@property
def description(self) -> Optional[List[Tuple]]:
"""
Get result set description.
Returns:
Optional[List[Tuple]]: Column descriptions as tuples
(name, type_code, display_size, internal_size,
precision, scale, null_ok)
"""
@property
def rowcount(self) -> int:
"""
Number of rows affected by last operation.
Returns:
int: Row count (-1 if not available)
"""
@property
def arraysize(self) -> int:
"""
Default number of rows to fetch with fetchmany().
Returns:
int: Array size for fetchmany()
"""
@arraysize.setter
def arraysize(self, size: int):
"""
Set array size for fetchmany().
Args:
size (int): New array size
"""
def setinputsizes(self, sizes: Sequence[Optional[int]]):
"""
Set input parameter sizes (no-op for YDB).
Args:
sizes (Sequence[Optional[int]]): Parameter sizes
"""
def setoutputsize(self, size: int, column: Optional[int] = None):
"""
Set output column size (no-op for YDB).
Args:
size (int): Output size
column (Optional[int]): Column index
"""
def callproc(self, procname: str, parameters: Tuple = None) -> Tuple:
"""
Call stored procedure (not supported in YDB).
Args:
procname (str): Procedure name
parameters (Tuple, optional): Procedure parameters
Returns:
Tuple: Procedure results
Raises:
NotSupportedError: Always raised as YDB doesn't support stored procedures
"""Standard DB-API exception hierarchy for error handling.
class Warning(Exception):
"""
Exception for important warnings.
"""
class Error(Exception):
"""
Base class for all database errors.
"""
class InterfaceError(Error):
"""
Exception for interface-related errors.
"""
class DatabaseError(Error):
"""
Exception for database-related errors.
"""
class DataError(DatabaseError):
"""
Exception for data processing errors.
"""
class OperationalError(DatabaseError):
"""
Exception for operational errors not under user control.
"""
class IntegrityError(DatabaseError):
"""
Exception for database integrity constraint violations.
"""
class InternalError(DatabaseError):
"""
Exception for database internal errors.
"""
class ProgrammingError(DatabaseError):
"""
Exception for programming errors in SQL or API usage.
"""
class NotSupportedError(DatabaseError):
"""
Exception for unsupported database features.
"""
def _map_ydb_error(ydb_error: ydb.Error) -> DatabaseError:
"""
Map YDB error to appropriate DB-API exception.
Args:
ydb_error (ydb.Error): YDB-specific error
Returns:
DatabaseError: Appropriate DB-API exception
"""Utilities for converting between YDB types and Python DB-API types.
class YdbTypeConverter:
"""
Converter between YDB types and Python DB-API types.
"""
@staticmethod
def ydb_to_python(ydb_value, ydb_type: ydb.Type):
"""
Convert YDB value to Python value.
Args:
ydb_value: YDB-typed value
ydb_type (ydb.Type): YDB type information
Returns:
Any: Python value
"""
@staticmethod
def python_to_ydb(python_value: Any) -> Tuple[Any, ydb.Type]:
"""
Convert Python value to YDB value and type.
Args:
python_value (Any): Python value
Returns:
Tuple[Any, ydb.Type]: YDB value and type
"""
@staticmethod
def get_type_code(ydb_type: ydb.Type) -> str:
"""
Get DB-API type code for YDB type.
Args:
ydb_type (ydb.Type): YDB type
Returns:
str: DB-API type code
"""
# Standard DB-API type objects
class DBAPIType:
"""DB-API type objects for parameter binding."""
STRING: str = "STRING"
BINARY: str = "BINARY"
NUMBER: str = "NUMBER"
DATETIME: str = "DATETIME"
ROWID: str = "ROWID"
def Date(year: int, month: int, day: int) -> datetime.date:
"""
Construct date value.
Args:
year (int): Year
month (int): Month
day (int): Day
Returns:
datetime.date: Date object
"""
def Time(hour: int, minute: int, second: int) -> datetime.time:
"""
Construct time value.
Args:
hour (int): Hour
minute (int): Minute
second (int): Second
Returns:
datetime.time: Time object
"""
def Timestamp(
year: int,
month: int,
day: int,
hour: int,
minute: int,
second: int
) -> datetime.datetime:
"""
Construct timestamp value.
Args:
year (int): Year
month (int): Month
day (int): Day
hour (int): Hour
minute (int): Minute
second (int): Second
Returns:
datetime.datetime: Timestamp object
"""
def DateFromTicks(ticks: float) -> datetime.date:
"""
Construct date from time ticks.
Args:
ticks (float): Time ticks since epoch
Returns:
datetime.date: Date object
"""
def TimeFromTicks(ticks: float) -> datetime.time:
"""
Construct time from time ticks.
Args:
ticks (float): Time ticks since epoch
Returns:
datetime.time: Time object
"""
def TimestampFromTicks(ticks: float) -> datetime.datetime:
"""
Construct timestamp from time ticks.
Args:
ticks (float): Time ticks since epoch
Returns:
datetime.datetime: Timestamp object
"""
def Binary(data: bytes) -> bytes:
"""
Construct binary value.
Args:
data (bytes): Binary data
Returns:
bytes: Binary object
"""Factory functions for creating standardized connections.
def connect(
endpoint: str = None,
database: str = None,
user: str = None,
password: str = None,
host: str = None,
port: int = None,
dsn: str = None,
credentials: ydb.Credentials = None,
**kwargs
) -> Connection:
"""
Create database connection with flexible parameter handling.
Args:
endpoint (str, optional): YDB endpoint URL
database (str, optional): Database path
user (str, optional): Username (for credential creation)
password (str, optional): Password (for credential creation)
host (str, optional): Hostname (alternative to endpoint)
port (int, optional): Port number (alternative to endpoint)
dsn (str, optional): Data source name
credentials (ydb.Credentials, optional): Pre-configured credentials
**kwargs: Additional connection parameters
Returns:
Connection: DB-API connection
"""
def create_connection_from_string(connection_string: str, **kwargs) -> Connection:
"""
Create connection from connection string.
Args:
connection_string (str): YDB connection string
**kwargs: Additional parameters
Returns:
Connection: DB-API connection
"""
class ConnectionPool:
"""
Basic connection pool for DB-API connections.
"""
def __init__(
self,
connection_factory: Callable[[], Connection],
max_connections: int = 10,
min_connections: int = 1
):
"""
Create connection pool.
Args:
connection_factory (Callable): Function to create new connections
max_connections (int): Maximum pool size
min_connections (int): Minimum pool size
"""
def get_connection(self, timeout: float = None) -> Connection:
"""
Get connection from pool.
Args:
timeout (float, optional): Acquisition timeout
Returns:
Connection: Available connection
"""
def return_connection(self, connection: Connection):
"""
Return connection to pool.
Args:
connection (Connection): Connection to return
"""
def close_all(self):
"""Close all connections in pool."""import ydb.dbapi
# Connect to YDB
connection = ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
)
try:
# Create cursor
cursor = connection.cursor()
# Execute simple query
cursor.execute("SELECT COUNT(*) FROM users")
result = cursor.fetchone()
print(f"User count: {result[0]}")
# Execute query with parameters
cursor.execute(
"SELECT name, age FROM users WHERE age > ? AND active = ?",
(25, True)
)
# Fetch results
rows = cursor.fetchall()
for row in rows:
print(f"Name: {row[0]}, Age: {row[1]}")
finally:
cursor.close()
connection.close()# Using context managers for automatic cleanup
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
with connection.cursor() as cursor:
# Execute multiple queries
queries = [
"SELECT COUNT(*) FROM users WHERE active = true",
"SELECT COUNT(*) FROM orders WHERE status = 'completed'",
"SELECT COUNT(*) FROM products WHERE in_stock = true"
]
results = {}
for query in queries:
cursor.execute(query)
count = cursor.fetchone()[0]
# Extract table name from query for results
table_name = query.split("FROM ")[1].split()[0]
results[table_name] = count
print("Table counts:", results)def demonstrate_parameters():
"""Demonstrate different parameter styles and safety."""
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
cursor = connection.cursor()
# Positional parameters (qmark style)
cursor.execute(
"SELECT * FROM users WHERE age BETWEEN ? AND ? AND city = ?",
(25, 35, "New York")
)
young_professionals = cursor.fetchall()
print(f"Found {len(young_professionals)} young professionals")
# Execute many with different parameter sets
insert_query = "INSERT INTO user_activity (user_id, activity, timestamp) VALUES (?, ?, ?)"
activity_data = [
(1, "login", "2024-01-01T10:00:00Z"),
(2, "purchase", "2024-01-01T11:30:00Z"),
(3, "logout", "2024-01-01T12:00:00Z"),
(1, "view_product", "2024-01-01T14:15:00Z"),
]
cursor.executemany(insert_query, activity_data)
connection.commit()
print(f"Inserted {len(activity_data)} activity records")
demonstrate_parameters()def process_large_result_set():
"""Demonstrate efficient result set processing."""
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
cursor = connection.cursor()
# Execute query that might return many rows
cursor.execute("""
SELECT user_id, name, email, registration_date, last_login
FROM users
WHERE registration_date >= '2023-01-01'
ORDER BY registration_date DESC
""")
# Check result metadata
if cursor.description:
print("Column information:")
for col_info in cursor.description:
name, type_code, display_size, internal_size, precision, scale, null_ok = col_info
print(f" {name}: {type_code} (nullable: {null_ok})")
# Process results in batches
cursor.arraysize = 100 # Fetch 100 rows at a time
batch_number = 1
while True:
rows = cursor.fetchmany()
if not rows:
break
print(f"Processing batch {batch_number} ({len(rows)} rows)")
for row in rows:
user_id, name, email, reg_date, last_login = row
# Process individual row
if last_login is None:
print(f"User {name} ({email}) never logged in")
else:
print(f"User {name} last seen: {last_login}")
batch_number += 1
print(f"Processed {batch_number - 1} batches total")
process_large_result_set()def handle_dbapi_errors():
"""Demonstrate DB-API error handling patterns."""
try:
connection = ydb.dbapi.connect(
endpoint="grpc://invalid:2136",
database="/local"
)
except ydb.dbapi.OperationalError as e:
print(f"Connection failed: {e}")
return
except ydb.dbapi.InterfaceError as e:
print(f"Interface error: {e}")
return
try:
with connection:
cursor = connection.cursor()
# This might cause a programming error
try:
cursor.execute("SELECT * FROM nonexistent_table")
except ydb.dbapi.ProgrammingError as e:
print(f"Query error (table doesn't exist): {e}")
# This might cause a data error
try:
cursor.execute("INSERT INTO users (id) VALUES (?)", ("not_a_number",))
except ydb.dbapi.DataError as e:
print(f"Data type error: {e}")
# This might cause an integrity error
try:
cursor.execute("INSERT INTO users (id, email) VALUES (?, ?)", (1, "duplicate@email.com"))
cursor.execute("INSERT INTO users (id, email) VALUES (?, ?)", (2, "duplicate@email.com"))
connection.commit()
except ydb.dbapi.IntegrityError as e:
print(f"Integrity constraint violation: {e}")
connection.rollback()
# Handle general database errors
try:
cursor.execute("SOME INVALID SQL SYNTAX")
except ydb.dbapi.DatabaseError as e:
print(f"General database error: {e}")
except ydb.dbapi.Error as e:
print(f"General YDB error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
if 'connection' in locals() and connection:
connection.close()
handle_dbapi_errors()def demonstrate_transactions():
"""Demonstrate transaction handling with DB-API."""
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
cursor = connection.cursor()
try:
# Start transaction (implicit)
# Transfer money between accounts
cursor.execute(
"UPDATE accounts SET balance = balance - ? WHERE account_id = ?",
(100.0, "account_1")
)
cursor.execute(
"UPDATE accounts SET balance = balance + ? WHERE account_id = ?",
(100.0, "account_2")
)
# Log transaction
cursor.execute(
"INSERT INTO transaction_log (from_account, to_account, amount, timestamp) VALUES (?, ?, ?, ?)",
("account_1", "account_2", 100.0, datetime.now())
)
# Commit transaction
connection.commit()
print("Transaction committed successfully")
except Exception as e:
# Rollback on any error
connection.rollback()
print(f"Transaction rolled back due to error: {e}")
def demonstrate_autocommit_mode():
"""Demonstrate autocommit behavior."""
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
cursor = connection.cursor()
# In YDB, each statement is typically auto-committed
# unless explicitly wrapped in a transaction
cursor.execute(
"INSERT INTO audit_log (action, timestamp) VALUES (?, ?)",
("user_login", datetime.now())
)
# This is automatically committed
print("Audit log entry added (auto-committed)")
# For multi-statement transactions, use explicit commit/rollback
cursor.execute("BEGIN TRANSACTION")
cursor.execute(
"UPDATE user_stats SET login_count = login_count + 1 WHERE user_id = ?",
(123,)
)
cursor.execute(
"INSERT INTO session_log (user_id, session_start) VALUES (?, ?)",
(123, datetime.now())
)
# Explicitly commit the transaction
cursor.execute("COMMIT TRANSACTION")
connection.commit()
print("Multi-statement transaction committed")
demonstrate_transactions()
demonstrate_autocommit_mode()def demonstrate_advanced_patterns():
"""Demonstrate advanced DB-API usage patterns."""
# Connection pooling
def create_connection():
return ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
)
pool = ydb.dbapi.ConnectionPool(
connection_factory=create_connection,
max_connections=10,
min_connections=2
)
try:
# Use pooled connection
connection = pool.get_connection(timeout=5.0)
with connection:
cursor = connection.cursor()
# Prepared statement pattern (simulate)
def execute_user_query(cursor, user_id, status):
cursor.execute(
"SELECT * FROM users WHERE id = ? AND status = ?",
(user_id, status)
)
return cursor.fetchall()
# Execute multiple times with different parameters
active_users = []
for user_id in [1, 2, 3, 4, 5]:
users = execute_user_query(cursor, user_id, "active")
active_users.extend(users)
print(f"Found {len(active_users)} active users")
# Cursor as iterator
cursor.execute("SELECT id, name FROM users LIMIT 10")
print("Users (using iterator):")
for row in cursor:
user_id, name = row
print(f" ID: {user_id}, Name: {name}")
# Custom result processing
cursor.execute("SELECT * FROM user_preferences")
# Convert rows to dictionaries
if cursor.description:
columns = [desc[0] for desc in cursor.description]
preferences = []
for row in cursor.fetchall():
row_dict = dict(zip(columns, row))
preferences.append(row_dict)
print(f"Loaded {len(preferences)} user preferences as dicts")
# Process as needed
for pref in preferences:
if pref.get('theme') == 'dark':
print(f"User {pref['user_id']} prefers dark theme")
finally:
pool.return_connection(connection)
pool.close_all()
def demonstrate_ydb_extensions():
"""Demonstrate YDB-specific DB-API extensions."""
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
# Use YDB-specific table description
try:
table_info = connection.describe("/local/users")
print("Table schema:")
for column in table_info:
print(f" {column.name}: {column.type}")
except AttributeError:
print("Table description not available")
# Execute schema queries (YDB extension)
try:
connection.execute_scheme_query("""
CREATE TABLE test_table (
id UInt64,
name Utf8,
PRIMARY KEY (id)
)
""")
print("Schema query executed")
except AttributeError:
print("Schema query execution not available")
except Exception as e:
print(f"Schema query failed: {e}")
demonstrate_advanced_patterns()
demonstrate_ydb_extensions()def integrate_with_pandas():
"""Demonstrate integration with pandas for data analysis."""
try:
import pandas as pd
with ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
) as connection:
# Read data into pandas DataFrame
query = """
SELECT
user_id,
age,
registration_date,
last_login_date,
total_orders,
total_spent
FROM user_analytics
WHERE registration_date >= '2023-01-01'
"""
# Execute query and get results
cursor = connection.cursor()
cursor.execute(query)
# Get column names
columns = [desc[0] for desc in cursor.description]
# Fetch all data
rows = cursor.fetchall()
# Create DataFrame
df = pd.DataFrame(rows, columns=columns)
print("Data loaded into pandas:")
print(df.info())
print("\nSample data:")
print(df.head())
# Perform analysis
print(f"\nAnalysis:")
print(f"Average age: {df['age'].mean():.1f}")
print(f"Average total spent: ${df['total_spent'].mean():.2f}")
print(f"Users with no logins: {df['last_login_date'].isna().sum()}")
# Group analysis
monthly_registrations = df.groupby(
df['registration_date'].dt.to_period('M')
).size()
print(f"\nMonthly registrations:")
print(monthly_registrations)
except ImportError:
print("pandas not available for integration")
except Exception as e:
print(f"Error in pandas integration: {e}")
def demonstrate_connection_url_formats():
"""Show different ways to specify connection parameters."""
# Method 1: Individual parameters
conn1 = ydb.dbapi.connect(
endpoint="grpc://localhost:2136",
database="/local"
)
# Method 2: Using host and port
conn2 = ydb.dbapi.connect(
host="localhost",
port=2136,
database="/local"
)
# Method 3: Using DSN-style string
conn3 = ydb.dbapi.create_connection_from_string(
"ydb://localhost:2136/local"
)
# Method 4: With credentials
credentials = ydb.StaticCredentials("your-token")
conn4 = ydb.dbapi.connect(
endpoint="grpcs://ydb.serverless.yandexcloud.net:2135",
database="/ru-central1/b1g8skpblkos03malf3s/etn01lrprvnlnhv8v5kf",
credentials=credentials
)
# Test connections
connections = [conn1, conn2, conn3, conn4]
for i, conn in enumerate(connections, 1):
try:
if conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
print(f"Connection {i}: Success - {result}")
cursor.close()
conn.close()
except Exception as e:
print(f"Connection {i}: Failed - {e}")
if conn:
conn.close()
integrate_with_pandas()
demonstrate_connection_url_formats()# Type aliases for DB-API interface
DBConnection = Connection
DBCursor = Cursor
DBError = Error
# Parameter types
Parameters = Union[Tuple, Dict[str, Any], None]
ParameterSequence = Sequence[Parameters]
# Result types
Row = Tuple[Any, ...]
ResultSet = List[Row]
ColumnDescription = Tuple[str, str, Optional[int], Optional[int], Optional[int], Optional[int], bool]
# Connection parameters
ConnectionParams = Dict[str, Any]
ConnectionString = str
ConnectionFactory = Callable[[], Connection]
# Type objects for parameter binding
TypeObject = Union[str, type]
TypeMapping = Dict[str, TypeObject]
# Common patterns
QueryExecutor = Callable[[Cursor], Any]
ResultProcessor = Callable[[ResultSet], Any]
ErrorHandler = Callable[[Exception], bool]