CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ijson

Iterative JSON parser with standard Python iterator interfaces for processing large JSON data streams without loading entire documents into memory

Pending
Overview
Eval results
Files

async-processing.mddocs/

Asynchronous Processing

Async variants of all parsing functions for use with asyncio and async file objects. These functions enable non-blocking JSON processing in concurrent applications, web servers, and other async environments.

Capabilities

Async Object Extraction

Asynchronous version of items() for processing JSON objects without blocking the event loop.

async def items_async(source, prefix, map_type=None, buf_size=64*1024, **config):
    """
    Async version of items() for async file objects.
    
    Parameters:
    - source: Async file-like object with async read() method
    - prefix (str): JSON path prefix targeting the objects to extract
    - map_type (type, optional): Custom mapping type for objects (default: dict)
    - buf_size (int): Buffer size for reading file data (default: 64*1024)
    - **config: Backend-specific configuration options
    
    Returns:
    Async generator yielding Python objects
    
    Raises:
    - JSONError: For malformed JSON
    - IncompleteJSONError: For truncated JSON data
    """

Usage Examples:

import asyncio
import ijson
import aiofiles

async def process_large_json():
    async with aiofiles.open('large_data.json', 'rb') as file:
        async for item in ijson.items_async(file, 'data.item'):
            await process_item_async(item)

# Run async processing
asyncio.run(process_large_json())

Async Key-Value Processing

Asynchronous version of kvitems() for extracting key-value pairs without blocking.

async def kvitems_async(source, prefix, map_type=None, buf_size=64*1024, **config):
    """
    Async version of kvitems() for async file objects.
    
    Parameters:
    - source: Async file-like object with async read() method
    - prefix (str): JSON path prefix targeting objects to extract pairs from
    - map_type (type, optional): Custom mapping type for nested objects (default: dict)
    - buf_size (int): Buffer size for reading file data (default: 64*1024)
    - **config: Backend-specific configuration options
    
    Returns:
    Async generator yielding (key, value) tuples
    
    Raises:
    - JSONError: For malformed JSON
    - IncompleteJSONError: For truncated JSON data
    """

Usage Examples:

import asyncio
import ijson
import aiofiles

async def process_config():
    async with aiofiles.open('config.json', 'rb') as file:
        async for key, value in ijson.kvitems_async(file, 'settings'):
            await apply_setting_async(key, value)

asyncio.run(process_config())

Async Event Parsing

Asynchronous version of parse() providing events with path context.

async def parse_async(source, buf_size=64*1024, **config):
    """
    Async version of parse() for async file objects.
    
    Parameters:
    - source: Async file-like object with async read() method
    - buf_size (int): Buffer size for reading file data (default: 64*1024)
    - **config: Backend-specific configuration options
    
    Returns:
    Async generator yielding (prefix, event, value) tuples
    
    Raises:
    - JSONError: For malformed JSON
    - IncompleteJSONError: For truncated JSON data
    """

Usage Examples:

import asyncio
import ijson
import aiofiles

async def analyze_json_structure():
    async with aiofiles.open('data.json', 'rb') as file:
        async for prefix, event, value in ijson.parse_async(file):
            if event == 'start_array':
                print(f"Found array at: {prefix}")
            elif event == 'start_map':
                print(f"Found object at: {prefix}")

asyncio.run(analyze_json_structure())

Async Low-Level Parsing

Asynchronous version of basic_parse() for low-level event processing.

async def basic_parse_async(source, buf_size=64*1024, **config):
    """
    Async version of basic_parse() for async file objects.
    
    Parameters:
    - source: Async file-like object with async read() method
    - buf_size (int): Buffer size for reading file data (default: 64*1024)
    - **config: Backend-specific configuration options
    
    Returns:
    Async generator yielding (event, value) tuples
    
    Raises:
    - JSONError: For malformed JSON
    - IncompleteJSONError: For truncated JSON data
    """

Usage Examples:

import asyncio
import ijson
import aiofiles
from ijson.common import ObjectBuilder

async def build_objects_async():
    builder = ObjectBuilder()
    async with aiofiles.open('data.json', 'rb') as file:
        async for event, value in ijson.basic_parse_async(file):
            builder.event(event, value)
            if event == 'end_map':  # Complete object
                yield builder.value
                builder = ObjectBuilder()

async def main():
    async for obj in build_objects_async():
        await process_object_async(obj)

asyncio.run(main())

Async File Support

The async functions automatically detect and work with async file objects that have an async read() method:

import asyncio
import ijson

class AsyncStringReader:
    def __init__(self, data):
        self.data = data
        self.pos = 0
    
    async def read(self, size=-1):
        if self.pos >= len(self.data):
            return b''
        if size == -1:
            result = self.data[self.pos:].encode('utf-8')
            self.pos = len(self.data)
        else:
            result = self.data[self.pos:self.pos + size].encode('utf-8')
            self.pos += size
        return result

async def parse_custom_async():
    json_data = '{"items": [1, 2, 3, 4, 5]}'
    reader = AsyncStringReader(json_data)
    
    async for item in ijson.items_async(reader, 'items.item'):
        print(f"Item: {item}")

asyncio.run(parse_custom_async())

Web Framework Integration

FastAPI Example

from fastapi import FastAPI, UploadFile
import ijson

app = FastAPI()

@app.post("/process-json/")
async def process_json(file: UploadFile):
    results = []
    async for item in ijson.items_async(file.file, 'data.item'):
        # Process each item without loading entire file
        processed = await process_item_async(item)
        results.append(processed)
    return {"processed_count": len(results)}

aiohttp Example

from aiohttp import web
import ijson

async def handle_json_upload(request):
    reader = await request.multipart()
    field = await reader.next()
    
    results = []
    async for item in ijson.items_async(field, 'records.item'):
        processed = await process_record_async(item)
        results.append(processed)
    
    return web.json_response({"status": "processed", "count": len(results)})

app = web.Application()
app.router.add_post('/upload', handle_json_upload)

Performance Considerations

Concurrency Benefits

Async functions enable processing multiple JSON streams concurrently:

import asyncio
import ijson
import aiofiles

async def process_file(filename):
    async with aiofiles.open(filename, 'rb') as file:
        async for item in ijson.items_async(file, 'data.item'):
            await process_item_async(item)

async def process_multiple_files(filenames):
    tasks = [process_file(filename) for filename in filenames]
    await asyncio.gather(*tasks)

# Process multiple large JSON files concurrently
files = ['data1.json', 'data2.json', 'data3.json']
asyncio.run(process_multiple_files(files))

Memory Efficiency

Async functions maintain the same memory efficiency benefits as their sync counterparts while enabling non-blocking I/O:

import asyncio
import ijson
import aiofiles

async def stream_large_dataset():
    # Process 1GB+ JSON file without blocking event loop
    async with aiofiles.open('huge_dataset.json', 'rb') as file:
        count = 0
        async for record in ijson.items_async(file, 'records.item'):
            await process_record_async(record)
            count += 1
            if count % 1000 == 0:
                print(f"Processed {count} records")

asyncio.run(stream_large_dataset())

Error Handling

Async functions raise the same exceptions as their sync counterparts:

import asyncio
import ijson
from ijson.common import JSONError, IncompleteJSONError

async def safe_async_parsing(source):
    try:
        async for item in ijson.items_async(source, 'data.item'):
            await process_item_async(item)
    except IncompleteJSONError:
        print("JSON stream was incomplete")
    except JSONError as e:
        print(f"JSON parsing error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

Compatibility

  • Requires Python 3.5+ for async/await syntax
  • Compatible with all major async frameworks (asyncio, FastAPI, aiohttp, etc.)
  • Works with async file objects from aiofiles, httpx, and custom implementations
  • Maintains same backend performance characteristics as sync versions

Install with Tessl CLI

npx tessl i tessl/pypi-ijson

docs

async-processing.md

backends.md

coroutines.md

high-level-parsing.md

index.md

tile.json