Iterative JSON parser with standard Python iterator interfaces for processing large JSON data streams without loading entire documents into memory
—
Coroutine-based parsing pipeline components for building custom JSON processing workflows. These provide maximum flexibility for advanced use cases requiring custom processing logic, filtering, transformation, or integration with existing coroutine-based systems.
Low-level coroutine for processing raw JSON parsing events without path context.
def basic_parse_coro(target, **config):
"""
Coroutine for low-level parsing events.
Parameters:
- target: Coroutine or object with send() method to receive events
- **config: Backend-specific configuration options
Returns:
Coroutine that accepts data chunks and sends (event, value) tuples to target
Events sent to target:
- ('null', None): JSON null value
- ('boolean', bool): JSON boolean value
- ('number', int/Decimal): JSON number value
- ('string', str): JSON string value
- ('map_key', str): JSON object key
- ('start_map', None): Start of JSON object
- ('end_map', None): End of JSON object
- ('start_array', None): Start of JSON array
- ('end_array', None): End of JSON array
"""Coroutine that adds path context to parsing events, providing full location information within the JSON document.
def parse_coro(target, **config):
"""
Coroutine for parsing with path context.
Parameters:
- target: Coroutine or object with send() method to receive events
- **config: Backend-specific configuration options
Returns:
Coroutine that accepts data chunks and sends (prefix, event, value) tuples to target
Events sent to target:
- (prefix, event, value) where prefix is the JSON path string
"""Coroutine for extracting complete Python objects from specific locations in JSON streams.
def items_coro(target, prefix, map_type=None, **config):
"""
Coroutine for extracting objects under prefix.
Parameters:
- target: Coroutine or object with send() method to receive objects
- prefix (str): JSON path prefix targeting objects to extract
- map_type (type, optional): Custom mapping type for objects (default: dict)
- **config: Backend-specific configuration options
Returns:
Coroutine that accepts data chunks and sends Python objects to target
"""Coroutine for extracting key-value pairs from JSON objects.
def kvitems_coro(target, prefix, map_type=None, **config):
"""
Coroutine for extracting key-value pairs under prefix.
Parameters:
- target: Coroutine or object with send() method to receive pairs
- prefix (str): JSON path prefix targeting objects to extract pairs from
- map_type (type, optional): Custom mapping type for nested objects (default: dict)
- **config: Backend-specific configuration options
Returns:
Coroutine that accepts data chunks and sends (key, value) tuples to target
"""def chain(sink, *coro_pipeline):
"""
Chain coroutines into a processing pipeline.
Parameters:
- sink: Final destination for processed data (coroutine or sendable object)
- *coro_pipeline: Tuples of (coroutine_func, args, kwargs) defining pipeline stages
Returns:
Chained coroutine that feeds data through the entire pipeline
"""class sendable_list(list):
"""
List that can receive data via send() method for use as pipeline sink.
Methods:
- send(value): Append value to list (alias for append)
"""def coroutine(func):
"""
Decorator for generator-based coroutines.
Automatically advances coroutine to first yield point.
Required for proper coroutine initialization in Python.
"""def coros2gen(source, *coro_pipeline):
"""
Convert coroutine pipeline to generator.
Parameters:
- source: Iterable providing input data
- *coro_pipeline: Pipeline specification tuples
Returns:
Generator yielding results from coroutine pipeline
"""def file_source(f, buf_size=64*1024):
"""
Generator that yields data from a file-like object.
Parameters:
- f: File-like object with read() method
- buf_size (int): Buffer size for reading chunks (default: 64*1024)
Returns:
Generator yielding data chunks from file
"""import ijson
from ijson.utils import sendable_list, chain
# Custom coroutine to filter events
@ijson.utils.coroutine
def filter_strings(target):
while True:
event, value = (yield)
if event == 'string' and len(value) > 10:
target.send((event, value))
# Build processing pipeline
results = sendable_list()
json_data = '{"short": "hi", "long": "this is a long string", "number": 42}'
# Chain: data -> basic_parse -> filter_strings -> results
pipeline = chain(
results,
(filter_strings, (), {}),
(ijson.basic_parse_coro, (), {})
)
# Send data through pipeline
for chunk in [json_data]:
pipeline.send(chunk)
pipeline.close()
print(results) # [('string', 'this is a long string')]import ijson
from ijson.utils import sendable_list, chain
# Transform objects as they're extracted
@ijson.utils.coroutine
def transform_users(target):
while True:
user = (yield)
# Add computed field
user['display_name'] = f"{user.get('first', '')} {user.get('last', '')}"
target.send(user)
# Process JSON with transformation
json_data = '{"users": [{"first": "Alice", "last": "Smith"}, {"first": "Bob", "last": "Jones"}]}'
results = sendable_list()
pipeline = chain(
results,
(transform_users, (), {}),
(ijson.items_coro, ('users.item',), {})
)
for chunk in [json_data]:
pipeline.send(chunk)
pipeline.close()
for user in results:
print(user['display_name']) # "Alice Smith", "Bob Jones"import ijson
from ijson.utils import sendable_list, chain, coros2gen
# First stage: Extract items
@ijson.utils.coroutine
def validate_items(target):
while True:
item = (yield)
if 'id' in item and 'name' in item:
target.send(item)
# Second stage: Add metadata
@ijson.utils.coroutine
def add_metadata(target):
counter = 0
while True:
item = (yield)
counter += 1
item['_sequence'] = counter
item['_processed'] = True
target.send(item)
# Use as generator
json_data = '{"items": [{"id": 1, "name": "A"}, {"id": 2}, {"id": 3, "name": "C"}]}'
# Convert pipeline to generator
processed_items = coros2gen(
[json_data],
(add_metadata, (), {}),
(validate_items, (), {}),
(ijson.items_coro, ('items.item',), {})
)
for item in processed_items:
print(item)
# {'id': 1, 'name': 'A', '_sequence': 1, '_processed': True}
# {'id': 3, 'name': 'C', '_sequence': 2, '_processed': True}import ijson
from ijson.utils import coroutine, chain
# Real-time processing coroutine
@coroutine
def real_time_processor(target):
batch = []
while True:
try:
item = (yield)
batch.append(item)
if len(batch) >= 10: # Process in batches
processed_batch = process_batch(batch)
for result in processed_batch:
target.send(result)
batch = []
except GeneratorExit:
# Process remaining items
if batch:
processed_batch = process_batch(batch)
for result in processed_batch:
target.send(result)
target.close()
break
def process_batch(items):
# Simulate batch processing
return [{'processed': item, 'batch_size': len(items)} for item in items]
# Set up real-time processing
results = sendable_list()
processor = chain(
results,
(real_time_processor, (), {}),
(ijson.items_coro, ('stream.item',), {})
)
# Simulate streaming data
stream_data = '{"stream": [' + ','.join([f'{{"id": {i}}}' for i in range(25)]) + ']}'
processor.send(stream_data)
processor.close()
print(f"Processed {len(results)} items in batches")import ijson
from ijson.utils import coroutine, sendable_list, chain
from ijson.common import JSONError
@coroutine
def error_handler(target):
while True:
try:
data = (yield)
target.send(data)
except JSONError as e:
# Handle JSON errors gracefully
error_info = {'error': 'JSON parsing failed', 'details': str(e)}
target.send(error_info)
except Exception as e:
# Handle other errors
error_info = {'error': 'Processing failed', 'details': str(e)}
target.send(error_info)
# Pipeline with error handling
results = sendable_list()
safe_parser = chain(
results,
(error_handler, (), {}),
(ijson.items_coro, ('data.item',), {})
)
# Test with malformed JSON
malformed_json = '{"data": [{"valid": true}, {"invalid": }]}'
try:
safe_parser.send(malformed_json)
safe_parser.close()
except:
pass
for result in results:
print(result)import ijson
from ijson.utils import coroutine
@coroutine
def custom_number_handler(target):
"""Convert all numbers to strings"""
while True:
event, value = (yield)
if event == 'number':
target.send(('string', str(value)))
else:
target.send((event, value))
# Create custom parsing pipeline
def parse_with_string_numbers(source):
results = sendable_list()
pipeline = chain(
results,
(custom_number_handler, (), {}),
(ijson.basic_parse_coro, (), {})
)
for chunk in source if hasattr(source, '__iter__') else [source]:
pipeline.send(chunk)
pipeline.close()
return resultsimport ijson
from ijson.utils import coroutine
@coroutine
def memory_efficient_processor(target):
"""Process items immediately without accumulation"""
while True:
item = (yield)
# Process immediately and send result
processed = process_item_immediately(item)
target.send(processed)
# Item is garbage collected here
def process_large_stream(source, prefix):
"""Process large JSON streams with minimal memory usage"""
@coroutine
def streaming_sink(target):
while True:
result = (yield)
# Handle result immediately (save to DB, send to API, etc.)
handle_result_immediately(result)
sink = streaming_sink(None)
pipeline = chain(
sink,
(memory_efficient_processor, (), {}),
(ijson.items_coro, (prefix,), {})
)
return pipelinecoros2gen() adds iteration overhead but provides familiar interfaceInstall with Tessl CLI
npx tessl i tessl/pypi-ijson