Ray is a unified framework for scaling AI and Python applications.
—
Ray Serve provides scalable model serving and application deployment with automatic scaling, batching, and multi-model support. It enables production deployment of ML models and Python applications.
Basic serving functionality and deployment management.
def start(detached=False, http_options=HTTPOptions(), **kwargs):
"""
Start Ray Serve.
Args:
detached (bool): Whether to run in detached mode
http_options (HTTPOptions, optional): HTTP configuration
**kwargs: Additional Ray initialization arguments
"""
def shutdown():
"""Shutdown Ray Serve."""
def run(target, *, name=None, route_prefix=None, blocking=True, **kwargs):
"""
Deploy and run a deployment.
Args:
target: Deployment target (function, class, or Deployment)
name (str, optional): Deployment name
route_prefix (str, optional): HTTP route prefix
blocking (bool): Whether to block until deployment is ready
**kwargs: Additional deployment options
Returns:
DeploymentHandle: Handle to deployment
"""
def status():
"""
Get status of Ray Serve deployments.
Returns:
str: Status information
"""
class HTTPOptions:
"""HTTP server configuration options."""
def __init__(self, *, host="127.0.0.1", port=8000, middlewares=None,
location="EveryNode", num_cpus=0):
"""
Initialize HTTP options.
Args:
host (str): Host to bind to
port (int): Port to bind to
middlewares (list, optional): ASGI middlewares
location (str): Where to run HTTP servers
num_cpus (int): CPUs for HTTP servers
"""Create and configure deployments.
def deployment(func_or_class=None, *, name=None, version=None,
num_replicas=None, route_prefix=None, ray_actor_options=None,
user_config=None, max_concurrent_queries=None,
autoscaling_config=None, graceful_shutdown_wait_loop_s=None,
graceful_shutdown_timeout_s=None, health_check_period_s=None,
health_check_timeout_s=None, is_driver_deployment=None):
"""
Decorator to create Ray Serve deployment.
Args:
func_or_class: Function or class to deploy
name (str, optional): Deployment name
version (str, optional): Deployment version
num_replicas (int, optional): Number of replicas
route_prefix (str, optional): HTTP route prefix
ray_actor_options (dict, optional): Ray actor options
user_config: User configuration
max_concurrent_queries (int, optional): Max concurrent queries per replica
autoscaling_config (AutoscalingConfig, optional): Autoscaling configuration
graceful_shutdown_wait_loop_s (float, optional): Graceful shutdown wait
graceful_shutdown_timeout_s (float, optional): Graceful shutdown timeout
health_check_period_s (float, optional): Health check period
health_check_timeout_s (float, optional): Health check timeout
is_driver_deployment (bool, optional): Whether this is driver deployment
Returns:
Deployment: Deployment object
"""
class Deployment:
"""Ray Serve deployment."""
def deploy(self, *init_args, _blocking=True, **init_kwargs):
"""
Deploy this deployment.
Args:
*init_args: Arguments for deployment initialization
_blocking (bool): Whether to block until ready
**init_kwargs: Keyword arguments for initialization
Returns:
DeploymentHandle: Handle to deployment
"""
def delete(self):
"""Delete this deployment."""
def get_handle(self, sync=None):
"""
Get handle to this deployment.
Args:
sync (bool, optional): Whether to use sync handle
Returns:
DeploymentHandle: Handle to deployment
"""
def options(self, *, func_or_class=None, **kwargs):
"""
Create new deployment with modified options.
Args:
func_or_class: New function or class
**kwargs: Options to modify
Returns:
Deployment: New deployment with modified options
"""
def multiplexed(max_num_models_per_replica=None, *, buffer_size_bytes=100_000_000,
buffer_size_bytes_per_replica=None, max_num_models=None):
"""
Decorator for multiplexed deployments supporting multiple models.
Args:
max_num_models_per_replica (int, optional): Max models per replica
buffer_size_bytes (int): Buffer size in bytes
buffer_size_bytes_per_replica (int, optional): Buffer size per replica
max_num_models (int, optional): Maximum total models
Returns:
Decorator function for multiplexed deployment
"""
def get_multiplexed_model_id():
"""
Get current multiplexed model ID within a deployment.
Returns:
str: Current model ID
"""
class AutoscalingConfig:
"""Configuration for deployment autoscaling."""
def __init__(self, *, min_replicas=None, max_replicas=None,
target_num_ongoing_requests_per_replica=None,
metrics_interval_s=None, look_back_period_s=None,
smoothing_factor=None, downscale_delay_s=None,
upscale_delay_s=None):
"""
Initialize autoscaling configuration.
Args:
min_replicas (int, optional): Minimum number of replicas
max_replicas (int, optional): Maximum number of replicas
target_num_ongoing_requests_per_replica (float, optional): Target requests per replica
metrics_interval_s (float, optional): Metrics collection interval
look_back_period_s (float, optional): Metrics lookback period
smoothing_factor (float, optional): Smoothing factor for metrics
downscale_delay_s (float, optional): Delay before downscaling
upscale_delay_s (float, optional): Delay before upscaling
"""Interact with deployed models and services.
class DeploymentHandle:
"""Handle for interacting with deployment."""
def remote(self, *args, **kwargs):
"""
Make async request to deployment.
Args:
*args: Arguments to pass
**kwargs: Keyword arguments to pass
Returns:
DeploymentResponse: Response object
"""
def options(self, *, method_name=None, multiplexed_model_id=None, **kwargs):
"""
Create handle with modified options.
Args:
method_name (str, optional): Method to call
multiplexed_model_id (str, optional): Model ID for multiplexing
**kwargs: Additional options
Returns:
DeploymentHandle: Handle with modified options
"""
class DeploymentResponse:
"""Response from deployment."""
def result(self, *, timeout_s=None):
"""
Get result (blocking).
Args:
timeout_s (float, optional): Timeout in seconds
Returns:
Result of deployment call
"""
class DeploymentResponseGenerator:
"""Generator for streaming deployment responses."""
def __iter__(self):
"""Iterate over streaming responses."""
def __next__(self):
"""Get next response."""Build complex serving applications.
class Application:
"""Ray Serve application."""
def __init__(self, import_path, *, args=None, kwargs=None):
"""
Initialize application.
Args:
import_path (str): Import path to application
args (list, optional): Arguments for application
kwargs (dict, optional): Keyword arguments for application
"""
def build(app_or_deployment, *args, **kwargs):
"""
Build application from deployment or function.
Args:
app_or_deployment: Application or deployment to build
*args: Arguments for building
**kwargs: Keyword arguments for building
Returns:
Application: Built application
"""Batch requests for improved throughput.
class Batched:
"""Decorator for batched request handling."""
def __init__(self, *, max_batch_size=None, batch_wait_timeout_s=None):
"""
Initialize batching decorator.
Args:
max_batch_size (int, optional): Maximum batch size
batch_wait_timeout_s (float, optional): Batch wait timeout
"""
def batch(max_batch_size=None, batch_wait_timeout_s=None):
"""
Decorator for batched request handling.
Args:
max_batch_size (int, optional): Maximum batch size
batch_wait_timeout_s (float, optional): Batch wait timeout
Returns:
Batched: Batching decorator
"""Handle HTTP requests and routing.
class Ingress:
"""Base class for custom HTTP ingress."""
async def __call__(self, request):
"""
Handle HTTP request.
Args:
request: HTTP request
Returns:
HTTP response
"""
def ingress(app):
"""
Mark deployment as HTTP ingress.
Args:
app: Deployment to mark as ingress
Returns:
Deployment with ingress configuration
"""Serve multiple models from single deployment.
class MultiplexedReplicaResult:
"""Result from multiplexed model call."""
def __init__(self, result):
"""Initialize with result."""
def get_multiplexed_model_id():
"""
Get current multiplexed model ID.
Returns:
str: Current model ID
"""Runtime configuration and context access.
def get_replica_context():
"""
Get current replica context.
Returns:
ReplicaContext: Current replica context
"""
class ReplicaContext:
"""Context for current replica."""
@property
def deployment(self):
"""Current deployment name."""
@property
def replica_tag(self):
"""Current replica tag."""
@property
def servable_object(self):
"""Current servable object."""import ray
from ray import serve
import numpy as np
# Start Ray Serve
serve.start()
# Define a simple model
@serve.deployment
class SimpleModel:
def __init__(self):
# Load your model here
self.model = self._load_model()
def _load_model(self):
# Placeholder for model loading
return lambda x: x * 2
def __call__(self, request):
data = request.json()
input_data = np.array(data["input"])
prediction = self.model(input_data)
return {"prediction": prediction.tolist()}
# Deploy the model
SimpleModel.deploy()
# Make a request
import requests
response = requests.post("http://127.0.0.1:8000/SimpleModel",
json={"input": [1, 2, 3, 4]})
print(response.json()) # {"prediction": [2, 4, 6, 8]}
serve.shutdown()import ray
from ray import serve
import torch
serve.start()
@serve.deployment(
num_replicas=2,
ray_actor_options={"num_cpus": 1, "num_gpus": 0.5}
)
class PyTorchModel:
def __init__(self, model_path):
self.model = torch.load(model_path)
self.model.eval()
@serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
async def predict_batch(self, inputs):
batch = torch.stack(inputs)
with torch.no_grad():
predictions = self.model(batch)
return predictions.numpy()
async def __call__(self, request):
data = torch.tensor(request.json()["input"])
prediction = await self.predict_batch(data)
return {"prediction": prediction.tolist()}
# Deploy with specific configuration
PyTorchModel.options(
autoscaling_config=serve.AutoscalingConfig(
min_replicas=1,
max_replicas=5,
target_num_ongoing_requests_per_replica=2
)
).deploy("model.pt")import ray
from ray import serve
serve.start()
@serve.deployment
class ModelRouter:
def __init__(self):
self.model_a = ModelA.get_handle()
self.model_b = ModelB.get_handle()
async def __call__(self, request):
data = request.json()
model_type = data.get("model", "a")
if model_type == "a":
result = await self.model_a.remote(data)
else:
result = await self.model_b.remote(data)
return result
@serve.deployment
class ModelA:
async def __call__(self, data):
return {"model": "a", "result": data["input"] * 2}
@serve.deployment
class ModelB:
async def __call__(self, data):
return {"model": "b", "result": data["input"] + 10}
# Deploy all models
ModelA.deploy()
ModelB.deploy()
ModelRouter.deploy()import ray
from ray import serve
from starlette.requests import Request
from starlette.responses import JSONResponse
serve.start()
@serve.deployment
@serve.ingress(app)
class CustomIngress:
def __init__(self):
self.model = MLModel.get_handle()
async def __call__(self, request: Request):
if request.method == "GET":
return JSONResponse({"status": "healthy"})
elif request.method == "POST":
data = await request.json()
result = await self.model.remote(data)
return JSONResponse(result)
else:
return JSONResponse({"error": "Method not allowed"},
status_code=405)
@serve.deployment
class MLModel:
def __init__(self):
# Initialize your model
pass
async def predict(self, data):
# Model prediction logic
return {"prediction": "result"}
# Build and run application
app = serve.build(CustomIngress)
serve.run(app)import ray
from ray import serve
# Production serving configuration
serve.start(
detached=True,
http_options=serve.HTTPOptions(
host="0.0.0.0",
port=8000,
location="EveryNode"
)
)
@serve.deployment(
name="production-model",
version="v1.0",
num_replicas=4,
autoscaling_config=serve.AutoscalingConfig(
min_replicas=2,
max_replicas=10,
target_num_ongoing_requests_per_replica=5
),
ray_actor_options={
"num_cpus": 2,
"num_gpus": 1,
"memory": 4000 * 1024 * 1024 # 4GB
},
health_check_period_s=10,
health_check_timeout_s=30,
graceful_shutdown_timeout_s=60
)
class ProductionModel:
def __init__(self, model_config):
self.model = self._load_model(model_config)
self.preprocessor = self._load_preprocessor()
def _load_model(self, config):
# Load production model
pass
def _load_preprocessor(self):
# Load data preprocessor
pass
@serve.batch(max_batch_size=64, batch_wait_timeout_s=0.05)
async def predict_batch(self, inputs):
# Batch prediction with preprocessing
processed = [self.preprocessor(inp) for inp in inputs]
predictions = self.model.predict(processed)
return predictions
async def __call__(self, request):
data = request.json()
prediction = await self.predict_batch(data["input"])
return {"prediction": prediction, "version": "v1.0"}
# Deploy production model
ProductionModel.deploy({"model_path": "s3://models/production-v1.0"})Install with Tessl CLI
npx tessl i tessl/pypi-ray