0
# Asynchronous Processing
1
2
Async variants of all parsing functions for use with asyncio and async file objects. These functions enable non-blocking JSON processing in concurrent applications, web servers, and other async environments.
3
4
## Capabilities
5
6
### Async Object Extraction
7
8
Asynchronous version of `items()` for processing JSON objects without blocking the event loop.
9
10
```python { .api }
11
async def items_async(source, prefix, map_type=None, buf_size=64*1024, **config):
12
"""
13
Async version of items() for async file objects.
14
15
Parameters:
16
- source: Async file-like object with async read() method
17
- prefix (str): JSON path prefix targeting the objects to extract
18
- map_type (type, optional): Custom mapping type for objects (default: dict)
19
- buf_size (int): Buffer size for reading file data (default: 64*1024)
20
- **config: Backend-specific configuration options
21
22
Returns:
23
Async generator yielding Python objects
24
25
Raises:
26
- JSONError: For malformed JSON
27
- IncompleteJSONError: For truncated JSON data
28
"""
29
```
30
31
**Usage Examples:**
32
33
```python
34
import asyncio
35
import ijson
36
import aiofiles
37
38
async def process_large_json():
39
async with aiofiles.open('large_data.json', 'rb') as file:
40
async for item in ijson.items_async(file, 'data.item'):
41
await process_item_async(item)
42
43
# Run async processing
44
asyncio.run(process_large_json())
45
```
46
47
### Async Key-Value Processing
48
49
Asynchronous version of `kvitems()` for extracting key-value pairs without blocking.
50
51
```python { .api }
52
async def kvitems_async(source, prefix, map_type=None, buf_size=64*1024, **config):
53
"""
54
Async version of kvitems() for async file objects.
55
56
Parameters:
57
- source: Async file-like object with async read() method
58
- prefix (str): JSON path prefix targeting objects to extract pairs from
59
- map_type (type, optional): Custom mapping type for nested objects (default: dict)
60
- buf_size (int): Buffer size for reading file data (default: 64*1024)
61
- **config: Backend-specific configuration options
62
63
Returns:
64
Async generator yielding (key, value) tuples
65
66
Raises:
67
- JSONError: For malformed JSON
68
- IncompleteJSONError: For truncated JSON data
69
"""
70
```
71
72
**Usage Examples:**
73
74
```python
75
import asyncio
76
import ijson
77
import aiofiles
78
79
async def process_config():
80
async with aiofiles.open('config.json', 'rb') as file:
81
async for key, value in ijson.kvitems_async(file, 'settings'):
82
await apply_setting_async(key, value)
83
84
asyncio.run(process_config())
85
```
86
87
### Async Event Parsing
88
89
Asynchronous version of `parse()` providing events with path context.
90
91
```python { .api }
92
async def parse_async(source, buf_size=64*1024, **config):
93
"""
94
Async version of parse() for async file objects.
95
96
Parameters:
97
- source: Async file-like object with async read() method
98
- buf_size (int): Buffer size for reading file data (default: 64*1024)
99
- **config: Backend-specific configuration options
100
101
Returns:
102
Async generator yielding (prefix, event, value) tuples
103
104
Raises:
105
- JSONError: For malformed JSON
106
- IncompleteJSONError: For truncated JSON data
107
"""
108
```
109
110
**Usage Examples:**
111
112
```python
113
import asyncio
114
import ijson
115
import aiofiles
116
117
async def analyze_json_structure():
118
async with aiofiles.open('data.json', 'rb') as file:
119
async for prefix, event, value in ijson.parse_async(file):
120
if event == 'start_array':
121
print(f"Found array at: {prefix}")
122
elif event == 'start_map':
123
print(f"Found object at: {prefix}")
124
125
asyncio.run(analyze_json_structure())
126
```
127
128
### Async Low-Level Parsing
129
130
Asynchronous version of `basic_parse()` for low-level event processing.
131
132
```python { .api }
133
async def basic_parse_async(source, buf_size=64*1024, **config):
134
"""
135
Async version of basic_parse() for async file objects.
136
137
Parameters:
138
- source: Async file-like object with async read() method
139
- buf_size (int): Buffer size for reading file data (default: 64*1024)
140
- **config: Backend-specific configuration options
141
142
Returns:
143
Async generator yielding (event, value) tuples
144
145
Raises:
146
- JSONError: For malformed JSON
147
- IncompleteJSONError: For truncated JSON data
148
"""
149
```
150
151
**Usage Examples:**
152
153
```python
154
import asyncio
155
import ijson
156
import aiofiles
157
from ijson.common import ObjectBuilder
158
159
async def build_objects_async():
160
builder = ObjectBuilder()
161
async with aiofiles.open('data.json', 'rb') as file:
162
async for event, value in ijson.basic_parse_async(file):
163
builder.event(event, value)
164
if event == 'end_map': # Complete object
165
yield builder.value
166
builder = ObjectBuilder()
167
168
async def main():
169
async for obj in build_objects_async():
170
await process_object_async(obj)
171
172
asyncio.run(main())
173
```
174
175
## Async File Support
176
177
The async functions automatically detect and work with async file objects that have an `async read()` method:
178
179
```python
180
import asyncio
181
import ijson
182
183
class AsyncStringReader:
184
def __init__(self, data):
185
self.data = data
186
self.pos = 0
187
188
async def read(self, size=-1):
189
if self.pos >= len(self.data):
190
return b''
191
if size == -1:
192
result = self.data[self.pos:].encode('utf-8')
193
self.pos = len(self.data)
194
else:
195
result = self.data[self.pos:self.pos + size].encode('utf-8')
196
self.pos += size
197
return result
198
199
async def parse_custom_async():
200
json_data = '{"items": [1, 2, 3, 4, 5]}'
201
reader = AsyncStringReader(json_data)
202
203
async for item in ijson.items_async(reader, 'items.item'):
204
print(f"Item: {item}")
205
206
asyncio.run(parse_custom_async())
207
```
208
209
## Web Framework Integration
210
211
### FastAPI Example
212
213
```python
214
from fastapi import FastAPI, UploadFile
215
import ijson
216
217
app = FastAPI()
218
219
@app.post("/process-json/")
220
async def process_json(file: UploadFile):
221
results = []
222
async for item in ijson.items_async(file.file, 'data.item'):
223
# Process each item without loading entire file
224
processed = await process_item_async(item)
225
results.append(processed)
226
return {"processed_count": len(results)}
227
```
228
229
### aiohttp Example
230
231
```python
232
from aiohttp import web
233
import ijson
234
235
async def handle_json_upload(request):
236
reader = await request.multipart()
237
field = await reader.next()
238
239
results = []
240
async for item in ijson.items_async(field, 'records.item'):
241
processed = await process_record_async(item)
242
results.append(processed)
243
244
return web.json_response({"status": "processed", "count": len(results)})
245
246
app = web.Application()
247
app.router.add_post('/upload', handle_json_upload)
248
```
249
250
## Performance Considerations
251
252
### Concurrency Benefits
253
254
Async functions enable processing multiple JSON streams concurrently:
255
256
```python
257
import asyncio
258
import ijson
259
import aiofiles
260
261
async def process_file(filename):
262
async with aiofiles.open(filename, 'rb') as file:
263
async for item in ijson.items_async(file, 'data.item'):
264
await process_item_async(item)
265
266
async def process_multiple_files(filenames):
267
tasks = [process_file(filename) for filename in filenames]
268
await asyncio.gather(*tasks)
269
270
# Process multiple large JSON files concurrently
271
files = ['data1.json', 'data2.json', 'data3.json']
272
asyncio.run(process_multiple_files(files))
273
```
274
275
### Memory Efficiency
276
277
Async functions maintain the same memory efficiency benefits as their sync counterparts while enabling non-blocking I/O:
278
279
```python
280
import asyncio
281
import ijson
282
import aiofiles
283
284
async def stream_large_dataset():
285
# Process 1GB+ JSON file without blocking event loop
286
async with aiofiles.open('huge_dataset.json', 'rb') as file:
287
count = 0
288
async for record in ijson.items_async(file, 'records.item'):
289
await process_record_async(record)
290
count += 1
291
if count % 1000 == 0:
292
print(f"Processed {count} records")
293
294
asyncio.run(stream_large_dataset())
295
```
296
297
## Error Handling
298
299
Async functions raise the same exceptions as their sync counterparts:
300
301
```python
302
import asyncio
303
import ijson
304
from ijson.common import JSONError, IncompleteJSONError
305
306
async def safe_async_parsing(source):
307
try:
308
async for item in ijson.items_async(source, 'data.item'):
309
await process_item_async(item)
310
except IncompleteJSONError:
311
print("JSON stream was incomplete")
312
except JSONError as e:
313
print(f"JSON parsing error: {e}")
314
except Exception as e:
315
print(f"Unexpected error: {e}")
316
```
317
318
## Compatibility
319
320
- Requires Python 3.5+ for async/await syntax
321
- Compatible with all major async frameworks (asyncio, FastAPI, aiohttp, etc.)
322
- Works with async file objects from aiofiles, httpx, and custom implementations
323
- Maintains same backend performance characteristics as sync versions