SQLAlchemy driver for DuckDB enabling seamless integration between Python applications and DuckDB's analytical database engine
—
Connection and cursor wrapper classes that adapt DuckDB's Python API to SQLAlchemy's expected interface. These classes handle special DuckDB operations, DataFrame registration, and provide compatibility with SQLAlchemy's connection and cursor protocols.
Wraps DuckDB connections to provide SQLAlchemy-compatible interface and behavior.
class ConnectionWrapper:
"""
Wraps DuckDB connection for SQLAlchemy compatibility.
Provides the interface SQLAlchemy expects while delegating to
the underlying DuckDB connection for actual operations.
"""
autocommit = None # DuckDB doesn't support setting autocommit
closed: bool = False
notices: List[str]
def __init__(self, c):
"""
Initialize connection wrapper.
Parameters:
- c (DuckDBPyConnection): DuckDB connection to wrap
"""
def cursor(self):
"""
Create a cursor for executing statements.
Returns:
CursorWrapper: Wrapped DuckDB cursor
"""
def close(self):
"""
Close the connection and mark as closed.
"""
def __getattr__(self, name):
"""
Delegate attribute access to underlying DuckDB connection.
Parameters:
- name (str): Attribute name
Returns:
Any: Attribute value from DuckDB connection
"""Wraps DuckDB cursors to provide SQLAlchemy-compatible interface with special handling for DuckDB operations.
class CursorWrapper:
"""
Wraps DuckDB cursor for SQLAlchemy compatibility.
Handles SQL execution, parameter binding, and special DuckDB
operations like DataFrame registration.
"""
def __init__(self, c, connection_wrapper):
"""
Initialize cursor wrapper.
Parameters:
- c (DuckDBPyConnection): DuckDB connection
- connection_wrapper (ConnectionWrapper): Parent connection wrapper
"""
def execute(self, statement, parameters=None, context=None):
"""
Execute SQL statement with parameter binding.
Handles special cases for DuckDB including:
- COMMIT statements for ipython-sql compatibility
- REGISTER statements for DataFrame registration
- Standard SQL with parameter binding
Parameters:
- statement (str): SQL statement to execute
- parameters (Tuple, optional): Parameter values for binding
- context (Any, optional): Execution context (unused)
Raises:
NotImplementedError: For unsupported DuckDB operations
RuntimeError: For other DuckDB errors
"""
def executemany(self, statement, parameters=None, context=None):
"""
Execute SQL statement multiple times with different parameters.
Parameters:
- statement (str): SQL statement to execute
- parameters (List[Dict], optional): List of parameter dictionaries
- context (Any, optional): Execution context (unused)
"""
def fetchmany(self, size=None):
"""
Fetch multiple rows from query results.
Parameters:
- size (int, optional): Number of rows to fetch
Returns:
List: Fetched rows
"""
def close(self):
"""
Close cursor (no-op for DuckDB cursors).
"""
@property
def connection(self):
"""
Get parent connection wrapper.
Returns:
ConnectionWrapper: Parent connection
"""
def __getattr__(self, name):
"""
Delegate attribute access to underlying DuckDB connection.
Parameters:
- name (str): Attribute name
Returns:
Any: Attribute value from DuckDB connection
"""DBAPI 2.0 compatibility layer providing standard database interface constants and exceptions.
class DBAPI:
"""
Database API compatibility class providing DBAPI 2.0 interface.
Defines standard database interface constants and exception classes
for SQLAlchemy compatibility.
"""
paramstyle: str # "numeric_dollar" for SQLAlchemy 2.0+, "qmark" for 1.x
apilevel: str # From duckdb.apilevel
threadsafety: int # From duckdb.threadsafety
# Exception classes
Error: Type[Exception] # Base exception class
TransactionException: Type[Exception] # Transaction-related errors
ParserException: Type[Exception] # SQL parsing errors
@staticmethod
def Binary(x):
"""
Construct binary data type (pass-through for DuckDB).
Parameters:
- x (Any): Data to treat as binary
Returns:
Any: Input data unchanged
"""from duckdb_engine import ConnectionWrapper, CursorWrapper
import duckdb
# Create wrapped connection
raw_conn = duckdb.connect(':memory:')
conn = ConnectionWrapper(raw_conn)
# Use cursor for operations
cursor = conn.cursor()
cursor.execute("CREATE TABLE test (id INTEGER, name VARCHAR)")
cursor.execute("INSERT INTO test VALUES (?, ?)", (1, 'Alice'))
# Fetch results
cursor.execute("SELECT * FROM test")
results = cursor.fetchall()
print(results)
conn.close()import pandas as pd
# Create DataFrame
df = pd.DataFrame({
'id': [1, 2, 3],
'value': ['a', 'b', 'c']
})
# Register DataFrame with DuckDB (special REGISTER handling)
cursor.execute("REGISTER", ('my_df', df))
# Query the registered DataFrame
cursor.execute("SELECT * FROM my_df")
results = cursor.fetchall()from sqlalchemy import create_engine
# Engine automatically uses ConnectionWrapper/CursorWrapper
engine = create_engine('duckdb:///:memory:')
with engine.connect() as conn:
# All operations go through wrapper classes
result = conn.execute("SELECT 1 as test")
print(result.fetchone())from duckdb_engine import DBAPI
try:
cursor.execute("INVALID SQL")
except DBAPI.ParserException as e:
print(f"SQL parsing error: {e}")
except DBAPI.Error as e:
print(f"Database error: {e}")conn = ConnectionWrapper(duckdb.connect(':memory:'))
# Check connection state
print(f"Closed: {conn.closed}")
print(f"Notices: {conn.notices}")
# Close connection
conn.close()
print(f"Closed: {conn.closed}")Install with Tessl CLI
npx tessl i tessl/pypi-duckdb-engine