CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ijson

Iterative JSON parser with standard Python iterator interfaces for processing large JSON data streams without loading entire documents into memory

Pending
Overview
Eval results
Files

coroutines.mddocs/

Low-Level Coroutines

Coroutine-based parsing pipeline components for building custom JSON processing workflows. These provide maximum flexibility for advanced use cases requiring custom processing logic, filtering, transformation, or integration with existing coroutine-based systems.

Capabilities

Basic Parsing Coroutines

Low-level coroutine for processing raw JSON parsing events without path context.

def basic_parse_coro(target, **config):
    """
    Coroutine for low-level parsing events.
    
    Parameters:
    - target: Coroutine or object with send() method to receive events
    - **config: Backend-specific configuration options
    
    Returns:
    Coroutine that accepts data chunks and sends (event, value) tuples to target
    
    Events sent to target:
    - ('null', None): JSON null value
    - ('boolean', bool): JSON boolean value  
    - ('number', int/Decimal): JSON number value
    - ('string', str): JSON string value
    - ('map_key', str): JSON object key
    - ('start_map', None): Start of JSON object
    - ('end_map', None): End of JSON object
    - ('start_array', None): Start of JSON array
    - ('end_array', None): End of JSON array
    """

Context-Aware Parsing Coroutines

Coroutine that adds path context to parsing events, providing full location information within the JSON document.

def parse_coro(target, **config):
    """
    Coroutine for parsing with path context.
    
    Parameters:
    - target: Coroutine or object with send() method to receive events
    - **config: Backend-specific configuration options
    
    Returns:
    Coroutine that accepts data chunks and sends (prefix, event, value) tuples to target
    
    Events sent to target:
    - (prefix, event, value) where prefix is the JSON path string
    """

Object Extraction Coroutines

Coroutine for extracting complete Python objects from specific locations in JSON streams.

def items_coro(target, prefix, map_type=None, **config):
    """
    Coroutine for extracting objects under prefix.
    
    Parameters:
    - target: Coroutine or object with send() method to receive objects
    - prefix (str): JSON path prefix targeting objects to extract
    - map_type (type, optional): Custom mapping type for objects (default: dict)
    - **config: Backend-specific configuration options
    
    Returns:
    Coroutine that accepts data chunks and sends Python objects to target
    """

Key-Value Extraction Coroutines

Coroutine for extracting key-value pairs from JSON objects.

def kvitems_coro(target, prefix, map_type=None, **config):
    """
    Coroutine for extracting key-value pairs under prefix.
    
    Parameters:
    - target: Coroutine or object with send() method to receive pairs
    - prefix (str): JSON path prefix targeting objects to extract pairs from
    - map_type (type, optional): Custom mapping type for nested objects (default: dict)
    - **config: Backend-specific configuration options
    
    Returns:
    Coroutine that accepts data chunks and sends (key, value) tuples to target
    """

Coroutine Utilities

Pipeline Construction

def chain(sink, *coro_pipeline):
    """
    Chain coroutines into a processing pipeline.
    
    Parameters:
    - sink: Final destination for processed data (coroutine or sendable object)
    - *coro_pipeline: Tuples of (coroutine_func, args, kwargs) defining pipeline stages
    
    Returns:
    Chained coroutine that feeds data through the entire pipeline
    """

Sendable Collections

class sendable_list(list):
    """
    List that can receive data via send() method for use as pipeline sink.
    
    Methods:
    - send(value): Append value to list (alias for append)
    """

Coroutine Decorator

def coroutine(func):
    """
    Decorator for generator-based coroutines.
    
    Automatically advances coroutine to first yield point.
    Required for proper coroutine initialization in Python.
    """

Pipeline to Generator Conversion

def coros2gen(source, *coro_pipeline):
    """
    Convert coroutine pipeline to generator.
    
    Parameters:
    - source: Iterable providing input data
    - *coro_pipeline: Pipeline specification tuples
    
    Returns:
    Generator yielding results from coroutine pipeline
    """

File Data Source

def file_source(f, buf_size=64*1024):
    """
    Generator that yields data from a file-like object.
    
    Parameters:
    - f: File-like object with read() method
    - buf_size (int): Buffer size for reading chunks (default: 64*1024)
    
    Returns:
    Generator yielding data chunks from file
    """

Usage Examples

Custom Event Filtering

import ijson
from ijson.utils import sendable_list, chain

# Custom coroutine to filter events
@ijson.utils.coroutine
def filter_strings(target):
    while True:
        event, value = (yield)
        if event == 'string' and len(value) > 10:
            target.send((event, value))

# Build processing pipeline
results = sendable_list()
json_data = '{"short": "hi", "long": "this is a long string", "number": 42}'

# Chain: data -> basic_parse -> filter_strings -> results
pipeline = chain(
    results,
    (filter_strings, (), {}),
    (ijson.basic_parse_coro, (), {})
)

# Send data through pipeline
for chunk in [json_data]:
    pipeline.send(chunk)
pipeline.close()

print(results)  # [('string', 'this is a long string')]

Custom Object Transformation

import ijson
from ijson.utils import sendable_list, chain

# Transform objects as they're extracted
@ijson.utils.coroutine
def transform_users(target):
    while True:
        user = (yield)
        # Add computed field
        user['display_name'] = f"{user.get('first', '')} {user.get('last', '')}"
        target.send(user)

# Process JSON with transformation
json_data = '{"users": [{"first": "Alice", "last": "Smith"}, {"first": "Bob", "last": "Jones"}]}'
results = sendable_list()

pipeline = chain(
    results,
    (transform_users, (), {}),
    (ijson.items_coro, ('users.item',), {})
)

for chunk in [json_data]:
    pipeline.send(chunk)
pipeline.close()

for user in results:
    print(user['display_name'])  # "Alice Smith", "Bob Jones"

Multi-Stage Processing Pipeline

import ijson
from ijson.utils import sendable_list, chain, coros2gen

# First stage: Extract items
@ijson.utils.coroutine  
def validate_items(target):
    while True:
        item = (yield)
        if 'id' in item and 'name' in item:
            target.send(item)

# Second stage: Add metadata
@ijson.utils.coroutine
def add_metadata(target):
    counter = 0
    while True:
        item = (yield)
        counter += 1
        item['_sequence'] = counter
        item['_processed'] = True
        target.send(item)

# Use as generator
json_data = '{"items": [{"id": 1, "name": "A"}, {"id": 2}, {"id": 3, "name": "C"}]}'

# Convert pipeline to generator
processed_items = coros2gen(
    [json_data],
    (add_metadata, (), {}),
    (validate_items, (), {}), 
    (ijson.items_coro, ('items.item',), {})
)

for item in processed_items:
    print(item)
# {'id': 1, 'name': 'A', '_sequence': 1, '_processed': True}
# {'id': 3, 'name': 'C', '_sequence': 2, '_processed': True}

Real-Time Stream Processing

import ijson
from ijson.utils import coroutine, chain

# Real-time processing coroutine
@coroutine
def real_time_processor(target):
    batch = []
    while True:
        try:
            item = (yield)
            batch.append(item)
            
            if len(batch) >= 10:  # Process in batches
                processed_batch = process_batch(batch)
                for result in processed_batch:
                    target.send(result)
                batch = []
        except GeneratorExit:
            # Process remaining items
            if batch:
                processed_batch = process_batch(batch)
                for result in processed_batch:
                    target.send(result)
            target.close()
            break

def process_batch(items):
    # Simulate batch processing
    return [{'processed': item, 'batch_size': len(items)} for item in items]

# Set up real-time processing
results = sendable_list()
processor = chain(
    results,
    (real_time_processor, (), {}),
    (ijson.items_coro, ('stream.item',), {})
)

# Simulate streaming data
stream_data = '{"stream": [' + ','.join([f'{{"id": {i}}}' for i in range(25)]) + ']}'
processor.send(stream_data)
processor.close()

print(f"Processed {len(results)} items in batches")

Error Handling in Pipelines

import ijson
from ijson.utils import coroutine, sendable_list, chain
from ijson.common import JSONError

@coroutine
def error_handler(target):
    while True:
        try:
            data = (yield)
            target.send(data)
        except JSONError as e:
            # Handle JSON errors gracefully
            error_info = {'error': 'JSON parsing failed', 'details': str(e)}
            target.send(error_info)
        except Exception as e:
            # Handle other errors
            error_info = {'error': 'Processing failed', 'details': str(e)}
            target.send(error_info)

# Pipeline with error handling
results = sendable_list()
safe_parser = chain(
    results,
    (error_handler, (), {}),
    (ijson.items_coro, ('data.item',), {})
)

# Test with malformed JSON
malformed_json = '{"data": [{"valid": true}, {"invalid": }]}'
try:
    safe_parser.send(malformed_json)
    safe_parser.close()
except:
    pass

for result in results:
    print(result)

Advanced Patterns

Custom Backend Integration

import ijson
from ijson.utils import coroutine

@coroutine
def custom_number_handler(target):
    """Convert all numbers to strings"""
    while True:
        event, value = (yield)
        if event == 'number':
            target.send(('string', str(value)))
        else:
            target.send((event, value))

# Create custom parsing pipeline
def parse_with_string_numbers(source):
    results = sendable_list()
    pipeline = chain(
        results,
        (custom_number_handler, (), {}),
        (ijson.basic_parse_coro, (), {})
    )
    
    for chunk in source if hasattr(source, '__iter__') else [source]:
        pipeline.send(chunk)
    pipeline.close()
    
    return results

Memory-Efficient Processing

import ijson
from ijson.utils import coroutine

@coroutine
def memory_efficient_processor(target):
    """Process items immediately without accumulation"""
    while True:
        item = (yield)
        # Process immediately and send result
        processed = process_item_immediately(item)
        target.send(processed)
        # Item is garbage collected here

def process_large_stream(source, prefix):
    """Process large JSON streams with minimal memory usage"""
    @coroutine
    def streaming_sink(target):
        while True:
            result = (yield)
            # Handle result immediately (save to DB, send to API, etc.)
            handle_result_immediately(result)
    
    sink = streaming_sink(None)
    pipeline = chain(
        sink,
        (memory_efficient_processor, (), {}),
        (ijson.items_coro, (prefix,), {})
    )
    
    return pipeline

Performance Considerations

  • Coroutines: More flexible but slightly slower than direct function calls
  • Pipeline Depth: Deeper pipelines have more overhead but enable complex processing
  • Memory Usage: Coroutines maintain minimal state, enabling efficient stream processing
  • Error Propagation: Exceptions bubble up through pipeline stages
  • Generator Conversion: coros2gen() adds iteration overhead but provides familiar interface

Install with Tessl CLI

npx tessl i tessl/pypi-ijson

docs

async-processing.md

backends.md

coroutines.md

high-level-parsing.md

index.md

tile.json