CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mitmproxy

An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.

Pending
Overview
Eval results
Files

content.mddocs/

Content Processing

Flexible content viewing, transformation, and analysis with support for various data formats and encoding schemes. Includes syntax highlighting, interactive content exploration, and extensible content view system.

Capabilities

Content View System

Extensible system for viewing and processing different content types.

class Contentview:
    """
    Base class for content viewers.
    
    Content viewers transform raw bytes into human-readable representations
    with optional syntax highlighting and formatting.
    """
    name: str
    content_types: List[str]
    
    def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterator[Tuple[str, bytes]]]:
        """
        Transform content for viewing.
        
        Parameters:
        - data: Raw content bytes
        - **metadata: Additional metadata (content_type, etc.)
        
        Returns:
        - Tuple of (description, formatted_lines)
        """

class InteractiveContentview(Contentview):
    """
    Interactive content viewer with user input handling.
    
    Extends basic content viewing with interactive capabilities
    for exploring complex data structures.
    """
    def render_priority(self, data: bytes, **metadata) -> float:
        """
        Return priority for this viewer (higher = preferred).
        
        Parameters:
        - data: Content to potentially view
        - **metadata: Content metadata
        
        Returns:
        - Priority score (0.0 to 1.0)
        """

class SyntaxHighlight(Contentview):
    """
    Syntax highlighting content viewer.
    
    Provides syntax highlighting for code and structured data formats.
    """

def add(view: Contentview) -> None:
    """
    Register a custom content view.
    
    Parameters:
    - view: Content view instance to register
    """

class Metadata:
    """
    Content metadata container.
    
    Holds information about content type, encoding, and other properties
    used by content viewers for processing decisions.
    """
    content_type: Optional[str]
    charset: Optional[str]
    filename: Optional[str]
    size: int

Encoding Utilities

Content encoding and decoding support for various compression and transformation schemes.

def encode(data: bytes, encoding: str) -> bytes:
    """
    Encode content using specified encoding scheme.
    
    Parameters:
    - data: Raw content bytes to encode
    - encoding: Encoding scheme name (gzip, deflate, brotli, etc.)
    
    Returns:
    - Encoded content bytes
    
    Raises:
    - ValueError: If encoding scheme is not supported
    """

def decode(data: bytes, encoding: str) -> bytes:
    """
    Decode content using specified encoding scheme.
    
    Parameters:
    - data: Encoded content bytes to decode
    - encoding: Encoding scheme name (gzip, deflate, brotli, etc.)
    
    Returns:
    - Decoded content bytes
    
    Raises:
    - ValueError: If encoding scheme is not supported or data is invalid
    """

# Supported encoding schemes
ENCODINGS = {
    "gzip": "GNU zip compression",
    "deflate": "DEFLATE compression",
    "brotli": "Brotli compression", 
    "identity": "No encoding (pass-through)",
    "compress": "Unix compress format",
    "x-gzip": "Legacy gzip",
    "x-deflate": "Legacy deflate"
}

Usage Examples

Custom Content Viewer

from mitmproxy import contentviews
from mitmproxy.contentviews import base
import json
import yaml

class YAMLContentView(base.Contentview):
    """Custom content viewer for YAML files."""
    
    name = "YAML"
    content_types = ["application/yaml", "application/x-yaml", "text/yaml"]
    
    def __call__(self, data, **metadata):
        try:
            # Parse YAML content
            parsed = yaml.safe_load(data.decode('utf-8'))
            
            # Convert to pretty-printed JSON for display
            formatted = json.dumps(parsed, indent=2, ensure_ascii=False)
            
            # Return formatted content with syntax highlighting
            lines = []
            for i, line in enumerate(formatted.split('\n')):
                # Simple syntax highlighting for JSON
                if line.strip().startswith('"') and ':' in line:
                    # Key lines
                    lines.append(("text", f"{i+1:4d} "), ("key", line.encode('utf-8')))
                elif line.strip() in ['{', '}', '[', ']']:
                    # Structural lines
                    lines.append(("text", f"{i+1:4d} "), ("punctuation", line.encode('utf-8')))
                else:
                    # Value lines
                    lines.append(("text", f"{i+1:4d} "), ("value", line.encode('utf-8')))
            
            return "YAML", lines
            
        except (yaml.YAMLError, UnicodeDecodeError) as e:
            return "YAML (parse error)", [("error", str(e).encode('utf-8'))]
    
    def render_priority(self, data, **metadata):
        # High priority for YAML content types
        content_type = metadata.get("content_type", "")
        if any(ct in content_type for ct in self.content_types):
            return 0.9
        
        # Medium priority if content looks like YAML
        try:
            text = data.decode('utf-8')
            if any(indicator in text[:100] for indicator in ['---', '- ', ': ']):
                return 0.5
        except UnicodeDecodeError:
            pass
        
        return 0.0

# Register the custom viewer
contentviews.add(YAMLContentView())

class XMLContentView(base.Contentview):
    """Custom content viewer for XML with pretty printing."""
    
    name = "XML Pretty"
    content_types = ["application/xml", "text/xml"]
    
    def __call__(self, data, **metadata):
        try:
            import xml.etree.ElementTree as ET
            from xml.dom import minidom
            
            # Parse and pretty-print XML
            root = ET.fromstring(data)
            rough_string = ET.tostring(root, encoding='unicode')
            reparsed = minidom.parseString(rough_string)
            pretty = reparsed.toprettyxml(indent="  ")
            
            # Remove empty lines
            lines = [line for line in pretty.split('\n') if line.strip()]
            
            # Format for display with line numbers
            formatted_lines = []
            for i, line in enumerate(lines):
                formatted_lines.append(("text", f"{i+1:4d} "), ("xml", line.encode('utf-8')))
            
            return f"XML ({len(lines)} lines)", formatted_lines
            
        except ET.ParseError as e:
            return "XML (parse error)", [("error", str(e).encode('utf-8'))]
    
    def render_priority(self, data, **metadata):
        content_type = metadata.get("content_type", "")
        if any(ct in content_type for ct in self.content_types):
            return 0.8
        
        # Check if content starts with XML declaration
        try:
            text = data.decode('utf-8').strip()
            if text.startswith('<?xml') or text.startswith('<'):
                return 0.6
        except UnicodeDecodeError:
            pass
        
        return 0.0

contentviews.add(XMLContentView())

Content Processing in Addons

from mitmproxy import http, contentviews
from mitmproxy.net import encoding
import mitmproxy.ctx as ctx
import gzip
import json

class ContentProcessorAddon:
    """Addon for comprehensive content processing."""
    
    def response(self, flow: http.HTTPFlow):
        """Process response content."""
        if not flow.response:
            return
        
        content_type = flow.response.headers.get("content-type", "")
        content_encoding = flow.response.headers.get("content-encoding", "")
        
        # Decode compressed content
        if content_encoding:
            try:
                decoded_content = encoding.decode(flow.response.content, content_encoding)
                ctx.log.info(f"Decoded {content_encoding} content: {len(flow.response.content)} -> {len(decoded_content)} bytes")
                
                # Store original for potential re-encoding
                flow.metadata["original_encoding"] = content_encoding
                flow.metadata["original_content"] = flow.response.content
                
                # Update response with decoded content
                flow.response.content = decoded_content
                del flow.response.headers["content-encoding"]
                
            except ValueError as e:
                ctx.log.error(f"Failed to decode {content_encoding}: {e}")
        
        # Process JSON content
        if "application/json" in content_type:
            self.process_json_content(flow)
        
        # Process HTML content
        elif "text/html" in content_type:
            self.process_html_content(flow)
        
        # Process image content
        elif content_type.startswith("image/"):
            self.process_image_content(flow)
    
    def process_json_content(self, flow: http.HTTPFlow):
        """Process JSON response content."""
        try:
            data = flow.response.json()
            
            # Log JSON structure
            ctx.log.info(f"JSON response structure: {type(data).__name__}")
            if isinstance(data, dict):
                ctx.log.info(f"JSON keys: {list(data.keys())}")
            elif isinstance(data, list):
                ctx.log.info(f"JSON array length: {len(data)}")
            
            # Pretty-print JSON for debugging
            pretty_json = json.dumps(data, indent=2, ensure_ascii=False)
            ctx.log.info(f"JSON content preview:\n{pretty_json[:500]}...")
            
            # Could modify JSON data here
            if isinstance(data, dict) and "debug" not in data:
                data["debug"] = {"processed_by": "mitmproxy", "timestamp": flow.response.timestamp_start}
                flow.response.set_text(json.dumps(data))
        
        except ValueError as e:
            ctx.log.error(f"Invalid JSON in response: {e}")
    
    def process_html_content(self, flow: http.HTTPFlow):
        """Process HTML response content."""
        try:
            html_content = flow.response.get_text()
            
            # Log HTML info
            title_start = html_content.find("<title>")
            title_end = html_content.find("</title>")
            if title_start != -1 and title_end != -1:
                title = html_content[title_start + 7:title_end]
                ctx.log.info(f"HTML page title: {title}")
            
            # Count common elements
            element_counts = {
                "links": html_content.count("<a "),
                "images": html_content.count("<img "),
                "scripts": html_content.count("<script"),
                "forms": html_content.count("<form")
            }
            ctx.log.info(f"HTML elements: {element_counts}")
            
        except UnicodeDecodeError as e:
            ctx.log.error(f"Failed to decode HTML: {e}")
    
    def process_image_content(self, flow: http.HTTPFlow):
        """Process image response content."""
        content_type = flow.response.headers.get("content-type", "")
        content_size = len(flow.response.content)
        
        ctx.log.info(f"Image: {content_type}, {content_size} bytes")
        
        # Could analyze image properties here
        if content_type == "image/jpeg":
            # Simple JPEG header analysis
            if flow.response.content.startswith(b'\xff\xd8\xff'):
                ctx.log.info("Valid JPEG header detected")
        elif content_type == "image/png":
            # PNG header analysis
            if flow.response.content.startswith(b'\x89PNG\r\n\x1a\n'):
                ctx.log.info("Valid PNG header detected")

addons = [ContentProcessorAddon()]

Advanced Content Analysis

from mitmproxy import http
import mitmproxy.ctx as ctx
import hashlib
import magic  # python-magic library for file type detection
import re

class ContentAnalyzerAddon:
    """Advanced content analysis and classification."""
    
    def __init__(self):
        self.content_stats = {
            "total_bytes": 0,
            "content_types": {},
            "encodings": {},
            "file_types": {}
        }
    
    def response(self, flow: http.HTTPFlow):
        """Analyze response content comprehensively."""
        if not flow.response or not flow.response.content:
            return
        
        content = flow.response.content
        content_size = len(content)
        content_type = flow.response.headers.get("content-type", "unknown")
        content_encoding = flow.response.headers.get("content-encoding", "none")
        
        # Update statistics
        self.content_stats["total_bytes"] += content_size
        self.content_stats["content_types"][content_type] = self.content_stats["content_types"].get(content_type, 0) + 1
        self.content_stats["encodings"][content_encoding] = self.content_stats["encodings"].get(content_encoding, 0) + 1
        
        # Detect actual file type using magic numbers
        try:
            detected_type = magic.from_buffer(content, mime=True)
            self.content_stats["file_types"][detected_type] = self.content_stats["file_types"].get(detected_type, 0) + 1
            
            # Check for content type mismatch
            if detected_type != content_type.split(';')[0]:
                ctx.log.warn(f"Content type mismatch: declared={content_type}, detected={detected_type}")
        
        except Exception as e:
            ctx.log.error(f"File type detection failed: {e}")
        
        # Calculate content hash
        content_hash = hashlib.sha256(content).hexdigest()[:16]
        
        # Security analysis
        self.analyze_security(flow, content, content_type)
        
        # Performance analysis
        self.analyze_performance(flow, content, content_size)
        
        # Log analysis summary
        ctx.log.info(f"Content analysis: {flow.request.url}")
        ctx.log.info(f"  Size: {content_size} bytes, Type: {content_type}")
        ctx.log.info(f"  Hash: {content_hash}, Encoding: {content_encoding}")
    
    def analyze_security(self, flow, content, content_type):
        """Analyze content for security issues."""
        security_issues = []
        
        # Check for potential XSS in HTML
        if "text/html" in content_type:
            try:
                html_text = content.decode('utf-8', errors='ignore')
                
                # Simple XSS pattern detection
                xss_patterns = [
                    r'<script[^>]*>.*?javascript:',
                    r'on\w+\s*=\s*["\'].*?javascript:',
                    r'<iframe[^>]*src\s*=\s*["\']javascript:',
                ]
                
                for pattern in xss_patterns:
                    if re.search(pattern, html_text, re.IGNORECASE | re.DOTALL):
                        security_issues.append("Potential XSS vector detected")
                        break
                
                # Check for inline scripts
                if '<script' in html_text and 'javascript:' in html_text:
                    security_issues.append("Inline JavaScript detected")
                
            except UnicodeDecodeError:
                pass
        
        # Check for exposed sensitive data in JSON
        elif "application/json" in content_type:
            try:
                json_text = content.decode('utf-8', errors='ignore').lower()
                
                sensitive_keywords = ['password', 'token', 'secret', 'key', 'api_key', 'private']
                for keyword in sensitive_keywords:
                    if keyword in json_text:
                        security_issues.append(f"Potentially sensitive data: {keyword}")
                
            except UnicodeDecodeError:
                pass
        
        # Log security issues
        if security_issues:
            ctx.log.warn(f"Security analysis for {flow.request.url}:")
            for issue in security_issues:
                ctx.log.warn(f"  - {issue}")
    
    def analyze_performance(self, flow, content, content_size):
        """Analyze content for performance implications."""
        performance_notes = []
        
        # Large content warning
        if content_size > 1024 * 1024:  # > 1MB
            performance_notes.append(f"Large response: {content_size / (1024*1024):.2f} MB")
        
        # Check compression effectiveness
        content_encoding = flow.response.headers.get("content-encoding", "")
        if not content_encoding and content_size > 1024:  # > 1KB uncompressed
            performance_notes.append("Content could benefit from compression")
        
        # Check caching headers
        cache_control = flow.response.headers.get("cache-control", "")
        expires = flow.response.headers.get("expires", "")
        etag = flow.response.headers.get("etag", "")
        
        if not any([cache_control, expires, etag]):
            performance_notes.append("No caching headers present")
        
        # Log performance notes
        if performance_notes:
            ctx.log.info(f"Performance analysis for {flow.request.url}:")
            for note in performance_notes:
                ctx.log.info(f"  - {note}")
    
    def done(self):
        """Log final content statistics."""
        stats = self.content_stats
        ctx.log.info("Content Analysis Summary:")
        ctx.log.info(f"  Total bytes processed: {stats['total_bytes']:,}")
        ctx.log.info(f"  Unique content types: {len(stats['content_types'])}")
        ctx.log.info(f"  Most common content type: {max(stats['content_types'], key=stats['content_types'].get) if stats['content_types'] else 'None'}")
        ctx.log.info(f"  Encoding distribution: {dict(list(stats['encodings'].items())[:5])}")

addons = [ContentAnalyzerAddon()]

Content Transformation

from mitmproxy import http
from mitmproxy.net import encoding
import mitmproxy.ctx as ctx
import json
import re

class ContentTransformerAddon:
    """Transform content based on rules and filters."""
    
    def __init__(self):
        self.transformation_rules = {
            # URL pattern -> transformation function
            r".*\.json$": self.transform_json,
            r".*/api/.*": self.transform_api_response,
            r".*\.html$": self.transform_html,
        }
    
    def response(self, flow: http.HTTPFlow):
        """Apply content transformations based on URL patterns."""
        if not flow.response:
            return
        
        url = flow.request.url
        
        # Find matching transformation rules
        for pattern, transform_func in self.transformation_rules.items():
            if re.match(pattern, url):
                try:
                    transform_func(flow)
                except Exception as e:
                    ctx.log.error(f"Transformation failed for {url}: {e}")
    
    def transform_json(self, flow: http.HTTPFlow):
        """Transform JSON responses."""
        try:
            data = flow.response.json()
            
            # Add metadata to all JSON responses
            if isinstance(data, dict):
                data["_metadata"] = {
                    "processed_by": "mitmproxy",
                    "original_size": len(flow.response.content),
                    "url": flow.request.url
                }
            
            # Pretty-format JSON
            flow.response.set_text(json.dumps(data, indent=2, ensure_ascii=False))
            
            ctx.log.info(f"Transformed JSON response: {flow.request.url}")
            
        except ValueError:
            ctx.log.warn(f"Failed to parse JSON: {flow.request.url}")
    
    def transform_api_response(self, flow: http.HTTPFlow):
        """Transform API responses with additional headers."""
        # Add API processing headers
        flow.response.headers["X-API-Processed"] = "true"
        flow.response.headers["X-Processing-Time"] = str(int(time.time()))
        
        # Add CORS headers for development
        flow.response.headers["Access-Control-Allow-Origin"] = "*"
        flow.response.headers["Access-Control-Allow-Methods"] = "GET,POST,PUT,DELETE,OPTIONS"
        flow.response.headers["Access-Control-Allow-Headers"] = "Content-Type,Authorization"
        
        ctx.log.info(f"Transformed API response: {flow.request.url}")
    
    def transform_html(self, flow: http.HTTPFlow):
        """Transform HTML responses."""
        try:
            html_content = flow.response.get_text()
            
            # Inject debugging script
            debug_script = """
            <script>
            console.log('Page processed by mitmproxy');
            window.mitmproxy_processed = true;
            </script>
            """
            
            # Insert before closing </body> tag
            if "</body>" in html_content:
                html_content = html_content.replace("</body>", debug_script + "</body>")
            else:
                html_content += debug_script
            
            # Add meta tag
            meta_tag = '<meta name="processed-by" content="mitmproxy">'
            if "<head>" in html_content:
                html_content = html_content.replace("<head>", "<head>" + meta_tag)
            
            flow.response.set_text(html_content)
            
            ctx.log.info(f"Transformed HTML response: {flow.request.url}")
            
        except UnicodeDecodeError:
            ctx.log.warn(f"Failed to decode HTML: {flow.request.url}")

addons = [ContentTransformerAddon()]

Install with Tessl CLI

npx tessl i tessl/pypi-mitmproxy

docs

addons.md

commands.md

configuration.md

connections.md

content.md

flow-io.md

http-flows.md

index.md

protocols.md

tile.json