ASGI specs, helper code, and adapters for bridging synchronous and asynchronous Python web applications
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
A specialized executor implementation that runs submitted code in the thread where it was instantiated, enabling async code running in other threads to execute synchronous operations in their originating thread context.
An executor that runs code in the thread it was instantiated in, rather than a separate thread pool. This is essential for async-to-sync conversion scenarios where synchronous code must run in the main thread context.
class CurrentThreadExecutor:
def __init__(self, old_executor: CurrentThreadExecutor | None) -> None:
"""
Initialize the executor for the current thread.
Args:
old_executor: Previous executor in the chain, used for nested contexts
"""
def run_until_future(self, future: Future[Any]) -> None:
"""
Runs the code in the work queue until a result is available from the future.
Must be called from the thread the executor was initialized in.
Args:
future: Future to wait for completion
Raises:
RuntimeError: If called from a different thread than initialization
"""
def submit(
self,
fn: Callable[_P, _R],
/,
*args: _P.args,
**kwargs: _P.kwargs,
) -> Future[_R]:
"""
Submit a callable to be executed in the executor's thread.
Cannot be called from the same thread as the executor.
Args:
fn: Callable to execute
*args: Positional arguments for the callable
**kwargs: Keyword arguments for the callable
Returns:
Future representing the execution result
Raises:
RuntimeError: If called from the executor's own thread or if executor is broken
"""import threading
from concurrent.futures import Future
from asgiref.current_thread_executor import CurrentThreadExecutor
# Create executor in main thread
executor = CurrentThreadExecutor(None)
def worker_thread():
"""Function that runs in a separate thread but needs to execute code in main thread"""
def sync_operation():
return "executed in main thread"
# Submit work to be executed in main thread
future = executor.submit(sync_operation)
return future
# Start worker thread
future_result = None
def start_worker():
global future_result
future_result = worker_thread()
worker = threading.Thread(target=start_worker)
worker.start()
# In main thread, process the submitted work
if future_result:
executor.run_until_future(future_result)
result = future_result.result()
print(result) # "executed in main thread"
worker.join()import asyncio
from asgiref.current_thread_executor import CurrentThreadExecutor
from concurrent.futures import Future
# This is typically used internally by async_to_sync
def main_thread_operation():
"""Synchronous operation that must run in main thread"""
return "main thread result"
# Create executor in main thread context
executor = CurrentThreadExecutor(None)
async def async_context():
"""Async function that needs to call sync code in main thread"""
# This would typically be handled by async_to_sync internally
future = executor.submit(main_thread_operation)
# The main thread would call run_until_future to process this
return future
# Usage pattern (simplified - actual usage is more complex)
async def demo():
future = await async_context()
# Main thread processing would happen here via run_until_future
return futurefrom typing import Any, Callable, TypeVar
from concurrent.futures import Future, Executor
_T = TypeVar("_T")
_P = ParamSpec("_P")
_R = TypeVar("_R")
class _WorkItem:
"""Internal work item representation for executor queue"""
def __init__(
self,
future: Future[_R],
fn: Callable[_P, _R],
*args: _P.args,
**kwargs: _P.kwargs,
): ...
def run(self) -> None: ...The CurrentThreadExecutor is a critical component for asgiref's sync-to-async conversion system. It enables:
old_executor parameterThis executor is primarily used internally by async_to_sync but can be used directly for advanced threading scenarios where precise thread control is required.
Install with Tessl CLI
npx tessl i tessl/pypi-asgiref