CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiosqlite

asyncio bridge to the standard sqlite3 module

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

advanced.mddocs/

Advanced Features

Extended SQLite functionality including user-defined functions, database backups, progress handlers, extension loading, and database utilities. These features provide advanced capabilities for specialized use cases and database management.

Capabilities

User-Defined Functions

Create custom SQL functions that can be called within SQL statements.

async def create_function(
    self,
    name: str,
    num_params: int,
    func: Callable,
    deterministic: bool = False
) -> None:
    """
    Create user-defined function that can be later used within SQL statements.
    
    Must be run within the same thread that query executions take place,
    so execution is deferred to the connection's worker thread.
    
    Parameters:
    - name: Function name to use in SQL statements
    - num_params: Number of parameters the function accepts (-1 for variable)
    - func: Python function to execute when SQL function is called
    - deterministic: If True, function is marked as deterministic for optimization
    
    Notes:
    - deterministic flag requires SQLite 3.8.3+, raises NotSupportedError on older versions
    - Function executes in the database thread, not the main async context
    """

Usage example:

import aiosqlite
import math

async def setup_custom_functions():
    async with aiosqlite.connect("database.db") as db:
        # Simple mathematical function
        def square(x):
            return x * x if x is not None else None
        
        await db.create_function("square", 1, square, deterministic=True)
        
        # String manipulation function  
        def reverse_string(s):
            return s[::-1] if s is not None else None
            
        await db.create_function("reverse", 1, reverse_string, deterministic=True)
        
        # Aggregate-like function (though SQLite handles aggregation)
        def distance(x1, y1, x2, y2):
            if any(v is None for v in [x1, y1, x2, y2]):
                return None
            return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
        
        await db.create_function("distance", 4, distance, deterministic=True)
        
        # Test the functions
        async with db.execute("SELECT square(5) as result") as cursor:
            row = await cursor.fetchone()
            print(f"Square of 5: {row[0]}")  # 25
        
        async with db.execute("SELECT reverse('hello') as result") as cursor:
            row = await cursor.fetchone()
            print(f"Reverse of 'hello': {row[0]}")  # 'olleh'
        
        # Use in complex queries
        await db.execute("""
            CREATE TABLE IF NOT EXISTS points (
                id INTEGER PRIMARY KEY,
                x REAL, y REAL, 
                name TEXT
            )
        """)
        
        await db.execute("INSERT INTO points (x, y, name) VALUES (0, 0, 'origin')")
        await db.execute("INSERT INTO points (x, y, name) VALUES (3, 4, 'point1')")
        await db.commit()
        
        async with db.execute("""
            SELECT name, distance(0, 0, x, y) as dist_from_origin 
            FROM points 
            ORDER BY dist_from_origin
        """) as cursor:
            async for row in cursor:
                print(f"{row[0]}: distance = {row[1]}")

Database Backup

Create backups of the current database to another database connection.

async def backup(
    self,
    target: Union["Connection", sqlite3.Connection],
    *,
    pages: int = 0,
    progress: Optional[Callable[[int, int, int], None]] = None,
    name: str = "main",
    sleep: float = 0.250
) -> None:
    """
    Make a backup of the current database to the target database.
    
    Takes either a standard sqlite3 or aiosqlite Connection object as target.
    
    Parameters:
    - target: Destination database connection (aiosqlite.Connection or sqlite3.Connection)
    - pages: Pages to copy at once (0 = all pages, default: 0)
    - progress: Optional callback function called during backup progress
    - name: Database name to backup (default: "main")
    - sleep: Sleep time between page batches in seconds (default: 0.250)
    
    Notes:
    - Progress callback receives (status, remaining, total) parameters
    - Backup is performed page by page to allow concurrent access
    - Sleep parameter helps prevent blocking other operations
    """

Usage example:

import aiosqlite
import sqlite3

async def backup_database():
    # Source database
    async with aiosqlite.connect("production.db") as source_db:
        # Backup to another aiosqlite connection
        async with aiosqlite.connect("backup.db") as backup_db:
            def progress_callback(status, remaining, total):
                print(f"Backup progress: {total - remaining}/{total} pages")
            
            await source_db.backup(
                backup_db,
                pages=100,  # Copy 100 pages at a time
                progress=progress_callback,
                sleep=0.1   # Brief pause between batches
            )
            print("Backup to aiosqlite connection completed")
        
        # Backup to standard sqlite3 connection
        sqlite_conn = sqlite3.connect("backup2.db")
        try:
            await source_db.backup(sqlite_conn)
            print("Backup to sqlite3 connection completed")
        finally:
            sqlite_conn.close()

Database Dump

Export database structure and data as SQL statements.

async def iterdump(self) -> AsyncIterator[str]:
    """
    Return an async iterator to dump the database in SQL text format.
    
    Generates SQL statements that can recreate the database structure
    and data. Each iteration yields a single SQL statement.
    
    Returns:
    AsyncIterator[str]: Async iterator yielding SQL statements
    
    Usage:
    async for line in db.iterdump():
        print(line)
    """

Usage example:

import aiosqlite

async def dump_database():
    async with aiosqlite.connect("database.db") as db:
        # Dump to file
        with open("database_dump.sql", "w") as f:
            f.write("-- Database dump generated by aiosqlite\n")
            async for line in db.iterdump():
                f.write(line + "\n")
        
        print("Database dumped to database_dump.sql")
        
        # Print schema only (filter out INSERT statements)
        print("\nDatabase schema:")
        async for line in db.iterdump():
            if not line.strip().startswith("INSERT"):
                print(line)

Progress Monitoring

Set up progress callbacks for long-running operations.

async def set_progress_handler(
    self,
    handler: Callable[[], Optional[int]],
    n: int
) -> None:
    """
    Set progress handler callback for long-running operations.
    
    The handler is called every n virtual machine instructions during
    SQL statement execution. Can be used to provide progress feedback
    or to interrupt long-running queries.
    
    Parameters:
    - handler: Callback function called during execution
    - n: Number of VM instructions between handler calls
    
    Notes:
    - Handler returning non-zero interrupts the operation
    - Useful for providing user feedback or implementing timeouts
    """

Usage example:

import aiosqlite
import time

async def long_running_operation_with_progress():
    async with aiosqlite.connect("database.db") as db:
        start_time = time.time()
        
        def progress_handler():
            elapsed = time.time() - start_time
            if elapsed > 30:  # Timeout after 30 seconds
                print("Operation timeout - interrupting")
                return 1  # Non-zero return interrupts operation
            
            if int(elapsed) % 5 == 0:  # Progress every 5 seconds
                print(f"Operation running for {elapsed:.1f} seconds...")
            
            return 0  # Continue operation
        
        # Set progress handler to be called every 1000 VM instructions
        await db.set_progress_handler(progress_handler, 1000)
        
        try:
            # Long-running operation
            await db.execute("""
                CREATE TABLE large_table AS 
                WITH RECURSIVE series(x) AS (
                    SELECT 1 UNION ALL SELECT x+1 FROM series WHERE x < 1000000
                )
                SELECT x as id, 'data_' || x as value FROM series
            """)
            await db.commit()
            print("Large table created successfully")
            
        except aiosqlite.OperationalError as e:
            if "interrupted" in str(e):
                print("Operation was interrupted by progress handler")
            else:
                raise

Query Tracing

Set up trace callbacks for debugging and monitoring SQL execution.

async def set_trace_callback(self, handler: Callable) -> None:
    """
    Set trace callback handler for SQL statement execution.
    
    The handler is called with each SQL statement before execution,
    useful for debugging, logging, or performance monitoring.
    
    Parameters:
    - handler: Callback function receiving SQL statement as parameter
    
    Notes:
    - Handler receives the SQL statement string as its only parameter
    - Called for every SQL statement executed on this connection
    - Useful for debugging and performance analysis
    """

Usage example:

import aiosqlite
import time

async def trace_sql_execution():
    async with aiosqlite.connect("database.db") as db:
        # Set up SQL tracing
        def trace_handler(sql_statement):
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{timestamp}] SQL: {sql_statement.strip()}")
        
        await db.set_trace_callback(trace_handler)
        
        # Now all SQL statements will be traced
        await db.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER, name TEXT)")
        await db.execute("INSERT INTO test VALUES (1, 'Alice')")
        await db.execute("INSERT INTO test VALUES (2, 'Bob')")
        await db.commit()
        
        async with db.execute("SELECT * FROM test ORDER BY name") as cursor:
            async for row in cursor:
                print(f"Row: {row}")

Extension Loading

Load SQLite extensions for additional functionality.

async def enable_load_extension(self, value: bool) -> None:
    """
    Enable or disable loading of SQLite extensions.
    
    Parameters:
    - value: True to enable extension loading, False to disable
    
    Notes:
    - Extension loading is disabled by default for security
    - Must be enabled before calling load_extension()
    - Should be disabled after loading required extensions
    """

async def load_extension(self, path: str):
    """
    Load an extension from the specified path.
    
    Parameters:
    - path: Path to the extension file (.so on Unix, .dll on Windows)
    
    Notes:
    - Extension loading must be enabled first with enable_load_extension(True)
    - Extensions provide additional SQL functions and capabilities
    - Common extensions include FTS (full-text search), spatial functions
    """

Usage example:

import aiosqlite

async def load_extensions():
    async with aiosqlite.connect("database.db") as db:
        try:
            # Enable extension loading
            await db.enable_load_extension(True)
            
            # Load extension (example path - actual path varies by system)
            # await db.load_extension("/usr/lib/sqlite3/libspatialite.so")
            
            # Extensions would provide additional functions
            # async with db.execute("SELECT ST_Distance(point1, point2) FROM locations") as cursor:
            #     ...
            
        except aiosqlite.OperationalError as e:
            print(f"Extension loading failed: {e}")
        finally:
            # Disable extension loading for security
            await db.enable_load_extension(False)

Connection Interruption

Interrupt long-running database operations.

async def interrupt(self) -> None:
    """
    Interrupt pending queries.
    
    Calls sqlite3.Connection.interrupt() to cancel long-running operations.
    Operations may still complete normally if interruption occurs too late
    in the execution process.
    
    Notes:
    - Safe to call from any thread or coroutine
    - May not immediately stop all operations
    - Interrupted operations raise OperationalError with "interrupted" message
    """

Usage example:

import aiosqlite
import asyncio

async def interruptible_operation():
    async with aiosqlite.connect("database.db") as db:
        # Start a long-running operation
        async def long_query():
            try:
                await db.execute("""
                    WITH RECURSIVE huge_series(x) AS (
                        SELECT 1 UNION ALL 
                        SELECT x+1 FROM huge_series WHERE x < 10000000
                    )
                    SELECT COUNT(*) FROM huge_series
                """)
                print("Query completed normally")
            except aiosqlite.OperationalError as e:
                if "interrupted" in str(e):
                    print("Query was interrupted")
                else:
                    raise
        
        # Start the query
        query_task = asyncio.create_task(long_query())
        
        # Interrupt after 2 seconds
        await asyncio.sleep(2)
        await db.interrupt()
        
        # Wait for query to complete (either normally or with interruption)
        await query_task

Install with Tessl CLI

npx tessl i tessl/pypi-aiosqlite

docs

advanced.md

connection.md

index.md

queries.md

transactions.md

tile.json