A Super Fast Async Python Web Framework with a Rust runtime.
—
HTTP request and response handling with comprehensive support for headers, query parameters, form data, file uploads, and various response types including HTML, file serving, and streaming responses.
The Request object contains all information about an incoming HTTP request.
@dataclass
class Request:
query_params: QueryParams
headers: Headers
path_params: dict[str, str]
body: Union[str, bytes]
method: str
url: Url
form_data: dict[str, str]
files: dict[str, bytes]
ip_addr: Optional[str]
identity: Optional[Identity]
def json(self) -> dict:
"""
Parse request body as JSON.
Returns:
dict: Parsed JSON data
Raises:
ValueError: If body is not valid JSON
"""The Response object represents an HTTP response with status, headers, and body.
@dataclass
class Response:
status_code: int
headers: Union[Headers, dict]
description: Union[str, bytes]
response_type: Optional[str]
file_path: Optional[str]
def set_cookie(self, key: str, value: str):
"""
Set a cookie in the response.
Args:
key: Cookie name
value: Cookie value
"""Handle HTTP headers with get, set, and contains operations.
class Headers:
def __init__(self, default_headers: Optional[dict] = None):
"""
Initialize headers container.
Args:
default_headers: Optional default headers dictionary
"""
def get(self, key: str) -> Optional[str]:
"""
Get header value by key.
Args:
key: Header name (case-insensitive)
Returns:
Header value or None if not found
"""
def set(self, key: str, value: str):
"""
Set header value.
Args:
key: Header name
value: Header value
"""
def append(self, key: str, value: str):
"""
Append value to existing header.
Args:
key: Header name
value: Value to append
"""
def contains(self, key: str) -> bool:
"""
Check if header exists.
Args:
key: Header name
Returns:
True if header exists, False otherwise
"""
def populate_from_dict(self, headers: dict[str, str]):
"""
Populate headers from dictionary.
Args:
headers: Dictionary of header name-value pairs
"""
def is_empty(self) -> bool:
"""
Check if headers container is empty.
Returns:
True if no headers are set
"""Handle URL query parameters with support for multiple values per key.
class QueryParams:
def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
"""
Get first value for a query parameter.
Args:
key: Parameter name
default: Default value if parameter not found
Returns:
Parameter value or default
"""
def get_first(self, key: str) -> Optional[str]:
"""
Get first value for a parameter.
Args:
key: Parameter name
Returns:
First value or None
"""
def get_all(self, key: str) -> list[str]:
"""
Get all values for a parameter.
Args:
key: Parameter name
Returns:
List of all values for the parameter
"""
def set(self, key: str, value: str):
"""
Set parameter value (replaces existing).
Args:
key: Parameter name
value: Parameter value
"""
def contains(self, key: str) -> bool:
"""
Check if parameter exists.
Args:
key: Parameter name
Returns:
True if parameter exists
"""
def empty(self) -> bool:
"""
Check if query parameters are empty.
Returns:
True if no parameters are set
"""
def extend(self, other: QueryParams):
"""
Extend with parameters from another QueryParams object.
Args:
other: Another QueryParams instance
"""
def to_dict(self) -> dict:
"""
Convert to dictionary representation.
Returns:
Dictionary of parameter name-value pairs
"""Various utilities for creating different types of HTTP responses.
def html(html_content: str) -> Response:
"""
Create an HTML response.
Args:
html_content: HTML content string
Returns:
Response with HTML content-type
"""
def serve_file(file_path: str, file_name: Optional[str] = None) -> "FileResponse":
"""
Serve a file for download.
Args:
file_path: Path to file to serve
file_name: Optional custom filename for download
Returns:
FileResponse for file download
"""
def serve_html(file_path: str) -> "FileResponse":
"""
Serve an HTML file.
Args:
file_path: Path to HTML file
Returns:
FileResponse with HTML content-type
"""
class FileResponse:
def __init__(
self,
file_path: str,
status_code: Optional[int] = None,
headers: Optional[Headers] = None,
):
"""
Response for serving files with automatic content-type detection.
Args:
file_path: Path to file to serve
status_code: HTTP status code (defaults to 200)
headers: Optional response headers
"""Support for streaming responses and Server-Sent Events (SSE).
class StreamingResponse:
def __init__(
self,
content: Union[Generator, AsyncGenerator],
status_code: Optional[int] = None,
headers: Optional[Headers] = None,
media_type: str = "text/event-stream"
):
"""
Create a streaming response.
Args:
content: Generator or async generator yielding response chunks
status_code: HTTP status code
headers: Response headers
media_type: Content-Type header value
"""
def SSEResponse(
content: Union[Generator, AsyncGenerator],
status_code: Optional[int] = None,
headers: Optional[Headers] = None
) -> StreamingResponse:
"""
Create a Server-Sent Events response.
Args:
content: Generator yielding SSE messages
status_code: HTTP status code
headers: Response headers
Returns:
StreamingResponse configured for SSE
"""
def SSEMessage(
data: str,
event: Optional[str] = None,
id: Optional[str] = None,
retry: Optional[int] = None
) -> str:
"""
Format a Server-Sent Events message.
Args:
data: Message data
event: Event type
id: Message ID
retry: Retry timeout in milliseconds
Returns:
Formatted SSE message string
"""JSON serialization utilities using high-performance orjson library.
def jsonify(input_dict: dict) -> str:
"""
Serialize dictionary to JSON string using orjson.
Args:
input_dict: Dictionary to serialize
Returns:
JSON string representation
"""Comprehensive HTTP status code constants.
# All standard HTTP status codes available as constants
HTTP_200_OK = 200
HTTP_201_CREATED = 201
HTTP_204_NO_CONTENT = 204
HTTP_400_BAD_REQUEST = 400
HTTP_401_UNAUTHORIZED = 401
HTTP_403_FORBIDDEN = 403
HTTP_404_NOT_FOUND = 404
HTTP_405_METHOD_NOT_ALLOWED = 405
HTTP_500_INTERNAL_SERVER_ERROR = 500
# ... and many more status codesfrom robyn import Robyn
app = Robyn(__file__)
@app.get("/user/<user_id>")
def get_user(request):
# Access path parameters
user_id = request.path_params["user_id"]
# Access query parameters
include_posts = request.query_params.get("include_posts", "false")
page = request.query_params.get("page", "1")
# Access headers
auth_header = request.headers.get("Authorization")
user_agent = request.headers.get("User-Agent")
return {
"user_id": user_id,
"include_posts": include_posts == "true",
"page": int(page),
"client_ip": request.ip_addr,
"user_agent": user_agent
}
@app.post("/users")
def create_user(request):
# Parse JSON body
user_data = request.json()
# Access form data (if form submission)
if request.form_data:
name = request.form_data.get("name")
email = request.form_data.get("email")
# Access uploaded files
if request.files:
avatar = request.files.get("avatar")
return {"message": "User created", "data": user_data}
app.start()from robyn import Robyn, Response, Headers, status_codes, html, jsonify
app = Robyn(__file__)
@app.get("/custom")
def custom_response(request):
# Create custom response with headers
headers = Headers()
headers.set("X-Custom-Header", "MyValue")
headers.set("Cache-Control", "no-cache")
response = Response(
status_code=status_codes.HTTP_200_OK,
headers=headers,
description="Custom response content"
)
response.set_cookie("session_id", "abc123")
return response
@app.get("/json")
def json_response(request):
data = {"users": ["alice", "bob"], "count": 2}
return jsonify(data)
@app.get("/html")
def html_response(request):
return html("<h1>Hello, World!</h1><p>This is HTML content.</p>")
app.start()from robyn import Robyn, serve_file, serve_html
app = Robyn(__file__)
@app.get("/download/<filename>")
def download_file(request):
filename = request.path_params["filename"]
file_path = f"./uploads/{filename}"
return serve_file(file_path, filename)
@app.get("/report")
def view_report(request):
return serve_html("./reports/monthly_report.html")
app.start()from robyn import Robyn, StreamingResponse, SSEResponse, SSEMessage
import time
import json
app = Robyn(__file__)
@app.get("/stream")
def stream_data(request):
def generate_data():
for i in range(10):
yield f"Data chunk {i}\n"
time.sleep(1)
return StreamingResponse(generate_data(), media_type="text/plain")
@app.get("/events")
def server_sent_events(request):
def generate_events():
for i in range(5):
data = json.dumps({"event": i, "timestamp": time.time()})
yield SSEMessage(data, event="update", id=str(i))
time.sleep(2)
return SSEResponse(generate_events())
app.start()from robyn import Robyn, Headers
app = Robyn(__file__)
@app.get("/search")
def search(request):
# Handle multiple query parameters
query = request.query_params.get("q", "")
page = int(request.query_params.get("page", "1"))
filters = request.query_params.get_all("filter") # Get all filter values
# Check for specific headers
if request.headers.contains("Accept"):
accept_header = request.headers.get("Accept")
if "application/json" in accept_header:
return {"query": query, "page": page, "filters": filters}
# Return HTML response with custom headers
headers = Headers()
headers.set("Content-Type", "text/html")
headers.set("X-Search-Results", str(len(filters)))
html_content = f"<h1>Search Results</h1><p>Query: {query}</p>"
response = Response(200, headers, html_content)
return response
app.start()Install with Tessl CLI
npx tessl i tessl/pypi-robyn