CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiohttp

Async http client/server framework (asyncio)

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data.mddocs/

Data Handling

Comprehensive data processing and payload system supporting various data types, multipart content, form data processing, and streaming operations. Includes automatic content encoding/decoding and extensive customization options for different data formats.

Capabilities

Form Data Processing

Form data creation and processing for HTML forms and file uploads with support for multiple encoding types.

class FormData:
    def __init__(self, quote_fields=True, charset='utf-8'):
        """
        Create form data container.
        
        Parameters:
        - quote_fields (bool): Quote field names and values
        - charset (str): Text encoding
        """
    
    def add_field(
        self,
        name,
        value,
        *,
        content_type=None,
        filename=None,
        content_transfer_encoding=None
    ):
        """
        Add form field.
        
        Parameters:
        - name (str): Field name
        - value: Field value (str, bytes, or file-like)
        - content_type (str): Content type for field
        - filename (str): Filename for file fields
        - content_transfer_encoding (str): Transfer encoding
        """
    
    def add_fields(self, *fields):
        """Add multiple fields from iterable."""
    
    def is_multipart(self):
        """Check if form requires multipart encoding."""

Multipart Content Processing

Reading and writing multipart content for file uploads, mixed content types, and complex data structures.

class MultipartReader:
    def __init__(self, headers, content):
        """
        Create multipart content reader.
        
        Parameters:
        - headers: Request headers
        - content: Content stream
        """
    
    async def next(self):
        """
        Get next part from multipart content.
        
        Returns:
        BodyPartReader or None: Next body part or None if finished
        """
    
    async def release(self):
        """Release reader resources."""
    
    def at_eof(self):
        """Check if at end of content."""

class BodyPartReader:
    @property
    def headers(self):
        """Part headers."""
    
    @property
    def name(self):
        """Part name from Content-Disposition."""
    
    @property
    def filename(self):
        """Part filename from Content-Disposition."""
    
    async def read(self, decode=False):
        """
        Read part content.
        
        Parameters:
        - decode (bool): Decode content based on headers
        
        Returns:
        bytes: Part content
        """
    
    async def read_chunk(self, size=8192):
        """Read content chunk."""
    
    async def text(self, encoding=None):
        """Read part content as text."""
    
    async def json(self, encoding=None):
        """Parse part content as JSON."""
    
    async def release(self):
        """Release part resources."""

class MultipartWriter:
    def __init__(self, subtype='mixed', boundary=None):
        """
        Create multipart content writer.
        
        Parameters:
        - subtype (str): Multipart subtype
        - boundary (str): Multipart boundary
        """
    
    def append(self, obj, headers=None):
        """
        Append object to multipart content.
        
        Parameters:
        - obj: Object to append (str, bytes, file-like, or payload)
        - headers (dict): Part headers
        
        Returns:
        Payload: Created payload for the part
        """
    
    def append_json(self, obj, headers=None):
        """Append JSON object to multipart content."""
    
    def append_form(self, obj, headers=None):
        """Append form data to multipart content."""
    
    @property
    def boundary(self):
        """Multipart boundary."""
    
    @property
    def size(self):
        """Content size if known."""

def content_disposition_filename(name, fallback=None):
    """
    Extract filename from Content-Disposition header value.
    
    Parameters:
    - name (str): Content-Disposition header value
    - fallback (str): Fallback filename
    
    Returns:
    str or None: Extracted filename
    """

def parse_content_disposition(header):
    """
    Parse Content-Disposition header.
    
    Parameters:
    - header (str): Content-Disposition header value
    
    Returns:
    tuple: (disposition_type, parameters_dict)
    """

class BadContentDispositionHeader(Exception):
    """Invalid Content-Disposition header."""

class BadContentDispositionParam(Exception):
    """Invalid Content-Disposition parameter."""

Payload System

Flexible payload system supporting various data types with automatic serialization and content type detection.

class Payload:
    def __init__(self, value, headers=None, content_type=None, filename=None, encoding=None):
        """
        Base payload class.
        
        Parameters:
        - value: Payload data
        - headers (dict): Payload headers
        - content_type (str): Content type
        - filename (str): Filename for file payloads
        - encoding (str): Text encoding
        """
    
    @property
    def size(self):
        """Payload size if known."""
    
    @property
    def headers(self):
        """Payload headers."""
    
    @property
    def content_type(self):
        """Payload content type."""
    
    async def write(self, writer):
        """Write payload to stream writer."""

class BytesPayload(Payload):
    """Payload for bytes data."""

class StringPayload(Payload):
    """Payload for string data."""

class IOBasePayload(Payload):
    """Payload for file-like objects."""

class BufferedReaderPayload(IOBasePayload):
    """Payload for buffered readers."""

class TextIOPayload(IOBasePayload):
    """Payload for text I/O objects."""

class StringIOPayload(TextIOPayload):
    """Payload for StringIO objects."""

class BytesIOPayload(IOBasePayload):
    """Payload for BytesIO objects."""

class JsonPayload(Payload):
    def __init__(
        self,
        value,
        encoding='utf-8',
        content_type='application/json',
        dumps=None,
        **kwargs
    ):
        """
        JSON payload.
        
        Parameters:  
        - value: Object to serialize as JSON
        - encoding (str): Text encoding
        - content_type (str): Content type
        - dumps: JSON serialization function
        """

class AsyncIterablePayload(Payload):
    def __init__(self, value, **kwargs):
        """
        Payload for async iterables.
        
        Parameters:
        - value: Async iterable data source
        """

def get_payload(data, *args, **kwargs):
    """
    Get appropriate payload for data.
    
    Parameters:
    - data: Data to create payload for
    
    Returns:
    Payload: Appropriate payload instance
    """

def payload_type(payload_class):
    """
    Register payload type for specific data types.
    
    Parameters:
    - payload_class: Payload class to register
    
    Returns:
    Function: Registration decorator
    """

# Global payload registry
PAYLOAD_REGISTRY = None  # PayloadRegistry instance

Streaming Support

Streaming payload creation and data streaming utilities.

def streamer(func):
    """
    Decorator to create streaming payload from generator function.
    
    Parameters:
    - func: Async generator function
    
    Returns:
    Function: Payload factory function
    """

Cookie Handling

Cookie management and storage implementations.

class CookieJar:
    def __init__(self, *, unsafe=False, quote_cookie=True, treat_as_secure_origin=None):
        """
        Default cookie jar implementation.
        
        Parameters:
        - unsafe (bool): Allow unsafe cookies
        - quote_cookie (bool): Quote cookie values
        - treat_as_secure_origin: Origins to treat as secure
        """
    
    def update_cookies(self, cookies, response_url=None):
        """Update cookies from response."""
    
    def filter_cookies(self, request_url=None):
        """Filter cookies for request."""
    
    def clear(self, predicate=None):
        """Clear cookies matching predicate."""
    
    def clear_domain(self, domain):
        """Clear cookies for domain."""
    
    def __iter__(self):
        """Iterate over cookies."""
    
    def __len__(self):
        """Number of stored cookies."""

class DummyCookieJar:
    """No-op cookie jar that stores no cookies."""
    
    def update_cookies(self, cookies, response_url=None):
        """No-op cookie update."""
    
    def filter_cookies(self, request_url=None):
        """Return empty cookie collection."""

Stream Processing

Low-level stream readers and data queues for advanced data processing.

class StreamReader:
    def __init__(self, protocol, limit=2**16, loop=None):
        """
        Asynchronous stream reader.
        
        Parameters:
        - protocol: Stream protocol
        - limit (int): Buffer size limit
        - loop: Event loop
        """
    
    async def read(self, n=-1):
        """
        Read up to n bytes.
        
        Parameters:
        - n (int): Number of bytes to read (-1 for all)
        
        Returns:
        bytes: Read data
        """
    
    async def readline(self):
        """Read one line."""
    
    async def readexactly(self, n):
        """Read exactly n bytes."""
    
    async def readuntil(self, separator=b'\n'):
        """Read until separator."""
    
    def at_eof(self):
        """Check if at end of stream."""
    
    def feed_eof(self):
        """Signal end of stream."""
    
    def feed_data(self, data):
        """Feed data to stream."""

class DataQueue:
    def __init__(self, *, loop=None):
        """Generic data queue."""
    
    def __aiter__(self):
        """Async iterator over queue items."""
    
    async def read(self):
        """Read next item from queue."""
    
    def feed_data(self, data, size=0):
        """Add data to queue."""
    
    def feed_eof(self):
        """Signal end of queue."""
    
    def is_eof(self):
        """Check if queue is at EOF."""
    
    def at_eof(self):
        """Check if queue is finished."""

class FlowControlDataQueue(DataQueue):
    def __init__(self, protocol, limit=2**16, *, loop=None):
        """
        Flow-controlled data queue.
        
        Parameters:
        - protocol: Flow control protocol
        - limit (int): Buffer size limit
        - loop: Event loop
        """

class EofStream(Exception):
    """End of stream exception."""

# Empty payload singleton
EMPTY_PAYLOAD = None

Usage Examples

Form Data Submission

import aiohttp

async def submit_form():
    # Create form data
    form = aiohttp.FormData()
    form.add_field('name', 'John Doe')
    form.add_field('email', 'john@example.com')
    form.add_field('age', '30')
    
    # Add file upload
    with open('document.pdf', 'rb') as f:
        form.add_field('file', f, 
                      filename='document.pdf',
                      content_type='application/pdf')
    
    # Submit form
    async with aiohttp.ClientSession() as session:
        async with session.post('https://api.example.com/submit', 
                               data=form) as response:
            result = await response.json()
            return result

Multipart Content Processing

from aiohttp import web

async def upload_handler(request):
    reader = await request.multipart()
    
    uploaded_files = []
    form_fields = {}
    
    async for part in reader:
        if part.filename:
            # Handle file upload
            filename = part.filename
            content = await part.read()
            
            # Save file
            with open(f'uploads/{filename}', 'wb') as f:
                f.write(content)
            
            uploaded_files.append({
                'filename': filename,
                'size': len(content),
                'content_type': part.headers.get('Content-Type')
            })
        
        else:
            # Handle form field
            field_name = part.headers['Content-Disposition'].split('name="')[1].split('"')[0]
            field_value = await part.text()
            form_fields[field_name] = field_value
    
    return web.json_response({
        'files': uploaded_files,
        'fields': form_fields
    })

app = web.Application()
app.router.add_post('/upload', upload_handler)

JSON Payload Streaming

import aiohttp
import asyncio
import json

@aiohttp.streamer
async def json_stream_generator():
    """Generate streaming JSON data."""
    yield b'['
    
    for i in range(1000):
        if i > 0:
            yield b','
        
        data = {'id': i, 'value': f'item_{i}'}
        yield json.dumps(data).encode('utf-8')
        
        # Simulate processing delay
        await asyncio.sleep(0.01)
    
    yield b']'

async def stream_json_data():
    async with aiohttp.ClientSession() as session:
        # Create streaming payload
        data = json_stream_generator()
        
        async with session.post('https://api.example.com/bulk', 
                               data=data,
                               headers={'Content-Type': 'application/json'}) as response:
            return await response.json()

Custom Payload Type

import aiohttp
from aiohttp.payload import Payload

class CSVPayload(Payload):
    def __init__(self, data, **kwargs):
        """Custom CSV payload."""
        # Convert data to CSV format
        if isinstance(data, list) and data and isinstance(data[0], dict):
            # Convert list of dicts to CSV
            import csv
            import io
            
            output = io.StringIO()
            writer = csv.DictWriter(output, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)
            csv_data = output.getvalue().encode('utf-8')
        else:
            csv_data = str(data).encode('utf-8')
        
        super().__init__(csv_data, 
                        content_type='text/csv',
                        **kwargs)

# Register custom payload type
@aiohttp.payload_type(list)
def list_to_csv_payload(data, *args, **kwargs):
    """Convert list to CSV payload."""
    if data and isinstance(data[0], dict):
        return CSVPayload(data, *args, **kwargs)
    return aiohttp.get_payload(data, *args, **kwargs)

# Usage
async def send_csv_data():
    data = [
        {'name': 'John', 'age': 30, 'city': 'New York'},
        {'name': 'Jane', 'age': 25, 'city': 'Boston'},
        {'name': 'Bob', 'age': 35, 'city': 'Chicago'}
    ]
    
    async with aiohttp.ClientSession() as session:
        async with session.post('https://api.example.com/data.csv', 
                               data=data) as response:
            return await response.text()

Advanced File Upload with Progress

import aiohttp
import asyncio
from pathlib import Path

class ProgressFilePayload(aiohttp.IOBasePayload):
    def __init__(self, file_path, progress_callback=None, **kwargs):
        self._file_path = Path(file_path)
        self._progress_callback = progress_callback
        self._total_size = self._file_path.stat().st_size
        self._uploaded = 0
        
        super().__init__(
            open(file_path, 'rb'),
            filename=self._file_path.name,
            **kwargs
        )
    
    async def write(self, writer):
        """Write file with progress tracking."""
        chunk_size = 8192
        
        while True:
            chunk = self._value.read(chunk_size)
            if not chunk:
                break
            
            await writer.write(chunk)
            self._uploaded += len(chunk)
            
            if self._progress_callback:
                progress = (self._uploaded / self._total_size) * 100
                await self._progress_callback(progress, self._uploaded, self._total_size)

async def progress_callback(percent, uploaded, total):
    """Progress callback function."""
    print(f"Upload progress: {percent:.1f}% ({uploaded}/{total} bytes)")

async def upload_large_file():
    file_path = 'large_file.zip'
    
    # Create form data with progress tracking
    form = aiohttp.FormData()
    form.add_field('description', 'Large file upload')
    form.add_field('file', 
                   ProgressFilePayload(file_path, progress_callback),
                   filename='large_file.zip',
                   content_type='application/zip')
    
    async with aiohttp.ClientSession() as session:
        async with session.post('https://api.example.com/upload', 
                               data=form) as response:
            result = await response.json()
            return result

# Run upload
asyncio.run(upload_large_file())

Install with Tessl CLI

npx tessl i tessl/pypi-aiohttp

docs

client.md

data.md

index.md

server.md

testing.md

utilities.md

websocket.md

tile.json