or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-objects.mddecoders.mdexceptions.mdform-parsing.mdindex.mdstreaming-parsers.md

streaming-parsers.mddocs/

0

# Base Parser and Streaming Parsers

1

2

Base class and low-level streaming parsers for specific content types with callback-based processing. BaseParser provides common functionality for all parsers, while specialized parsers provide fine-grained control over parsing behavior, custom callback handling, and memory-efficient processing of large payloads through incremental data processing.

3

4

## Capabilities

5

6

### BaseParser

7

8

Base class that provides common functionality for all parsers including callback management and lifecycle control.

9

10

```python { .api }

11

class BaseParser:

12

"""

13

Base class for all parsers with callback functionality.

14

"""

15

16

def __init__(self):

17

"""Initialize base parser."""

18

19

def callback(

20

self,

21

name: str,

22

data: bytes | None = None,

23

start: int | None = None,

24

end: int | None = None

25

) -> None:

26

"""

27

Execute named callback with optional data parameters.

28

29

Parameters:

30

- name: Callback name to execute

31

- data: Data bytes for data callbacks

32

- start: Start index for data slice

33

- end: End index for data slice

34

"""

35

36

def set_callback(self, name: str, new_func) -> None:

37

"""

38

Set callback function for specific event.

39

40

Parameters:

41

- name: Callback name

42

- new_func: Callback function or None to remove

43

"""

44

45

def close(self) -> None:

46

"""Close parser and clean up resources."""

47

48

def finalize(self) -> None:

49

"""Finalize parsing."""

50

```

51

52

**Usage Example:**

53

54

```python

55

from python_multipart import BaseParser

56

57

class CustomParser(BaseParser):

58

def __init__(self):

59

super().__init__()

60

self.data_buffer = []

61

62

def process_data(self, data):

63

# Use inherited callback functionality

64

self.callback('on_data_start')

65

self.callback('on_data', data, 0, len(data))

66

self.callback('on_data_end')

67

68

def setup_callbacks(self):

69

def on_data(data, start, end):

70

chunk = data[start:end]

71

self.data_buffer.append(chunk)

72

73

def on_data_start():

74

print("Starting data processing")

75

76

def on_data_end():

77

print("Finished data processing")

78

79

# Set callbacks using inherited method

80

self.set_callback('on_data', on_data)

81

self.set_callback('on_data_start', on_data_start)

82

self.set_callback('on_data_end', on_data_end)

83

84

# Usage

85

parser = CustomParser()

86

parser.setup_callbacks()

87

parser.process_data(b"Hello World")

88

print(f"Buffered data: {b''.join(parser.data_buffer)}")

89

```

90

91

### MultipartParser

92

93

Streaming parser for multipart/form-data content with comprehensive callback support for all parsing events.

94

95

```python { .api }

96

class MultipartParser(BaseParser):

97

"""

98

Streaming multipart/form-data parser with callback-based processing.

99

"""

100

101

def __init__(

102

self,

103

boundary: bytes | str,

104

callbacks: dict = {},

105

max_size: float = float("inf")

106

):

107

"""

108

Initialize MultipartParser.

109

110

Parameters:

111

- boundary: Multipart boundary string/bytes

112

- callbacks: Dict of callback functions for parsing events

113

- max_size: Maximum data size to process

114

"""

115

116

def write(self, data: bytes) -> int:

117

"""

118

Process data chunk.

119

120

Parameters:

121

- data: Bytes to process

122

123

Returns:

124

Number of bytes processed

125

"""

126

127

def finalize(self) -> None:

128

"""

129

Finalize parsing. Call when no more data will be written.

130

"""

131

```

132

133

**Supported Callbacks:**

134

135

- `on_part_begin`: Called when a new part starts

136

- `on_part_data(data, start, end)`: Called with part data chunks

137

- `on_part_end`: Called when current part ends

138

- `on_header_begin`: Called when header section starts

139

- `on_header_field(data, start, end)`: Called with header field name data

140

- `on_header_value(data, start, end)`: Called with header value data

141

- `on_header_end`: Called when current header ends

142

- `on_headers_finished`: Called when all headers are parsed

143

- `on_end`: Called when parsing completes

144

145

**Usage Example:**

146

147

```python

148

import hashlib

149

from python_multipart import MultipartParser

150

from python_multipart.multipart import parse_options_header

151

152

def calculate_file_hashes(content_type_header, input_stream):

153

# Extract boundary from Content-Type header

154

content_type, params = parse_options_header(content_type_header)

155

boundary = params.get(b'boundary')

156

157

if not boundary:

158

raise ValueError("No boundary found in Content-Type header")

159

160

# Track current part state

161

current_hash = None

162

part_hashes = []

163

current_headers = {}

164

current_header_name = None

165

166

def on_part_begin():

167

nonlocal current_hash, current_headers

168

current_hash = hashlib.sha256()

169

current_headers = {}

170

171

def on_part_data(data, start, end):

172

if current_hash:

173

current_hash.update(data[start:end])

174

175

def on_part_end():

176

if current_hash:

177

part_info = {

178

'hash': current_hash.hexdigest(),

179

'headers': current_headers.copy()

180

}

181

part_hashes.append(part_info)

182

current_hash = None

183

184

def on_header_field(data, start, end):

185

nonlocal current_header_name

186

current_header_name = data[start:end].decode('utf-8').lower()

187

188

def on_header_value(data, start, end):

189

if current_header_name:

190

current_headers[current_header_name] = data[start:end].decode('utf-8')

191

192

# Set up callbacks

193

callbacks = {

194

'on_part_begin': on_part_begin,

195

'on_part_data': on_part_data,

196

'on_part_end': on_part_end,

197

'on_header_field': on_header_field,

198

'on_header_value': on_header_value

199

}

200

201

# Create parser and process data

202

parser = MultipartParser(boundary, callbacks)

203

204

while True:

205

chunk = input_stream.read(8192)

206

if not chunk:

207

break

208

parser.write(chunk)

209

210

parser.finalize()

211

return part_hashes

212

```

213

214

### QuerystringParser

215

216

Streaming parser for application/x-www-form-urlencoded data with field-level callbacks.

217

218

```python { .api }

219

class QuerystringParser(BaseParser):

220

"""

221

Streaming querystring parser for URL-encoded form data.

222

"""

223

224

def __init__(

225

self,

226

callbacks: dict = {},

227

strict_parsing: bool = False,

228

max_size: float = float("inf")

229

):

230

"""

231

Initialize QuerystringParser.

232

233

Parameters:

234

- callbacks: Dict of callback functions

235

- strict_parsing: Whether to parse strictly

236

- max_size: Maximum data size to process

237

"""

238

239

def write(self, data: bytes) -> int:

240

"""Write some data to the parser, which will perform size verification,

241

parse into either a field name or value, and then pass the

242

corresponding data to the underlying callback. If an error is

243

encountered while parsing, a QuerystringParseError will be raised.

244

245

Parameters:

246

- data: The data to write to the parser

247

248

Returns:

249

The number of bytes written

250

251

Raises:

252

QuerystringParseError: If parsing error occurs

253

"""

254

255

def finalize(self) -> None:

256

"""Finalize parsing."""

257

```

258

259

**Supported Callbacks:**

260

261

- `on_field_start`: Called when a new field starts

262

- `on_field_name(data, start, end)`: Called with field name data

263

- `on_field_data(data, start, end)`: Called with field value data

264

- `on_field_end`: Called when current field ends

265

- `on_end`: Called when parsing completes

266

267

**Usage Example:**

268

269

```python

270

from python_multipart import QuerystringParser

271

import urllib.parse

272

273

def parse_url_encoded_form(data_stream):

274

fields = {}

275

current_field_name = b''

276

current_field_data = b''

277

278

def on_field_name(data, start, end):

279

nonlocal current_field_name

280

current_field_name += data[start:end]

281

282

def on_field_data(data, start, end):

283

nonlocal current_field_data

284

current_field_data += data[start:end]

285

286

def on_field_end():

287

nonlocal current_field_name, current_field_data

288

if current_field_name:

289

# URL decode the field name and data

290

name = urllib.parse.unquote_plus(current_field_name.decode('utf-8'))

291

value = urllib.parse.unquote_plus(current_field_data.decode('utf-8'))

292

fields[name] = value

293

294

# Reset for next field

295

current_field_name = b''

296

current_field_data = b''

297

298

callbacks = {

299

'on_field_name': on_field_name,

300

'on_field_data': on_field_data,

301

'on_field_end': on_field_end

302

}

303

304

parser = QuerystringParser(callbacks)

305

306

while True:

307

chunk = data_stream.read(1024)

308

if not chunk:

309

break

310

parser.write(chunk)

311

312

parser.finalize()

313

return fields

314

```

315

316

### OctetStreamParser

317

318

Streaming parser for application/octet-stream and binary data with simple data callbacks.

319

320

```python { .api }

321

class OctetStreamParser(BaseParser):

322

"""

323

Streaming parser for binary octet-stream data.

324

"""

325

326

def __init__(

327

self,

328

callbacks: dict = {},

329

max_size: float = float("inf")

330

):

331

"""

332

Initialize OctetStreamParser.

333

334

Parameters:

335

- callbacks: Dict of callback functions

336

- max_size: Maximum data size to process

337

"""

338

339

def write(self, data: bytes) -> int:

340

"""Write some data to the parser, which will perform size verification,

341

and then pass the data to the underlying callback.

342

343

Parameters:

344

- data: The data to write to the parser

345

346

Returns:

347

The number of bytes written

348

"""

349

350

def finalize(self) -> None:

351

"""Finalize parsing."""

352

```

353

354

**Supported Callbacks:**

355

356

- `on_start`: Called when parsing begins

357

- `on_data(data, start, end)`: Called with each data chunk

358

- `on_end`: Called when parsing completes

359

360

**Usage Example:**

361

362

```python

363

from python_multipart import OctetStreamParser

364

import hashlib

365

366

def process_binary_upload(input_stream, output_file_path):

367

"""Stream binary data while calculating hash and saving to file."""

368

369

file_hash = hashlib.md5()

370

bytes_processed = 0

371

372

with open(output_file_path, 'wb') as output_file:

373

def on_data(data, start, end):

374

nonlocal bytes_processed

375

chunk = data[start:end]

376

file_hash.update(chunk)

377

output_file.write(chunk)

378

bytes_processed += len(chunk)

379

380

callbacks = {

381

'on_data': on_data

382

}

383

384

parser = OctetStreamParser(callbacks)

385

386

while True:

387

chunk = input_stream.read(8192)

388

if not chunk:

389

break

390

parser.write(chunk)

391

392

parser.finalize()

393

394

return {

395

'bytes_processed': bytes_processed,

396

'md5_hash': file_hash.hexdigest()

397

}

398

```

399

400

### BaseParser

401

402

Base class that provides common functionality for all parsers including callback management.

403

404

```python { .api }

405

class BaseParser:

406

"""

407

Base class for all parsers with callback functionality.

408

"""

409

410

def __init__(self):

411

"""Initialize base parser."""

412

413

def callback(self, name: str, data: bytes = None, start: int = None, end: int = None) -> None:

414

"""

415

Execute named callback with optional data parameters.

416

417

Parameters:

418

- name: Callback name to execute

419

- data: Data bytes for data callbacks

420

- start: Start index for data slice

421

- end: End index for data slice

422

"""

423

424

def set_callback(self, name: str, new_func) -> None:

425

"""

426

Set callback function for specific event.

427

428

Parameters:

429

- name: Callback name

430

- new_func: Callback function or None to remove

431

"""

432

433

def close(self) -> None:

434

"""Close parser and clean up resources."""

435

436

def finalize(self) -> None:

437

"""Finalize parsing."""

438

```

439

440

### Parser States

441

442

Each parser maintains internal state using enums to track parsing progress:

443

444

```python { .api }

445

class QuerystringState(IntEnum):

446

BEFORE_FIELD = 0

447

FIELD_NAME = 1

448

FIELD_DATA = 2

449

450

class MultipartState(IntEnum):

451

START = 0

452

START_BOUNDARY = 1

453

HEADER_FIELD_START = 2

454

HEADER_FIELD = 3

455

HEADER_VALUE_START = 4

456

HEADER_VALUE = 5

457

HEADER_VALUE_ALMOST_DONE = 6

458

HEADERS_ALMOST_DONE = 7

459

PART_DATA_START = 8

460

PART_DATA = 9

461

PART_DATA_END = 10

462

END_BOUNDARY = 11

463

END = 12

464

```

465

466

These states enable proper parsing flow control and error detection during stream processing.

467

468

## Utility Functions

469

470

### parse_options_header

471

472

Parses Content-Type headers into (content_type, parameters) format for boundary extraction and content type detection.

473

474

```python { .api }

475

def parse_options_header(value: str | bytes | None) -> tuple[bytes, dict[bytes, bytes]]:

476

"""

477

Parse Content-Type header into content type and parameters.

478

479

Parameters:

480

- value: Content-Type header value as string or bytes

481

482

Returns:

483

Tuple of (content_type, parameters_dict)

484

"""

485

```

486

487

**Usage Example:**

488

489

```python

490

from python_multipart.multipart import parse_options_header

491

492

# Parse multipart Content-Type header

493

content_type_header = "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW"

494

content_type, params = parse_options_header(content_type_header)

495

496

print(f"Content type: {content_type}") # b'multipart/form-data'

497

print(f"Boundary: {params.get(b'boundary')}") # b'----WebKitFormBoundary7MA4YWxkTrZu0gW'

498

499

# Parse with charset

500

content_type_header = "text/plain; charset=utf-8"

501

content_type, params = parse_options_header(content_type_header)

502

503

print(f"Content type: {content_type}") # b'text/plain'

504

print(f"Charset: {params.get(b'charset')}") # b'utf-8'

505

```