A high-throughput and memory-efficient inference and serving engine for LLMs
Overall
score
69%
Evaluation — 69%
↑ 1.33xAgent success when using this tile
High-performance asynchronous inference engine for concurrent request handling, streaming responses, and integration with async frameworks. Enables scalable serving of LLMs with efficient resource utilization and non-blocking operations.
Core asynchronous inference engine supporting concurrent request processing, streaming generation, and distributed execution with Ray.
class AsyncLLMEngine:
@classmethod
def from_engine_args(cls, engine_args: AsyncEngineArgs) -> "AsyncLLMEngine":
"""
Create AsyncLLMEngine from configuration arguments.
Parameters:
- engine_args: Configuration for engine initialization
Returns:
AsyncLLMEngine instance ready for inference
"""
async def generate(
self,
prompt: Optional[str],
sampling_params: SamplingParams,
request_id: str,
prompt_token_ids: Optional[List[int]] = None,
lora_request: Optional[LoRARequest] = None
) -> AsyncIterator[RequestOutput]:
"""
Generate text asynchronously with streaming output.
Parameters:
- prompt: Input text prompt
- sampling_params: Generation parameters
- request_id: Unique identifier for request tracking
- prompt_token_ids: Pre-tokenized input (optional)
- lora_request: LoRA adapter configuration
Yields:
RequestOutput objects as generation progresses
"""
async def encode(
self,
prompt: Optional[str],
pooling_params: PoolingParams,
request_id: str,
prompt_token_ids: Optional[List[int]] = None,
lora_request: Optional[LoRARequest] = None
) -> AsyncIterator[EmbeddingRequestOutput]:
"""
Generate embeddings asynchronously.
Parameters:
- prompt: Input text for embedding
- pooling_params: Embedding pooling configuration
- request_id: Unique request identifier
- prompt_token_ids: Pre-tokenized input (optional)
- lora_request: LoRA adapter configuration
Yields:
EmbeddingRequestOutput with vector representations
"""
async def abort(self, request_id: str) -> None:
"""
Cancel a running request.
Parameters:
- request_id: ID of request to abort
"""
async def get_model_config(self) -> ModelConfig:
"""Get the model configuration."""
async def get_num_unfinished_requests(self) -> int:
"""Get count of active requests."""
async def check_health(self) -> None:
"""
Health check for the engine.
Raises:
RuntimeError: If engine is unhealthy
"""
async def add_request(
self,
request_id: str,
prompt: Optional[str],
params: Union[SamplingParams, PoolingParams],
arrival_time: Optional[float] = None,
prompt_token_ids: Optional[List[int]] = None,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Dict[str, str]] = None,
prompt_adapter_request: Optional["PromptAdapterRequest"] = None,
priority: int = 0
) -> AsyncIterator[RequestOutput]:
"""
Add request and return async generator for results.
Parameters:
- request_id: Unique identifier for the request
- prompt: Input text prompt
- params: Sampling or pooling parameters
- arrival_time: Request arrival timestamp
- prompt_token_ids: Pre-tokenized input
- lora_request: LoRA adapter configuration
- trace_headers: Tracing headers
- prompt_adapter_request: Prompt adapter configuration
- priority: Request priority level
Yields:
RequestOutput objects as processing progresses
"""
async def get_tokenizer(self, lora_request: Optional[LoRARequest] = None) -> "PreTrainedTokenizerBase":
"""
Get tokenizer asynchronously with optional LoRA support.
Parameters:
- lora_request: Optional LoRA adapter configuration
Returns:
Tokenizer instance configured for this model
"""
async def get_input_preprocessor(self) -> "InputPreprocessor":
"""
Get input preprocessor for this engine.
Returns:
Input preprocessor instance
"""
async def start_background_loop(self) -> None:
"""Start background loop for processing requests."""
async def shutdown_background_loop(self) -> None:
"""Shutdown background processing loop."""
async def get_vllm_config(self) -> "VllmConfig":
"""Get complete vLLM configuration."""
async def get_parallel_config(self) -> "ParallelConfig":
"""Get parallel execution configuration."""
async def get_decoding_config(self) -> "DecodingConfig":
"""Get decoding configuration."""
async def get_scheduler_config(self) -> "SchedulerConfig":
"""Get request scheduler configuration."""
async def get_lora_config(self) -> Optional["LoRAConfig"]:
"""Get LoRA adapter configuration."""
async def do_log_stats(
self,
scheduler_outputs: Optional[Any] = None,
model_output: Optional[Any] = None
) -> None:
"""
Log performance statistics.
Parameters:
- scheduler_outputs: Optional scheduler output data
- model_output: Optional model output data
"""
async def is_tracing_enabled(self) -> bool:
"""Check if request tracing is enabled."""
async def add_logger(self, logger_name: str, logger: Any) -> None:
"""
Add custom logger to the engine.
Parameters:
- logger_name: Name for the logger
- logger: Logger instance to add
"""
async def remove_logger(self, logger_name: str) -> None:
"""
Remove logger from the engine.
Parameters:
- logger_name: Name of logger to remove
"""
async def start_profile(self) -> None:
"""Start performance profiling for async engine."""
async def stop_profile(self) -> None:
"""Stop performance profiling and save results."""
async def reset_mm_cache(self) -> None:
"""Reset multimodal processing cache."""
async def reset_prefix_cache(self, device: Optional[Union[str, int]] = None) -> None:
"""
Reset prefix cache for memory optimization.
Parameters:
- device: Optional device specification for cache reset
"""
async def sleep(self, level: int = 1) -> None:
"""
Put async engine to sleep to free resources.
Parameters:
- level: Sleep level (1=light, 2=deep)
"""
async def wake_up(self, tags: Optional[List[str]] = None) -> None:
"""
Wake up sleeping async engine.
Parameters:
- tags: Optional tags for selective wake-up
"""
async def is_sleeping(self) -> bool:
"""Check if engine is currently sleeping."""
async def add_lora(self, lora_request: LoRARequest) -> None:
"""
Add LoRA adapter to engine.
Parameters:
- lora_request: LoRA adapter configuration to add
"""
async def collective_rpc(
self,
method: str,
timeout: Optional[float] = None,
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None
) -> List[Any]:
"""
Execute RPC calls on all workers asynchronously.
Parameters:
- method: Method name to call on workers
- timeout: Optional timeout for RPC calls
- args: Positional arguments for method
- kwargs: Keyword arguments for method
Returns:
List of results from all workers
"""
@property
def is_running(self) -> bool:
"""Check if background loop is running."""
@property
def is_stopped(self) -> bool:
"""Check if engine is stopped."""
@property
def errored(self) -> bool:
"""Check if engine has errored."""
@property
def dead_error(self) -> Optional[BaseException]:
"""Get dead error exception if any."""Support for real-time streaming of generated tokens as they are produced, enabling responsive user interfaces and immediate feedback.
async def stream_generate(
engine: AsyncLLMEngine,
prompt: str,
sampling_params: SamplingParams,
request_id: str
) -> AsyncIterator[str]:
"""
Stream generated text tokens in real-time.
Parameters:
- engine: AsyncLLMEngine instance
- prompt: Input prompt
- sampling_params: Generation configuration
- request_id: Unique request identifier
Yields:
Individual text tokens as they are generated
"""Handle multiple requests simultaneously with intelligent batching and resource management.
class RequestManager:
async def add_request(
self,
request_id: str,
prompt: str,
sampling_params: SamplingParams,
prompt_token_ids: Optional[List[int]] = None,
arrival_time: Optional[float] = None,
lora_request: Optional[LoRARequest] = None
) -> None:
"""Add new request to processing queue."""
async def abort_request(self, request_id: str) -> None:
"""Cancel pending or running request."""
async def get_request_status(self, request_id: str) -> RequestStatus:
"""Check status of specific request."""import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
async def main():
# Initialize async engine
engine_args = AsyncEngineArgs(
model="microsoft/DialoGPT-medium",
tensor_parallel_size=1
)
engine = AsyncLLMEngine.from_engine_args(engine_args)
# Generate text asynchronously
sampling_params = SamplingParams(temperature=0.8, max_tokens=100)
request_id = "req-001"
results = []
async for output in engine.generate("Hello world", sampling_params, request_id):
results.append(output)
print(f"Generated: {output.outputs[0].text}")
return results
# Run the async function
asyncio.run(main())import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
async def stream_text():
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
engine = AsyncLLMEngine.from_engine_args(engine_args)
sampling_params = SamplingParams(
temperature=0.8,
max_tokens=150,
# Enable streaming output
)
request_id = "stream-001"
prompt = "Tell me a story about"
print("Streaming generation:")
async for output in engine.generate(prompt, sampling_params, request_id):
if output.outputs:
# Print only the new text (delta)
text = output.outputs[0].text
print(text, end="", flush=True)
print("\nGeneration complete!")
asyncio.run(stream_text())import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
async def process_request(engine, prompt, request_id):
sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
results = []
async for output in engine.generate(prompt, sampling_params, request_id):
results.append(output)
return request_id, results[-1].outputs[0].text
async def concurrent_generation():
engine_args = AsyncEngineArgs(
model="microsoft/DialoGPT-medium",
max_num_seqs=10 # Support up to 10 concurrent requests
)
engine = AsyncLLMEngine.from_engine_args(engine_args)
# Process multiple requests concurrently
prompts = [
"The future of AI is",
"Once upon a time",
"In a galaxy far away",
"The secret to happiness"
]
# Create concurrent tasks
tasks = [
process_request(engine, prompt, f"req-{i}")
for i, prompt in enumerate(prompts)
]
# Wait for all requests to complete
results = await asyncio.gather(*tasks)
for request_id, generated_text in results:
print(f"{request_id}: {generated_text}")
asyncio.run(concurrent_generation())import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
async def request_with_timeout():
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
engine = AsyncLLMEngine.from_engine_args(engine_args)
sampling_params = SamplingParams(
temperature=0.8,
max_tokens=500 # Long generation
)
request_id = "timeout-test"
async def generate_text():
async for output in engine.generate(
"Write a very long story",
sampling_params,
request_id
):
print(f"Generated so far: {len(output.outputs[0].text)} chars")
yield output
try:
# Set a timeout for generation
results = []
async for output in asyncio.wait_for(generate_text(), timeout=5.0):
results.append(output)
except asyncio.TimeoutError:
print("Generation timed out, cancelling request...")
await engine.abort(request_id)
print("Request cancelled successfully")
asyncio.run(request_with_timeout())from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
import json
import uuid
app = FastAPI()
# Initialize engine at startup
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
engine = AsyncLLMEngine.from_engine_args(engine_args)
@app.post("/generate")
async def generate_text(prompt: str, max_tokens: int = 100):
request_id = str(uuid.uuid4())
sampling_params = SamplingParams(
temperature=0.8,
max_tokens=max_tokens
)
async def generate_stream():
async for output in engine.generate(prompt, sampling_params, request_id):
yield f"data: {json.dumps({'text': output.outputs[0].text})}\\n\\n"
return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={"Cache-Control": "no-cache"}
)
@app.delete("/cancel/{request_id}")
async def cancel_request(request_id: str):
await engine.abort(request_id)
return {"message": "Request cancelled"}class RequestStatus(str, Enum):
WAITING = "waiting"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
FAILED = "failed"
class AsyncEngineArgs(EngineArgs):
# Inherits all EngineArgs fields plus:
worker_use_ray: bool = False
engine_use_ray: bool = False
disable_log_requests: bool = False
max_log_len: Optional[int] = None
class StreamingRequest:
request_id: str
prompt: str
sampling_params: SamplingParams
arrival_time: float
lora_request: Optional[LoRARequest]Install with Tessl CLI
npx tessl i tessl/pypi-vllmdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10