CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-duckdb-engine

SQLAlchemy driver for DuckDB enabling seamless integration between Python applications and DuckDB's analytical database engine

Pending
Overview
Eval results
Files

connection-management.mddocs/

Connection and Cursor Management

Connection and cursor wrapper classes that adapt DuckDB's Python API to SQLAlchemy's expected interface. These classes handle special DuckDB operations, DataFrame registration, and provide compatibility with SQLAlchemy's connection and cursor protocols.

Capabilities

Connection Wrapper

Wraps DuckDB connections to provide SQLAlchemy-compatible interface and behavior.

class ConnectionWrapper:
    """
    Wraps DuckDB connection for SQLAlchemy compatibility.
    
    Provides the interface SQLAlchemy expects while delegating to 
    the underlying DuckDB connection for actual operations.
    """
    
    autocommit = None  # DuckDB doesn't support setting autocommit
    closed: bool = False
    notices: List[str]
    
    def __init__(self, c):
        """
        Initialize connection wrapper.
        
        Parameters:
        - c (DuckDBPyConnection): DuckDB connection to wrap
        """
    
    def cursor(self):
        """
        Create a cursor for executing statements.
        
        Returns:
        CursorWrapper: Wrapped DuckDB cursor
        """
    
    def close(self):
        """
        Close the connection and mark as closed.
        """
    
    def __getattr__(self, name):
        """
        Delegate attribute access to underlying DuckDB connection.
        
        Parameters:
        - name (str): Attribute name
        
        Returns:
        Any: Attribute value from DuckDB connection
        """

Cursor Wrapper

Wraps DuckDB cursors to provide SQLAlchemy-compatible interface with special handling for DuckDB operations.

class CursorWrapper:
    """
    Wraps DuckDB cursor for SQLAlchemy compatibility.
    
    Handles SQL execution, parameter binding, and special DuckDB 
    operations like DataFrame registration.
    """
    
    def __init__(self, c, connection_wrapper):
        """
        Initialize cursor wrapper.
        
        Parameters:
        - c (DuckDBPyConnection): DuckDB connection
        - connection_wrapper (ConnectionWrapper): Parent connection wrapper
        """
    
    def execute(self, statement, parameters=None, context=None):
        """
        Execute SQL statement with parameter binding.
        
        Handles special cases for DuckDB including:
        - COMMIT statements for ipython-sql compatibility
        - REGISTER statements for DataFrame registration
        - Standard SQL with parameter binding
        
        Parameters:
        - statement (str): SQL statement to execute
        - parameters (Tuple, optional): Parameter values for binding
        - context (Any, optional): Execution context (unused)
        
        Raises:
        NotImplementedError: For unsupported DuckDB operations
        RuntimeError: For other DuckDB errors
        """
    
    def executemany(self, statement, parameters=None, context=None):
        """
        Execute SQL statement multiple times with different parameters.
        
        Parameters:
        - statement (str): SQL statement to execute
        - parameters (List[Dict], optional): List of parameter dictionaries
        - context (Any, optional): Execution context (unused)
        """
    
    def fetchmany(self, size=None):
        """
        Fetch multiple rows from query results.
        
        Parameters:
        - size (int, optional): Number of rows to fetch
        
        Returns:
        List: Fetched rows
        """
    
    def close(self):
        """
        Close cursor (no-op for DuckDB cursors).
        """
    
    @property
    def connection(self):
        """
        Get parent connection wrapper.
        
        Returns:
        ConnectionWrapper: Parent connection
        """
    
    def __getattr__(self, name):
        """
        Delegate attribute access to underlying DuckDB connection.
        
        Parameters:
        - name (str): Attribute name
        
        Returns:
        Any: Attribute value from DuckDB connection
        """

Database API Compatibility

DBAPI 2.0 compatibility layer providing standard database interface constants and exceptions.

class DBAPI:
    """
    Database API compatibility class providing DBAPI 2.0 interface.
    
    Defines standard database interface constants and exception classes
    for SQLAlchemy compatibility.
    """
    
    paramstyle: str  # "numeric_dollar" for SQLAlchemy 2.0+, "qmark" for 1.x
    apilevel: str    # From duckdb.apilevel
    threadsafety: int # From duckdb.threadsafety
    
    # Exception classes
    Error: Type[Exception]              # Base exception class
    TransactionException: Type[Exception]  # Transaction-related errors
    ParserException: Type[Exception]       # SQL parsing errors
    
    @staticmethod
    def Binary(x):
        """
        Construct binary data type (pass-through for DuckDB).
        
        Parameters:
        - x (Any): Data to treat as binary
        
        Returns:
        Any: Input data unchanged
        """

Usage Examples

Direct Connection Usage

from duckdb_engine import ConnectionWrapper, CursorWrapper
import duckdb

# Create wrapped connection
raw_conn = duckdb.connect(':memory:')
conn = ConnectionWrapper(raw_conn)

# Use cursor for operations
cursor = conn.cursor()
cursor.execute("CREATE TABLE test (id INTEGER, name VARCHAR)")
cursor.execute("INSERT INTO test VALUES (?, ?)", (1, 'Alice'))

# Fetch results
cursor.execute("SELECT * FROM test")
results = cursor.fetchall()
print(results)

conn.close()

DataFrame Registration

import pandas as pd

# Create DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'value': ['a', 'b', 'c']
})

# Register DataFrame with DuckDB (special REGISTER handling)
cursor.execute("REGISTER", ('my_df', df))

# Query the registered DataFrame
cursor.execute("SELECT * FROM my_df")
results = cursor.fetchall()

SQLAlchemy Integration

from sqlalchemy import create_engine

# Engine automatically uses ConnectionWrapper/CursorWrapper
engine = create_engine('duckdb:///:memory:')

with engine.connect() as conn:
    # All operations go through wrapper classes
    result = conn.execute("SELECT 1 as test")
    print(result.fetchone())

Error Handling

from duckdb_engine import DBAPI

try:
    cursor.execute("INVALID SQL")
except DBAPI.ParserException as e:
    print(f"SQL parsing error: {e}")
except DBAPI.Error as e:
    print(f"Database error: {e}")

Connection State Management

conn = ConnectionWrapper(duckdb.connect(':memory:'))

# Check connection state
print(f"Closed: {conn.closed}")
print(f"Notices: {conn.notices}")

# Close connection
conn.close()
print(f"Closed: {conn.closed}")

Install with Tessl CLI

npx tessl i tessl/pypi-duckdb-engine

docs

configuration.md

connection-management.md

core-integration.md

data-types.md

index.md

tile.json