The networks core of the Minos Framework providing networking components for reactive microservices
—
HTTP server functionality with REST support, request/response abstractions, routing, and connector patterns. Provides the foundation for building HTTP APIs and web services with the Minos framework.
Port classes for managing HTTP service lifecycle with automatic setup and teardown.
class HttpPort:
connector: HttpConnector
@staticmethod
def _get_connector(connector: Optional[HttpConnector] = None, http_connector: Optional[HttpConnector] = None, **kwargs) -> HttpConnector: ...
async def _start(self) -> None: ...
async def _stop(self, err: Exception = None) -> None: ...
class RestService:
"""Deprecated - use HttpPort instead"""Usage Examples:
from minos.networks import HttpPort
from minos.common import Config
# Create HTTP port from configuration
config = Config("config.yml")
http_port = HttpPort._from_config(config)
# Start the HTTP server
await http_port.start()
# Server runs and handles requests...
# Stop the HTTP server
await http_port.stop()Adapters that manage HTTP routers and provide route aggregation.
class HttpAdapter:
def __init__(self, routers: list[HttpRouter]): ...
routes: dict[HttpEnrouteDecorator, Callable]
routers: list[HttpRouter]
@classmethod
def _from_config(cls, config: Config, **kwargs) -> HttpAdapter: ...
@staticmethod
def _routers_from_config(config: Config, **kwargs) -> list[HttpRouter]: ...Usage Examples:
from minos.networks import HttpAdapter, RestHttpRouter
# Create routers
router = RestHttpRouter._from_config(config)
# Create adapter with routers
adapter = HttpAdapter(routers=[router])
# Access aggregated routes
routes = adapter.routes
for decorator, callback in routes.items():
print(f"Route: {decorator.path} {decorator.method}")Abstract base classes for HTTP server implementations that bridge framework-specific HTTP servers with Minos request/response system.
from typing import TypeVar, Generic, Callable, Awaitable
RawRequest = TypeVar("RawRequest")
RawResponse = TypeVar("RawResponse")
class HttpConnector(Generic[RawRequest, RawResponse]):
def __init__(self, adapter: HttpAdapter, host: Optional[str] = None, port: Optional[int] = None, max_connections: int = 5): ...
host: str # default: "0.0.0.0"
port: int # default: 8080
routes: dict[HttpEnrouteDecorator, Callable]
adapter: HttpAdapter
@classmethod
def _from_config(cls, config: Config, **kwargs) -> HttpConnector: ...
async def start(self) -> None: ...
async def stop(self) -> None: ...
def mount_routes(self) -> None: ...
def mount_route(self, path: str, method: str, callback: Callable[[Request], Optional[Response]]) -> None: ...
def adapt_callback(self, callback: Callable[[Request], Union[Optional[Response], Awaitable[Optional[Response]]]]) -> Callable[[RawRequest], Awaitable[RawResponse]]: ...
# Abstract methods - implement in subclasses
async def _start(self) -> None: ...
async def _stop(self) -> None: ...
def _mount_route(self, path: str, method: str, adapted_callback: Callable) -> None: ...
async def _build_request(self, request: RawRequest) -> Request: ...
async def _build_response(self, response: Optional[Response]) -> RawResponse: ...
async def _build_error_response(self, message: str, status: int) -> RawResponse: ...Usage Examples:
# Custom connector implementation
class FastAPIConnector(HttpConnector[FastAPIRequest, FastAPIResponse]):
async def _start(self) -> None:
# Start FastAPI server
pass
async def _stop(self) -> None:
# Stop FastAPI server
pass
def _mount_route(self, path: str, method: str, adapted_callback: Callable) -> None:
# Mount route on FastAPI app
pass
async def _build_request(self, request: FastAPIRequest) -> Request:
# Convert FastAPI request to Minos Request
pass
async def _build_response(self, response: Optional[Response]) -> FastAPIResponse:
# Convert Minos Response to FastAPI response
pass
# Using the connector
connector = FastAPIConnector(adapter=adapter, host="localhost", port=8080)
await connector.start()HTTP-specific request classes with URL parameters, query parameters, and header support.
class HttpRequest:
user: Optional[UUID]
headers: dict[str, str]
content_type: str
has_url_params: bool
has_query_params: bool
async def url_params(self, type_: Optional[Union[type, str]] = None, **kwargs) -> Any: ...
async def query_params(self, type_: Optional[Union[type, str]] = None, **kwargs) -> Any: ...
# Inherited from Request
has_content: bool
has_params: bool
async def content(self, **kwargs) -> Any: ...
async def params(self, **kwargs) -> dict[str, Any]: ...Usage Examples:
@enroute.rest.query("/users/{user_id}", method="GET")
async def get_user(request: HttpRequest) -> Response:
# Get URL parameters
url_params = await request.url_params()
user_id = url_params["user_id"]
# Get query parameters
query_params = await request.query_params()
include_posts = query_params.get("include_posts", False)
# Access headers
auth_header = request.headers.get("Authorization")
# Get request body (if any)
if request.has_content:
body = await request.content()
return Response({"user_id": user_id, "include_posts": include_posts})
@enroute.rest.command("/users/{user_id}", method="PUT")
async def update_user(request: HttpRequest) -> Response:
# URL parameters from path
url_params = await request.url_params()
user_id = url_params["user_id"]
# Request body content
user_data = await request.content()
# Update user logic
updated_user = {"id": user_id, **user_data}
return Response(updated_user)HTTP-specific response classes with content type management and status codes.
class HttpResponse:
def __init__(self, *args, content_type: str = "application/json", **kwargs): ...
content_type: str
status: int
has_content: bool
async def content(self, **kwargs) -> Any: ...
@classmethod
def from_response(cls, response: Optional[Response]) -> HttpResponse: ...
class HttpResponseException:
status: int # default: 400Usage Examples:
@enroute.rest.query("/users/{user_id}", method="GET")
async def get_user(request: HttpRequest) -> HttpResponse:
url_params = await request.url_params()
user_id = url_params["user_id"]
# Return JSON response (default)
return HttpResponse({"id": user_id, "name": "John Doe"})
@enroute.rest.query("/users/{user_id}/avatar", method="GET")
async def get_user_avatar(request: HttpRequest) -> HttpResponse:
# Return image response
avatar_data = get_avatar_bytes()
return HttpResponse(
avatar_data,
content_type="image/png",
status=200
)
@enroute.rest.command("/users", method="POST")
async def create_user(request: HttpRequest) -> HttpResponse:
try:
user_data = await request.content()
if not user_data.get("email"):
raise HttpResponseException(status=400)
# Create user logic
new_user = {"id": "123", **user_data}
return HttpResponse(new_user, status=201)
except ValueError:
raise HttpResponseException(status=400)Integration with the routing system for HTTP request handling.
class RestHttpRouter:
routes: dict[HttpEnrouteDecorator, Callable]
def _filter_routes(self, routes: dict[EnrouteDecorator, Callable]) -> dict[EnrouteDecorator, Callable]: ...Usage Examples:
from minos.networks import RestHttpRouter, enroute
class UserController:
@enroute.rest.query("/users", method="GET")
async def list_users(self, request: HttpRequest) -> HttpResponse:
return HttpResponse({"users": []})
@enroute.rest.command("/users", method="POST")
async def create_user(self, request: HttpRequest) -> HttpResponse:
user_data = await request.content()
return HttpResponse({"id": "123", **user_data}, status=201)
# Create router and filter routes
config = Config("config.yml")
router = RestHttpRouter._from_config(config)
# Router automatically discovers and filters HTTP routes
http_routes = router.routesfrom minos.networks import HttpPort, enroute, HttpRequest, HttpResponse
from minos.common import Config
class UserAPI:
@enroute.rest.query("/users", method="GET")
async def list_users(self, request: HttpRequest) -> HttpResponse:
query_params = await request.query_params()
page = query_params.get("page", 1)
limit = query_params.get("limit", 10)
# Fetch users with pagination
users = fetch_users(page=page, limit=limit)
return HttpResponse({"users": users, "page": page})
@enroute.rest.command("/users", method="POST")
async def create_user(self, request: HttpRequest) -> HttpResponse:
user_data = await request.content()
# Validate required fields
if not user_data.get("email"):
return HttpResponse({"error": "Email required"}, status=400)
# Create user
new_user = create_user(user_data)
return HttpResponse(new_user, status=201)
@enroute.rest.query("/users/{user_id}", method="GET")
async def get_user(self, request: HttpRequest) -> HttpResponse:
url_params = await request.url_params()
user_id = url_params["user_id"]
user = get_user_by_id(user_id)
if not user:
return HttpResponse({"error": "User not found"}, status=404)
return HttpResponse(user)
@enroute.rest.command("/users/{user_id}", method="PUT")
async def update_user(self, request: HttpRequest) -> HttpResponse:
url_params = await request.url_params()
user_id = url_params["user_id"]
user_data = await request.content()
updated_user = update_user(user_id, user_data)
return HttpResponse(updated_user)
@enroute.rest.command("/users/{user_id}", method="DELETE")
async def delete_user(self, request: HttpRequest) -> HttpResponse:
url_params = await request.url_params()
user_id = url_params["user_id"]
delete_user(user_id)
return HttpResponse(status=204)
# Setup HTTP service
config = Config("config.yml")
http_port = HttpPort._from_config(config)
# Start HTTP server
await http_port.start()
print(f"HTTP server running on {http_port.connector.host}:{http_port.connector.port}")
# Server handles requests automatically based on decorators
# Stop when done
await http_port.stop()@enroute.rest.query("/users/{user_id}/report", method="GET")
async def generate_user_report(request: HttpRequest) -> HttpResponse:
url_params = await request.url_params()
user_id = url_params["user_id"]
# Check accept header for response format
accept = request.headers.get("Accept", "application/json")
user_data = get_user_data(user_id)
if "application/pdf" in accept:
pdf_data = generate_pdf_report(user_data)
return HttpResponse(
pdf_data,
content_type="application/pdf",
status=200
)
elif "text/csv" in accept:
csv_data = generate_csv_report(user_data)
return HttpResponse(
csv_data,
content_type="text/csv",
status=200
)
else:
# Default JSON response
return HttpResponse(user_data)from minos.networks import HttpResponseException
@enroute.rest.command("/users/{user_id}/validate", method="POST")
async def validate_user(request: HttpRequest) -> HttpResponse:
try:
# Check authentication
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HttpResponseException(status=401)
# Get and validate input
url_params = await request.url_params()
user_id = url_params["user_id"]
validation_data = await request.content()
if not validation_data:
raise HttpResponseException(status=400)
# Perform validation
result = validate_user_data(user_id, validation_data)
return HttpResponse({"valid": result})
except ValueError as e:
return HttpResponse({"error": str(e)}, status=400)
except PermissionError:
return HttpResponse({"error": "Forbidden"}, status=403)
except Exception as e:
return HttpResponse({"error": "Internal server error"}, status=500)Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks