CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-qasync

Python library for using asyncio in Qt-based applications

Pending
Overview
Eval results
Files

thread-executor.mddocs/

Thread Execution

Thread pool executor using Qt's QThread for CPU-intensive tasks. QThreadExecutor provides a concurrent.futures.Executor-compatible interface that integrates seamlessly with asyncio's run_in_executor functionality.

Capabilities

Thread Pool Management

Creates and manages a pool of QThread workers for executing blocking or CPU-intensive operations without blocking the main UI thread.

class QThreadExecutor:
    """
    ThreadExecutor that produces QThreads.
    
    Same API as concurrent.futures.Executor with Qt integration.
    
    Args:
        max_workers: Maximum number of worker threads (default: 10)
        stack_size: Stack size for each thread in bytes (auto-detected if None)
    """
    def __init__(self, max_workers=10, stack_size=None): ...

Usage Example

import asyncio
import time
from qasync import QEventLoop, QThreadExecutor

def cpu_intensive_task(n):
    # Simulate CPU-intensive work
    total = 0
    for i in range(n * 1000000):
        total += i
    return total

async def main():
    loop = asyncio.get_event_loop()
    
    # Method 1: Use with event loop's run_in_executor
    with QThreadExecutor(5) as executor:
        result = await loop.run_in_executor(executor, cpu_intensive_task, 100)
        print(f"Result: {result}")
    
    # Method 2: Direct submission
    executor = QThreadExecutor(3)
    try:
        future = executor.submit(cpu_intensive_task, 50)
        result = future.result()  # Blocks until complete
        print(f"Direct result: {result}")
    finally:
        executor.shutdown()

Task Submission

Submit callables to be executed in the thread pool, returning Future objects for result retrieval.

def submit(self, callback, *args, **kwargs):
    """
    Submit a callable to be executed in a worker thread.
    
    Args:
        callback: Callable to execute
        *args: Positional arguments for callback
        **kwargs: Keyword arguments for callback
        
    Returns:
        concurrent.futures.Future: Future representing the execution
        
    Raises:
        RuntimeError: If executor has been shutdown
    """

Usage Example

from qasync import QThreadExecutor
import time

def blocking_operation(duration, message):
    time.sleep(duration)
    return f"Completed: {message}"

executor = QThreadExecutor(2)

# Submit multiple tasks
future1 = executor.submit(blocking_operation, 1, "Task 1")
future2 = executor.submit(blocking_operation, 2, "Task 2")

# Wait for results
print(future1.result())  # "Completed: Task 1"
print(future2.result())  # "Completed: Task 2"

executor.shutdown()

Executor Lifecycle

Control the lifecycle of the thread pool executor, including graceful shutdown and resource cleanup.

def shutdown(self, wait=True):
    """
    Shutdown the executor and clean up worker threads.
    
    Args:
        wait: If True, wait for all pending tasks to complete
        
    Raises:
        RuntimeError: If executor has already been shutdown
    """

Context Manager Support

Use QThreadExecutor as a context manager for automatic resource management.

def __enter__(self):
    """
    Context manager entry.
    
    Returns:
        QThreadExecutor: Self
        
    Raises:
        RuntimeError: If executor has been shutdown
    """

def __exit__(self, *args):
    """Context manager exit - shuts down executor."""

Usage Example

from qasync import QThreadExecutor

def process_data(data):
    # Process data (blocking operation)
    return [x * 2 for x in data]

# Automatic cleanup with context manager
with QThreadExecutor(4) as executor:
    futures = []
    for i in range(5):
        future = executor.submit(process_data, list(range(i * 10, (i + 1) * 10)))
        futures.append(future)
    
    # Collect results
    results = [future.result() for future in futures]
    print("All tasks completed:", len(results))
# Executor is automatically shutdown

Asyncio Integration

Seamless integration with asyncio event loops through run_in_executor.

Usage Example

import asyncio
from qasync import QEventLoop, QThreadExecutor

def blocking_computation(n):
    result = sum(i * i for i in range(n))
    return result

async def async_workflow():
    loop = asyncio.get_event_loop()
    
    # Run blocking operations concurrently
    with QThreadExecutor(3) as executor:
        tasks = [
            loop.run_in_executor(executor, blocking_computation, 1000),
            loop.run_in_executor(executor, blocking_computation, 2000),
            loop.run_in_executor(executor, blocking_computation, 3000),
        ]
        
        results = await asyncio.gather(*tasks)
        print("Concurrent results:", results)

# Run with Qt event loop
import sys
from PySide6.QtWidgets import QApplication

app = QApplication(sys.argv)
asyncio.run(async_workflow(), loop_factory=QEventLoop)

Thread Configuration

Stack Size Management

The executor automatically configures appropriate stack sizes based on the platform:

  • macOS: 16 MB stack size
  • FreeBSD: 4 MB stack size
  • AIX: 2 MB stack size
  • Other platforms: Uses system default

Custom stack sizes can be specified during initialization:

# Custom stack size (8 MB)
executor = QThreadExecutor(max_workers=5, stack_size=8 * 1024 * 1024)

Worker Thread Management

Each QThreadExecutor manages a fixed pool of QThread workers that:

  • Start immediately upon executor creation
  • Process tasks from a shared queue
  • Handle exceptions and propagate them through Future objects
  • Clean up resources when shutting down
  • Support graceful termination

Error Handling

The executor properly handles and propagates exceptions from worker threads:

from qasync import QThreadExecutor

def failing_task():
    raise ValueError("Something went wrong!")

executor = QThreadExecutor(1)
future = executor.submit(failing_task)

try:
    result = future.result()
except ValueError as e:
    print(f"Task failed: {e}")

executor.shutdown()

Limitations

def map(self, func, *iterables, timeout=None):
    """
    Not implemented - raises NotImplementedError.
    Use asyncio.gather with run_in_executor instead.
    """

For mapping operations, use asyncio patterns instead:

import asyncio
from qasync import QThreadExecutor

async def map_with_executor(func, iterable):
    loop = asyncio.get_event_loop()
    with QThreadExecutor() as executor:
        tasks = [loop.run_in_executor(executor, func, item) for item in iterable]
        return await asyncio.gather(*tasks)

Install with Tessl CLI

npx tessl i tessl/pypi-qasync

docs

async-decorators.md

event-loop.md

index.md

thread-executor.md

utilities.md

tile.json