0
# Current Thread Executor
1
2
A specialized executor implementation that runs submitted code in the thread where it was instantiated, enabling async code running in other threads to execute synchronous operations in their originating thread context.
3
4
## Capabilities
5
6
### CurrentThreadExecutor
7
8
An executor that runs code in the thread it was instantiated in, rather than a separate thread pool. This is essential for async-to-sync conversion scenarios where synchronous code must run in the main thread context.
9
10
```python { .api }
11
class CurrentThreadExecutor:
12
def __init__(self, old_executor: CurrentThreadExecutor | None) -> None:
13
"""
14
Initialize the executor for the current thread.
15
16
Args:
17
old_executor: Previous executor in the chain, used for nested contexts
18
"""
19
20
def run_until_future(self, future: Future[Any]) -> None:
21
"""
22
Runs the code in the work queue until a result is available from the future.
23
Must be called from the thread the executor was initialized in.
24
25
Args:
26
future: Future to wait for completion
27
28
Raises:
29
RuntimeError: If called from a different thread than initialization
30
"""
31
32
def submit(
33
self,
34
fn: Callable[_P, _R],
35
/,
36
*args: _P.args,
37
**kwargs: _P.kwargs,
38
) -> Future[_R]:
39
"""
40
Submit a callable to be executed in the executor's thread.
41
Cannot be called from the same thread as the executor.
42
43
Args:
44
fn: Callable to execute
45
*args: Positional arguments for the callable
46
**kwargs: Keyword arguments for the callable
47
48
Returns:
49
Future representing the execution result
50
51
Raises:
52
RuntimeError: If called from the executor's own thread or if executor is broken
53
"""
54
```
55
56
## Usage Examples
57
58
### Basic Thread Execution
59
60
```python
61
import threading
62
from concurrent.futures import Future
63
from asgiref.current_thread_executor import CurrentThreadExecutor
64
65
# Create executor in main thread
66
executor = CurrentThreadExecutor(None)
67
68
def worker_thread():
69
"""Function that runs in a separate thread but needs to execute code in main thread"""
70
def sync_operation():
71
return "executed in main thread"
72
73
# Submit work to be executed in main thread
74
future = executor.submit(sync_operation)
75
return future
76
77
# Start worker thread
78
future_result = None
79
def start_worker():
80
global future_result
81
future_result = worker_thread()
82
83
worker = threading.Thread(target=start_worker)
84
worker.start()
85
86
# In main thread, process the submitted work
87
if future_result:
88
executor.run_until_future(future_result)
89
result = future_result.result()
90
print(result) # "executed in main thread"
91
92
worker.join()
93
```
94
95
### Integration with Async-to-Sync Conversion
96
97
```python
98
import asyncio
99
from asgiref.current_thread_executor import CurrentThreadExecutor
100
from concurrent.futures import Future
101
102
# This is typically used internally by async_to_sync
103
def main_thread_operation():
104
"""Synchronous operation that must run in main thread"""
105
return "main thread result"
106
107
# Create executor in main thread context
108
executor = CurrentThreadExecutor(None)
109
110
async def async_context():
111
"""Async function that needs to call sync code in main thread"""
112
# This would typically be handled by async_to_sync internally
113
future = executor.submit(main_thread_operation)
114
# The main thread would call run_until_future to process this
115
return future
116
117
# Usage pattern (simplified - actual usage is more complex)
118
async def demo():
119
future = await async_context()
120
# Main thread processing would happen here via run_until_future
121
return future
122
```
123
124
## Types
125
126
```python { .api }
127
from typing import Any, Callable, TypeVar
128
from concurrent.futures import Future, Executor
129
130
_T = TypeVar("_T")
131
_P = ParamSpec("_P")
132
_R = TypeVar("_R")
133
134
class _WorkItem:
135
"""Internal work item representation for executor queue"""
136
def __init__(
137
self,
138
future: Future[_R],
139
fn: Callable[_P, _R],
140
*args: _P.args,
141
**kwargs: _P.kwargs,
142
): ...
143
144
def run(self) -> None: ...
145
```
146
147
## Design Notes
148
149
The `CurrentThreadExecutor` is a critical component for asgiref's sync-to-async conversion system. It enables:
150
151
- **Thread Context Preservation**: Ensures synchronous code runs in the expected thread context
152
- **Deadlock Prevention**: Manages execution flow to prevent async/sync conversion deadlocks
153
- **Stack Chaining**: Supports nested executor contexts through the `old_executor` parameter
154
- **Queue Management**: Uses a thread-safe work queue to coordinate cross-thread execution
155
156
This executor is primarily used internally by `async_to_sync` but can be used directly for advanced threading scenarios where precise thread control is required.