0
# Low-Level Coroutines
1
2
Coroutine-based parsing pipeline components for building custom JSON processing workflows. These provide maximum flexibility for advanced use cases requiring custom processing logic, filtering, transformation, or integration with existing coroutine-based systems.
3
4
## Capabilities
5
6
### Basic Parsing Coroutines
7
8
Low-level coroutine for processing raw JSON parsing events without path context.
9
10
```python { .api }
11
def basic_parse_coro(target, **config):
12
"""
13
Coroutine for low-level parsing events.
14
15
Parameters:
16
- target: Coroutine or object with send() method to receive events
17
- **config: Backend-specific configuration options
18
19
Returns:
20
Coroutine that accepts data chunks and sends (event, value) tuples to target
21
22
Events sent to target:
23
- ('null', None): JSON null value
24
- ('boolean', bool): JSON boolean value
25
- ('number', int/Decimal): JSON number value
26
- ('string', str): JSON string value
27
- ('map_key', str): JSON object key
28
- ('start_map', None): Start of JSON object
29
- ('end_map', None): End of JSON object
30
- ('start_array', None): Start of JSON array
31
- ('end_array', None): End of JSON array
32
"""
33
```
34
35
### Context-Aware Parsing Coroutines
36
37
Coroutine that adds path context to parsing events, providing full location information within the JSON document.
38
39
```python { .api }
40
def parse_coro(target, **config):
41
"""
42
Coroutine for parsing with path context.
43
44
Parameters:
45
- target: Coroutine or object with send() method to receive events
46
- **config: Backend-specific configuration options
47
48
Returns:
49
Coroutine that accepts data chunks and sends (prefix, event, value) tuples to target
50
51
Events sent to target:
52
- (prefix, event, value) where prefix is the JSON path string
53
"""
54
```
55
56
### Object Extraction Coroutines
57
58
Coroutine for extracting complete Python objects from specific locations in JSON streams.
59
60
```python { .api }
61
def items_coro(target, prefix, map_type=None, **config):
62
"""
63
Coroutine for extracting objects under prefix.
64
65
Parameters:
66
- target: Coroutine or object with send() method to receive objects
67
- prefix (str): JSON path prefix targeting objects to extract
68
- map_type (type, optional): Custom mapping type for objects (default: dict)
69
- **config: Backend-specific configuration options
70
71
Returns:
72
Coroutine that accepts data chunks and sends Python objects to target
73
"""
74
```
75
76
### Key-Value Extraction Coroutines
77
78
Coroutine for extracting key-value pairs from JSON objects.
79
80
```python { .api }
81
def kvitems_coro(target, prefix, map_type=None, **config):
82
"""
83
Coroutine for extracting key-value pairs under prefix.
84
85
Parameters:
86
- target: Coroutine or object with send() method to receive pairs
87
- prefix (str): JSON path prefix targeting objects to extract pairs from
88
- map_type (type, optional): Custom mapping type for nested objects (default: dict)
89
- **config: Backend-specific configuration options
90
91
Returns:
92
Coroutine that accepts data chunks and sends (key, value) tuples to target
93
"""
94
```
95
96
## Coroutine Utilities
97
98
### Pipeline Construction
99
100
```python { .api }
101
def chain(sink, *coro_pipeline):
102
"""
103
Chain coroutines into a processing pipeline.
104
105
Parameters:
106
- sink: Final destination for processed data (coroutine or sendable object)
107
- *coro_pipeline: Tuples of (coroutine_func, args, kwargs) defining pipeline stages
108
109
Returns:
110
Chained coroutine that feeds data through the entire pipeline
111
"""
112
```
113
114
### Sendable Collections
115
116
```python { .api }
117
class sendable_list(list):
118
"""
119
List that can receive data via send() method for use as pipeline sink.
120
121
Methods:
122
- send(value): Append value to list (alias for append)
123
"""
124
```
125
126
### Coroutine Decorator
127
128
```python { .api }
129
def coroutine(func):
130
"""
131
Decorator for generator-based coroutines.
132
133
Automatically advances coroutine to first yield point.
134
Required for proper coroutine initialization in Python.
135
"""
136
```
137
138
### Pipeline to Generator Conversion
139
140
```python { .api }
141
def coros2gen(source, *coro_pipeline):
142
"""
143
Convert coroutine pipeline to generator.
144
145
Parameters:
146
- source: Iterable providing input data
147
- *coro_pipeline: Pipeline specification tuples
148
149
Returns:
150
Generator yielding results from coroutine pipeline
151
"""
152
```
153
154
### File Data Source
155
156
```python { .api }
157
def file_source(f, buf_size=64*1024):
158
"""
159
Generator that yields data from a file-like object.
160
161
Parameters:
162
- f: File-like object with read() method
163
- buf_size (int): Buffer size for reading chunks (default: 64*1024)
164
165
Returns:
166
Generator yielding data chunks from file
167
"""
168
```
169
170
## Usage Examples
171
172
### Custom Event Filtering
173
174
```python
175
import ijson
176
from ijson.utils import sendable_list, chain
177
178
# Custom coroutine to filter events
179
@ijson.utils.coroutine
180
def filter_strings(target):
181
while True:
182
event, value = (yield)
183
if event == 'string' and len(value) > 10:
184
target.send((event, value))
185
186
# Build processing pipeline
187
results = sendable_list()
188
json_data = '{"short": "hi", "long": "this is a long string", "number": 42}'
189
190
# Chain: data -> basic_parse -> filter_strings -> results
191
pipeline = chain(
192
results,
193
(filter_strings, (), {}),
194
(ijson.basic_parse_coro, (), {})
195
)
196
197
# Send data through pipeline
198
for chunk in [json_data]:
199
pipeline.send(chunk)
200
pipeline.close()
201
202
print(results) # [('string', 'this is a long string')]
203
```
204
205
### Custom Object Transformation
206
207
```python
208
import ijson
209
from ijson.utils import sendable_list, chain
210
211
# Transform objects as they're extracted
212
@ijson.utils.coroutine
213
def transform_users(target):
214
while True:
215
user = (yield)
216
# Add computed field
217
user['display_name'] = f"{user.get('first', '')} {user.get('last', '')}"
218
target.send(user)
219
220
# Process JSON with transformation
221
json_data = '{"users": [{"first": "Alice", "last": "Smith"}, {"first": "Bob", "last": "Jones"}]}'
222
results = sendable_list()
223
224
pipeline = chain(
225
results,
226
(transform_users, (), {}),
227
(ijson.items_coro, ('users.item',), {})
228
)
229
230
for chunk in [json_data]:
231
pipeline.send(chunk)
232
pipeline.close()
233
234
for user in results:
235
print(user['display_name']) # "Alice Smith", "Bob Jones"
236
```
237
238
### Multi-Stage Processing Pipeline
239
240
```python
241
import ijson
242
from ijson.utils import sendable_list, chain, coros2gen
243
244
# First stage: Extract items
245
@ijson.utils.coroutine
246
def validate_items(target):
247
while True:
248
item = (yield)
249
if 'id' in item and 'name' in item:
250
target.send(item)
251
252
# Second stage: Add metadata
253
@ijson.utils.coroutine
254
def add_metadata(target):
255
counter = 0
256
while True:
257
item = (yield)
258
counter += 1
259
item['_sequence'] = counter
260
item['_processed'] = True
261
target.send(item)
262
263
# Use as generator
264
json_data = '{"items": [{"id": 1, "name": "A"}, {"id": 2}, {"id": 3, "name": "C"}]}'
265
266
# Convert pipeline to generator
267
processed_items = coros2gen(
268
[json_data],
269
(add_metadata, (), {}),
270
(validate_items, (), {}),
271
(ijson.items_coro, ('items.item',), {})
272
)
273
274
for item in processed_items:
275
print(item)
276
# {'id': 1, 'name': 'A', '_sequence': 1, '_processed': True}
277
# {'id': 3, 'name': 'C', '_sequence': 2, '_processed': True}
278
```
279
280
### Real-Time Stream Processing
281
282
```python
283
import ijson
284
from ijson.utils import coroutine, chain
285
286
# Real-time processing coroutine
287
@coroutine
288
def real_time_processor(target):
289
batch = []
290
while True:
291
try:
292
item = (yield)
293
batch.append(item)
294
295
if len(batch) >= 10: # Process in batches
296
processed_batch = process_batch(batch)
297
for result in processed_batch:
298
target.send(result)
299
batch = []
300
except GeneratorExit:
301
# Process remaining items
302
if batch:
303
processed_batch = process_batch(batch)
304
for result in processed_batch:
305
target.send(result)
306
target.close()
307
break
308
309
def process_batch(items):
310
# Simulate batch processing
311
return [{'processed': item, 'batch_size': len(items)} for item in items]
312
313
# Set up real-time processing
314
results = sendable_list()
315
processor = chain(
316
results,
317
(real_time_processor, (), {}),
318
(ijson.items_coro, ('stream.item',), {})
319
)
320
321
# Simulate streaming data
322
stream_data = '{"stream": [' + ','.join([f'{{"id": {i}}}' for i in range(25)]) + ']}'
323
processor.send(stream_data)
324
processor.close()
325
326
print(f"Processed {len(results)} items in batches")
327
```
328
329
### Error Handling in Pipelines
330
331
```python
332
import ijson
333
from ijson.utils import coroutine, sendable_list, chain
334
from ijson.common import JSONError
335
336
@coroutine
337
def error_handler(target):
338
while True:
339
try:
340
data = (yield)
341
target.send(data)
342
except JSONError as e:
343
# Handle JSON errors gracefully
344
error_info = {'error': 'JSON parsing failed', 'details': str(e)}
345
target.send(error_info)
346
except Exception as e:
347
# Handle other errors
348
error_info = {'error': 'Processing failed', 'details': str(e)}
349
target.send(error_info)
350
351
# Pipeline with error handling
352
results = sendable_list()
353
safe_parser = chain(
354
results,
355
(error_handler, (), {}),
356
(ijson.items_coro, ('data.item',), {})
357
)
358
359
# Test with malformed JSON
360
malformed_json = '{"data": [{"valid": true}, {"invalid": }]}'
361
try:
362
safe_parser.send(malformed_json)
363
safe_parser.close()
364
except:
365
pass
366
367
for result in results:
368
print(result)
369
```
370
371
## Advanced Patterns
372
373
### Custom Backend Integration
374
375
```python
376
import ijson
377
from ijson.utils import coroutine
378
379
@coroutine
380
def custom_number_handler(target):
381
"""Convert all numbers to strings"""
382
while True:
383
event, value = (yield)
384
if event == 'number':
385
target.send(('string', str(value)))
386
else:
387
target.send((event, value))
388
389
# Create custom parsing pipeline
390
def parse_with_string_numbers(source):
391
results = sendable_list()
392
pipeline = chain(
393
results,
394
(custom_number_handler, (), {}),
395
(ijson.basic_parse_coro, (), {})
396
)
397
398
for chunk in source if hasattr(source, '__iter__') else [source]:
399
pipeline.send(chunk)
400
pipeline.close()
401
402
return results
403
```
404
405
### Memory-Efficient Processing
406
407
```python
408
import ijson
409
from ijson.utils import coroutine
410
411
@coroutine
412
def memory_efficient_processor(target):
413
"""Process items immediately without accumulation"""
414
while True:
415
item = (yield)
416
# Process immediately and send result
417
processed = process_item_immediately(item)
418
target.send(processed)
419
# Item is garbage collected here
420
421
def process_large_stream(source, prefix):
422
"""Process large JSON streams with minimal memory usage"""
423
@coroutine
424
def streaming_sink(target):
425
while True:
426
result = (yield)
427
# Handle result immediately (save to DB, send to API, etc.)
428
handle_result_immediately(result)
429
430
sink = streaming_sink(None)
431
pipeline = chain(
432
sink,
433
(memory_efficient_processor, (), {}),
434
(ijson.items_coro, (prefix,), {})
435
)
436
437
return pipeline
438
```
439
440
## Performance Considerations
441
442
- **Coroutines**: More flexible but slightly slower than direct function calls
443
- **Pipeline Depth**: Deeper pipelines have more overhead but enable complex processing
444
- **Memory Usage**: Coroutines maintain minimal state, enabling efficient stream processing
445
- **Error Propagation**: Exceptions bubble up through pipeline stages
446
- **Generator Conversion**: `coros2gen()` adds iteration overhead but provides familiar interface