CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-networks

The networks core of the Minos Framework providing networking components for reactive microservices

Pending
Overview
Eval results
Files

requests.mddocs/

Request and Response Handling

Core request/response abstractions used across all transport types. Provides unified interfaces for handling different types of requests with content, parameters, and user context.

Capabilities

Core Request Interface

Abstract base class providing unified request handling across all transport types.

class Request:
    user: Optional[UUID]
    has_content: bool
    has_params: bool
    async def content(self, **kwargs) -> Any: ...
    async def params(self, **kwargs) -> dict[str, Any]: ...
    async def _content(self, **kwargs) -> Any: ...  # Abstract
    async def _params(self, **kwargs) -> dict[str, Any]: ...  # Abstract
    def __eq__(self, other: Any) -> bool: ...  # Abstract
    def __repr__(self) -> str: ...  # Abstract

Core Response Interface

Concrete response class for returning data with status codes.

class Response:
    def __init__(self, data: Any = None, *, status: int = 200): ...
    has_content: bool
    status: int
    async def content(self, **kwargs) -> Any: ...
    def __eq__(self, other: Any) -> bool: ...
    def __repr__(self) -> str: ...
    def __hash__(self) -> int: ...

class ResponseException:
    def __init__(self, *args, status: int = 400): ...
    status: int

Usage Examples:

from minos.networks import Response, ResponseException

# Create successful response
response = Response({"user_id": "123", "name": "John"})
print(response.status)  # 200

# Create response with custom status
created_response = Response({"id": "123"}, status=201)

# Check if response has content
if response.has_content:
    data = await response.content()

# Raise response exception
if not valid_data:
    raise ResponseException("Invalid data", status=400)

In-Memory Request Implementation

Concrete request implementation for testing and in-memory usage.

class InMemoryRequest(Request):
    def __init__(self, content: Any = None, params: dict[str, Any] = None, user: Optional[UUID] = None): ...
    user: Optional[UUID]
    has_content: bool
    has_params: bool
    async def _content(self, **kwargs) -> Any: ...
    async def _params(self, **kwargs) -> dict[str, Any]: ...
    def __eq__(self, other: Any) -> bool: ...
    def __repr__(self) -> str: ...

Usage Examples:

from minos.networks import InMemoryRequest
from uuid import UUID

# Create request with content
request = InMemoryRequest(
    content={"name": "John", "email": "john@example.com"},
    params={"user_id": "123"},
    user=UUID("12345678-1234-5678-9012-123456789012")
)

# Access request data
if request.has_content:
    content = await request.content()
    print(content["name"])  # "John"

if request.has_params:
    params = await request.params()
    print(params["user_id"])  # "123"

print(request.user)  # UUID object

Wrapped Request Implementation

Request wrapper that applies transformations to content and parameters.

from typing import Callable, Union, Awaitable

ContentAction = Callable[[Any, ...], Union[Any, Awaitable[Any]]]
ParamsAction = Callable[[dict[str, Any], ...], Union[dict[str, Any], Awaitable[dict[str, Any]]]]

class WrappedRequest(Request):
    def __init__(self, base: Request, content_action: Optional[ContentAction] = None, params_action: Optional[ParamsAction] = None): ...
    user: Optional[UUID]
    has_content: bool
    has_params: bool
    async def _content(self, **kwargs) -> Any: ...
    async def _params(self, **kwargs) -> dict[str, Any]: ...
    def __eq__(self, other: Any) -> bool: ...
    def __repr__(self) -> str: ...

Usage Examples:

from minos.networks import WrappedRequest, InMemoryRequest

# Base request
base_request = InMemoryRequest(content={"name": "john"})

# Transform content to uppercase
def uppercase_name(content):
    if isinstance(content, dict) and "name" in content:
        content["name"] = content["name"].upper()
    return content

# Create wrapped request with transformation
wrapped_request = WrappedRequest(
    base=base_request,
    content_action=uppercase_name
)

# Content is transformed when accessed
content = await wrapped_request.content()
print(content["name"])  # "JOHN"

# Async transformation example
async def async_transform(content):
    # Simulate async processing
    await asyncio.sleep(0.1)
    content["processed"] = True
    return content

wrapped_async = WrappedRequest(
    base=base_request,
    content_action=async_transform
)

Context Variables

Context variables for passing request metadata across async boundaries.

from contextvars import ContextVar
from uuid import UUID

REQUEST_USER_CONTEXT_VAR: ContextVar[Optional[UUID]]

Usage Examples:

from minos.networks import REQUEST_USER_CONTEXT_VAR
from uuid import UUID

# Set user context
user_id = UUID("12345678-1234-5678-9012-123456789012")
REQUEST_USER_CONTEXT_VAR.set(user_id)

# Access user context in handlers
def get_current_user() -> Optional[UUID]:
    return REQUEST_USER_CONTEXT_VAR.get(None)

@enroute.broker.command("user.update")
async def update_user(request: Request) -> Response:
    current_user = get_current_user()
    if current_user:
        print(f"User {current_user} is making the request")
    
    # Process update
    return Response({"updated": True})

Advanced Usage

Custom Request Implementation

class DatabaseRequest(Request):
    def __init__(self, query_id: str, db_connection):
        self.query_id = query_id
        self.db_connection = db_connection
        self._user = None
    
    @property
    def user(self) -> Optional[UUID]:
        return self._user
    
    @property 
    def has_content(self) -> bool:
        return True
    
    @property
    def has_params(self) -> bool:
        return True
    
    async def _content(self, **kwargs) -> Any:
        # Fetch content from database
        result = await self.db_connection.fetch_one(
            "SELECT content FROM requests WHERE id = ?", 
            (self.query_id,)
        )
        return result["content"] if result else None
    
    async def _params(self, **kwargs) -> dict[str, Any]:
        # Fetch params from database
        result = await self.db_connection.fetch_one(
            "SELECT params FROM requests WHERE id = ?",
            (self.query_id,)
        )
        return result["params"] if result else {}
    
    def __eq__(self, other):
        return isinstance(other, DatabaseRequest) and other.query_id == self.query_id
    
    def __repr__(self):
        return f"DatabaseRequest(query_id={self.query_id})"

Request/Response Middleware Patterns

class LoggingRequest(WrappedRequest):
    def __init__(self, base: Request):
        async def log_content(content):
            print(f"Accessing content: {type(content)}")
            return content
        
        async def log_params(params):
            print(f"Accessing params: {list(params.keys())}")
            return params
        
        super().__init__(
            base=base,
            content_action=log_content,
            params_action=log_params
        )

class ValidatedRequest(WrappedRequest):
    def __init__(self, base: Request, schema: dict):
        self.schema = schema
        
        async def validate_content(content):
            # Validate content against schema
            if not self._validate(content, self.schema):
                raise ValueError("Content validation failed")
            return content
        
        super().__init__(base=base, content_action=validate_content)
    
    def _validate(self, data, schema):
        # Implement validation logic
        return True

# Usage in handlers
@enroute.broker.command("user.create")
async def create_user(request: Request) -> Response:
    # Wrap request with validation
    schema = {"type": "object", "required": ["email", "name"]}
    validated_request = ValidatedRequest(request, schema)
    
    try:
        user_data = await validated_request.content()
        # Process creation
        return Response({"created": True})
    except ValueError as e:
        return Response({"error": str(e)}, status=400)

Response Builders and Factories

class ResponseBuilder:
    def __init__(self):
        self._data = None
        self._status = 200
        self._headers = {}
    
    def with_data(self, data: Any):
        self._data = data
        return self
    
    def with_status(self, status: int):
        self._status = status
        return self
    
    def with_header(self, key: str, value: str):
        self._headers[key] = value
        return self
    
    def build(self) -> Response:
        response = Response(self._data, status=self._status)
        # Custom response with headers would need HttpResponse
        return response

# Factory methods for common responses
class ResponseFactory:
    @staticmethod
    def success(data: Any = None) -> Response:
        return Response(data, status=200)
    
    @staticmethod
    def created(data: Any = None) -> Response:
        return Response(data, status=201)
    
    @staticmethod
    def not_found(message: str = "Not found") -> Response:
        return Response({"error": message}, status=404)
    
    @staticmethod
    def bad_request(message: str = "Bad request") -> Response:
        return Response({"error": message}, status=400)

# Usage
@enroute.rest.command("/users", method="POST")
async def create_user(request: Request) -> Response:
    try:
        user_data = await request.content()
        new_user = create_user_logic(user_data)
        return ResponseFactory.created(new_user)
    except ValueError:
        return ResponseFactory.bad_request("Invalid user data")

Request Processing Pipelines

class RequestPipeline:
    def __init__(self):
        self.middlewares = []
    
    def add_middleware(self, middleware_func):
        self.middlewares.append(middleware_func)
        return self
    
    async def process(self, request: Request) -> Request:
        current_request = request
        for middleware in self.middlewares:
            current_request = await middleware(current_request)
        return current_request

# Middleware functions
async def auth_middleware(request: Request) -> Request:
    # Add user authentication
    if hasattr(request, 'headers'):
        auth_header = request.headers.get("Authorization")
        if auth_header:
            user = authenticate_user(auth_header)
            REQUEST_USER_CONTEXT_VAR.set(user)
    return request

async def logging_middleware(request: Request) -> Request:
    print(f"Processing request: {type(request)}")
    return LoggingRequest(request)

# Usage in handlers
pipeline = RequestPipeline()
pipeline.add_middleware(auth_middleware)
pipeline.add_middleware(logging_middleware)

@enroute.broker.command("user.update") 
async def update_user(request: Request) -> Response:
    # Process request through pipeline
    processed_request = await pipeline.process(request)
    
    # Handle the processed request
    user_data = await processed_request.content()
    return Response({"updated": True})

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-networks

docs

brokers.md

decorators.md

discovery.md

http.md

index.md

requests.md

routers.md

scheduling.md

specs.md

system-utils.md

tile.json