Python library for using asyncio in Qt-based applications
—
Thread pool executor using Qt's QThread for CPU-intensive tasks. QThreadExecutor provides a concurrent.futures.Executor-compatible interface that integrates seamlessly with asyncio's run_in_executor functionality.
Creates and manages a pool of QThread workers for executing blocking or CPU-intensive operations without blocking the main UI thread.
class QThreadExecutor:
"""
ThreadExecutor that produces QThreads.
Same API as concurrent.futures.Executor with Qt integration.
Args:
max_workers: Maximum number of worker threads (default: 10)
stack_size: Stack size for each thread in bytes (auto-detected if None)
"""
def __init__(self, max_workers=10, stack_size=None): ...import asyncio
import time
from qasync import QEventLoop, QThreadExecutor
def cpu_intensive_task(n):
# Simulate CPU-intensive work
total = 0
for i in range(n * 1000000):
total += i
return total
async def main():
loop = asyncio.get_event_loop()
# Method 1: Use with event loop's run_in_executor
with QThreadExecutor(5) as executor:
result = await loop.run_in_executor(executor, cpu_intensive_task, 100)
print(f"Result: {result}")
# Method 2: Direct submission
executor = QThreadExecutor(3)
try:
future = executor.submit(cpu_intensive_task, 50)
result = future.result() # Blocks until complete
print(f"Direct result: {result}")
finally:
executor.shutdown()Submit callables to be executed in the thread pool, returning Future objects for result retrieval.
def submit(self, callback, *args, **kwargs):
"""
Submit a callable to be executed in a worker thread.
Args:
callback: Callable to execute
*args: Positional arguments for callback
**kwargs: Keyword arguments for callback
Returns:
concurrent.futures.Future: Future representing the execution
Raises:
RuntimeError: If executor has been shutdown
"""from qasync import QThreadExecutor
import time
def blocking_operation(duration, message):
time.sleep(duration)
return f"Completed: {message}"
executor = QThreadExecutor(2)
# Submit multiple tasks
future1 = executor.submit(blocking_operation, 1, "Task 1")
future2 = executor.submit(blocking_operation, 2, "Task 2")
# Wait for results
print(future1.result()) # "Completed: Task 1"
print(future2.result()) # "Completed: Task 2"
executor.shutdown()Control the lifecycle of the thread pool executor, including graceful shutdown and resource cleanup.
def shutdown(self, wait=True):
"""
Shutdown the executor and clean up worker threads.
Args:
wait: If True, wait for all pending tasks to complete
Raises:
RuntimeError: If executor has already been shutdown
"""Use QThreadExecutor as a context manager for automatic resource management.
def __enter__(self):
"""
Context manager entry.
Returns:
QThreadExecutor: Self
Raises:
RuntimeError: If executor has been shutdown
"""
def __exit__(self, *args):
"""Context manager exit - shuts down executor."""from qasync import QThreadExecutor
def process_data(data):
# Process data (blocking operation)
return [x * 2 for x in data]
# Automatic cleanup with context manager
with QThreadExecutor(4) as executor:
futures = []
for i in range(5):
future = executor.submit(process_data, list(range(i * 10, (i + 1) * 10)))
futures.append(future)
# Collect results
results = [future.result() for future in futures]
print("All tasks completed:", len(results))
# Executor is automatically shutdownSeamless integration with asyncio event loops through run_in_executor.
import asyncio
from qasync import QEventLoop, QThreadExecutor
def blocking_computation(n):
result = sum(i * i for i in range(n))
return result
async def async_workflow():
loop = asyncio.get_event_loop()
# Run blocking operations concurrently
with QThreadExecutor(3) as executor:
tasks = [
loop.run_in_executor(executor, blocking_computation, 1000),
loop.run_in_executor(executor, blocking_computation, 2000),
loop.run_in_executor(executor, blocking_computation, 3000),
]
results = await asyncio.gather(*tasks)
print("Concurrent results:", results)
# Run with Qt event loop
import sys
from PySide6.QtWidgets import QApplication
app = QApplication(sys.argv)
asyncio.run(async_workflow(), loop_factory=QEventLoop)The executor automatically configures appropriate stack sizes based on the platform:
Custom stack sizes can be specified during initialization:
# Custom stack size (8 MB)
executor = QThreadExecutor(max_workers=5, stack_size=8 * 1024 * 1024)Each QThreadExecutor manages a fixed pool of QThread workers that:
The executor properly handles and propagates exceptions from worker threads:
from qasync import QThreadExecutor
def failing_task():
raise ValueError("Something went wrong!")
executor = QThreadExecutor(1)
future = executor.submit(failing_task)
try:
result = future.result()
except ValueError as e:
print(f"Task failed: {e}")
executor.shutdown()def map(self, func, *iterables, timeout=None):
"""
Not implemented - raises NotImplementedError.
Use asyncio.gather with run_in_executor instead.
"""For mapping operations, use asyncio patterns instead:
import asyncio
from qasync import QThreadExecutor
async def map_with_executor(func, iterable):
loop = asyncio.get_event_loop()
with QThreadExecutor() as executor:
tasks = [loop.run_in_executor(executor, func, item) for item in iterable]
return await asyncio.gather(*tasks)Install with Tessl CLI
npx tessl i tessl/pypi-qasync