The comprehensive WSGI web application library providing essential utilities and components for building Python web applications.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
URL processing functions and WSGI utilities for encoding, decoding, parsing, and manipulating URLs and WSGI environments. These utilities provide essential functionality for handling web requests and constructing proper URLs.
Functions for converting between different URL formats and handling international characters.
def uri_to_iri(uri):
"""
Convert a URI to an IRI (Internationalized Resource Identifier).
Unquotes all valid UTF-8 characters while leaving reserved and invalid
characters quoted. Decodes Punycode domains to Unicode.
Parameters:
- uri: URI string to convert
Returns:
IRI string with Unicode characters unquoted
Examples:
- uri_to_iri("http://xn--n3h.net/p%C3%A5th") → "http://☃.net/påth"
- uri_to_iri("/path%20with%20spaces") → "/path with spaces"
"""
def iri_to_uri(iri):
"""
Convert an IRI to a URI by encoding non-ASCII characters.
Encodes Unicode characters to percent-encoded UTF-8 bytes and
converts Unicode domains to Punycode.
Parameters:
- iri: IRI string to convert
Returns:
ASCII-only URI string
Examples:
- iri_to_uri("http://☃.net/påth") → "http://xn--n3h.net/p%C3%A5th"
- iri_to_uri("/path with spaces") → "/path%20with%20spaces"
"""
# Import URL conversion functions
from werkzeug.urls import uri_to_iri, iri_to_uri
# Note: Werkzeug uses urllib.parse functions directly for basic URL operations
from urllib.parse import quote, unquote, quote_plus, unquote_plus, urlencode, parse_qs
# These are the standard functions used throughout Werkzeug:
def url_quote(string, charset="utf-8", errors="strict", safe="/:"):
"""Quote URL string (alias for urllib.parse.quote)."""
def url_unquote(string, charset="utf-8", errors="replace"):
"""Unquote URL string (alias for urllib.parse.unquote)."""
def url_quote_plus(string, charset="utf-8", errors="strict"):
"""Quote URL string using + for spaces (alias for urllib.parse.quote_plus)."""
def url_unquote_plus(string, charset="utf-8", errors="replace"):
"""Unquote URL string with + as spaces (alias for urllib.parse.unquote_plus)."""
def url_encode(obj, charset="utf-8", encode_keys=False, sort=False, key=None, separator="&"):
"""Encode object to URL query string (enhanced urllib.parse.urlencode)."""
def url_decode(s, charset="utf-8", decode_keys=False, separator="&", cls=None, errors="replace"):
"""Decode URL query string to MultiDict or dict."""Functions for extracting and manipulating data from WSGI environment dictionaries.
def get_current_url(environ, root_only=False, strip_querystring=False, host_only=False, trusted_hosts=None):
"""
Reconstruct the complete URL from WSGI environment.
Parameters:
- environ: WSGI environment dictionary
- root_only: Only return scheme, host, and script root
- strip_querystring: Exclude query string from result
- host_only: Only return scheme and host
- trusted_hosts: List of trusted hostnames for validation
Returns:
Complete URL as IRI (may contain Unicode characters)
Examples:
- get_current_url(environ) → "http://example.com/app/path?query=value"
- get_current_url(environ, root_only=True) → "http://example.com/app"
- get_current_url(environ, host_only=True) → "http://example.com"
"""
def get_host(environ, trusted_hosts=None):
"""
Extract host information from WSGI environment.
Prefers Host header, falls back to SERVER_NAME. Only includes port
if it differs from the standard port for the protocol.
Parameters:
- environ: WSGI environment dictionary
- trusted_hosts: List of trusted hostnames (raises SecurityError if not matched)
Returns:
Host string, optionally with port
Examples:
- get_host(environ) → "example.com"
- get_host(environ) → "localhost:8080"
"""
def get_content_length(environ):
"""
Get the content length from WSGI environment.
Parameters:
- environ: WSGI environment dictionary
Returns:
Content length as integer or None if not specified/invalid
"""
def get_input_stream(environ, safe_fallback=True):
"""
Get the input stream from WSGI environment with safety checks.
Parameters:
- environ: WSGI environment dictionary
- safe_fallback: Return empty stream if wsgi.input is None
Returns:
Input stream for reading request body
"""
def get_path_info(environ):
"""
Get PATH_INFO from WSGI environment with proper decoding.
Parameters:
- environ: WSGI environment dictionary
Returns:
Decoded path info string
"""Utilities for handling WSGI responses and file serving.
def wrap_file(environ, file, buffer_size=8192):
"""
Wrap a file for WSGI response, with optional range support.
Parameters:
- environ: WSGI environment dictionary
- file: File-like object to wrap
- buffer_size: Buffer size for reading file chunks
Returns:
WSGI-compatible iterable for file content
"""
class FileWrapper:
def __init__(self, file, buffer_size=8192):
"""
Wrap a file-like object for WSGI response.
Parameters:
- file: File-like object to serve
- buffer_size: Size of chunks to read
"""
def __iter__(self):
"""Iterate over file chunks."""
def __len__(self):
"""Get file length if available."""
class ClosingIterator:
def __init__(self, iterable, callbacks=None):
"""
Wrap an iterable to ensure cleanup callbacks are called.
Parameters:
- iterable: Iterable to wrap
- callbacks: List of functions to call when iterator is closed
"""
def __iter__(self):
"""Iterate over wrapped iterable."""
def close(self):
"""Call all cleanup callbacks."""
class LimitedStream:
def __init__(self, stream, limit):
"""
Limit the amount of data that can be read from a stream.
Parameters:
- stream: Stream to wrap
- limit: Maximum bytes to allow reading
"""
def read(self, size=-1):
"""Read up to size bytes, respecting the limit."""
def readline(self, size=-1):
"""Read one line, respecting the limit."""Decorators and helper functions for WSGI application development.
def responder(f):
"""
Decorator to automatically call returned WSGI application.
Allows returning Response objects from functions that will be
automatically called with (environ, start_response).
Parameters:
- f: Function that returns a WSGI application
Returns:
WSGI application function
Example:
@responder
def app(environ, start_response):
return Response('Hello World!')
"""
def host_is_trusted(hostname, trusted_list):
"""
Check if hostname is in the trusted list.
Parameters:
- hostname: Hostname to check
- trusted_list: List of trusted hostnames/patterns
Returns:
True if hostname is trusted
"""from werkzeug.urls import uri_to_iri, iri_to_uri
from werkzeug.wsgi import get_current_url
from urllib.parse import quote, unquote, urlencode, parse_qs
def url_handling_examples():
# Convert between URI and IRI
uri = "http://xn--n3h.net/caf%C3%A9"
iri = uri_to_iri(uri)
print(f"URI: {uri}")
print(f"IRI: {iri}") # "http://☃.net/café"
back_to_uri = iri_to_uri(iri)
print(f"Back to URI: {back_to_uri}")
# Basic URL encoding/decoding
text = "Hello World & Co."
encoded = quote(text)
decoded = unquote(encoded)
print(f"Original: {text}")
print(f"Encoded: {encoded}") # "Hello%20World%20%26%20Co."
print(f"Decoded: {decoded}")
# Query string handling
params = {'name': 'John Doe', 'age': '30', 'city': 'New York'}
query_string = urlencode(params)
print(f"Query string: {query_string}") # "name=John+Doe&age=30&city=New+York"
parsed = parse_qs(query_string)
print(f"Parsed: {parsed}")
def wsgi_url_reconstruction(environ, start_response):
"""Example of reconstructing URLs from WSGI environ."""
# Get complete current URL
full_url = get_current_url(environ)
# Get just the application root
root_url = get_current_url(environ, root_only=True)
# Get host only
host_only = get_current_url(environ, host_only=True)
# URL without query string
path_url = get_current_url(environ, strip_querystring=True)
response_text = f"""
Full URL: {full_url}
Root URL: {root_url}
Host only: {host_only}
Path URL: {path_url}
"""
response = Response(response_text)
return response(environ, start_response)
if __name__ == '__main__':
url_handling_examples()from werkzeug.wsgi import get_host, get_content_length, get_path_info
from werkzeug.wrappers import Request, Response
def process_wsgi_environ(environ, start_response):
"""Extract information from WSGI environment."""
# Get host information
host = get_host(environ)
# Get content length
content_length = get_content_length(environ)
# Get path info
path_info = get_path_info(environ)
# Build response with environment info
info = {
'host': host,
'content_length': content_length,
'path_info': path_info,
'method': environ.get('REQUEST_METHOD'),
'query_string': environ.get('QUERY_STRING'),
'content_type': environ.get('CONTENT_TYPE'),
'remote_addr': environ.get('REMOTE_ADDR'),
'user_agent': environ.get('HTTP_USER_AGENT'),
}
response_text = '\n'.join(f'{k}: {v}' for k, v in info.items())
response = Response(response_text, mimetype='text/plain')
return response(environ, start_response)
# Security example with trusted hosts
def secure_host_check(environ, start_response):
"""Example of validating host against trusted list."""
trusted_hosts = ['example.com', 'www.example.com', 'localhost']
try:
host = get_host(environ, trusted_hosts)
response = Response(f'Welcome from trusted host: {host}')
except SecurityError:
response = Response('Untrusted host', status=400)
return response(environ, start_response)from werkzeug.wsgi import wrap_file, FileWrapper
from werkzeug.wrappers import Response
import os
import mimetypes
def serve_static_file(environ, start_response):
"""Serve static files using WSGI utilities."""
# Get requested file path (in production, validate this!)
path_info = environ.get('PATH_INFO', '').lstrip('/')
file_path = os.path.join('./static', path_info)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
response = Response('File not found', status=404)
return response(environ, start_response)
# Get file info
file_size = os.path.getsize(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
# Open file and wrap it for WSGI
file_obj = open(file_path, 'rb')
file_wrapper = wrap_file(environ, file_obj)
# Create response with proper headers
response = Response(
file_wrapper,
mimetype=mime_type or 'application/octet-stream',
headers={
'Content-Length': str(file_size),
'Accept-Ranges': 'bytes',
}
)
return response(environ, start_response)
def serve_large_file_with_wrapper():
"""Example of using FileWrapper directly."""
def large_file_app(environ, start_response):
file_path = './large_file.dat'
if not os.path.exists(file_path):
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'File not found']
file_obj = open(file_path, 'rb')
file_size = os.path.getsize(file_path)
headers = [
('Content-Type', 'application/octet-stream'),
('Content-Length', str(file_size)),
]
start_response('200 OK', headers)
# Use FileWrapper for efficient serving
return FileWrapper(file_obj, buffer_size=16384)
return large_file_appfrom werkzeug.wsgi import responder
from werkzeug.wrappers import Response
import json
@responder
def json_api(environ, start_response):
"""API endpoint using responder decorator."""
path = environ.get('PATH_INFO', '')
method = environ.get('REQUEST_METHOD', 'GET')
if path == '/api/status':
data = {'status': 'ok', 'method': method}
return Response(
json.dumps(data),
mimetype='application/json'
)
elif path == '/api/info':
data = {
'host': environ.get('HTTP_HOST'),
'user_agent': environ.get('HTTP_USER_AGENT'),
'path': path
}
return Response(
json.dumps(data),
mimetype='application/json'
)
else:
return Response(
'{"error": "Not found"}',
status=404,
mimetype='application/json'
)
# Without responder decorator (traditional WSGI)
def traditional_wsgi_app(environ, start_response):
"""Same functionality without decorator."""
response = json_api.__wrapped__(environ, start_response)
return response(environ, start_response)from werkzeug.wsgi import LimitedStream, get_input_stream
from werkzeug.wrappers import Request, Response
def handle_upload_with_limits(environ, start_response):
"""Handle file uploads with size limits."""
request = Request(environ)
# Get content length
content_length = get_content_length(environ)
max_upload_size = 1024 * 1024 * 5 # 5MB limit
if content_length and content_length > max_upload_size:
response = Response('File too large', status=413)
return response(environ, start_response)
# Get input stream with limit
input_stream = get_input_stream(environ)
limited_stream = LimitedStream(input_stream, max_upload_size)
try:
# Read data with automatic limit enforcement
data = limited_stream.read()
response = Response(f'Received {len(data)} bytes')
except RequestEntityTooLarge:
response = Response('Upload size exceeded limit', status=413)
return response(environ, start_response)
def safe_stream_reading():
"""Example of safe stream reading patterns."""
def app(environ, start_response):
input_stream = get_input_stream(environ, safe_fallback=True)
# Read in chunks to avoid memory issues
chunks = []
chunk_size = 8192
total_size = 0
max_size = 1024 * 1024 # 1MB limit
while True:
chunk = input_stream.read(chunk_size)
if not chunk:
break
total_size += len(chunk)
if total_size > max_size:
response = Response('Request too large', status=413)
return response(environ, start_response)
chunks.append(chunk)
data = b''.join(chunks)
response = Response(f'Processed {len(data)} bytes safely')
return response(environ, start_response)
return appfrom werkzeug.wsgi import get_current_url
from werkzeug.urls import iri_to_uri
from werkzeug.routing import Map, Rule
from urllib.parse import urlencode
class URLBuilder:
"""Helper class for building URLs in WSGI applications."""
def __init__(self, url_map):
self.url_map = url_map
def build_url(self, environ, endpoint, values=None, external=False):
"""Build URL for endpoint with current request context."""
adapter = self.url_map.bind_to_environ(environ)
try:
url = adapter.build(endpoint, values or {}, force_external=external)
# Convert IRI to URI for external URLs
if external:
url = iri_to_uri(url)
return url
except BuildError as e:
return None
def build_external_url(self, environ, path, query_params=None):
"""Build external URL with query parameters."""
base_url = get_current_url(environ, root_only=True)
if query_params:
query_string = urlencode(query_params)
url = f"{base_url}{path}?{query_string}"
else:
url = f"{base_url}{path}"
return iri_to_uri(url)
# Example usage
url_map = Map([
Rule('/', endpoint='index'),
Rule('/user/<int:user_id>', endpoint='user_profile'),
Rule('/api/data', endpoint='api_data'),
])
builder = URLBuilder(url_map)
def link_generation_app(environ, start_response):
"""App demonstrating URL generation."""
# Build internal URLs
home_url = builder.build_url(environ, 'index')
profile_url = builder.build_url(environ, 'user_profile', {'user_id': 123})
# Build external URLs
api_url = builder.build_url(environ, 'api_data', external=True)
custom_url = builder.build_external_url(
environ,
'/search',
{'q': 'python', 'page': '2'}
)
html = f"""
<html>
<body>
<h1>URL Examples</h1>
<ul>
<li><a href="{home_url}">Home</a></li>
<li><a href="{profile_url}">User Profile</a></li>
<li><a href="{api_url}">API Data</a></li>
<li><a href="{custom_url}">Search Results</a></li>
</ul>
</body>
</html>
"""
response = Response(html, mimetype='text/html')
return response(environ, start_response)Install with Tessl CLI
npx tessl i tessl/pypi-werkzeug