0
# Asynchronous Inference
1
2
High-performance asynchronous inference engine for concurrent request handling, streaming responses, and integration with async frameworks. Enables scalable serving of LLMs with efficient resource utilization and non-blocking operations.
3
4
## Capabilities
5
6
### Async LLM Engine
7
8
Core asynchronous inference engine supporting concurrent request processing, streaming generation, and distributed execution with Ray.
9
10
```python { .api }
11
class AsyncLLMEngine:
12
@classmethod
13
def from_engine_args(cls, engine_args: AsyncEngineArgs) -> "AsyncLLMEngine":
14
"""
15
Create AsyncLLMEngine from configuration arguments.
16
17
Parameters:
18
- engine_args: Configuration for engine initialization
19
20
Returns:
21
AsyncLLMEngine instance ready for inference
22
"""
23
24
async def generate(
25
self,
26
prompt: Optional[str],
27
sampling_params: SamplingParams,
28
request_id: str,
29
prompt_token_ids: Optional[List[int]] = None,
30
lora_request: Optional[LoRARequest] = None
31
) -> AsyncIterator[RequestOutput]:
32
"""
33
Generate text asynchronously with streaming output.
34
35
Parameters:
36
- prompt: Input text prompt
37
- sampling_params: Generation parameters
38
- request_id: Unique identifier for request tracking
39
- prompt_token_ids: Pre-tokenized input (optional)
40
- lora_request: LoRA adapter configuration
41
42
Yields:
43
RequestOutput objects as generation progresses
44
"""
45
46
async def encode(
47
self,
48
prompt: Optional[str],
49
pooling_params: PoolingParams,
50
request_id: str,
51
prompt_token_ids: Optional[List[int]] = None,
52
lora_request: Optional[LoRARequest] = None
53
) -> AsyncIterator[EmbeddingRequestOutput]:
54
"""
55
Generate embeddings asynchronously.
56
57
Parameters:
58
- prompt: Input text for embedding
59
- pooling_params: Embedding pooling configuration
60
- request_id: Unique request identifier
61
- prompt_token_ids: Pre-tokenized input (optional)
62
- lora_request: LoRA adapter configuration
63
64
Yields:
65
EmbeddingRequestOutput with vector representations
66
"""
67
68
async def abort(self, request_id: str) -> None:
69
"""
70
Cancel a running request.
71
72
Parameters:
73
- request_id: ID of request to abort
74
"""
75
76
async def get_model_config(self) -> ModelConfig:
77
"""Get the model configuration."""
78
79
async def get_num_unfinished_requests(self) -> int:
80
"""Get count of active requests."""
81
82
async def check_health(self) -> None:
83
"""
84
Health check for the engine.
85
86
Raises:
87
RuntimeError: If engine is unhealthy
88
"""
89
90
async def add_request(
91
self,
92
request_id: str,
93
prompt: Optional[str],
94
params: Union[SamplingParams, PoolingParams],
95
arrival_time: Optional[float] = None,
96
prompt_token_ids: Optional[List[int]] = None,
97
lora_request: Optional[LoRARequest] = None,
98
trace_headers: Optional[Dict[str, str]] = None,
99
prompt_adapter_request: Optional["PromptAdapterRequest"] = None,
100
priority: int = 0
101
) -> AsyncIterator[RequestOutput]:
102
"""
103
Add request and return async generator for results.
104
105
Parameters:
106
- request_id: Unique identifier for the request
107
- prompt: Input text prompt
108
- params: Sampling or pooling parameters
109
- arrival_time: Request arrival timestamp
110
- prompt_token_ids: Pre-tokenized input
111
- lora_request: LoRA adapter configuration
112
- trace_headers: Tracing headers
113
- prompt_adapter_request: Prompt adapter configuration
114
- priority: Request priority level
115
116
Yields:
117
RequestOutput objects as processing progresses
118
"""
119
120
async def get_tokenizer(self, lora_request: Optional[LoRARequest] = None) -> "PreTrainedTokenizerBase":
121
"""
122
Get tokenizer asynchronously with optional LoRA support.
123
124
Parameters:
125
- lora_request: Optional LoRA adapter configuration
126
127
Returns:
128
Tokenizer instance configured for this model
129
"""
130
131
async def get_input_preprocessor(self) -> "InputPreprocessor":
132
"""
133
Get input preprocessor for this engine.
134
135
Returns:
136
Input preprocessor instance
137
"""
138
139
async def start_background_loop(self) -> None:
140
"""Start background loop for processing requests."""
141
142
async def shutdown_background_loop(self) -> None:
143
"""Shutdown background processing loop."""
144
145
async def get_vllm_config(self) -> "VllmConfig":
146
"""Get complete vLLM configuration."""
147
148
async def get_parallel_config(self) -> "ParallelConfig":
149
"""Get parallel execution configuration."""
150
151
async def get_decoding_config(self) -> "DecodingConfig":
152
"""Get decoding configuration."""
153
154
async def get_scheduler_config(self) -> "SchedulerConfig":
155
"""Get request scheduler configuration."""
156
157
async def get_lora_config(self) -> Optional["LoRAConfig"]:
158
"""Get LoRA adapter configuration."""
159
160
async def do_log_stats(
161
self,
162
scheduler_outputs: Optional[Any] = None,
163
model_output: Optional[Any] = None
164
) -> None:
165
"""
166
Log performance statistics.
167
168
Parameters:
169
- scheduler_outputs: Optional scheduler output data
170
- model_output: Optional model output data
171
"""
172
173
async def is_tracing_enabled(self) -> bool:
174
"""Check if request tracing is enabled."""
175
176
async def add_logger(self, logger_name: str, logger: Any) -> None:
177
"""
178
Add custom logger to the engine.
179
180
Parameters:
181
- logger_name: Name for the logger
182
- logger: Logger instance to add
183
"""
184
185
async def remove_logger(self, logger_name: str) -> None:
186
"""
187
Remove logger from the engine.
188
189
Parameters:
190
- logger_name: Name of logger to remove
191
"""
192
193
async def start_profile(self) -> None:
194
"""Start performance profiling for async engine."""
195
196
async def stop_profile(self) -> None:
197
"""Stop performance profiling and save results."""
198
199
async def reset_mm_cache(self) -> None:
200
"""Reset multimodal processing cache."""
201
202
async def reset_prefix_cache(self, device: Optional[Union[str, int]] = None) -> None:
203
"""
204
Reset prefix cache for memory optimization.
205
206
Parameters:
207
- device: Optional device specification for cache reset
208
"""
209
210
async def sleep(self, level: int = 1) -> None:
211
"""
212
Put async engine to sleep to free resources.
213
214
Parameters:
215
- level: Sleep level (1=light, 2=deep)
216
"""
217
218
async def wake_up(self, tags: Optional[List[str]] = None) -> None:
219
"""
220
Wake up sleeping async engine.
221
222
Parameters:
223
- tags: Optional tags for selective wake-up
224
"""
225
226
async def is_sleeping(self) -> bool:
227
"""Check if engine is currently sleeping."""
228
229
async def add_lora(self, lora_request: LoRARequest) -> None:
230
"""
231
Add LoRA adapter to engine.
232
233
Parameters:
234
- lora_request: LoRA adapter configuration to add
235
"""
236
237
async def collective_rpc(
238
self,
239
method: str,
240
timeout: Optional[float] = None,
241
args: Tuple[Any, ...] = (),
242
kwargs: Optional[Dict[str, Any]] = None
243
) -> List[Any]:
244
"""
245
Execute RPC calls on all workers asynchronously.
246
247
Parameters:
248
- method: Method name to call on workers
249
- timeout: Optional timeout for RPC calls
250
- args: Positional arguments for method
251
- kwargs: Keyword arguments for method
252
253
Returns:
254
List of results from all workers
255
"""
256
257
@property
258
def is_running(self) -> bool:
259
"""Check if background loop is running."""
260
261
@property
262
def is_stopped(self) -> bool:
263
"""Check if engine is stopped."""
264
265
@property
266
def errored(self) -> bool:
267
"""Check if engine has errored."""
268
269
@property
270
def dead_error(self) -> Optional[BaseException]:
271
"""Get dead error exception if any."""
272
```
273
274
### Streaming Generation
275
276
Support for real-time streaming of generated tokens as they are produced, enabling responsive user interfaces and immediate feedback.
277
278
```python { .api }
279
async def stream_generate(
280
engine: AsyncLLMEngine,
281
prompt: str,
282
sampling_params: SamplingParams,
283
request_id: str
284
) -> AsyncIterator[str]:
285
"""
286
Stream generated text tokens in real-time.
287
288
Parameters:
289
- engine: AsyncLLMEngine instance
290
- prompt: Input prompt
291
- sampling_params: Generation configuration
292
- request_id: Unique request identifier
293
294
Yields:
295
Individual text tokens as they are generated
296
"""
297
```
298
299
### Concurrent Request Processing
300
301
Handle multiple requests simultaneously with intelligent batching and resource management.
302
303
```python { .api }
304
class RequestManager:
305
async def add_request(
306
self,
307
request_id: str,
308
prompt: str,
309
sampling_params: SamplingParams,
310
prompt_token_ids: Optional[List[int]] = None,
311
arrival_time: Optional[float] = None,
312
lora_request: Optional[LoRARequest] = None
313
) -> None:
314
"""Add new request to processing queue."""
315
316
async def abort_request(self, request_id: str) -> None:
317
"""Cancel pending or running request."""
318
319
async def get_request_status(self, request_id: str) -> RequestStatus:
320
"""Check status of specific request."""
321
```
322
323
## Usage Examples
324
325
### Basic Async Generation
326
327
```python
328
import asyncio
329
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
330
331
async def main():
332
# Initialize async engine
333
engine_args = AsyncEngineArgs(
334
model="microsoft/DialoGPT-medium",
335
tensor_parallel_size=1
336
)
337
engine = AsyncLLMEngine.from_engine_args(engine_args)
338
339
# Generate text asynchronously
340
sampling_params = SamplingParams(temperature=0.8, max_tokens=100)
341
request_id = "req-001"
342
343
results = []
344
async for output in engine.generate("Hello world", sampling_params, request_id):
345
results.append(output)
346
print(f"Generated: {output.outputs[0].text}")
347
348
return results
349
350
# Run the async function
351
asyncio.run(main())
352
```
353
354
### Streaming Generation
355
356
```python
357
import asyncio
358
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
359
360
async def stream_text():
361
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
362
engine = AsyncLLMEngine.from_engine_args(engine_args)
363
364
sampling_params = SamplingParams(
365
temperature=0.8,
366
max_tokens=150,
367
# Enable streaming output
368
)
369
370
request_id = "stream-001"
371
prompt = "Tell me a story about"
372
373
print("Streaming generation:")
374
async for output in engine.generate(prompt, sampling_params, request_id):
375
if output.outputs:
376
# Print only the new text (delta)
377
text = output.outputs[0].text
378
print(text, end="", flush=True)
379
380
print("\nGeneration complete!")
381
382
asyncio.run(stream_text())
383
```
384
385
### Concurrent Multiple Requests
386
387
```python
388
import asyncio
389
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
390
391
async def process_request(engine, prompt, request_id):
392
sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
393
394
results = []
395
async for output in engine.generate(prompt, sampling_params, request_id):
396
results.append(output)
397
398
return request_id, results[-1].outputs[0].text
399
400
async def concurrent_generation():
401
engine_args = AsyncEngineArgs(
402
model="microsoft/DialoGPT-medium",
403
max_num_seqs=10 # Support up to 10 concurrent requests
404
)
405
engine = AsyncLLMEngine.from_engine_args(engine_args)
406
407
# Process multiple requests concurrently
408
prompts = [
409
"The future of AI is",
410
"Once upon a time",
411
"In a galaxy far away",
412
"The secret to happiness"
413
]
414
415
# Create concurrent tasks
416
tasks = [
417
process_request(engine, prompt, f"req-{i}")
418
for i, prompt in enumerate(prompts)
419
]
420
421
# Wait for all requests to complete
422
results = await asyncio.gather(*tasks)
423
424
for request_id, generated_text in results:
425
print(f"{request_id}: {generated_text}")
426
427
asyncio.run(concurrent_generation())
428
```
429
430
### Request Management and Cancellation
431
432
```python
433
import asyncio
434
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
435
436
async def request_with_timeout():
437
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
438
engine = AsyncLLMEngine.from_engine_args(engine_args)
439
440
sampling_params = SamplingParams(
441
temperature=0.8,
442
max_tokens=500 # Long generation
443
)
444
445
request_id = "timeout-test"
446
447
async def generate_text():
448
async for output in engine.generate(
449
"Write a very long story",
450
sampling_params,
451
request_id
452
):
453
print(f"Generated so far: {len(output.outputs[0].text)} chars")
454
yield output
455
456
try:
457
# Set a timeout for generation
458
results = []
459
async for output in asyncio.wait_for(generate_text(), timeout=5.0):
460
results.append(output)
461
462
except asyncio.TimeoutError:
463
print("Generation timed out, cancelling request...")
464
await engine.abort(request_id)
465
print("Request cancelled successfully")
466
467
asyncio.run(request_with_timeout())
468
```
469
470
### Integration with Web Framework
471
472
```python
473
from fastapi import FastAPI
474
from fastapi.responses import StreamingResponse
475
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
476
import json
477
import uuid
478
479
app = FastAPI()
480
481
# Initialize engine at startup
482
engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")
483
engine = AsyncLLMEngine.from_engine_args(engine_args)
484
485
@app.post("/generate")
486
async def generate_text(prompt: str, max_tokens: int = 100):
487
request_id = str(uuid.uuid4())
488
sampling_params = SamplingParams(
489
temperature=0.8,
490
max_tokens=max_tokens
491
)
492
493
async def generate_stream():
494
async for output in engine.generate(prompt, sampling_params, request_id):
495
yield f"data: {json.dumps({'text': output.outputs[0].text})}\\n\\n"
496
497
return StreamingResponse(
498
generate_stream(),
499
media_type="text/plain",
500
headers={"Cache-Control": "no-cache"}
501
)
502
503
@app.delete("/cancel/{request_id}")
504
async def cancel_request(request_id: str):
505
await engine.abort(request_id)
506
return {"message": "Request cancelled"}
507
```
508
509
## Types
510
511
```python { .api }
512
class RequestStatus(str, Enum):
513
WAITING = "waiting"
514
RUNNING = "running"
515
COMPLETED = "completed"
516
CANCELLED = "cancelled"
517
FAILED = "failed"
518
519
class AsyncEngineArgs(EngineArgs):
520
# Inherits all EngineArgs fields plus:
521
worker_use_ray: bool = False
522
engine_use_ray: bool = False
523
disable_log_requests: bool = False
524
max_log_len: Optional[int] = None
525
526
class StreamingRequest:
527
request_id: str
528
prompt: str
529
sampling_params: SamplingParams
530
arrival_time: float
531
lora_request: Optional[LoRARequest]
532
```