CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asgiref

ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

current-thread-executor.mddocs/

Current Thread Executor

A specialized executor implementation that runs submitted code in the thread where it was instantiated, enabling async code running in other threads to execute synchronous operations in their originating thread context.

Capabilities

CurrentThreadExecutor

An executor that runs code in the thread it was instantiated in, rather than a separate thread pool. This is essential for async-to-sync conversion scenarios where synchronous code must run in the main thread context.

class CurrentThreadExecutor:
    def __init__(self, old_executor: CurrentThreadExecutor | None) -> None:
        """
        Initialize the executor for the current thread.
        
        Args:
            old_executor: Previous executor in the chain, used for nested contexts
        """
    
    def run_until_future(self, future: Future[Any]) -> None:
        """
        Runs the code in the work queue until a result is available from the future.
        Must be called from the thread the executor was initialized in.
        
        Args:
            future: Future to wait for completion
            
        Raises:
            RuntimeError: If called from a different thread than initialization
        """
    
    def submit(
        self,
        fn: Callable[_P, _R],
        /,
        *args: _P.args,
        **kwargs: _P.kwargs,
    ) -> Future[_R]:
        """
        Submit a callable to be executed in the executor's thread.
        Cannot be called from the same thread as the executor.
        
        Args:
            fn: Callable to execute
            *args: Positional arguments for the callable
            **kwargs: Keyword arguments for the callable
            
        Returns:
            Future representing the execution result
            
        Raises:
            RuntimeError: If called from the executor's own thread or if executor is broken
        """

Usage Examples

Basic Thread Execution

import threading
from concurrent.futures import Future
from asgiref.current_thread_executor import CurrentThreadExecutor

# Create executor in main thread
executor = CurrentThreadExecutor(None)

def worker_thread():
    """Function that runs in a separate thread but needs to execute code in main thread"""
    def sync_operation():
        return "executed in main thread"
    
    # Submit work to be executed in main thread
    future = executor.submit(sync_operation)
    return future

# Start worker thread
future_result = None
def start_worker():
    global future_result
    future_result = worker_thread()

worker = threading.Thread(target=start_worker)
worker.start()

# In main thread, process the submitted work
if future_result:
    executor.run_until_future(future_result)
    result = future_result.result()
    print(result)  # "executed in main thread"

worker.join()

Integration with Async-to-Sync Conversion

import asyncio
from asgiref.current_thread_executor import CurrentThreadExecutor
from concurrent.futures import Future

# This is typically used internally by async_to_sync
def main_thread_operation():
    """Synchronous operation that must run in main thread"""
    return "main thread result"

# Create executor in main thread context
executor = CurrentThreadExecutor(None)

async def async_context():
    """Async function that needs to call sync code in main thread"""
    # This would typically be handled by async_to_sync internally
    future = executor.submit(main_thread_operation)
    # The main thread would call run_until_future to process this
    return future

# Usage pattern (simplified - actual usage is more complex)
async def demo():
    future = await async_context()
    # Main thread processing would happen here via run_until_future
    return future

Types

from typing import Any, Callable, TypeVar
from concurrent.futures import Future, Executor

_T = TypeVar("_T")
_P = ParamSpec("_P") 
_R = TypeVar("_R")

class _WorkItem:
    """Internal work item representation for executor queue"""
    def __init__(
        self,
        future: Future[_R],
        fn: Callable[_P, _R],
        *args: _P.args,
        **kwargs: _P.kwargs,
    ): ...
    
    def run(self) -> None: ...

Design Notes

The CurrentThreadExecutor is a critical component for asgiref's sync-to-async conversion system. It enables:

  • Thread Context Preservation: Ensures synchronous code runs in the expected thread context
  • Deadlock Prevention: Manages execution flow to prevent async/sync conversion deadlocks
  • Stack Chaining: Supports nested executor contexts through the old_executor parameter
  • Queue Management: Uses a thread-safe work queue to coordinate cross-thread execution

This executor is primarily used internally by async_to_sync but can be used directly for advanced threading scenarios where precise thread control is required.

Install with Tessl CLI

npx tessl i tessl/pypi-asgiref

docs

compatibility.md

current-thread-executor.md

index.md

local-storage.md

server-base.md

sync-async.md

testing.md

timeout.md

type-definitions.md

wsgi-integration.md

tile.json