Async http client/server framework (asyncio)
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive data processing and payload system supporting various data types, multipart content, form data processing, and streaming operations. Includes automatic content encoding/decoding and extensive customization options for different data formats.
Form data creation and processing for HTML forms and file uploads with support for multiple encoding types.
class FormData:
def __init__(self, quote_fields=True, charset='utf-8'):
"""
Create form data container.
Parameters:
- quote_fields (bool): Quote field names and values
- charset (str): Text encoding
"""
def add_field(
self,
name,
value,
*,
content_type=None,
filename=None,
content_transfer_encoding=None
):
"""
Add form field.
Parameters:
- name (str): Field name
- value: Field value (str, bytes, or file-like)
- content_type (str): Content type for field
- filename (str): Filename for file fields
- content_transfer_encoding (str): Transfer encoding
"""
def add_fields(self, *fields):
"""Add multiple fields from iterable."""
def is_multipart(self):
"""Check if form requires multipart encoding."""Reading and writing multipart content for file uploads, mixed content types, and complex data structures.
class MultipartReader:
def __init__(self, headers, content):
"""
Create multipart content reader.
Parameters:
- headers: Request headers
- content: Content stream
"""
async def next(self):
"""
Get next part from multipart content.
Returns:
BodyPartReader or None: Next body part or None if finished
"""
async def release(self):
"""Release reader resources."""
def at_eof(self):
"""Check if at end of content."""
class BodyPartReader:
@property
def headers(self):
"""Part headers."""
@property
def name(self):
"""Part name from Content-Disposition."""
@property
def filename(self):
"""Part filename from Content-Disposition."""
async def read(self, decode=False):
"""
Read part content.
Parameters:
- decode (bool): Decode content based on headers
Returns:
bytes: Part content
"""
async def read_chunk(self, size=8192):
"""Read content chunk."""
async def text(self, encoding=None):
"""Read part content as text."""
async def json(self, encoding=None):
"""Parse part content as JSON."""
async def release(self):
"""Release part resources."""
class MultipartWriter:
def __init__(self, subtype='mixed', boundary=None):
"""
Create multipart content writer.
Parameters:
- subtype (str): Multipart subtype
- boundary (str): Multipart boundary
"""
def append(self, obj, headers=None):
"""
Append object to multipart content.
Parameters:
- obj: Object to append (str, bytes, file-like, or payload)
- headers (dict): Part headers
Returns:
Payload: Created payload for the part
"""
def append_json(self, obj, headers=None):
"""Append JSON object to multipart content."""
def append_form(self, obj, headers=None):
"""Append form data to multipart content."""
@property
def boundary(self):
"""Multipart boundary."""
@property
def size(self):
"""Content size if known."""
def content_disposition_filename(name, fallback=None):
"""
Extract filename from Content-Disposition header value.
Parameters:
- name (str): Content-Disposition header value
- fallback (str): Fallback filename
Returns:
str or None: Extracted filename
"""
def parse_content_disposition(header):
"""
Parse Content-Disposition header.
Parameters:
- header (str): Content-Disposition header value
Returns:
tuple: (disposition_type, parameters_dict)
"""
class BadContentDispositionHeader(Exception):
"""Invalid Content-Disposition header."""
class BadContentDispositionParam(Exception):
"""Invalid Content-Disposition parameter."""Flexible payload system supporting various data types with automatic serialization and content type detection.
class Payload:
def __init__(self, value, headers=None, content_type=None, filename=None, encoding=None):
"""
Base payload class.
Parameters:
- value: Payload data
- headers (dict): Payload headers
- content_type (str): Content type
- filename (str): Filename for file payloads
- encoding (str): Text encoding
"""
@property
def size(self):
"""Payload size if known."""
@property
def headers(self):
"""Payload headers."""
@property
def content_type(self):
"""Payload content type."""
async def write(self, writer):
"""Write payload to stream writer."""
class BytesPayload(Payload):
"""Payload for bytes data."""
class StringPayload(Payload):
"""Payload for string data."""
class IOBasePayload(Payload):
"""Payload for file-like objects."""
class BufferedReaderPayload(IOBasePayload):
"""Payload for buffered readers."""
class TextIOPayload(IOBasePayload):
"""Payload for text I/O objects."""
class StringIOPayload(TextIOPayload):
"""Payload for StringIO objects."""
class BytesIOPayload(IOBasePayload):
"""Payload for BytesIO objects."""
class JsonPayload(Payload):
def __init__(
self,
value,
encoding='utf-8',
content_type='application/json',
dumps=None,
**kwargs
):
"""
JSON payload.
Parameters:
- value: Object to serialize as JSON
- encoding (str): Text encoding
- content_type (str): Content type
- dumps: JSON serialization function
"""
class AsyncIterablePayload(Payload):
def __init__(self, value, **kwargs):
"""
Payload for async iterables.
Parameters:
- value: Async iterable data source
"""
def get_payload(data, *args, **kwargs):
"""
Get appropriate payload for data.
Parameters:
- data: Data to create payload for
Returns:
Payload: Appropriate payload instance
"""
def payload_type(payload_class):
"""
Register payload type for specific data types.
Parameters:
- payload_class: Payload class to register
Returns:
Function: Registration decorator
"""
# Global payload registry
PAYLOAD_REGISTRY = None # PayloadRegistry instanceStreaming payload creation and data streaming utilities.
def streamer(func):
"""
Decorator to create streaming payload from generator function.
Parameters:
- func: Async generator function
Returns:
Function: Payload factory function
"""Cookie management and storage implementations.
class CookieJar:
def __init__(self, *, unsafe=False, quote_cookie=True, treat_as_secure_origin=None):
"""
Default cookie jar implementation.
Parameters:
- unsafe (bool): Allow unsafe cookies
- quote_cookie (bool): Quote cookie values
- treat_as_secure_origin: Origins to treat as secure
"""
def update_cookies(self, cookies, response_url=None):
"""Update cookies from response."""
def filter_cookies(self, request_url=None):
"""Filter cookies for request."""
def clear(self, predicate=None):
"""Clear cookies matching predicate."""
def clear_domain(self, domain):
"""Clear cookies for domain."""
def __iter__(self):
"""Iterate over cookies."""
def __len__(self):
"""Number of stored cookies."""
class DummyCookieJar:
"""No-op cookie jar that stores no cookies."""
def update_cookies(self, cookies, response_url=None):
"""No-op cookie update."""
def filter_cookies(self, request_url=None):
"""Return empty cookie collection."""Low-level stream readers and data queues for advanced data processing.
class StreamReader:
def __init__(self, protocol, limit=2**16, loop=None):
"""
Asynchronous stream reader.
Parameters:
- protocol: Stream protocol
- limit (int): Buffer size limit
- loop: Event loop
"""
async def read(self, n=-1):
"""
Read up to n bytes.
Parameters:
- n (int): Number of bytes to read (-1 for all)
Returns:
bytes: Read data
"""
async def readline(self):
"""Read one line."""
async def readexactly(self, n):
"""Read exactly n bytes."""
async def readuntil(self, separator=b'\n'):
"""Read until separator."""
def at_eof(self):
"""Check if at end of stream."""
def feed_eof(self):
"""Signal end of stream."""
def feed_data(self, data):
"""Feed data to stream."""
class DataQueue:
def __init__(self, *, loop=None):
"""Generic data queue."""
def __aiter__(self):
"""Async iterator over queue items."""
async def read(self):
"""Read next item from queue."""
def feed_data(self, data, size=0):
"""Add data to queue."""
def feed_eof(self):
"""Signal end of queue."""
def is_eof(self):
"""Check if queue is at EOF."""
def at_eof(self):
"""Check if queue is finished."""
class FlowControlDataQueue(DataQueue):
def __init__(self, protocol, limit=2**16, *, loop=None):
"""
Flow-controlled data queue.
Parameters:
- protocol: Flow control protocol
- limit (int): Buffer size limit
- loop: Event loop
"""
class EofStream(Exception):
"""End of stream exception."""
# Empty payload singleton
EMPTY_PAYLOAD = Noneimport aiohttp
async def submit_form():
# Create form data
form = aiohttp.FormData()
form.add_field('name', 'John Doe')
form.add_field('email', 'john@example.com')
form.add_field('age', '30')
# Add file upload
with open('document.pdf', 'rb') as f:
form.add_field('file', f,
filename='document.pdf',
content_type='application/pdf')
# Submit form
async with aiohttp.ClientSession() as session:
async with session.post('https://api.example.com/submit',
data=form) as response:
result = await response.json()
return resultfrom aiohttp import web
async def upload_handler(request):
reader = await request.multipart()
uploaded_files = []
form_fields = {}
async for part in reader:
if part.filename:
# Handle file upload
filename = part.filename
content = await part.read()
# Save file
with open(f'uploads/{filename}', 'wb') as f:
f.write(content)
uploaded_files.append({
'filename': filename,
'size': len(content),
'content_type': part.headers.get('Content-Type')
})
else:
# Handle form field
field_name = part.headers['Content-Disposition'].split('name="')[1].split('"')[0]
field_value = await part.text()
form_fields[field_name] = field_value
return web.json_response({
'files': uploaded_files,
'fields': form_fields
})
app = web.Application()
app.router.add_post('/upload', upload_handler)import aiohttp
import asyncio
import json
@aiohttp.streamer
async def json_stream_generator():
"""Generate streaming JSON data."""
yield b'['
for i in range(1000):
if i > 0:
yield b','
data = {'id': i, 'value': f'item_{i}'}
yield json.dumps(data).encode('utf-8')
# Simulate processing delay
await asyncio.sleep(0.01)
yield b']'
async def stream_json_data():
async with aiohttp.ClientSession() as session:
# Create streaming payload
data = json_stream_generator()
async with session.post('https://api.example.com/bulk',
data=data,
headers={'Content-Type': 'application/json'}) as response:
return await response.json()import aiohttp
from aiohttp.payload import Payload
class CSVPayload(Payload):
def __init__(self, data, **kwargs):
"""Custom CSV payload."""
# Convert data to CSV format
if isinstance(data, list) and data and isinstance(data[0], dict):
# Convert list of dicts to CSV
import csv
import io
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
csv_data = output.getvalue().encode('utf-8')
else:
csv_data = str(data).encode('utf-8')
super().__init__(csv_data,
content_type='text/csv',
**kwargs)
# Register custom payload type
@aiohttp.payload_type(list)
def list_to_csv_payload(data, *args, **kwargs):
"""Convert list to CSV payload."""
if data and isinstance(data[0], dict):
return CSVPayload(data, *args, **kwargs)
return aiohttp.get_payload(data, *args, **kwargs)
# Usage
async def send_csv_data():
data = [
{'name': 'John', 'age': 30, 'city': 'New York'},
{'name': 'Jane', 'age': 25, 'city': 'Boston'},
{'name': 'Bob', 'age': 35, 'city': 'Chicago'}
]
async with aiohttp.ClientSession() as session:
async with session.post('https://api.example.com/data.csv',
data=data) as response:
return await response.text()import aiohttp
import asyncio
from pathlib import Path
class ProgressFilePayload(aiohttp.IOBasePayload):
def __init__(self, file_path, progress_callback=None, **kwargs):
self._file_path = Path(file_path)
self._progress_callback = progress_callback
self._total_size = self._file_path.stat().st_size
self._uploaded = 0
super().__init__(
open(file_path, 'rb'),
filename=self._file_path.name,
**kwargs
)
async def write(self, writer):
"""Write file with progress tracking."""
chunk_size = 8192
while True:
chunk = self._value.read(chunk_size)
if not chunk:
break
await writer.write(chunk)
self._uploaded += len(chunk)
if self._progress_callback:
progress = (self._uploaded / self._total_size) * 100
await self._progress_callback(progress, self._uploaded, self._total_size)
async def progress_callback(percent, uploaded, total):
"""Progress callback function."""
print(f"Upload progress: {percent:.1f}% ({uploaded}/{total} bytes)")
async def upload_large_file():
file_path = 'large_file.zip'
# Create form data with progress tracking
form = aiohttp.FormData()
form.add_field('description', 'Large file upload')
form.add_field('file',
ProgressFilePayload(file_path, progress_callback),
filename='large_file.zip',
content_type='application/zip')
async with aiohttp.ClientSession() as session:
async with session.post('https://api.example.com/upload',
data=form) as response:
result = await response.json()
return result
# Run upload
asyncio.run(upload_large_file())Install with Tessl CLI
npx tessl i tessl/pypi-aiohttp