CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ray

Ray is a unified framework for scaling AI and Python applications.

Pending
Overview
Eval results
Files

model-serving.mddocs/

Model Serving

Ray Serve provides scalable model serving and application deployment with automatic scaling, batching, and multi-model support. It enables production deployment of ML models and Python applications.

Capabilities

Core Serving Framework

Basic serving functionality and deployment management.

def start(detached=False, http_options=HTTPOptions(), **kwargs):
    """
    Start Ray Serve.
    
    Args:
        detached (bool): Whether to run in detached mode
        http_options (HTTPOptions, optional): HTTP configuration
        **kwargs: Additional Ray initialization arguments
    """

def shutdown():
    """Shutdown Ray Serve."""

def run(target, *, name=None, route_prefix=None, blocking=True, **kwargs):
    """
    Deploy and run a deployment.
    
    Args:
        target: Deployment target (function, class, or Deployment)
        name (str, optional): Deployment name
        route_prefix (str, optional): HTTP route prefix
        blocking (bool): Whether to block until deployment is ready
        **kwargs: Additional deployment options
    
    Returns:
        DeploymentHandle: Handle to deployment
    """

def status():
    """
    Get status of Ray Serve deployments.
    
    Returns:
        str: Status information
    """

class HTTPOptions:
    """HTTP server configuration options."""
    
    def __init__(self, *, host="127.0.0.1", port=8000, middlewares=None,
                 location="EveryNode", num_cpus=0):
        """
        Initialize HTTP options.
        
        Args:
            host (str): Host to bind to
            port (int): Port to bind to
            middlewares (list, optional): ASGI middlewares
            location (str): Where to run HTTP servers
            num_cpus (int): CPUs for HTTP servers
        """

Deployment Decorator and Configuration

Create and configure deployments.

def deployment(func_or_class=None, *, name=None, version=None,
               num_replicas=None, route_prefix=None, ray_actor_options=None,
               user_config=None, max_concurrent_queries=None,
               autoscaling_config=None, graceful_shutdown_wait_loop_s=None,
               graceful_shutdown_timeout_s=None, health_check_period_s=None,
               health_check_timeout_s=None, is_driver_deployment=None):
    """
    Decorator to create Ray Serve deployment.
    
    Args:
        func_or_class: Function or class to deploy
        name (str, optional): Deployment name
        version (str, optional): Deployment version
        num_replicas (int, optional): Number of replicas
        route_prefix (str, optional): HTTP route prefix
        ray_actor_options (dict, optional): Ray actor options
        user_config: User configuration
        max_concurrent_queries (int, optional): Max concurrent queries per replica
        autoscaling_config (AutoscalingConfig, optional): Autoscaling configuration
        graceful_shutdown_wait_loop_s (float, optional): Graceful shutdown wait
        graceful_shutdown_timeout_s (float, optional): Graceful shutdown timeout
        health_check_period_s (float, optional): Health check period
        health_check_timeout_s (float, optional): Health check timeout
        is_driver_deployment (bool, optional): Whether this is driver deployment
    
    Returns:
        Deployment: Deployment object
    """

class Deployment:
    """Ray Serve deployment."""
    
    def deploy(self, *init_args, _blocking=True, **init_kwargs):
        """
        Deploy this deployment.
        
        Args:
            *init_args: Arguments for deployment initialization
            _blocking (bool): Whether to block until ready
            **init_kwargs: Keyword arguments for initialization
        
        Returns:
            DeploymentHandle: Handle to deployment
        """
    
    def delete(self):
        """Delete this deployment."""
    
    def get_handle(self, sync=None):
        """
        Get handle to this deployment.
        
        Args:
            sync (bool, optional): Whether to use sync handle
        
        Returns:
            DeploymentHandle: Handle to deployment
        """
    
    def options(self, *, func_or_class=None, **kwargs):
        """
        Create new deployment with modified options.
        
        Args:
            func_or_class: New function or class
            **kwargs: Options to modify
        
        Returns:
            Deployment: New deployment with modified options
        """

def multiplexed(max_num_models_per_replica=None, *, buffer_size_bytes=100_000_000,
                buffer_size_bytes_per_replica=None, max_num_models=None):
    """
    Decorator for multiplexed deployments supporting multiple models.
    
    Args:
        max_num_models_per_replica (int, optional): Max models per replica
        buffer_size_bytes (int): Buffer size in bytes
        buffer_size_bytes_per_replica (int, optional): Buffer size per replica
        max_num_models (int, optional): Maximum total models
    
    Returns:
        Decorator function for multiplexed deployment
    """

def get_multiplexed_model_id():
    """
    Get current multiplexed model ID within a deployment.
    
    Returns:
        str: Current model ID
    """

class AutoscalingConfig:
    """Configuration for deployment autoscaling."""
    
    def __init__(self, *, min_replicas=None, max_replicas=None,
                 target_num_ongoing_requests_per_replica=None,
                 metrics_interval_s=None, look_back_period_s=None,
                 smoothing_factor=None, downscale_delay_s=None,
                 upscale_delay_s=None):
        """
        Initialize autoscaling configuration.
        
        Args:
            min_replicas (int, optional): Minimum number of replicas
            max_replicas (int, optional): Maximum number of replicas
            target_num_ongoing_requests_per_replica (float, optional): Target requests per replica
            metrics_interval_s (float, optional): Metrics collection interval
            look_back_period_s (float, optional): Metrics lookback period
            smoothing_factor (float, optional): Smoothing factor for metrics
            downscale_delay_s (float, optional): Delay before downscaling
            upscale_delay_s (float, optional): Delay before upscaling
        """

Deployment Handles

Interact with deployed models and services.

class DeploymentHandle:
    """Handle for interacting with deployment."""
    
    def remote(self, *args, **kwargs):
        """
        Make async request to deployment.
        
        Args:
            *args: Arguments to pass
            **kwargs: Keyword arguments to pass
        
        Returns:
            DeploymentResponse: Response object
        """
    
    def options(self, *, method_name=None, multiplexed_model_id=None, **kwargs):
        """
        Create handle with modified options.
        
        Args:
            method_name (str, optional): Method to call
            multiplexed_model_id (str, optional): Model ID for multiplexing
            **kwargs: Additional options
        
        Returns:
            DeploymentHandle: Handle with modified options
        """

class DeploymentResponse:
    """Response from deployment."""
    
    def result(self, *, timeout_s=None):
        """
        Get result (blocking).
        
        Args:
            timeout_s (float, optional): Timeout in seconds
        
        Returns:
            Result of deployment call
        """

class DeploymentResponseGenerator:
    """Generator for streaming deployment responses."""
    
    def __iter__(self):
        """Iterate over streaming responses."""
    
    def __next__(self):
        """Get next response."""

Application Framework

Build complex serving applications.

class Application:
    """Ray Serve application."""
    
    def __init__(self, import_path, *, args=None, kwargs=None):
        """
        Initialize application.
        
        Args:
            import_path (str): Import path to application
            args (list, optional): Arguments for application
            kwargs (dict, optional): Keyword arguments for application
        """

def build(app_or_deployment, *args, **kwargs):
    """
    Build application from deployment or function.
    
    Args:
        app_or_deployment: Application or deployment to build
        *args: Arguments for building
        **kwargs: Keyword arguments for building
    
    Returns:
        Application: Built application
    """

Batching Support

Batch requests for improved throughput.

class Batched:
    """Decorator for batched request handling."""
    
    def __init__(self, *, max_batch_size=None, batch_wait_timeout_s=None):
        """
        Initialize batching decorator.
        
        Args:
            max_batch_size (int, optional): Maximum batch size
            batch_wait_timeout_s (float, optional): Batch wait timeout
        """

def batch(max_batch_size=None, batch_wait_timeout_s=None):
    """
    Decorator for batched request handling.
    
    Args:
        max_batch_size (int, optional): Maximum batch size
        batch_wait_timeout_s (float, optional): Batch wait timeout
    
    Returns:
        Batched: Batching decorator
    """

Ingress and Routing

Handle HTTP requests and routing.

class Ingress:
    """Base class for custom HTTP ingress."""
    
    async def __call__(self, request):
        """
        Handle HTTP request.
        
        Args:
            request: HTTP request
        
        Returns:
            HTTP response
        """

def ingress(app):
    """
    Mark deployment as HTTP ingress.
    
    Args:
        app: Deployment to mark as ingress
    
    Returns:
        Deployment with ingress configuration
    """

Model Multiplexing

Serve multiple models from single deployment.

class MultiplexedReplicaResult:
    """Result from multiplexed model call."""
    
    def __init__(self, result):
        """Initialize with result."""

def get_multiplexed_model_id():
    """
    Get current multiplexed model ID.
    
    Returns:
        str: Current model ID
    """

Configuration and Context

Runtime configuration and context access.

def get_replica_context():
    """
    Get current replica context.
    
    Returns:
        ReplicaContext: Current replica context
    """

class ReplicaContext:
    """Context for current replica."""
    
    @property
    def deployment(self):
        """Current deployment name."""
    
    @property
    def replica_tag(self):
        """Current replica tag."""
    
    @property
    def servable_object(self):
        """Current servable object."""

Usage Examples

Basic Model Serving

import ray
from ray import serve
import numpy as np

# Start Ray Serve
serve.start()

# Define a simple model
@serve.deployment
class SimpleModel:
    def __init__(self):
        # Load your model here
        self.model = self._load_model()
    
    def _load_model(self):
        # Placeholder for model loading
        return lambda x: x * 2
    
    def __call__(self, request):
        data = request.json()
        input_data = np.array(data["input"])
        prediction = self.model(input_data)
        return {"prediction": prediction.tolist()}

# Deploy the model
SimpleModel.deploy()

# Make a request
import requests
response = requests.post("http://127.0.0.1:8000/SimpleModel", 
                        json={"input": [1, 2, 3, 4]})
print(response.json())  # {"prediction": [2, 4, 6, 8]}

serve.shutdown()

Advanced Model with Batching

import ray
from ray import serve
import torch

serve.start()

@serve.deployment(
    num_replicas=2,
    ray_actor_options={"num_cpus": 1, "num_gpus": 0.5}
)
class PyTorchModel:
    def __init__(self, model_path):
        self.model = torch.load(model_path)
        self.model.eval()
    
    @serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
    async def predict_batch(self, inputs):
        batch = torch.stack(inputs)
        with torch.no_grad():
            predictions = self.model(batch)
        return predictions.numpy()
    
    async def __call__(self, request):
        data = torch.tensor(request.json()["input"])
        prediction = await self.predict_batch(data)
        return {"prediction": prediction.tolist()}

# Deploy with specific configuration
PyTorchModel.options(
    autoscaling_config=serve.AutoscalingConfig(
        min_replicas=1,
        max_replicas=5,
        target_num_ongoing_requests_per_replica=2
    )
).deploy("model.pt")

Multi-Model Deployment

import ray
from ray import serve

serve.start()

@serve.deployment
class ModelRouter:
    def __init__(self):
        self.model_a = ModelA.get_handle()
        self.model_b = ModelB.get_handle()
    
    async def __call__(self, request):
        data = request.json()
        model_type = data.get("model", "a")
        
        if model_type == "a":
            result = await self.model_a.remote(data)
        else:
            result = await self.model_b.remote(data)
        
        return result

@serve.deployment
class ModelA:
    async def __call__(self, data):
        return {"model": "a", "result": data["input"] * 2}

@serve.deployment  
class ModelB:
    async def __call__(self, data):
        return {"model": "b", "result": data["input"] + 10}

# Deploy all models
ModelA.deploy()
ModelB.deploy()
ModelRouter.deploy()

Application with Custom Ingress

import ray
from ray import serve
from starlette.requests import Request
from starlette.responses import JSONResponse

serve.start()

@serve.deployment
@serve.ingress(app)
class CustomIngress:
    def __init__(self):
        self.model = MLModel.get_handle()
    
    async def __call__(self, request: Request):
        if request.method == "GET":
            return JSONResponse({"status": "healthy"})
        
        elif request.method == "POST":
            data = await request.json()
            result = await self.model.remote(data)
            return JSONResponse(result)
        
        else:
            return JSONResponse({"error": "Method not allowed"}, 
                              status_code=405)

@serve.deployment
class MLModel:
    def __init__(self):
        # Initialize your model
        pass
    
    async def predict(self, data):
        # Model prediction logic
        return {"prediction": "result"}

# Build and run application
app = serve.build(CustomIngress)
serve.run(app)

Production Configuration

import ray
from ray import serve

# Production serving configuration
serve.start(
    detached=True,
    http_options=serve.HTTPOptions(
        host="0.0.0.0",
        port=8000,
        location="EveryNode"
    )
)

@serve.deployment(
    name="production-model",
    version="v1.0",
    num_replicas=4,
    autoscaling_config=serve.AutoscalingConfig(
        min_replicas=2,
        max_replicas=10,
        target_num_ongoing_requests_per_replica=5
    ),
    ray_actor_options={
        "num_cpus": 2,
        "num_gpus": 1,
        "memory": 4000 * 1024 * 1024  # 4GB
    },
    health_check_period_s=10,
    health_check_timeout_s=30,
    graceful_shutdown_timeout_s=60
)
class ProductionModel:
    def __init__(self, model_config):
        self.model = self._load_model(model_config)
        self.preprocessor = self._load_preprocessor()
    
    def _load_model(self, config):
        # Load production model
        pass
    
    def _load_preprocessor(self):
        # Load data preprocessor
        pass
    
    @serve.batch(max_batch_size=64, batch_wait_timeout_s=0.05)
    async def predict_batch(self, inputs):
        # Batch prediction with preprocessing
        processed = [self.preprocessor(inp) for inp in inputs]
        predictions = self.model.predict(processed)
        return predictions
    
    async def __call__(self, request):
        data = request.json()
        prediction = await self.predict_batch(data["input"])
        return {"prediction": prediction, "version": "v1.0"}

# Deploy production model
ProductionModel.deploy({"model_path": "s3://models/production-v1.0"})

Install with Tessl CLI

npx tessl i tessl/pypi-ray

docs

core-distributed.md

data-processing.md

distributed-training.md

hyperparameter-tuning.md

index.md

model-serving.md

reinforcement-learning.md

utilities-advanced.md

tile.json