asyncio bridge to the standard sqlite3 module
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Extended SQLite functionality including user-defined functions, database backups, progress handlers, extension loading, and database utilities. These features provide advanced capabilities for specialized use cases and database management.
Create custom SQL functions that can be called within SQL statements.
async def create_function(
self,
name: str,
num_params: int,
func: Callable,
deterministic: bool = False
) -> None:
"""
Create user-defined function that can be later used within SQL statements.
Must be run within the same thread that query executions take place,
so execution is deferred to the connection's worker thread.
Parameters:
- name: Function name to use in SQL statements
- num_params: Number of parameters the function accepts (-1 for variable)
- func: Python function to execute when SQL function is called
- deterministic: If True, function is marked as deterministic for optimization
Notes:
- deterministic flag requires SQLite 3.8.3+, raises NotSupportedError on older versions
- Function executes in the database thread, not the main async context
"""Usage example:
import aiosqlite
import math
async def setup_custom_functions():
async with aiosqlite.connect("database.db") as db:
# Simple mathematical function
def square(x):
return x * x if x is not None else None
await db.create_function("square", 1, square, deterministic=True)
# String manipulation function
def reverse_string(s):
return s[::-1] if s is not None else None
await db.create_function("reverse", 1, reverse_string, deterministic=True)
# Aggregate-like function (though SQLite handles aggregation)
def distance(x1, y1, x2, y2):
if any(v is None for v in [x1, y1, x2, y2]):
return None
return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
await db.create_function("distance", 4, distance, deterministic=True)
# Test the functions
async with db.execute("SELECT square(5) as result") as cursor:
row = await cursor.fetchone()
print(f"Square of 5: {row[0]}") # 25
async with db.execute("SELECT reverse('hello') as result") as cursor:
row = await cursor.fetchone()
print(f"Reverse of 'hello': {row[0]}") # 'olleh'
# Use in complex queries
await db.execute("""
CREATE TABLE IF NOT EXISTS points (
id INTEGER PRIMARY KEY,
x REAL, y REAL,
name TEXT
)
""")
await db.execute("INSERT INTO points (x, y, name) VALUES (0, 0, 'origin')")
await db.execute("INSERT INTO points (x, y, name) VALUES (3, 4, 'point1')")
await db.commit()
async with db.execute("""
SELECT name, distance(0, 0, x, y) as dist_from_origin
FROM points
ORDER BY dist_from_origin
""") as cursor:
async for row in cursor:
print(f"{row[0]}: distance = {row[1]}")Create backups of the current database to another database connection.
async def backup(
self,
target: Union["Connection", sqlite3.Connection],
*,
pages: int = 0,
progress: Optional[Callable[[int, int, int], None]] = None,
name: str = "main",
sleep: float = 0.250
) -> None:
"""
Make a backup of the current database to the target database.
Takes either a standard sqlite3 or aiosqlite Connection object as target.
Parameters:
- target: Destination database connection (aiosqlite.Connection or sqlite3.Connection)
- pages: Pages to copy at once (0 = all pages, default: 0)
- progress: Optional callback function called during backup progress
- name: Database name to backup (default: "main")
- sleep: Sleep time between page batches in seconds (default: 0.250)
Notes:
- Progress callback receives (status, remaining, total) parameters
- Backup is performed page by page to allow concurrent access
- Sleep parameter helps prevent blocking other operations
"""Usage example:
import aiosqlite
import sqlite3
async def backup_database():
# Source database
async with aiosqlite.connect("production.db") as source_db:
# Backup to another aiosqlite connection
async with aiosqlite.connect("backup.db") as backup_db:
def progress_callback(status, remaining, total):
print(f"Backup progress: {total - remaining}/{total} pages")
await source_db.backup(
backup_db,
pages=100, # Copy 100 pages at a time
progress=progress_callback,
sleep=0.1 # Brief pause between batches
)
print("Backup to aiosqlite connection completed")
# Backup to standard sqlite3 connection
sqlite_conn = sqlite3.connect("backup2.db")
try:
await source_db.backup(sqlite_conn)
print("Backup to sqlite3 connection completed")
finally:
sqlite_conn.close()Export database structure and data as SQL statements.
async def iterdump(self) -> AsyncIterator[str]:
"""
Return an async iterator to dump the database in SQL text format.
Generates SQL statements that can recreate the database structure
and data. Each iteration yields a single SQL statement.
Returns:
AsyncIterator[str]: Async iterator yielding SQL statements
Usage:
async for line in db.iterdump():
print(line)
"""Usage example:
import aiosqlite
async def dump_database():
async with aiosqlite.connect("database.db") as db:
# Dump to file
with open("database_dump.sql", "w") as f:
f.write("-- Database dump generated by aiosqlite\n")
async for line in db.iterdump():
f.write(line + "\n")
print("Database dumped to database_dump.sql")
# Print schema only (filter out INSERT statements)
print("\nDatabase schema:")
async for line in db.iterdump():
if not line.strip().startswith("INSERT"):
print(line)Set up progress callbacks for long-running operations.
async def set_progress_handler(
self,
handler: Callable[[], Optional[int]],
n: int
) -> None:
"""
Set progress handler callback for long-running operations.
The handler is called every n virtual machine instructions during
SQL statement execution. Can be used to provide progress feedback
or to interrupt long-running queries.
Parameters:
- handler: Callback function called during execution
- n: Number of VM instructions between handler calls
Notes:
- Handler returning non-zero interrupts the operation
- Useful for providing user feedback or implementing timeouts
"""Usage example:
import aiosqlite
import time
async def long_running_operation_with_progress():
async with aiosqlite.connect("database.db") as db:
start_time = time.time()
def progress_handler():
elapsed = time.time() - start_time
if elapsed > 30: # Timeout after 30 seconds
print("Operation timeout - interrupting")
return 1 # Non-zero return interrupts operation
if int(elapsed) % 5 == 0: # Progress every 5 seconds
print(f"Operation running for {elapsed:.1f} seconds...")
return 0 # Continue operation
# Set progress handler to be called every 1000 VM instructions
await db.set_progress_handler(progress_handler, 1000)
try:
# Long-running operation
await db.execute("""
CREATE TABLE large_table AS
WITH RECURSIVE series(x) AS (
SELECT 1 UNION ALL SELECT x+1 FROM series WHERE x < 1000000
)
SELECT x as id, 'data_' || x as value FROM series
""")
await db.commit()
print("Large table created successfully")
except aiosqlite.OperationalError as e:
if "interrupted" in str(e):
print("Operation was interrupted by progress handler")
else:
raiseSet up trace callbacks for debugging and monitoring SQL execution.
async def set_trace_callback(self, handler: Callable) -> None:
"""
Set trace callback handler for SQL statement execution.
The handler is called with each SQL statement before execution,
useful for debugging, logging, or performance monitoring.
Parameters:
- handler: Callback function receiving SQL statement as parameter
Notes:
- Handler receives the SQL statement string as its only parameter
- Called for every SQL statement executed on this connection
- Useful for debugging and performance analysis
"""Usage example:
import aiosqlite
import time
async def trace_sql_execution():
async with aiosqlite.connect("database.db") as db:
# Set up SQL tracing
def trace_handler(sql_statement):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] SQL: {sql_statement.strip()}")
await db.set_trace_callback(trace_handler)
# Now all SQL statements will be traced
await db.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER, name TEXT)")
await db.execute("INSERT INTO test VALUES (1, 'Alice')")
await db.execute("INSERT INTO test VALUES (2, 'Bob')")
await db.commit()
async with db.execute("SELECT * FROM test ORDER BY name") as cursor:
async for row in cursor:
print(f"Row: {row}")Load SQLite extensions for additional functionality.
async def enable_load_extension(self, value: bool) -> None:
"""
Enable or disable loading of SQLite extensions.
Parameters:
- value: True to enable extension loading, False to disable
Notes:
- Extension loading is disabled by default for security
- Must be enabled before calling load_extension()
- Should be disabled after loading required extensions
"""
async def load_extension(self, path: str):
"""
Load an extension from the specified path.
Parameters:
- path: Path to the extension file (.so on Unix, .dll on Windows)
Notes:
- Extension loading must be enabled first with enable_load_extension(True)
- Extensions provide additional SQL functions and capabilities
- Common extensions include FTS (full-text search), spatial functions
"""Usage example:
import aiosqlite
async def load_extensions():
async with aiosqlite.connect("database.db") as db:
try:
# Enable extension loading
await db.enable_load_extension(True)
# Load extension (example path - actual path varies by system)
# await db.load_extension("/usr/lib/sqlite3/libspatialite.so")
# Extensions would provide additional functions
# async with db.execute("SELECT ST_Distance(point1, point2) FROM locations") as cursor:
# ...
except aiosqlite.OperationalError as e:
print(f"Extension loading failed: {e}")
finally:
# Disable extension loading for security
await db.enable_load_extension(False)Interrupt long-running database operations.
async def interrupt(self) -> None:
"""
Interrupt pending queries.
Calls sqlite3.Connection.interrupt() to cancel long-running operations.
Operations may still complete normally if interruption occurs too late
in the execution process.
Notes:
- Safe to call from any thread or coroutine
- May not immediately stop all operations
- Interrupted operations raise OperationalError with "interrupted" message
"""Usage example:
import aiosqlite
import asyncio
async def interruptible_operation():
async with aiosqlite.connect("database.db") as db:
# Start a long-running operation
async def long_query():
try:
await db.execute("""
WITH RECURSIVE huge_series(x) AS (
SELECT 1 UNION ALL
SELECT x+1 FROM huge_series WHERE x < 10000000
)
SELECT COUNT(*) FROM huge_series
""")
print("Query completed normally")
except aiosqlite.OperationalError as e:
if "interrupted" in str(e):
print("Query was interrupted")
else:
raise
# Start the query
query_task = asyncio.create_task(long_query())
# Interrupt after 2 seconds
await asyncio.sleep(2)
await db.interrupt()
# Wait for query to complete (either normally or with interruption)
await query_taskInstall with Tessl CLI
npx tessl i tessl/pypi-aiosqlite