or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-processing.mdbackends.mdcoroutines.mdhigh-level-parsing.mdindex.md

coroutines.mddocs/

0

# Low-Level Coroutines

1

2

Coroutine-based parsing pipeline components for building custom JSON processing workflows. These provide maximum flexibility for advanced use cases requiring custom processing logic, filtering, transformation, or integration with existing coroutine-based systems.

3

4

## Capabilities

5

6

### Basic Parsing Coroutines

7

8

Low-level coroutine for processing raw JSON parsing events without path context.

9

10

```python { .api }

11

def basic_parse_coro(target, **config):

12

"""

13

Coroutine for low-level parsing events.

14

15

Parameters:

16

- target: Coroutine or object with send() method to receive events

17

- **config: Backend-specific configuration options

18

19

Returns:

20

Coroutine that accepts data chunks and sends (event, value) tuples to target

21

22

Events sent to target:

23

- ('null', None): JSON null value

24

- ('boolean', bool): JSON boolean value

25

- ('number', int/Decimal): JSON number value

26

- ('string', str): JSON string value

27

- ('map_key', str): JSON object key

28

- ('start_map', None): Start of JSON object

29

- ('end_map', None): End of JSON object

30

- ('start_array', None): Start of JSON array

31

- ('end_array', None): End of JSON array

32

"""

33

```

34

35

### Context-Aware Parsing Coroutines

36

37

Coroutine that adds path context to parsing events, providing full location information within the JSON document.

38

39

```python { .api }

40

def parse_coro(target, **config):

41

"""

42

Coroutine for parsing with path context.

43

44

Parameters:

45

- target: Coroutine or object with send() method to receive events

46

- **config: Backend-specific configuration options

47

48

Returns:

49

Coroutine that accepts data chunks and sends (prefix, event, value) tuples to target

50

51

Events sent to target:

52

- (prefix, event, value) where prefix is the JSON path string

53

"""

54

```

55

56

### Object Extraction Coroutines

57

58

Coroutine for extracting complete Python objects from specific locations in JSON streams.

59

60

```python { .api }

61

def items_coro(target, prefix, map_type=None, **config):

62

"""

63

Coroutine for extracting objects under prefix.

64

65

Parameters:

66

- target: Coroutine or object with send() method to receive objects

67

- prefix (str): JSON path prefix targeting objects to extract

68

- map_type (type, optional): Custom mapping type for objects (default: dict)

69

- **config: Backend-specific configuration options

70

71

Returns:

72

Coroutine that accepts data chunks and sends Python objects to target

73

"""

74

```

75

76

### Key-Value Extraction Coroutines

77

78

Coroutine for extracting key-value pairs from JSON objects.

79

80

```python { .api }

81

def kvitems_coro(target, prefix, map_type=None, **config):

82

"""

83

Coroutine for extracting key-value pairs under prefix.

84

85

Parameters:

86

- target: Coroutine or object with send() method to receive pairs

87

- prefix (str): JSON path prefix targeting objects to extract pairs from

88

- map_type (type, optional): Custom mapping type for nested objects (default: dict)

89

- **config: Backend-specific configuration options

90

91

Returns:

92

Coroutine that accepts data chunks and sends (key, value) tuples to target

93

"""

94

```

95

96

## Coroutine Utilities

97

98

### Pipeline Construction

99

100

```python { .api }

101

def chain(sink, *coro_pipeline):

102

"""

103

Chain coroutines into a processing pipeline.

104

105

Parameters:

106

- sink: Final destination for processed data (coroutine or sendable object)

107

- *coro_pipeline: Tuples of (coroutine_func, args, kwargs) defining pipeline stages

108

109

Returns:

110

Chained coroutine that feeds data through the entire pipeline

111

"""

112

```

113

114

### Sendable Collections

115

116

```python { .api }

117

class sendable_list(list):

118

"""

119

List that can receive data via send() method for use as pipeline sink.

120

121

Methods:

122

- send(value): Append value to list (alias for append)

123

"""

124

```

125

126

### Coroutine Decorator

127

128

```python { .api }

129

def coroutine(func):

130

"""

131

Decorator for generator-based coroutines.

132

133

Automatically advances coroutine to first yield point.

134

Required for proper coroutine initialization in Python.

135

"""

136

```

137

138

### Pipeline to Generator Conversion

139

140

```python { .api }

141

def coros2gen(source, *coro_pipeline):

142

"""

143

Convert coroutine pipeline to generator.

144

145

Parameters:

146

- source: Iterable providing input data

147

- *coro_pipeline: Pipeline specification tuples

148

149

Returns:

150

Generator yielding results from coroutine pipeline

151

"""

152

```

153

154

### File Data Source

155

156

```python { .api }

157

def file_source(f, buf_size=64*1024):

158

"""

159

Generator that yields data from a file-like object.

160

161

Parameters:

162

- f: File-like object with read() method

163

- buf_size (int): Buffer size for reading chunks (default: 64*1024)

164

165

Returns:

166

Generator yielding data chunks from file

167

"""

168

```

169

170

## Usage Examples

171

172

### Custom Event Filtering

173

174

```python

175

import ijson

176

from ijson.utils import sendable_list, chain

177

178

# Custom coroutine to filter events

179

@ijson.utils.coroutine

180

def filter_strings(target):

181

while True:

182

event, value = (yield)

183

if event == 'string' and len(value) > 10:

184

target.send((event, value))

185

186

# Build processing pipeline

187

results = sendable_list()

188

json_data = '{"short": "hi", "long": "this is a long string", "number": 42}'

189

190

# Chain: data -> basic_parse -> filter_strings -> results

191

pipeline = chain(

192

results,

193

(filter_strings, (), {}),

194

(ijson.basic_parse_coro, (), {})

195

)

196

197

# Send data through pipeline

198

for chunk in [json_data]:

199

pipeline.send(chunk)

200

pipeline.close()

201

202

print(results) # [('string', 'this is a long string')]

203

```

204

205

### Custom Object Transformation

206

207

```python

208

import ijson

209

from ijson.utils import sendable_list, chain

210

211

# Transform objects as they're extracted

212

@ijson.utils.coroutine

213

def transform_users(target):

214

while True:

215

user = (yield)

216

# Add computed field

217

user['display_name'] = f"{user.get('first', '')} {user.get('last', '')}"

218

target.send(user)

219

220

# Process JSON with transformation

221

json_data = '{"users": [{"first": "Alice", "last": "Smith"}, {"first": "Bob", "last": "Jones"}]}'

222

results = sendable_list()

223

224

pipeline = chain(

225

results,

226

(transform_users, (), {}),

227

(ijson.items_coro, ('users.item',), {})

228

)

229

230

for chunk in [json_data]:

231

pipeline.send(chunk)

232

pipeline.close()

233

234

for user in results:

235

print(user['display_name']) # "Alice Smith", "Bob Jones"

236

```

237

238

### Multi-Stage Processing Pipeline

239

240

```python

241

import ijson

242

from ijson.utils import sendable_list, chain, coros2gen

243

244

# First stage: Extract items

245

@ijson.utils.coroutine

246

def validate_items(target):

247

while True:

248

item = (yield)

249

if 'id' in item and 'name' in item:

250

target.send(item)

251

252

# Second stage: Add metadata

253

@ijson.utils.coroutine

254

def add_metadata(target):

255

counter = 0

256

while True:

257

item = (yield)

258

counter += 1

259

item['_sequence'] = counter

260

item['_processed'] = True

261

target.send(item)

262

263

# Use as generator

264

json_data = '{"items": [{"id": 1, "name": "A"}, {"id": 2}, {"id": 3, "name": "C"}]}'

265

266

# Convert pipeline to generator

267

processed_items = coros2gen(

268

[json_data],

269

(add_metadata, (), {}),

270

(validate_items, (), {}),

271

(ijson.items_coro, ('items.item',), {})

272

)

273

274

for item in processed_items:

275

print(item)

276

# {'id': 1, 'name': 'A', '_sequence': 1, '_processed': True}

277

# {'id': 3, 'name': 'C', '_sequence': 2, '_processed': True}

278

```

279

280

### Real-Time Stream Processing

281

282

```python

283

import ijson

284

from ijson.utils import coroutine, chain

285

286

# Real-time processing coroutine

287

@coroutine

288

def real_time_processor(target):

289

batch = []

290

while True:

291

try:

292

item = (yield)

293

batch.append(item)

294

295

if len(batch) >= 10: # Process in batches

296

processed_batch = process_batch(batch)

297

for result in processed_batch:

298

target.send(result)

299

batch = []

300

except GeneratorExit:

301

# Process remaining items

302

if batch:

303

processed_batch = process_batch(batch)

304

for result in processed_batch:

305

target.send(result)

306

target.close()

307

break

308

309

def process_batch(items):

310

# Simulate batch processing

311

return [{'processed': item, 'batch_size': len(items)} for item in items]

312

313

# Set up real-time processing

314

results = sendable_list()

315

processor = chain(

316

results,

317

(real_time_processor, (), {}),

318

(ijson.items_coro, ('stream.item',), {})

319

)

320

321

# Simulate streaming data

322

stream_data = '{"stream": [' + ','.join([f'{{"id": {i}}}' for i in range(25)]) + ']}'

323

processor.send(stream_data)

324

processor.close()

325

326

print(f"Processed {len(results)} items in batches")

327

```

328

329

### Error Handling in Pipelines

330

331

```python

332

import ijson

333

from ijson.utils import coroutine, sendable_list, chain

334

from ijson.common import JSONError

335

336

@coroutine

337

def error_handler(target):

338

while True:

339

try:

340

data = (yield)

341

target.send(data)

342

except JSONError as e:

343

# Handle JSON errors gracefully

344

error_info = {'error': 'JSON parsing failed', 'details': str(e)}

345

target.send(error_info)

346

except Exception as e:

347

# Handle other errors

348

error_info = {'error': 'Processing failed', 'details': str(e)}

349

target.send(error_info)

350

351

# Pipeline with error handling

352

results = sendable_list()

353

safe_parser = chain(

354

results,

355

(error_handler, (), {}),

356

(ijson.items_coro, ('data.item',), {})

357

)

358

359

# Test with malformed JSON

360

malformed_json = '{"data": [{"valid": true}, {"invalid": }]}'

361

try:

362

safe_parser.send(malformed_json)

363

safe_parser.close()

364

except:

365

pass

366

367

for result in results:

368

print(result)

369

```

370

371

## Advanced Patterns

372

373

### Custom Backend Integration

374

375

```python

376

import ijson

377

from ijson.utils import coroutine

378

379

@coroutine

380

def custom_number_handler(target):

381

"""Convert all numbers to strings"""

382

while True:

383

event, value = (yield)

384

if event == 'number':

385

target.send(('string', str(value)))

386

else:

387

target.send((event, value))

388

389

# Create custom parsing pipeline

390

def parse_with_string_numbers(source):

391

results = sendable_list()

392

pipeline = chain(

393

results,

394

(custom_number_handler, (), {}),

395

(ijson.basic_parse_coro, (), {})

396

)

397

398

for chunk in source if hasattr(source, '__iter__') else [source]:

399

pipeline.send(chunk)

400

pipeline.close()

401

402

return results

403

```

404

405

### Memory-Efficient Processing

406

407

```python

408

import ijson

409

from ijson.utils import coroutine

410

411

@coroutine

412

def memory_efficient_processor(target):

413

"""Process items immediately without accumulation"""

414

while True:

415

item = (yield)

416

# Process immediately and send result

417

processed = process_item_immediately(item)

418

target.send(processed)

419

# Item is garbage collected here

420

421

def process_large_stream(source, prefix):

422

"""Process large JSON streams with minimal memory usage"""

423

@coroutine

424

def streaming_sink(target):

425

while True:

426

result = (yield)

427

# Handle result immediately (save to DB, send to API, etc.)

428

handle_result_immediately(result)

429

430

sink = streaming_sink(None)

431

pipeline = chain(

432

sink,

433

(memory_efficient_processor, (), {}),

434

(ijson.items_coro, (prefix,), {})

435

)

436

437

return pipeline

438

```

439

440

## Performance Considerations

441

442

- **Coroutines**: More flexible but slightly slower than direct function calls

443

- **Pipeline Depth**: Deeper pipelines have more overhead but enable complex processing

444

- **Memory Usage**: Coroutines maintain minimal state, enabling efficient stream processing

445

- **Error Propagation**: Exceptions bubble up through pipeline stages

446

- **Generator Conversion**: `coros2gen()` adds iteration overhead but provides familiar interface