A streaming multipart parser for Python that enables efficient handling of file uploads and form data in web applications
—
Base class and low-level streaming parsers for specific content types with callback-based processing. BaseParser provides common functionality for all parsers, while specialized parsers provide fine-grained control over parsing behavior, custom callback handling, and memory-efficient processing of large payloads through incremental data processing.
Base class that provides common functionality for all parsers including callback management and lifecycle control.
class BaseParser:
"""
Base class for all parsers with callback functionality.
"""
def __init__(self):
"""Initialize base parser."""
def callback(
self,
name: str,
data: bytes | None = None,
start: int | None = None,
end: int | None = None
) -> None:
"""
Execute named callback with optional data parameters.
Parameters:
- name: Callback name to execute
- data: Data bytes for data callbacks
- start: Start index for data slice
- end: End index for data slice
"""
def set_callback(self, name: str, new_func) -> None:
"""
Set callback function for specific event.
Parameters:
- name: Callback name
- new_func: Callback function or None to remove
"""
def close(self) -> None:
"""Close parser and clean up resources."""
def finalize(self) -> None:
"""Finalize parsing."""Usage Example:
from python_multipart import BaseParser
class CustomParser(BaseParser):
def __init__(self):
super().__init__()
self.data_buffer = []
def process_data(self, data):
# Use inherited callback functionality
self.callback('on_data_start')
self.callback('on_data', data, 0, len(data))
self.callback('on_data_end')
def setup_callbacks(self):
def on_data(data, start, end):
chunk = data[start:end]
self.data_buffer.append(chunk)
def on_data_start():
print("Starting data processing")
def on_data_end():
print("Finished data processing")
# Set callbacks using inherited method
self.set_callback('on_data', on_data)
self.set_callback('on_data_start', on_data_start)
self.set_callback('on_data_end', on_data_end)
# Usage
parser = CustomParser()
parser.setup_callbacks()
parser.process_data(b"Hello World")
print(f"Buffered data: {b''.join(parser.data_buffer)}")Streaming parser for multipart/form-data content with comprehensive callback support for all parsing events.
class MultipartParser(BaseParser):
"""
Streaming multipart/form-data parser with callback-based processing.
"""
def __init__(
self,
boundary: bytes | str,
callbacks: dict = {},
max_size: float = float("inf")
):
"""
Initialize MultipartParser.
Parameters:
- boundary: Multipart boundary string/bytes
- callbacks: Dict of callback functions for parsing events
- max_size: Maximum data size to process
"""
def write(self, data: bytes) -> int:
"""
Process data chunk.
Parameters:
- data: Bytes to process
Returns:
Number of bytes processed
"""
def finalize(self) -> None:
"""
Finalize parsing. Call when no more data will be written.
"""Supported Callbacks:
on_part_begin: Called when a new part startson_part_data(data, start, end): Called with part data chunkson_part_end: Called when current part endson_header_begin: Called when header section startson_header_field(data, start, end): Called with header field name dataon_header_value(data, start, end): Called with header value dataon_header_end: Called when current header endson_headers_finished: Called when all headers are parsedon_end: Called when parsing completesUsage Example:
import hashlib
from python_multipart import MultipartParser
from python_multipart.multipart import parse_options_header
def calculate_file_hashes(content_type_header, input_stream):
# Extract boundary from Content-Type header
content_type, params = parse_options_header(content_type_header)
boundary = params.get(b'boundary')
if not boundary:
raise ValueError("No boundary found in Content-Type header")
# Track current part state
current_hash = None
part_hashes = []
current_headers = {}
current_header_name = None
def on_part_begin():
nonlocal current_hash, current_headers
current_hash = hashlib.sha256()
current_headers = {}
def on_part_data(data, start, end):
if current_hash:
current_hash.update(data[start:end])
def on_part_end():
if current_hash:
part_info = {
'hash': current_hash.hexdigest(),
'headers': current_headers.copy()
}
part_hashes.append(part_info)
current_hash = None
def on_header_field(data, start, end):
nonlocal current_header_name
current_header_name = data[start:end].decode('utf-8').lower()
def on_header_value(data, start, end):
if current_header_name:
current_headers[current_header_name] = data[start:end].decode('utf-8')
# Set up callbacks
callbacks = {
'on_part_begin': on_part_begin,
'on_part_data': on_part_data,
'on_part_end': on_part_end,
'on_header_field': on_header_field,
'on_header_value': on_header_value
}
# Create parser and process data
parser = MultipartParser(boundary, callbacks)
while True:
chunk = input_stream.read(8192)
if not chunk:
break
parser.write(chunk)
parser.finalize()
return part_hashesStreaming parser for application/x-www-form-urlencoded data with field-level callbacks.
class QuerystringParser(BaseParser):
"""
Streaming querystring parser for URL-encoded form data.
"""
def __init__(
self,
callbacks: dict = {},
strict_parsing: bool = False,
max_size: float = float("inf")
):
"""
Initialize QuerystringParser.
Parameters:
- callbacks: Dict of callback functions
- strict_parsing: Whether to parse strictly
- max_size: Maximum data size to process
"""
def write(self, data: bytes) -> int:
"""Write some data to the parser, which will perform size verification,
parse into either a field name or value, and then pass the
corresponding data to the underlying callback. If an error is
encountered while parsing, a QuerystringParseError will be raised.
Parameters:
- data: The data to write to the parser
Returns:
The number of bytes written
Raises:
QuerystringParseError: If parsing error occurs
"""
def finalize(self) -> None:
"""Finalize parsing."""Supported Callbacks:
on_field_start: Called when a new field startson_field_name(data, start, end): Called with field name dataon_field_data(data, start, end): Called with field value dataon_field_end: Called when current field endson_end: Called when parsing completesUsage Example:
from python_multipart import QuerystringParser
import urllib.parse
def parse_url_encoded_form(data_stream):
fields = {}
current_field_name = b''
current_field_data = b''
def on_field_name(data, start, end):
nonlocal current_field_name
current_field_name += data[start:end]
def on_field_data(data, start, end):
nonlocal current_field_data
current_field_data += data[start:end]
def on_field_end():
nonlocal current_field_name, current_field_data
if current_field_name:
# URL decode the field name and data
name = urllib.parse.unquote_plus(current_field_name.decode('utf-8'))
value = urllib.parse.unquote_plus(current_field_data.decode('utf-8'))
fields[name] = value
# Reset for next field
current_field_name = b''
current_field_data = b''
callbacks = {
'on_field_name': on_field_name,
'on_field_data': on_field_data,
'on_field_end': on_field_end
}
parser = QuerystringParser(callbacks)
while True:
chunk = data_stream.read(1024)
if not chunk:
break
parser.write(chunk)
parser.finalize()
return fieldsStreaming parser for application/octet-stream and binary data with simple data callbacks.
class OctetStreamParser(BaseParser):
"""
Streaming parser for binary octet-stream data.
"""
def __init__(
self,
callbacks: dict = {},
max_size: float = float("inf")
):
"""
Initialize OctetStreamParser.
Parameters:
- callbacks: Dict of callback functions
- max_size: Maximum data size to process
"""
def write(self, data: bytes) -> int:
"""Write some data to the parser, which will perform size verification,
and then pass the data to the underlying callback.
Parameters:
- data: The data to write to the parser
Returns:
The number of bytes written
"""
def finalize(self) -> None:
"""Finalize parsing."""Supported Callbacks:
on_start: Called when parsing beginson_data(data, start, end): Called with each data chunkon_end: Called when parsing completesUsage Example:
from python_multipart import OctetStreamParser
import hashlib
def process_binary_upload(input_stream, output_file_path):
"""Stream binary data while calculating hash and saving to file."""
file_hash = hashlib.md5()
bytes_processed = 0
with open(output_file_path, 'wb') as output_file:
def on_data(data, start, end):
nonlocal bytes_processed
chunk = data[start:end]
file_hash.update(chunk)
output_file.write(chunk)
bytes_processed += len(chunk)
callbacks = {
'on_data': on_data
}
parser = OctetStreamParser(callbacks)
while True:
chunk = input_stream.read(8192)
if not chunk:
break
parser.write(chunk)
parser.finalize()
return {
'bytes_processed': bytes_processed,
'md5_hash': file_hash.hexdigest()
}Base class that provides common functionality for all parsers including callback management.
class BaseParser:
"""
Base class for all parsers with callback functionality.
"""
def __init__(self):
"""Initialize base parser."""
def callback(self, name: str, data: bytes = None, start: int = None, end: int = None) -> None:
"""
Execute named callback with optional data parameters.
Parameters:
- name: Callback name to execute
- data: Data bytes for data callbacks
- start: Start index for data slice
- end: End index for data slice
"""
def set_callback(self, name: str, new_func) -> None:
"""
Set callback function for specific event.
Parameters:
- name: Callback name
- new_func: Callback function or None to remove
"""
def close(self) -> None:
"""Close parser and clean up resources."""
def finalize(self) -> None:
"""Finalize parsing."""Each parser maintains internal state using enums to track parsing progress:
class QuerystringState(IntEnum):
BEFORE_FIELD = 0
FIELD_NAME = 1
FIELD_DATA = 2
class MultipartState(IntEnum):
START = 0
START_BOUNDARY = 1
HEADER_FIELD_START = 2
HEADER_FIELD = 3
HEADER_VALUE_START = 4
HEADER_VALUE = 5
HEADER_VALUE_ALMOST_DONE = 6
HEADERS_ALMOST_DONE = 7
PART_DATA_START = 8
PART_DATA = 9
PART_DATA_END = 10
END_BOUNDARY = 11
END = 12These states enable proper parsing flow control and error detection during stream processing.
Parses Content-Type headers into (content_type, parameters) format for boundary extraction and content type detection.
def parse_options_header(value: str | bytes | None) -> tuple[bytes, dict[bytes, bytes]]:
"""
Parse Content-Type header into content type and parameters.
Parameters:
- value: Content-Type header value as string or bytes
Returns:
Tuple of (content_type, parameters_dict)
"""Usage Example:
from python_multipart.multipart import parse_options_header
# Parse multipart Content-Type header
content_type_header = "multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW"
content_type, params = parse_options_header(content_type_header)
print(f"Content type: {content_type}") # b'multipart/form-data'
print(f"Boundary: {params.get(b'boundary')}") # b'----WebKitFormBoundary7MA4YWxkTrZu0gW'
# Parse with charset
content_type_header = "text/plain; charset=utf-8"
content_type, params = parse_options_header(content_type_header)
print(f"Content type: {content_type}") # b'text/plain'
print(f"Charset: {params.get(b'charset')}") # b'utf-8'Install with Tessl CLI
npx tessl i tessl/pypi-python-multipart