Iterative JSON parser with standard Python iterator interfaces for processing large JSON data streams without loading entire documents into memory
—
Async variants of all parsing functions for use with asyncio and async file objects. These functions enable non-blocking JSON processing in concurrent applications, web servers, and other async environments.
Asynchronous version of items() for processing JSON objects without blocking the event loop.
async def items_async(source, prefix, map_type=None, buf_size=64*1024, **config):
"""
Async version of items() for async file objects.
Parameters:
- source: Async file-like object with async read() method
- prefix (str): JSON path prefix targeting the objects to extract
- map_type (type, optional): Custom mapping type for objects (default: dict)
- buf_size (int): Buffer size for reading file data (default: 64*1024)
- **config: Backend-specific configuration options
Returns:
Async generator yielding Python objects
Raises:
- JSONError: For malformed JSON
- IncompleteJSONError: For truncated JSON data
"""Usage Examples:
import asyncio
import ijson
import aiofiles
async def process_large_json():
async with aiofiles.open('large_data.json', 'rb') as file:
async for item in ijson.items_async(file, 'data.item'):
await process_item_async(item)
# Run async processing
asyncio.run(process_large_json())Asynchronous version of kvitems() for extracting key-value pairs without blocking.
async def kvitems_async(source, prefix, map_type=None, buf_size=64*1024, **config):
"""
Async version of kvitems() for async file objects.
Parameters:
- source: Async file-like object with async read() method
- prefix (str): JSON path prefix targeting objects to extract pairs from
- map_type (type, optional): Custom mapping type for nested objects (default: dict)
- buf_size (int): Buffer size for reading file data (default: 64*1024)
- **config: Backend-specific configuration options
Returns:
Async generator yielding (key, value) tuples
Raises:
- JSONError: For malformed JSON
- IncompleteJSONError: For truncated JSON data
"""Usage Examples:
import asyncio
import ijson
import aiofiles
async def process_config():
async with aiofiles.open('config.json', 'rb') as file:
async for key, value in ijson.kvitems_async(file, 'settings'):
await apply_setting_async(key, value)
asyncio.run(process_config())Asynchronous version of parse() providing events with path context.
async def parse_async(source, buf_size=64*1024, **config):
"""
Async version of parse() for async file objects.
Parameters:
- source: Async file-like object with async read() method
- buf_size (int): Buffer size for reading file data (default: 64*1024)
- **config: Backend-specific configuration options
Returns:
Async generator yielding (prefix, event, value) tuples
Raises:
- JSONError: For malformed JSON
- IncompleteJSONError: For truncated JSON data
"""Usage Examples:
import asyncio
import ijson
import aiofiles
async def analyze_json_structure():
async with aiofiles.open('data.json', 'rb') as file:
async for prefix, event, value in ijson.parse_async(file):
if event == 'start_array':
print(f"Found array at: {prefix}")
elif event == 'start_map':
print(f"Found object at: {prefix}")
asyncio.run(analyze_json_structure())Asynchronous version of basic_parse() for low-level event processing.
async def basic_parse_async(source, buf_size=64*1024, **config):
"""
Async version of basic_parse() for async file objects.
Parameters:
- source: Async file-like object with async read() method
- buf_size (int): Buffer size for reading file data (default: 64*1024)
- **config: Backend-specific configuration options
Returns:
Async generator yielding (event, value) tuples
Raises:
- JSONError: For malformed JSON
- IncompleteJSONError: For truncated JSON data
"""Usage Examples:
import asyncio
import ijson
import aiofiles
from ijson.common import ObjectBuilder
async def build_objects_async():
builder = ObjectBuilder()
async with aiofiles.open('data.json', 'rb') as file:
async for event, value in ijson.basic_parse_async(file):
builder.event(event, value)
if event == 'end_map': # Complete object
yield builder.value
builder = ObjectBuilder()
async def main():
async for obj in build_objects_async():
await process_object_async(obj)
asyncio.run(main())The async functions automatically detect and work with async file objects that have an async read() method:
import asyncio
import ijson
class AsyncStringReader:
def __init__(self, data):
self.data = data
self.pos = 0
async def read(self, size=-1):
if self.pos >= len(self.data):
return b''
if size == -1:
result = self.data[self.pos:].encode('utf-8')
self.pos = len(self.data)
else:
result = self.data[self.pos:self.pos + size].encode('utf-8')
self.pos += size
return result
async def parse_custom_async():
json_data = '{"items": [1, 2, 3, 4, 5]}'
reader = AsyncStringReader(json_data)
async for item in ijson.items_async(reader, 'items.item'):
print(f"Item: {item}")
asyncio.run(parse_custom_async())from fastapi import FastAPI, UploadFile
import ijson
app = FastAPI()
@app.post("/process-json/")
async def process_json(file: UploadFile):
results = []
async for item in ijson.items_async(file.file, 'data.item'):
# Process each item without loading entire file
processed = await process_item_async(item)
results.append(processed)
return {"processed_count": len(results)}from aiohttp import web
import ijson
async def handle_json_upload(request):
reader = await request.multipart()
field = await reader.next()
results = []
async for item in ijson.items_async(field, 'records.item'):
processed = await process_record_async(item)
results.append(processed)
return web.json_response({"status": "processed", "count": len(results)})
app = web.Application()
app.router.add_post('/upload', handle_json_upload)Async functions enable processing multiple JSON streams concurrently:
import asyncio
import ijson
import aiofiles
async def process_file(filename):
async with aiofiles.open(filename, 'rb') as file:
async for item in ijson.items_async(file, 'data.item'):
await process_item_async(item)
async def process_multiple_files(filenames):
tasks = [process_file(filename) for filename in filenames]
await asyncio.gather(*tasks)
# Process multiple large JSON files concurrently
files = ['data1.json', 'data2.json', 'data3.json']
asyncio.run(process_multiple_files(files))Async functions maintain the same memory efficiency benefits as their sync counterparts while enabling non-blocking I/O:
import asyncio
import ijson
import aiofiles
async def stream_large_dataset():
# Process 1GB+ JSON file without blocking event loop
async with aiofiles.open('huge_dataset.json', 'rb') as file:
count = 0
async for record in ijson.items_async(file, 'records.item'):
await process_record_async(record)
count += 1
if count % 1000 == 0:
print(f"Processed {count} records")
asyncio.run(stream_large_dataset())Async functions raise the same exceptions as their sync counterparts:
import asyncio
import ijson
from ijson.common import JSONError, IncompleteJSONError
async def safe_async_parsing(source):
try:
async for item in ijson.items_async(source, 'data.item'):
await process_item_async(item)
except IncompleteJSONError:
print("JSON stream was incomplete")
except JSONError as e:
print(f"JSON parsing error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-ijson