The networks core of the Minos Framework providing networking components for reactive microservices
—
Core request/response abstractions used across all transport types. Provides unified interfaces for handling different types of requests with content, parameters, and user context.
Abstract base class providing unified request handling across all transport types.
class Request:
user: Optional[UUID]
has_content: bool
has_params: bool
async def content(self, **kwargs) -> Any: ...
async def params(self, **kwargs) -> dict[str, Any]: ...
async def _content(self, **kwargs) -> Any: ... # Abstract
async def _params(self, **kwargs) -> dict[str, Any]: ... # Abstract
def __eq__(self, other: Any) -> bool: ... # Abstract
def __repr__(self) -> str: ... # AbstractConcrete response class for returning data with status codes.
class Response:
def __init__(self, data: Any = None, *, status: int = 200): ...
has_content: bool
status: int
async def content(self, **kwargs) -> Any: ...
def __eq__(self, other: Any) -> bool: ...
def __repr__(self) -> str: ...
def __hash__(self) -> int: ...
class ResponseException:
def __init__(self, *args, status: int = 400): ...
status: intUsage Examples:
from minos.networks import Response, ResponseException
# Create successful response
response = Response({"user_id": "123", "name": "John"})
print(response.status) # 200
# Create response with custom status
created_response = Response({"id": "123"}, status=201)
# Check if response has content
if response.has_content:
data = await response.content()
# Raise response exception
if not valid_data:
raise ResponseException("Invalid data", status=400)Concrete request implementation for testing and in-memory usage.
class InMemoryRequest(Request):
def __init__(self, content: Any = None, params: dict[str, Any] = None, user: Optional[UUID] = None): ...
user: Optional[UUID]
has_content: bool
has_params: bool
async def _content(self, **kwargs) -> Any: ...
async def _params(self, **kwargs) -> dict[str, Any]: ...
def __eq__(self, other: Any) -> bool: ...
def __repr__(self) -> str: ...Usage Examples:
from minos.networks import InMemoryRequest
from uuid import UUID
# Create request with content
request = InMemoryRequest(
content={"name": "John", "email": "john@example.com"},
params={"user_id": "123"},
user=UUID("12345678-1234-5678-9012-123456789012")
)
# Access request data
if request.has_content:
content = await request.content()
print(content["name"]) # "John"
if request.has_params:
params = await request.params()
print(params["user_id"]) # "123"
print(request.user) # UUID objectRequest wrapper that applies transformations to content and parameters.
from typing import Callable, Union, Awaitable
ContentAction = Callable[[Any, ...], Union[Any, Awaitable[Any]]]
ParamsAction = Callable[[dict[str, Any], ...], Union[dict[str, Any], Awaitable[dict[str, Any]]]]
class WrappedRequest(Request):
def __init__(self, base: Request, content_action: Optional[ContentAction] = None, params_action: Optional[ParamsAction] = None): ...
user: Optional[UUID]
has_content: bool
has_params: bool
async def _content(self, **kwargs) -> Any: ...
async def _params(self, **kwargs) -> dict[str, Any]: ...
def __eq__(self, other: Any) -> bool: ...
def __repr__(self) -> str: ...Usage Examples:
from minos.networks import WrappedRequest, InMemoryRequest
# Base request
base_request = InMemoryRequest(content={"name": "john"})
# Transform content to uppercase
def uppercase_name(content):
if isinstance(content, dict) and "name" in content:
content["name"] = content["name"].upper()
return content
# Create wrapped request with transformation
wrapped_request = WrappedRequest(
base=base_request,
content_action=uppercase_name
)
# Content is transformed when accessed
content = await wrapped_request.content()
print(content["name"]) # "JOHN"
# Async transformation example
async def async_transform(content):
# Simulate async processing
await asyncio.sleep(0.1)
content["processed"] = True
return content
wrapped_async = WrappedRequest(
base=base_request,
content_action=async_transform
)Context variables for passing request metadata across async boundaries.
from contextvars import ContextVar
from uuid import UUID
REQUEST_USER_CONTEXT_VAR: ContextVar[Optional[UUID]]Usage Examples:
from minos.networks import REQUEST_USER_CONTEXT_VAR
from uuid import UUID
# Set user context
user_id = UUID("12345678-1234-5678-9012-123456789012")
REQUEST_USER_CONTEXT_VAR.set(user_id)
# Access user context in handlers
def get_current_user() -> Optional[UUID]:
return REQUEST_USER_CONTEXT_VAR.get(None)
@enroute.broker.command("user.update")
async def update_user(request: Request) -> Response:
current_user = get_current_user()
if current_user:
print(f"User {current_user} is making the request")
# Process update
return Response({"updated": True})class DatabaseRequest(Request):
def __init__(self, query_id: str, db_connection):
self.query_id = query_id
self.db_connection = db_connection
self._user = None
@property
def user(self) -> Optional[UUID]:
return self._user
@property
def has_content(self) -> bool:
return True
@property
def has_params(self) -> bool:
return True
async def _content(self, **kwargs) -> Any:
# Fetch content from database
result = await self.db_connection.fetch_one(
"SELECT content FROM requests WHERE id = ?",
(self.query_id,)
)
return result["content"] if result else None
async def _params(self, **kwargs) -> dict[str, Any]:
# Fetch params from database
result = await self.db_connection.fetch_one(
"SELECT params FROM requests WHERE id = ?",
(self.query_id,)
)
return result["params"] if result else {}
def __eq__(self, other):
return isinstance(other, DatabaseRequest) and other.query_id == self.query_id
def __repr__(self):
return f"DatabaseRequest(query_id={self.query_id})"class LoggingRequest(WrappedRequest):
def __init__(self, base: Request):
async def log_content(content):
print(f"Accessing content: {type(content)}")
return content
async def log_params(params):
print(f"Accessing params: {list(params.keys())}")
return params
super().__init__(
base=base,
content_action=log_content,
params_action=log_params
)
class ValidatedRequest(WrappedRequest):
def __init__(self, base: Request, schema: dict):
self.schema = schema
async def validate_content(content):
# Validate content against schema
if not self._validate(content, self.schema):
raise ValueError("Content validation failed")
return content
super().__init__(base=base, content_action=validate_content)
def _validate(self, data, schema):
# Implement validation logic
return True
# Usage in handlers
@enroute.broker.command("user.create")
async def create_user(request: Request) -> Response:
# Wrap request with validation
schema = {"type": "object", "required": ["email", "name"]}
validated_request = ValidatedRequest(request, schema)
try:
user_data = await validated_request.content()
# Process creation
return Response({"created": True})
except ValueError as e:
return Response({"error": str(e)}, status=400)class ResponseBuilder:
def __init__(self):
self._data = None
self._status = 200
self._headers = {}
def with_data(self, data: Any):
self._data = data
return self
def with_status(self, status: int):
self._status = status
return self
def with_header(self, key: str, value: str):
self._headers[key] = value
return self
def build(self) -> Response:
response = Response(self._data, status=self._status)
# Custom response with headers would need HttpResponse
return response
# Factory methods for common responses
class ResponseFactory:
@staticmethod
def success(data: Any = None) -> Response:
return Response(data, status=200)
@staticmethod
def created(data: Any = None) -> Response:
return Response(data, status=201)
@staticmethod
def not_found(message: str = "Not found") -> Response:
return Response({"error": message}, status=404)
@staticmethod
def bad_request(message: str = "Bad request") -> Response:
return Response({"error": message}, status=400)
# Usage
@enroute.rest.command("/users", method="POST")
async def create_user(request: Request) -> Response:
try:
user_data = await request.content()
new_user = create_user_logic(user_data)
return ResponseFactory.created(new_user)
except ValueError:
return ResponseFactory.bad_request("Invalid user data")class RequestPipeline:
def __init__(self):
self.middlewares = []
def add_middleware(self, middleware_func):
self.middlewares.append(middleware_func)
return self
async def process(self, request: Request) -> Request:
current_request = request
for middleware in self.middlewares:
current_request = await middleware(current_request)
return current_request
# Middleware functions
async def auth_middleware(request: Request) -> Request:
# Add user authentication
if hasattr(request, 'headers'):
auth_header = request.headers.get("Authorization")
if auth_header:
user = authenticate_user(auth_header)
REQUEST_USER_CONTEXT_VAR.set(user)
return request
async def logging_middleware(request: Request) -> Request:
print(f"Processing request: {type(request)}")
return LoggingRequest(request)
# Usage in handlers
pipeline = RequestPipeline()
pipeline.add_middleware(auth_middleware)
pipeline.add_middleware(logging_middleware)
@enroute.broker.command("user.update")
async def update_user(request: Request) -> Response:
# Process request through pipeline
processed_request = await pipeline.process(request)
# Handle the processed request
user_data = await processed_request.content()
return Response({"updated": True})Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks