CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vllm

A high-throughput and memory-efficient inference and serving engine for LLMs

Overall
score

69%

Evaluation69%

1.33x

Agent success when using this tile

Overview
Eval results
Files

async-inference.mddocs/

Asynchronous Inference

High-performance asynchronous inference engine for concurrent request handling, streaming responses, and integration with async frameworks. Enables scalable serving of LLMs with efficient resource utilization and non-blocking operations.

Capabilities

Async LLM Engine

Core asynchronous inference engine supporting concurrent request processing, streaming generation, and distributed execution with Ray.

class AsyncLLMEngine:
    @classmethod
    def from_engine_args(cls, engine_args: AsyncEngineArgs) -> "AsyncLLMEngine":
        """
        Create AsyncLLMEngine from configuration arguments.

        Parameters:
        - engine_args: Configuration for engine initialization

        Returns:
        AsyncLLMEngine instance ready for inference
        """

    async def generate(
        self,
        prompt: Optional[str],
        sampling_params: SamplingParams,
        request_id: str,
        prompt_token_ids: Optional[List[int]] = None,
        lora_request: Optional[LoRARequest] = None
    ) -> AsyncIterator[RequestOutput]:
        """
        Generate text asynchronously with streaming output.

        Parameters:
        - prompt: Input text prompt
        - sampling_params: Generation parameters
        - request_id: Unique identifier for request tracking
        - prompt_token_ids: Pre-tokenized input (optional)
        - lora_request: LoRA adapter configuration

        Yields:
        RequestOutput objects as generation progresses
        """

    async def encode(
        self,
        prompt: Optional[str],
        pooling_params: PoolingParams,
        request_id: str,
        prompt_token_ids: Optional[List[int]] = None,
        lora_request: Optional[LoRARequest] = None
    ) -> AsyncIterator[EmbeddingRequestOutput]:
        """
        Generate embeddings asynchronously.

        Parameters:
        - prompt: Input text for embedding
        - pooling_params: Embedding pooling configuration
        - request_id: Unique request identifier
        - prompt_token_ids: Pre-tokenized input (optional)
        - lora_request: LoRA adapter configuration

        Yields:
        EmbeddingRequestOutput with vector representations
        """

    async def abort(self, request_id: str) -> None:
        """
        Cancel a running request.

        Parameters:
        - request_id: ID of request to abort
        """

    async def get_model_config(self) -> ModelConfig:
        """Get the model configuration."""

    async def get_num_unfinished_requests(self) -> int:
        """Get count of active requests."""

    async def check_health(self) -> None:
        """
        Health check for the engine.

        Raises:
        RuntimeError: If engine is unhealthy
        """

    async def add_request(
        self,
        request_id: str,
        prompt: Optional[str],
        params: Union[SamplingParams, PoolingParams],
        arrival_time: Optional[float] = None,
        prompt_token_ids: Optional[List[int]] = None,
        lora_request: Optional[LoRARequest] = None,
        trace_headers: Optional[Dict[str, str]] = None,
        prompt_adapter_request: Optional["PromptAdapterRequest"] = None,
        priority: int = 0
    ) -> AsyncIterator[RequestOutput]:
        """
        Add request and return async generator for results.

        Parameters:
        - request_id: Unique identifier for the request
        - prompt: Input text prompt
        - params: Sampling or pooling parameters
        - arrival_time: Request arrival timestamp
        - prompt_token_ids: Pre-tokenized input
        - lora_request: LoRA adapter configuration
        - trace_headers: Tracing headers
        - prompt_adapter_request: Prompt adapter configuration
        - priority: Request priority level

        Yields:
        RequestOutput objects as processing progresses
        """

    async def get_tokenizer(self, lora_request: Optional[LoRARequest] = None) -> "PreTrainedTokenizerBase":
        """
        Get tokenizer asynchronously with optional LoRA support.

        Parameters:
        - lora_request: Optional LoRA adapter configuration

        Returns:
        Tokenizer instance configured for this model
        """

    async def get_input_preprocessor(self) -> "InputPreprocessor":
        """
        Get input preprocessor for this engine.

        Returns:
        Input preprocessor instance
        """

    async def start_background_loop(self) -> None:
        """Start background loop for processing requests."""

    async def shutdown_background_loop(self) -> None:
        """Shutdown background processing loop."""

    async def get_vllm_config(self) -> "VllmConfig":
        """Get complete vLLM configuration."""

    async def get_parallel_config(self) -> "ParallelConfig":
        """Get parallel execution configuration."""

    async def get_decoding_config(self) -> "DecodingConfig":
        """Get decoding configuration."""

    async def get_scheduler_config(self) -> "SchedulerConfig":
        """Get request scheduler configuration."""

    async def get_lora_config(self) -> Optional["LoRAConfig"]:
        """Get LoRA adapter configuration."""

    async def do_log_stats(
        self,
        scheduler_outputs: Optional[Any] = None,
        model_output: Optional[Any] = None
    ) -> None:
        """
        Log performance statistics.

        Parameters:
        - scheduler_outputs: Optional scheduler output data
        - model_output: Optional model output data
        """

    async def is_tracing_enabled(self) -> bool:
        """Check if request tracing is enabled."""

    async def add_logger(self, logger_name: str, logger: Any) -> None:
        """
        Add custom logger to the engine.

        Parameters:
        - logger_name: Name for the logger
        - logger: Logger instance to add
        """

    async def remove_logger(self, logger_name: str) -> None:
        """
        Remove logger from the engine.

        Parameters:
        - logger_name: Name of logger to remove
        """

    async def start_profile(self) -> None:
        """Start performance profiling for async engine."""

    async def stop_profile(self) -> None:
        """Stop performance profiling and save results."""

    async def reset_mm_cache(self) -> None:
        """Reset multimodal processing cache."""

    async def reset_prefix_cache(self, device: Optional[Union[str, int]] = None) -> None:
        """
        Reset prefix cache for memory optimization.

        Parameters:
        - device: Optional device specification for cache reset
        """

    async def sleep(self, level: int = 1) -> None:
        """
        Put async engine to sleep to free resources.

        Parameters:
        - level: Sleep level (1=light, 2=deep)
        """

    async def wake_up(self, tags: Optional[List[str]] = None) -> None:
        """
        Wake up sleeping async engine.

        Parameters:
        - tags: Optional tags for selective wake-up
        """

    async def is_sleeping(self) -> bool:
        """Check if engine is currently sleeping."""

    async def add_lora(self, lora_request: LoRARequest) -> None:
        """
        Add LoRA adapter to engine.

        Parameters:
        - lora_request: LoRA adapter configuration to add
        """

    async def collective_rpc(
        self,
        method: str,
        timeout: Optional[float] = None,
        args: Tuple[Any, ...] = (),
        kwargs: Optional[Dict[str, Any]] = None
    ) -> List[Any]:
        """
        Execute RPC calls on all workers asynchronously.

        Parameters:
        - method: Method name to call on workers
        - timeout: Optional timeout for RPC calls
        - args: Positional arguments for method
        - kwargs: Keyword arguments for method

        Returns:
        List of results from all workers
        """

    @property
    def is_running(self) -> bool:
        """Check if background loop is running."""

    @property
    def is_stopped(self) -> bool:
        """Check if engine is stopped."""

    @property
    def errored(self) -> bool:
        """Check if engine has errored."""

    @property
    def dead_error(self) -> Optional[BaseException]:
        """Get dead error exception if any."""

Streaming Generation

Support for real-time streaming of generated tokens as they are produced, enabling responsive user interfaces and immediate feedback.

async def stream_generate(
    engine: AsyncLLMEngine,
    prompt: str,
    sampling_params: SamplingParams,
    request_id: str
) -> AsyncIterator[str]:
    """
    Stream generated text tokens in real-time.

    Parameters:
    - engine: AsyncLLMEngine instance
    - prompt: Input prompt
    - sampling_params: Generation configuration
    - request_id: Unique request identifier

    Yields:
    Individual text tokens as they are generated
    """

Concurrent Request Processing

Handle multiple requests simultaneously with intelligent batching and resource management.

class RequestManager:
    async def add_request(
        self,
        request_id: str,
        prompt: str,
        sampling_params: SamplingParams,
        prompt_token_ids: Optional[List[int]] = None,
        arrival_time: Optional[float] = None,
        lora_request: Optional[LoRARequest] = None
    ) -> None:
        """Add new request to processing queue."""

    async def abort_request(self, request_id: str) -> None:
        """Cancel pending or running request."""

    async def get_request_status(self, request_id: str) -> RequestStatus:
        """Check status of specific request."""

Usage Examples

Basic Async Generation

import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

async def main():
    # Initialize async engine
    engine_args = AsyncEngineArgs(
        model="microsoft/DialoGPT-medium",
        tensor_parallel_size=1
    )
    engine = AsyncLLMEngine.from_engine_args(engine_args)

    # Generate text asynchronously
    sampling_params = SamplingParams(temperature=0.8, max_tokens=100)
    request_id = "req-001"

    results = []
    async for output in engine.generate("Hello world", sampling_params, request_id):
        results.append(output)
        print(f"Generated: {output.outputs[0].text}")

    return results

# Run the async function
asyncio.run(main())

Streaming Generation

import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

async def stream_text():
    engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
    engine = AsyncLLMEngine.from_engine_args(engine_args)

    sampling_params = SamplingParams(
        temperature=0.8,
        max_tokens=150,
        # Enable streaming output
    )

    request_id = "stream-001"
    prompt = "Tell me a story about"

    print("Streaming generation:")
    async for output in engine.generate(prompt, sampling_params, request_id):
        if output.outputs:
            # Print only the new text (delta)
            text = output.outputs[0].text
            print(text, end="", flush=True)

    print("\nGeneration complete!")

asyncio.run(stream_text())

Concurrent Multiple Requests

import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

async def process_request(engine, prompt, request_id):
    sampling_params = SamplingParams(temperature=0.7, max_tokens=100)

    results = []
    async for output in engine.generate(prompt, sampling_params, request_id):
        results.append(output)

    return request_id, results[-1].outputs[0].text

async def concurrent_generation():
    engine_args = AsyncEngineArgs(
        model="microsoft/DialoGPT-medium",
        max_num_seqs=10  # Support up to 10 concurrent requests
    )
    engine = AsyncLLMEngine.from_engine_args(engine_args)

    # Process multiple requests concurrently
    prompts = [
        "The future of AI is",
        "Once upon a time",
        "In a galaxy far away",
        "The secret to happiness"
    ]

    # Create concurrent tasks
    tasks = [
        process_request(engine, prompt, f"req-{i}")
        for i, prompt in enumerate(prompts)
    ]

    # Wait for all requests to complete
    results = await asyncio.gather(*tasks)

    for request_id, generated_text in results:
        print(f"{request_id}: {generated_text}")

asyncio.run(concurrent_generation())

Request Management and Cancellation

import asyncio
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

async def request_with_timeout():
    engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
    engine = AsyncLLMEngine.from_engine_args(engine_args)

    sampling_params = SamplingParams(
        temperature=0.8,
        max_tokens=500  # Long generation
    )

    request_id = "timeout-test"

    async def generate_text():
        async for output in engine.generate(
            "Write a very long story",
            sampling_params,
            request_id
        ):
            print(f"Generated so far: {len(output.outputs[0].text)} chars")
            yield output

    try:
        # Set a timeout for generation
        results = []
        async for output in asyncio.wait_for(generate_text(), timeout=5.0):
            results.append(output)

    except asyncio.TimeoutError:
        print("Generation timed out, cancelling request...")
        await engine.abort(request_id)
        print("Request cancelled successfully")

asyncio.run(request_with_timeout())

Integration with Web Framework

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
import json
import uuid

app = FastAPI()

# Initialize engine at startup
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
engine = AsyncLLMEngine.from_engine_args(engine_args)

@app.post("/generate")
async def generate_text(prompt: str, max_tokens: int = 100):
    request_id = str(uuid.uuid4())
    sampling_params = SamplingParams(
        temperature=0.8,
        max_tokens=max_tokens
    )

    async def generate_stream():
        async for output in engine.generate(prompt, sampling_params, request_id):
            yield f"data: {json.dumps({'text': output.outputs[0].text})}\\n\\n"

    return StreamingResponse(
        generate_stream(),
        media_type="text/plain",
        headers={"Cache-Control": "no-cache"}
    )

@app.delete("/cancel/{request_id}")
async def cancel_request(request_id: str):
    await engine.abort(request_id)
    return {"message": "Request cancelled"}

Types

class RequestStatus(str, Enum):
    WAITING = "waiting"
    RUNNING = "running"
    COMPLETED = "completed"
    CANCELLED = "cancelled"
    FAILED = "failed"

class AsyncEngineArgs(EngineArgs):
    # Inherits all EngineArgs fields plus:
    worker_use_ray: bool = False
    engine_use_ray: bool = False
    disable_log_requests: bool = False
    max_log_len: Optional[int] = None

class StreamingRequest:
    request_id: str
    prompt: str
    sampling_params: SamplingParams
    arrival_time: float
    lora_request: Optional[LoRARequest]

Install with Tessl CLI

npx tessl i tessl/pypi-vllm

docs

async-inference.md

chat-completions.md

configuration.md

index.md

parameters-types.md

text-classification.md

text-embeddings.md

text-generation.md

text-scoring.md

tile.json