0
# Thread-Safe Storage
1
2
Async-aware local storage that works correctly with both threading and asyncio, replacing threading.local for async applications. This provides context isolation that respects both thread boundaries and async task boundaries.
3
4
## Capabilities
5
6
### Local Storage Class
7
8
Async-safe replacement for threading.local that works correctly with asyncio tasks and context variables.
9
10
```python { .api }
11
class Local:
12
"""Thread and async-task safe local storage."""
13
14
def __init__(self, thread_critical=False):
15
"""
16
Initialize local storage.
17
18
Parameters:
19
- thread_critical: bool, whether to use thread-based storage (default False)
20
If False, uses context variables for async safety
21
If True, uses traditional thread-local storage
22
"""
23
24
def __getattr__(self, key):
25
"""
26
Get attribute from local storage.
27
28
Parameters:
29
- key: str, attribute name
30
31
Returns:
32
Any: Value stored for this context/thread
33
34
Raises:
35
AttributeError: If attribute not found in current context
36
"""
37
38
def __setattr__(self, key, value):
39
"""
40
Set attribute in local storage.
41
42
Parameters:
43
- key: str, attribute name
44
- value: Any, value to store for this context/thread
45
"""
46
47
def __delattr__(self, key):
48
"""
49
Delete attribute from local storage.
50
51
Parameters:
52
- key: str, attribute name to remove
53
54
Raises:
55
AttributeError: If attribute not found in current context
56
"""
57
```
58
59
## Usage Examples
60
61
### Basic Local Storage
62
63
```python
64
from asgiref.local import Local
65
import asyncio
66
67
# Create local storage instance
68
local = Local()
69
70
async def worker_task(worker_id):
71
# Each task gets its own storage context
72
local.worker_id = worker_id
73
local.data = []
74
75
for i in range(3):
76
local.data.append(f"item_{i}")
77
await asyncio.sleep(0.1)
78
79
print(f"Worker {local.worker_id}: {local.data}")
80
81
async def main():
82
# Run multiple tasks concurrently
83
tasks = [worker_task(i) for i in range(3)]
84
await asyncio.gather(*tasks)
85
86
# Each worker will have isolated storage
87
# asyncio.run(main())
88
```
89
90
### Request Context in Web Applications
91
92
```python
93
from asgiref.local import Local
94
95
# Global request context
96
request_context = Local()
97
98
async def asgi_app(scope, receive, send):
99
# Store request information in local context
100
request_context.scope = scope
101
request_context.user_id = scope.get('user', {}).get('id')
102
request_context.request_id = scope.get('headers', {}).get('x-request-id')
103
104
try:
105
await handle_request(scope, receive, send)
106
finally:
107
# Context automatically cleaned up when task ends
108
pass
109
110
async def handle_request(scope, receive, send):
111
# Can access request context from anywhere in the call stack
112
print(f"Handling request {request_context.request_id} for user {request_context.user_id}")
113
114
await business_logic()
115
116
await send({
117
'type': 'http.response.start',
118
'status': 200,
119
'headers': [[b'content-type', b'text/plain']],
120
})
121
await send({
122
'type': 'http.response.body',
123
'body': f'Request {request_context.request_id} processed'.encode(),
124
})
125
126
async def business_logic():
127
# Deep in the call stack, still has access to request context
128
if request_context.user_id:
129
print(f"Processing for authenticated user: {request_context.user_id}")
130
else:
131
print("Processing for anonymous user")
132
```
133
134
### Database Connection Management
135
136
```python
137
from asgiref.local import Local
138
import asyncio
139
140
class DatabaseManager:
141
def __init__(self):
142
self.local = Local()
143
144
async def get_connection(self):
145
"""Get or create connection for current context."""
146
if not hasattr(self.local, 'connection'):
147
# Create new connection for this context
148
self.local.connection = await self.create_connection()
149
print(f"Created new connection: {id(self.local.connection)}")
150
return self.local.connection
151
152
async def create_connection(self):
153
"""Simulate creating a database connection."""
154
await asyncio.sleep(0.1) # Simulate connection time
155
return {"id": id({}), "status": "connected"}
156
157
async def query(self, sql):
158
"""Execute query using context-local connection."""
159
conn = await self.get_connection()
160
print(f"Executing '{sql}' on connection {conn['id']}")
161
return {"result": "data", "connection_id": conn["id"]}
162
163
db = DatabaseManager()
164
165
async def service_function(service_id):
166
"""Service function that uses database."""
167
result1 = await db.query("SELECT * FROM users")
168
await asyncio.sleep(0.1)
169
result2 = await db.query("SELECT * FROM orders")
170
171
print(f"Service {service_id} results: {result1['connection_id']}, {result2['connection_id']}")
172
173
async def main():
174
# Each service call gets its own database connection
175
await asyncio.gather(
176
service_function("A"),
177
service_function("B"),
178
service_function("C")
179
)
180
181
# Each service will use its own connection consistently
182
# asyncio.run(main())
183
```
184
185
### Thread-Critical Mode
186
187
```python
188
from asgiref.local import Local
189
import threading
190
import asyncio
191
192
# Thread-critical local storage
193
thread_local = Local(thread_critical=True)
194
195
# Regular async-safe local storage
196
async_local = Local(thread_critical=False)
197
198
def thread_worker(thread_id):
199
"""Function running in separate thread."""
200
thread_local.thread_id = thread_id
201
async_local.thread_id = thread_id # This won't work across threads
202
203
print(f"Thread {thread_id} - thread_local: {thread_local.thread_id}")
204
205
try:
206
print(f"Thread {thread_id} - async_local: {async_local.thread_id}")
207
except AttributeError:
208
print(f"Thread {thread_id} - async_local: Not accessible from different thread")
209
210
async def async_worker(worker_id):
211
"""Async function running in same thread."""
212
async_local.worker_id = worker_id
213
print(f"Async worker {worker_id} - async_local: {async_local.worker_id}")
214
215
def demonstrate_differences():
216
# Set up initial values in main thread
217
thread_local.main_value = "main_thread"
218
async_local.main_value = "main_async"
219
220
# Start thread workers
221
threads = []
222
for i in range(2):
223
t = threading.Thread(target=thread_worker, args=(i,))
224
threads.append(t)
225
t.start()
226
227
# Wait for threads
228
for t in threads:
229
t.join()
230
231
# Run async workers in same thread
232
async def run_async_workers():
233
await asyncio.gather(
234
async_worker("A"),
235
async_worker("B")
236
)
237
238
asyncio.run(run_async_workers())
239
240
# demonstrate_differences()
241
```
242
243
### Middleware with Local Context
244
245
```python
246
from asgiref.local import Local
247
import time
248
import uuid
249
250
# Global context for request tracking
251
request_local = Local()
252
253
class RequestTrackingMiddleware:
254
"""Middleware that tracks request context."""
255
256
def __init__(self, app):
257
self.app = app
258
259
async def __call__(self, scope, receive, send):
260
# Initialize request context
261
request_local.request_id = str(uuid.uuid4())
262
request_local.start_time = time.time()
263
request_local.path = scope['path']
264
265
print(f"Request {request_local.request_id} started: {request_local.path}")
266
267
try:
268
await self.app(scope, receive, send)
269
finally:
270
duration = time.time() - request_local.start_time
271
print(f"Request {request_local.request_id} completed in {duration:.3f}s")
272
273
async def business_app(scope, receive, send):
274
"""Business application that uses request context."""
275
# Can access request context from anywhere
276
print(f"Processing request {request_local.request_id} for path {request_local.path}")
277
278
await asyncio.sleep(0.1) # Simulate work
279
280
await send({
281
'type': 'http.response.start',
282
'status': 200,
283
'headers': [[b'content-type', b'text/plain']],
284
})
285
await send({
286
'type': 'http.response.body',
287
'body': f'Processed by request {request_local.request_id}'.encode(),
288
})
289
290
# Wrap application with middleware
291
app = RequestTrackingMiddleware(business_app)
292
```
293
294
## Context Variable vs Thread Local
295
296
The `Local` class automatically chooses the appropriate storage mechanism:
297
298
- **Context Variables** (default): Works with asyncio tasks, coroutines, and async generators
299
- **Thread Local** (thread_critical=True): Works with traditional threading
300
301
Context variables are preferred for async applications as they properly isolate data across concurrent tasks running in the same thread.